Files
ha-xiaoxiang-bms/custom_components/xiaoxiang_bms/bluetooth_handler.py
T
Jannis e1e6d69d2c Use BLE device name, add hard poll timeout, tighten request timing
- Device name: coordinator now takes name=entry.title so the HA device card
  shows the actual BLE advertised name instead of "Xiaoxiang Smart BMS"
- Hard poll timeout: each poll is capped at (poll_interval - 3)s via
  asyncio.wait_for so a stalled poll can't bleed into the next cycle
- Request timeout: 5s → 3s (BMS should reply in <1s under normal conditions)
- Retry delay: 0.5s → 0.3s between retries

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 20:21:37 +02:00

251 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import asyncio
import logging
import struct
from bleak import BleakClient, BleakError
from bleak.backends.device import BLEDevice
from .const import (
FRAME_END,
FRAME_START,
RX_CHAR_UUID,
TX_CHAR_UUID,
)
_LOGGER = logging.getLogger(__name__)
# Full frame layout:
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker)
_HEADER_LEN = 4
_TRAILER_LEN = 3
class BmsBluetoothHandler:
"""Protocol framing and parsing for a Xiaoxiang BMS device.
Designed for a connect → poll → disconnect pattern: the BMS only allows
one simultaneous BLE connection, so we hold it only for the duration of
a single data fetch and release it immediately after.
"""
def __init__(self, address: str) -> None:
self._address = address
self._buffer = bytearray()
self._response_event = asyncio.Event()
self._response_data: bytes | None = None
self._lock = asyncio.Lock()
# ------------------------------------------------------------------
# High-level poll — the only entry point the coordinator needs
# ------------------------------------------------------------------
async def poll(
self,
ble_device: BLEDevice,
commands: list[bytes],
timeout: float = 3.0,
retries: int = 3,
) -> list[bytes | None]:
"""Connect, send each command in sequence, disconnect.
The BMS only supports a single BLE connection at a time. By connecting
only during the active read window and disconnecting immediately after,
the mobile app (or any other client) can connect freely between polls.
"""
_LOGGER.debug("Polling BMS at %s", self._address)
client = BleakClient(ble_device)
try:
await client.connect()
await client.start_notify(RX_CHAR_UUID, self._on_notify)
# Give the BMS a moment to register the subscription before
# we start sending commands
await asyncio.sleep(0.3)
return [
await self._request(client, cmd, timeout, retries)
for cmd in commands
]
finally:
try:
await client.disconnect()
except Exception:
pass
self._buffer.clear()
# ------------------------------------------------------------------
# Frame reception
# ------------------------------------------------------------------
def _on_notify(self, _char, data: bytearray) -> None:
"""Accumulate BLE notification chunks into complete protocol frames.
BLE max payload is 20 bytes (default MTU), so a single BMS frame
(up to ~50 bytes for 16 cells) arrives across several notifications.
We buffer until we can calculate and verify the expected frame length.
"""
self._buffer.extend(data)
# Discard leading garbage until we see a frame start byte
while self._buffer and self._buffer[0] != FRAME_START:
self._buffer.pop(0)
# Need at least the 4-byte header to know payload length
if len(self._buffer) < _HEADER_LEN:
return
payload_len = self._buffer[3]
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
if len(self._buffer) < expected_total:
return # still waiting for more chunks
frame = bytes(self._buffer[:expected_total])
del self._buffer[:expected_total]
if frame[-1] != FRAME_END:
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
return
if frame[2] != 0x00:
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
frame[2], frame[1])
return
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
self._response_data = frame
self._response_event.set()
# ------------------------------------------------------------------
# Request / response (private — used inside poll())
# ------------------------------------------------------------------
async def _request(
self,
client: BleakClient,
command: bytes,
timeout: float,
retries: int,
) -> bytes | None:
"""Send one command and wait for the response frame, with retries.
Tries Write With Response first; falls back to Write Without Response
if the characteristic rejects it — covers both BMS firmware variants.
"""
async with self._lock:
for attempt in range(1, retries + 1):
self._response_event.clear()
self._response_data = None
try:
await client.write_gatt_char(TX_CHAR_UUID, command, response=True)
except BleakError:
try:
await client.write_gatt_char(TX_CHAR_UUID, command, response=False)
except BleakError as exc:
_LOGGER.error("BLE write failed (attempt %d/%d): %s",
attempt, retries, exc)
if attempt < retries:
await asyncio.sleep(0.3)
continue
try:
await asyncio.wait_for(self._response_event.wait(), timeout)
return self._response_data
except asyncio.TimeoutError:
_LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)",
command.hex(), attempt, retries)
if attempt < retries:
await asyncio.sleep(0.3)
return None
# ------------------------------------------------------------------
# Frame parsers
# ------------------------------------------------------------------
@staticmethod
def parse_general_info(frame: bytes) -> dict:
"""Parse a 0x03 general info response frame.
Payload byte offsets (frame[4] is payload[0]):
0-1 Total voltage uint16 BE ÷100 → V
2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging)
4-5 Residual capacity uint16 BE ÷100 → Ah
6-7 Nominal capacity uint16 BE ÷100 → Ah
8-9 Cycle count uint16 BE
10-11 Production date (ignored)
12-15 Balance status (ignored)
16-17 Protection status (ignored)
18 Software version (ignored)
19 State of charge uint8 %
20 MOS status uint8
21 Cell count uint8
22 Temp probe count uint8
23+ Temperatures uint16 BE each (raw 2731) ÷ 10 → °C
"""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
temp_count = p[22]
temperatures: list[float] = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temperatures.append(round((raw - 2731) / 10.0, 1))
balance = struct.unpack_from(">H", p, 12)[0] | struct.unpack_from(">H", p, 14)[0]
prot = struct.unpack_from(">H", p, 16)[0]
mos = p[20]
return {
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temperatures,
# MOS status
"mos_charge_enabled": bool(mos & 0x01),
"mos_discharge_enabled": bool(mos & 0x02),
# Cell balancing (any cell currently balancing)
"balance_active": balance != 0,
# Protection flags (bit per event, True = protection triggered)
"prot_cell_overvolt": bool(prot & (1 << 0)),
"prot_cell_undervolt": bool(prot & (1 << 1)),
"prot_pack_overvolt": bool(prot & (1 << 2)),
"prot_pack_undervolt": bool(prot & (1 << 3)),
"prot_charge_overtemp": bool(prot & (1 << 4)),
"prot_charge_undertemp": bool(prot & (1 << 5)),
"prot_discharge_overtemp": bool(prot & (1 << 6)),
"prot_discharge_undertemp": bool(prot & (1 << 7)),
"prot_charge_overcurrent": bool(prot & (1 << 8)),
"prot_discharge_overcurrent": bool(prot & (1 << 9)),
"prot_short_circuit": bool(prot & (1 << 10)),
"prot_frontend_ic_error": bool(prot & (1 << 11)),
"prot_software_lock": bool(prot & (1 << 12)),
}
@staticmethod
def parse_version(frame: bytes) -> str:
"""Parse a 0x05 hardware version response frame into an ASCII string."""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
return p.decode("ascii", errors="replace").strip("\x00").strip()
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
"""Parse a 0x04 cell voltage response frame.
Per spec: frame[3] (the header length byte) = cell_count × 2.
The payload contains ONLY the voltage bytes — no count byte.
0+ Cell voltages uint16 BE each unit mV ÷1000 → V
"""
count = frame[3] // 2 # header length byte = N_cells × 2
p = frame[_HEADER_LEN:-_TRAILER_LEN]
voltages: list[float] = []
for i in range(count):
raw = struct.unpack_from(">H", p, i * 2)[0]
voltages.append(round(raw / 1000.0, 3))
return {"cell_voltages": voltages}