"""BLE GATT communication handler for the Xiaoxiang Smart BMS.""" from __future__ import annotations import asyncio import logging import struct from bleak import BleakClient, BleakError from bleak.backends.device import BLEDevice from .const import ( FRAME_END, FRAME_START, RX_CHAR_UUID, TX_CHAR_UUID, ) _LOGGER = logging.getLogger(__name__) # Full frame layout: # [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77] # Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker) _HEADER_LEN = 4 _TRAILER_LEN = 3 class BmsBluetoothHandler: """Protocol framing and parsing for a Xiaoxiang BMS device. Designed for a connect → poll → disconnect pattern: the BMS only allows one simultaneous BLE connection, so we hold it only for the duration of a single data fetch and release it immediately after. """ def __init__(self, address: str) -> None: self._address = address self._buffer = bytearray() self._response_event = asyncio.Event() self._response_data: bytes | None = None self._lock = asyncio.Lock() # ------------------------------------------------------------------ # High-level poll — the only entry point the coordinator needs # ------------------------------------------------------------------ async def poll( self, ble_device: BLEDevice, commands: list[bytes], timeout: float = 3.0, retries: int = 3, ) -> list[bytes | None]: """Connect, send each command in sequence, disconnect. The BMS only supports a single BLE connection at a time. By connecting only during the active read window and disconnecting immediately after, the mobile app (or any other client) can connect freely between polls. """ _LOGGER.debug("Polling BMS at %s", self._address) client = BleakClient(ble_device) try: await client.connect() await client.start_notify(RX_CHAR_UUID, self._on_notify) # Give the BMS a moment to register the subscription before # we start sending commands await asyncio.sleep(0.3) return [ await self._request(client, cmd, timeout, retries) for cmd in commands ] finally: try: await client.disconnect() except Exception: pass self._buffer.clear() # ------------------------------------------------------------------ # Frame reception # ------------------------------------------------------------------ def _on_notify(self, _char, data: bytearray) -> None: """Accumulate BLE notification chunks into complete protocol frames. BLE max payload is 20 bytes (default MTU), so a single BMS frame (up to ~50 bytes for 16 cells) arrives across several notifications. We buffer until we can calculate and verify the expected frame length. """ self._buffer.extend(data) # Discard leading garbage until we see a frame start byte while self._buffer and self._buffer[0] != FRAME_START: self._buffer.pop(0) # Need at least the 4-byte header to know payload length if len(self._buffer) < _HEADER_LEN: return payload_len = self._buffer[3] expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN if len(self._buffer) < expected_total: return # still waiting for more chunks frame = bytes(self._buffer[:expected_total]) del self._buffer[:expected_total] if frame[-1] != FRAME_END: _LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex()) return if frame[2] != 0x00: _LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X", frame[2], frame[1]) return _LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len) self._response_data = frame self._response_event.set() # ------------------------------------------------------------------ # Request / response (private — used inside poll()) # ------------------------------------------------------------------ async def _request( self, client: BleakClient, command: bytes, timeout: float, retries: int, ) -> bytes | None: """Send one command and wait for the response frame, with retries. Tries Write With Response first; falls back to Write Without Response if the characteristic rejects it — covers both BMS firmware variants. """ async with self._lock: for attempt in range(1, retries + 1): self._response_event.clear() self._response_data = None try: await client.write_gatt_char(TX_CHAR_UUID, command, response=True) except BleakError: try: await client.write_gatt_char(TX_CHAR_UUID, command, response=False) except BleakError as exc: _LOGGER.error("BLE write failed (attempt %d/%d): %s", attempt, retries, exc) if attempt < retries: await asyncio.sleep(0.3) continue try: await asyncio.wait_for(self._response_event.wait(), timeout) return self._response_data except asyncio.TimeoutError: _LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)", command.hex(), attempt, retries) if attempt < retries: await asyncio.sleep(0.3) return None # ------------------------------------------------------------------ # MOS write command # ------------------------------------------------------------------ async def write_mos(self, ble_device: BLEDevice, value: int) -> bool: """Send a MOS control write command and return True on ACK. Follows the same connect → send → disconnect pattern as poll() so it doesn't interfere with the normal poll cycle. """ command = self._build_mos_command(value) _LOGGER.debug("Writing MOS value 0x%02X to BMS at %s", value, self._address) client = BleakClient(ble_device) try: await client.connect() await client.start_notify(RX_CHAR_UUID, self._on_notify) await asyncio.sleep(0.5) response = await self._request(client, command, timeout=3.0, retries=2) # Response: DD E1 00 00 CHK_H CHK_L 77 (status byte 0x00 = OK) return response is not None and response[2] == 0x00 finally: try: await client.disconnect() except Exception: pass self._buffer.clear() @staticmethod def _build_mos_command(value: int) -> bytes: """Build a MOS control write frame with correct checksum. Frame: DD 5A E1 02 00 XX CHK_H CHK_L 77 Checked bytes (per spec): command_code + length + data bytes = 0xE1 + 0x02 + 0x00 + XX Checksum = two's complement of sum, high byte first. Verified against spec example: XX=0x02 → sum=0xE5 → ~0xE5+1=0xFF1B → CHK FF 1B ✓ """ checked = [0xE1, 0x02, 0x00, value & 0xFF] checksum = (~sum(checked) + 1) & 0xFFFF return bytes([ 0xDD, 0x5A, 0xE1, 0x02, 0x00, value & 0xFF, (checksum >> 8) & 0xFF, checksum & 0xFF, 0x77, ]) # ------------------------------------------------------------------ # Frame parsers # ------------------------------------------------------------------ @staticmethod def parse_general_info(frame: bytes) -> dict: """Parse a 0x03 general info response frame. Payload byte offsets (frame[4] is payload[0]): 0-1 Total voltage uint16 BE ÷100 → V 2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging) 4-5 Residual capacity uint16 BE ÷100 → Ah 6-7 Nominal capacity uint16 BE ÷100 → Ah 8-9 Cycle count uint16 BE 10-11 Production date (ignored) 12-15 Balance status (ignored) 16-17 Protection status (ignored) 18 Software version (ignored) 19 State of charge uint8 % 20 MOS status uint8 21 Cell count uint8 22 Temp probe count uint8 23+ Temperatures uint16 BE each (raw − 2731) ÷ 10 → °C """ p = frame[_HEADER_LEN:-_TRAILER_LEN] temp_count = p[22] temperatures: list[float] = [] for i in range(temp_count): raw = struct.unpack_from(">H", p, 23 + i * 2)[0] temperatures.append(round((raw - 2731) / 10.0, 1)) balance = struct.unpack_from(">H", p, 12)[0] | struct.unpack_from(">H", p, 14)[0] prot = struct.unpack_from(">H", p, 16)[0] mos = p[20] return { "voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2), "current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2), "residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2), "nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2), "cycle_count": struct.unpack_from(">H", p, 8)[0], "state_of_charge": p[19], "cell_count": p[21], "temperatures": temperatures, # MOS status "mos_charge_enabled": bool(mos & 0x01), "mos_discharge_enabled": bool(mos & 0x02), # Cell balancing (any cell currently balancing) "balance_active": balance != 0, # Protection flags (bit per event, True = protection triggered) "prot_cell_overvolt": bool(prot & (1 << 0)), "prot_cell_undervolt": bool(prot & (1 << 1)), "prot_pack_overvolt": bool(prot & (1 << 2)), "prot_pack_undervolt": bool(prot & (1 << 3)), "prot_charge_overtemp": bool(prot & (1 << 4)), "prot_charge_undertemp": bool(prot & (1 << 5)), "prot_discharge_overtemp": bool(prot & (1 << 6)), "prot_discharge_undertemp": bool(prot & (1 << 7)), "prot_charge_overcurrent": bool(prot & (1 << 8)), "prot_discharge_overcurrent": bool(prot & (1 << 9)), "prot_short_circuit": bool(prot & (1 << 10)), "prot_frontend_ic_error": bool(prot & (1 << 11)), "prot_software_lock": bool(prot & (1 << 12)), } @staticmethod def parse_version(frame: bytes) -> str: """Parse a 0x05 hardware version response frame into an ASCII string.""" p = frame[_HEADER_LEN:-_TRAILER_LEN] return p.decode("ascii", errors="replace").strip("\x00").strip() @staticmethod def parse_cell_info(frame: bytes) -> dict: """Parse a 0x04 cell voltage response frame. Per spec: frame[3] (the header length byte) = cell_count × 2. The payload contains ONLY the voltage bytes — no count byte. 0+ Cell voltages uint16 BE each unit mV ÷1000 → V """ count = frame[3] // 2 # header length byte = N_cells × 2 p = frame[_HEADER_LEN:-_TRAILER_LEN] voltages: list[float] = [] for i in range(count): raw = struct.unpack_from(">H", p, i * 2)[0] voltages.append(round(raw / 1000.0, 3)) return {"cell_voltages": voltages}