Files
ha-xiaoxiang-bms/custom_components/xiaoxiang_bms/bluetooth_handler.py
T
Jannis 5cee4bd543 Fix BMS not responding to commands
- Use write_gatt_char with response=True (Write With Response) — many BMS
  firmwares ignore Write Without Response packets
- Add 0.5s delay after start_notify before sending first command to avoid
  race where BMS hasn't registered the notification subscription yet

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 19:43:38 +02:00

201 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import asyncio
import logging
import struct
from bleak import BleakClient, BleakError
from bleak.backends.device import BLEDevice
from .const import (
FRAME_END,
FRAME_START,
RX_CHAR_UUID,
TX_CHAR_UUID,
)
_LOGGER = logging.getLogger(__name__)
# Full frame layout:
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker)
_HEADER_LEN = 4
_TRAILER_LEN = 3
class BmsBluetoothHandler:
"""Manages BLE connection and protocol framing for a Xiaoxiang BMS device."""
def __init__(self, address: str) -> None:
self._address = address
self._client: BleakClient | None = None
self._buffer = bytearray()
self._response_event = asyncio.Event()
self._response_data: bytes | None = None
self._lock = asyncio.Lock()
# ------------------------------------------------------------------
# Connection management
# ------------------------------------------------------------------
@property
def is_connected(self) -> bool:
return self._client is not None and self._client.is_connected
async def connect(self, ble_device: BLEDevice) -> None:
"""Open BLE connection and start notifications.
Accepts a BLEDevice resolved by HA's Bluetooth subsystem so that
ESPHome BLE proxies are used transparently alongside local adapters.
"""
if self.is_connected:
return
_LOGGER.debug("Connecting to BMS at %s (via %s)", self._address, ble_device.name)
self._client = BleakClient(
ble_device,
disconnected_callback=self._on_disconnect,
)
await self._client.connect()
await self._client.start_notify(RX_CHAR_UUID, self._on_notify)
# Give the BMS a moment to register the notification subscription
# before we start sending commands — avoids dropped first response
await asyncio.sleep(0.5)
_LOGGER.debug("Connected to BMS at %s", self._address)
async def disconnect(self) -> None:
"""Close the BLE connection cleanly."""
if self._client:
try:
await self._client.disconnect()
except Exception:
pass
self._client = None
def _on_disconnect(self, _client: BleakClient) -> None:
_LOGGER.debug("BMS at %s disconnected", self._address)
self._client = None
# ------------------------------------------------------------------
# Frame reception
# ------------------------------------------------------------------
def _on_notify(self, _char, data: bytearray) -> None:
"""Accumulate BLE notification chunks into complete protocol frames.
BLE max payload is 20 bytes (default MTU), so a single BMS frame
(up to ~50 bytes for 16 cells) arrives across several notifications.
We buffer until we can calculate and verify the expected frame length.
"""
self._buffer.extend(data)
# Discard leading garbage until we see a frame start byte
while self._buffer and self._buffer[0] != FRAME_START:
self._buffer.pop(0)
# Need at least the 4-byte header to know payload length
if len(self._buffer) < _HEADER_LEN:
return
payload_len = self._buffer[3]
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
if len(self._buffer) < expected_total:
return # still waiting for more chunks
frame = bytes(self._buffer[:expected_total])
del self._buffer[:expected_total]
if frame[-1] != FRAME_END:
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
return
if frame[2] != 0x00:
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
frame[2], frame[1])
return
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
self._response_data = frame
self._response_event.set()
# ------------------------------------------------------------------
# Request / response
# ------------------------------------------------------------------
async def request(self, command: bytes, timeout: float = 5.0) -> bytes | None:
"""Send a command frame and wait for the corresponding response frame."""
async with self._lock:
self._response_event.clear()
self._response_data = None
try:
await self._client.write_gatt_char(TX_CHAR_UUID, command, response=True)
except BleakError as exc:
_LOGGER.error("BLE write failed: %s", exc)
return None
try:
await asyncio.wait_for(self._response_event.wait(), timeout)
return self._response_data
except asyncio.TimeoutError:
_LOGGER.warning("BMS response timeout (cmd=%s)", command.hex())
return None
# ------------------------------------------------------------------
# Frame parsers
# ------------------------------------------------------------------
@staticmethod
def parse_general_info(frame: bytes) -> dict:
"""Parse a 0x03 general info response frame.
Payload byte offsets (frame[4] is payload[0]):
0-1 Total voltage uint16 BE ÷100 → V
2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging)
4-5 Residual capacity uint16 BE ÷100 → Ah
6-7 Nominal capacity uint16 BE ÷100 → Ah
8-9 Cycle count uint16 BE
10-11 Production date (ignored)
12-15 Balance status (ignored)
16-17 Protection status (ignored)
18 Software version (ignored)
19 State of charge uint8 %
20 MOS status uint8
21 Cell count uint8
22 Temp probe count uint8
23+ Temperatures uint16 BE each (raw 2731) ÷ 10 → °C
"""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
temp_count = p[22]
temperatures: list[float] = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temperatures.append(round((raw - 2731) / 10.0, 1))
return {
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temperatures,
}
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
"""Parse a 0x04 cell voltage response frame.
Per spec: frame[3] (the header length byte) = cell_count × 2.
The payload contains ONLY the voltage bytes — no count byte.
0+ Cell voltages uint16 BE each unit mV ÷1000 → V
"""
count = frame[3] // 2 # header length byte = N_cells × 2
p = frame[_HEADER_LEN:-_TRAILER_LEN]
voltages: list[float] = []
for i in range(count):
raw = struct.unpack_from(">H", p, i * 2)[0]
voltages.append(round(raw / 1000.0, 3))
return {"cell_voltages": voltages}