Files
ha-xiaoxiang-bms/custom_components/xiaoxiang_bms/bluetooth_handler.py
T
Jannis dcc528b96a Add global BLE lock + faster timeouts for multi-device reliability
Root cause: 3 BMS devices fighting for 3 ESPHome proxy connection slots
simultaneously, causing 80% timeout failures and 22s+ poll times.

Fixes:
- Add shared asyncio.Lock so only one BMS polls at a time — eliminates
  proxy slot contention entirely
- Pass ble_device_callback to establish_connection so retry attempts
  get a fresh BLEDevice (handles proxy path changes)
- Reduce command timeout 5s -> 3s, retries 3 -> 2 (BMS responds in
  <200ms when connection is clean)
- Reduce establish_connection max_attempts 3 -> 2 (fail fast, retry
  next cycle instead of blocking 25s)
- Fixed poll timeout to 15s (was poll_interval-5=25s)

Expected: polls complete in 2-5s instead of 22s, ~95%+ success rate.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-12 10:05:54 +02:00

332 lines
13 KiB
Python

"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import asyncio
import logging
import struct
from collections.abc import Callable
from bleak import BleakError
from bleak.backends.device import BLEDevice
from bleak_retry_connector import establish_connection, BleakClientWithServiceCache
from .const import (
FRAME_END,
FRAME_START,
RX_CHAR_UUID,
TX_CHAR_UUID,
)
_LOGGER = logging.getLogger(__name__)
# Full frame layout:
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
# Header = 4 bytes, trailer = 3 bytes (checksum x 2 + end marker)
_HEADER_LEN = 4
_TRAILER_LEN = 3
class BmsBluetoothHandler:
"""Protocol framing and parsing for a Xiaoxiang BMS device.
Designed for a connect -> poll -> disconnect pattern: the BMS only allows
one simultaneous BLE connection, so we hold it only for the duration of
a single data fetch and release it immediately after.
"""
def __init__(self, address: str) -> None:
self._address = address
self._buffer = bytearray()
self._response_queue: asyncio.Queue[bytes] = asyncio.Queue()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _drain_queue(self) -> None:
"""Discard any stale frames left in the queue from a prior cycle."""
while not self._response_queue.empty():
try:
self._response_queue.get_nowait()
except asyncio.QueueEmpty:
break
def _reset(self) -> None:
"""Clear all transient state before a new connection."""
self._buffer.clear()
self._drain_queue()
# ------------------------------------------------------------------
# High-level poll — the only entry point the coordinator needs
# ------------------------------------------------------------------
async def poll(
self,
ble_device: BLEDevice,
commands: list[bytes],
timeout: float = 3.0,
retries: int = 2,
ble_device_callback: Callable[[], BLEDevice | None] | None = None,
) -> list[bytes | None]:
"""Connect, send each command in sequence, disconnect.
The BMS only supports a single BLE connection at a time. By connecting
only during the active read window and disconnecting immediately after,
the mobile app (or any other client) can connect freely between polls.
"""
self._reset()
_LOGGER.debug("Polling BMS at %s", self._address)
client = await establish_connection(
BleakClientWithServiceCache,
ble_device,
self._address,
max_attempts=2,
ble_device_callback=ble_device_callback,
)
try:
await client.start_notify(RX_CHAR_UUID, self._on_notify)
await asyncio.sleep(0.3)
return [
await self._request(client, cmd, timeout, retries)
for cmd in commands
]
finally:
try:
await client.disconnect()
except BleakError:
pass
# ------------------------------------------------------------------
# Frame reception
# ------------------------------------------------------------------
def _on_notify(self, _char, data: bytearray) -> None:
"""Accumulate BLE notification chunks into complete protocol frames.
BLE max payload is 20 bytes (default MTU), so a single BMS frame
(up to ~50 bytes for 16 cells) arrives across several notifications.
We buffer until we can calculate and verify the expected frame length.
"""
self._buffer.extend(data)
# Discard leading garbage until we see a frame start byte
while self._buffer and self._buffer[0] != FRAME_START:
self._buffer.pop(0)
# Need at least the 4-byte header to know payload length
if len(self._buffer) < _HEADER_LEN:
return
payload_len = self._buffer[3]
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
if len(self._buffer) < expected_total:
return # still waiting for more chunks
frame = bytes(self._buffer[:expected_total])
del self._buffer[:expected_total]
if frame[-1] != FRAME_END:
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
return
if frame[2] != 0x00:
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
frame[2], frame[1])
return
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
self._response_queue.put_nowait(frame)
# ------------------------------------------------------------------
# Request / response (private — used inside poll())
# ------------------------------------------------------------------
async def _request(
self,
client: BleakClientWithServiceCache,
command: bytes,
timeout: float,
retries: int,
) -> bytes | None:
"""Send one command and wait for the response frame, with retries.
Tries Write With Response first; falls back to Write Without Response
if the characteristic rejects it — covers both BMS firmware variants.
"""
for attempt in range(1, retries + 1):
# Drain any stale frames before sending a new command
self._drain_queue()
self._buffer.clear()
try:
await client.write_gatt_char(TX_CHAR_UUID, command, response=True)
except BleakError:
try:
await client.write_gatt_char(TX_CHAR_UUID, command, response=False)
except BleakError as exc:
_LOGGER.warning("BLE write failed (attempt %d/%d): %s",
attempt, retries, exc)
if attempt < retries:
await asyncio.sleep(0.3)
continue
try:
frame = await asyncio.wait_for(
self._response_queue.get(), timeout
)
return frame
except asyncio.TimeoutError:
_LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)",
command.hex(), attempt, retries)
if attempt < retries:
await asyncio.sleep(0.3)
return None
# ------------------------------------------------------------------
# MOS write command
# ------------------------------------------------------------------
async def write_mos(
self,
ble_device: BLEDevice,
value: int,
ble_device_callback: Callable[[], BLEDevice | None] | None = None,
) -> bool:
"""Send a MOS control write command and return True on ACK.
Follows the same connect -> send -> disconnect pattern as poll() so
it doesn't interfere with the normal poll cycle.
"""
self._reset()
command = self._build_mos_command(value)
_LOGGER.debug("Writing MOS value 0x%02X to BMS at %s", value, self._address)
client = await establish_connection(
BleakClientWithServiceCache,
ble_device,
self._address,
max_attempts=2,
ble_device_callback=ble_device_callback,
)
try:
await client.start_notify(RX_CHAR_UUID, self._on_notify)
await asyncio.sleep(0.3)
response = await self._request(client, command, timeout=3.0, retries=2)
return response is not None and response[2] == 0x00
finally:
try:
await client.disconnect()
except BleakError:
pass
@staticmethod
def _build_mos_command(value: int) -> bytes:
"""Build a MOS control write frame with correct checksum.
Frame: DD 5A E1 02 00 XX CHK_H CHK_L 77
Checked bytes (per spec): command_code + length + data bytes
= 0xE1 + 0x02 + 0x00 + XX
Checksum = two's complement of sum, high byte first.
Verified against spec example:
XX=0x02 -> sum=0xE5 -> ~0xE5+1=0xFF1B -> CHK FF 1B
"""
checked = [0xE1, 0x02, 0x00, value & 0xFF]
checksum = (~sum(checked) + 1) & 0xFFFF
return bytes([
0xDD, 0x5A, 0xE1, 0x02, 0x00, value & 0xFF,
(checksum >> 8) & 0xFF, checksum & 0xFF,
0x77,
])
# ------------------------------------------------------------------
# Frame parsers
# ------------------------------------------------------------------
@staticmethod
def parse_general_info(frame: bytes) -> dict:
"""Parse a 0x03 general info response frame.
Payload byte offsets (frame[4] is payload[0]):
0-1 Total voltage uint16 BE /100 -> V
2-3 Current int16 BE /100 -> A (positive = charging, negative = discharging)
4-5 Residual capacity uint16 BE /100 -> Ah
6-7 Nominal capacity uint16 BE /100 -> Ah
8-9 Cycle count uint16 BE
10-11 Production date (ignored)
12-15 Balance status (ignored)
16-17 Protection status (ignored)
18 Software version (ignored)
19 State of charge uint8 %
20 MOS status uint8
21 Cell count uint8
22 Temp probe count uint8
23+ Temperatures uint16 BE each (raw - 2731) / 10 -> C
"""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
temp_count = p[22]
temperatures: list[float] = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temperatures.append(round((raw - 2731) / 10.0, 1))
balance = struct.unpack_from(">H", p, 12)[0] | struct.unpack_from(">H", p, 14)[0]
prot = struct.unpack_from(">H", p, 16)[0]
mos = p[20]
return {
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temperatures,
# MOS status
"mos_charge_enabled": bool(mos & 0x01),
"mos_discharge_enabled": bool(mos & 0x02),
# Cell balancing (any cell currently balancing)
"balance_active": balance != 0,
# Protection flags (bit per event, True = protection triggered)
"prot_cell_overvolt": bool(prot & (1 << 0)),
"prot_cell_undervolt": bool(prot & (1 << 1)),
"prot_pack_overvolt": bool(prot & (1 << 2)),
"prot_pack_undervolt": bool(prot & (1 << 3)),
"prot_charge_overtemp": bool(prot & (1 << 4)),
"prot_charge_undertemp": bool(prot & (1 << 5)),
"prot_discharge_overtemp": bool(prot & (1 << 6)),
"prot_discharge_undertemp": bool(prot & (1 << 7)),
"prot_charge_overcurrent": bool(prot & (1 << 8)),
"prot_discharge_overcurrent": bool(prot & (1 << 9)),
"prot_short_circuit": bool(prot & (1 << 10)),
"prot_frontend_ic_error": bool(prot & (1 << 11)),
"prot_software_lock": bool(prot & (1 << 12)),
}
@staticmethod
def parse_version(frame: bytes) -> str:
"""Parse a 0x05 hardware version response frame into an ASCII string."""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
return p.decode("ascii", errors="replace").strip("\x00").strip()
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
"""Parse a 0x04 cell voltage response frame.
Per spec: frame[3] (the header length byte) = cell_count x 2.
The payload contains ONLY the voltage bytes — no count byte.
0+ Cell voltages uint16 BE each unit mV /1000 -> V
"""
count = frame[3] // 2 # header length byte = N_cells x 2
p = frame[_HEADER_LEN:-_TRAILER_LEN]
voltages: list[float] = []
for i in range(count):
raw = struct.unpack_from(">H", p, i * 2)[0]
voltages.append(round(raw / 1000.0, 3))
return {"cell_voltages": voltages}