e1e6d69d2c
- Device name: coordinator now takes name=entry.title so the HA device card shows the actual BLE advertised name instead of "Xiaoxiang Smart BMS" - Hard poll timeout: each poll is capped at (poll_interval - 3)s via asyncio.wait_for so a stalled poll can't bleed into the next cycle - Request timeout: 5s → 3s (BMS should reply in <1s under normal conditions) - Retry delay: 0.5s → 0.3s between retries Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
251 lines
10 KiB
Python
251 lines
10 KiB
Python
"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import struct
|
||
|
||
from bleak import BleakClient, BleakError
|
||
from bleak.backends.device import BLEDevice
|
||
|
||
from .const import (
|
||
FRAME_END,
|
||
FRAME_START,
|
||
RX_CHAR_UUID,
|
||
TX_CHAR_UUID,
|
||
)
|
||
|
||
_LOGGER = logging.getLogger(__name__)
|
||
|
||
# Full frame layout:
|
||
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
|
||
# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker)
|
||
_HEADER_LEN = 4
|
||
_TRAILER_LEN = 3
|
||
|
||
|
||
class BmsBluetoothHandler:
|
||
"""Protocol framing and parsing for a Xiaoxiang BMS device.
|
||
|
||
Designed for a connect → poll → disconnect pattern: the BMS only allows
|
||
one simultaneous BLE connection, so we hold it only for the duration of
|
||
a single data fetch and release it immediately after.
|
||
"""
|
||
|
||
def __init__(self, address: str) -> None:
|
||
self._address = address
|
||
self._buffer = bytearray()
|
||
self._response_event = asyncio.Event()
|
||
self._response_data: bytes | None = None
|
||
self._lock = asyncio.Lock()
|
||
|
||
# ------------------------------------------------------------------
|
||
# High-level poll — the only entry point the coordinator needs
|
||
# ------------------------------------------------------------------
|
||
|
||
async def poll(
|
||
self,
|
||
ble_device: BLEDevice,
|
||
commands: list[bytes],
|
||
timeout: float = 3.0,
|
||
retries: int = 3,
|
||
) -> list[bytes | None]:
|
||
"""Connect, send each command in sequence, disconnect.
|
||
|
||
The BMS only supports a single BLE connection at a time. By connecting
|
||
only during the active read window and disconnecting immediately after,
|
||
the mobile app (or any other client) can connect freely between polls.
|
||
"""
|
||
_LOGGER.debug("Polling BMS at %s", self._address)
|
||
client = BleakClient(ble_device)
|
||
try:
|
||
await client.connect()
|
||
await client.start_notify(RX_CHAR_UUID, self._on_notify)
|
||
# Give the BMS a moment to register the subscription before
|
||
# we start sending commands
|
||
await asyncio.sleep(0.3)
|
||
return [
|
||
await self._request(client, cmd, timeout, retries)
|
||
for cmd in commands
|
||
]
|
||
finally:
|
||
try:
|
||
await client.disconnect()
|
||
except Exception:
|
||
pass
|
||
self._buffer.clear()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Frame reception
|
||
# ------------------------------------------------------------------
|
||
|
||
def _on_notify(self, _char, data: bytearray) -> None:
|
||
"""Accumulate BLE notification chunks into complete protocol frames.
|
||
|
||
BLE max payload is 20 bytes (default MTU), so a single BMS frame
|
||
(up to ~50 bytes for 16 cells) arrives across several notifications.
|
||
We buffer until we can calculate and verify the expected frame length.
|
||
"""
|
||
self._buffer.extend(data)
|
||
|
||
# Discard leading garbage until we see a frame start byte
|
||
while self._buffer and self._buffer[0] != FRAME_START:
|
||
self._buffer.pop(0)
|
||
|
||
# Need at least the 4-byte header to know payload length
|
||
if len(self._buffer) < _HEADER_LEN:
|
||
return
|
||
|
||
payload_len = self._buffer[3]
|
||
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
|
||
|
||
if len(self._buffer) < expected_total:
|
||
return # still waiting for more chunks
|
||
|
||
frame = bytes(self._buffer[:expected_total])
|
||
del self._buffer[:expected_total]
|
||
|
||
if frame[-1] != FRAME_END:
|
||
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
|
||
return
|
||
|
||
if frame[2] != 0x00:
|
||
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
|
||
frame[2], frame[1])
|
||
return
|
||
|
||
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
|
||
self._response_data = frame
|
||
self._response_event.set()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Request / response (private — used inside poll())
|
||
# ------------------------------------------------------------------
|
||
|
||
async def _request(
|
||
self,
|
||
client: BleakClient,
|
||
command: bytes,
|
||
timeout: float,
|
||
retries: int,
|
||
) -> bytes | None:
|
||
"""Send one command and wait for the response frame, with retries.
|
||
|
||
Tries Write With Response first; falls back to Write Without Response
|
||
if the characteristic rejects it — covers both BMS firmware variants.
|
||
"""
|
||
async with self._lock:
|
||
for attempt in range(1, retries + 1):
|
||
self._response_event.clear()
|
||
self._response_data = None
|
||
try:
|
||
await client.write_gatt_char(TX_CHAR_UUID, command, response=True)
|
||
except BleakError:
|
||
try:
|
||
await client.write_gatt_char(TX_CHAR_UUID, command, response=False)
|
||
except BleakError as exc:
|
||
_LOGGER.error("BLE write failed (attempt %d/%d): %s",
|
||
attempt, retries, exc)
|
||
if attempt < retries:
|
||
await asyncio.sleep(0.3)
|
||
continue
|
||
|
||
try:
|
||
await asyncio.wait_for(self._response_event.wait(), timeout)
|
||
return self._response_data
|
||
except asyncio.TimeoutError:
|
||
_LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)",
|
||
command.hex(), attempt, retries)
|
||
if attempt < retries:
|
||
await asyncio.sleep(0.3)
|
||
|
||
return None
|
||
|
||
# ------------------------------------------------------------------
|
||
# Frame parsers
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def parse_general_info(frame: bytes) -> dict:
|
||
"""Parse a 0x03 general info response frame.
|
||
|
||
Payload byte offsets (frame[4] is payload[0]):
|
||
0-1 Total voltage uint16 BE ÷100 → V
|
||
2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging)
|
||
4-5 Residual capacity uint16 BE ÷100 → Ah
|
||
6-7 Nominal capacity uint16 BE ÷100 → Ah
|
||
8-9 Cycle count uint16 BE
|
||
10-11 Production date (ignored)
|
||
12-15 Balance status (ignored)
|
||
16-17 Protection status (ignored)
|
||
18 Software version (ignored)
|
||
19 State of charge uint8 %
|
||
20 MOS status uint8
|
||
21 Cell count uint8
|
||
22 Temp probe count uint8
|
||
23+ Temperatures uint16 BE each (raw − 2731) ÷ 10 → °C
|
||
"""
|
||
p = frame[_HEADER_LEN:-_TRAILER_LEN]
|
||
|
||
temp_count = p[22]
|
||
temperatures: list[float] = []
|
||
for i in range(temp_count):
|
||
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
|
||
temperatures.append(round((raw - 2731) / 10.0, 1))
|
||
|
||
balance = struct.unpack_from(">H", p, 12)[0] | struct.unpack_from(">H", p, 14)[0]
|
||
prot = struct.unpack_from(">H", p, 16)[0]
|
||
mos = p[20]
|
||
|
||
return {
|
||
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
|
||
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
|
||
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
|
||
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
|
||
"cycle_count": struct.unpack_from(">H", p, 8)[0],
|
||
"state_of_charge": p[19],
|
||
"cell_count": p[21],
|
||
"temperatures": temperatures,
|
||
# MOS status
|
||
"mos_charge_enabled": bool(mos & 0x01),
|
||
"mos_discharge_enabled": bool(mos & 0x02),
|
||
# Cell balancing (any cell currently balancing)
|
||
"balance_active": balance != 0,
|
||
# Protection flags (bit per event, True = protection triggered)
|
||
"prot_cell_overvolt": bool(prot & (1 << 0)),
|
||
"prot_cell_undervolt": bool(prot & (1 << 1)),
|
||
"prot_pack_overvolt": bool(prot & (1 << 2)),
|
||
"prot_pack_undervolt": bool(prot & (1 << 3)),
|
||
"prot_charge_overtemp": bool(prot & (1 << 4)),
|
||
"prot_charge_undertemp": bool(prot & (1 << 5)),
|
||
"prot_discharge_overtemp": bool(prot & (1 << 6)),
|
||
"prot_discharge_undertemp": bool(prot & (1 << 7)),
|
||
"prot_charge_overcurrent": bool(prot & (1 << 8)),
|
||
"prot_discharge_overcurrent": bool(prot & (1 << 9)),
|
||
"prot_short_circuit": bool(prot & (1 << 10)),
|
||
"prot_frontend_ic_error": bool(prot & (1 << 11)),
|
||
"prot_software_lock": bool(prot & (1 << 12)),
|
||
}
|
||
|
||
@staticmethod
|
||
def parse_version(frame: bytes) -> str:
|
||
"""Parse a 0x05 hardware version response frame into an ASCII string."""
|
||
p = frame[_HEADER_LEN:-_TRAILER_LEN]
|
||
return p.decode("ascii", errors="replace").strip("\x00").strip()
|
||
|
||
@staticmethod
|
||
def parse_cell_info(frame: bytes) -> dict:
|
||
"""Parse a 0x04 cell voltage response frame.
|
||
|
||
Per spec: frame[3] (the header length byte) = cell_count × 2.
|
||
The payload contains ONLY the voltage bytes — no count byte.
|
||
0+ Cell voltages uint16 BE each unit mV ÷1000 → V
|
||
"""
|
||
count = frame[3] // 2 # header length byte = N_cells × 2
|
||
p = frame[_HEADER_LEN:-_TRAILER_LEN]
|
||
voltages: list[float] = []
|
||
for i in range(count):
|
||
raw = struct.unpack_from(">H", p, i * 2)[0]
|
||
voltages.append(round(raw / 1000.0, 3))
|
||
return {"cell_voltages": voltages}
|