Files
ha-xiaoxiang-bms/custom_components/xiaoxiang_bms/bluetooth_handler.py
T
Jannis 5d527168e2 Full protocol coverage: binary sensors, energy, hardware version
Sensors added:
- energy_stored (kWh = V × Ah / 1000) for energy dashboard

Binary sensors added (all from existing 0x03 frame, no extra BLE requests):
- Charge MOSFET / Discharge MOSFET (MOS gate status)
- Cell Balancing (any balance bit active)
- 13× protection flags: cell/pack over/under-voltage, charge/discharge
  over/under-temperature, charge/discharge over-current, short circuit,
  frontend IC error, software lock

Other:
- Hardware version string fetched once via CMD 0x05, shown in device card
- DeviceInfo centralised on coordinator (sensor + binary_sensor share it)
- CONF_ADDRESS removed from sensor.py (coordinator holds address)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 19:52:10 +02:00

230 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import asyncio
import logging
import struct
from bleak import BleakClient, BleakError
from bleak.backends.device import BLEDevice
from .const import (
FRAME_END,
FRAME_START,
RX_CHAR_UUID,
TX_CHAR_UUID,
)
_LOGGER = logging.getLogger(__name__)
# Full frame layout:
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker)
_HEADER_LEN = 4
_TRAILER_LEN = 3
class BmsBluetoothHandler:
"""Manages BLE connection and protocol framing for a Xiaoxiang BMS device."""
def __init__(self, address: str) -> None:
self._address = address
self._client: BleakClient | None = None
self._buffer = bytearray()
self._response_event = asyncio.Event()
self._response_data: bytes | None = None
self._lock = asyncio.Lock()
# ------------------------------------------------------------------
# Connection management
# ------------------------------------------------------------------
@property
def is_connected(self) -> bool:
return self._client is not None and self._client.is_connected
async def connect(self, ble_device: BLEDevice) -> None:
"""Open BLE connection and start notifications.
Accepts a BLEDevice resolved by HA's Bluetooth subsystem so that
ESPHome BLE proxies are used transparently alongside local adapters.
"""
if self.is_connected:
return
_LOGGER.debug("Connecting to BMS at %s (via %s)", self._address, ble_device.name)
self._client = BleakClient(
ble_device,
disconnected_callback=self._on_disconnect,
)
await self._client.connect()
await self._client.start_notify(RX_CHAR_UUID, self._on_notify)
# Give the BMS a moment to register the notification subscription
# before we start sending commands — avoids dropped first response
await asyncio.sleep(0.5)
_LOGGER.debug("Connected to BMS at %s", self._address)
async def disconnect(self) -> None:
"""Close the BLE connection cleanly."""
if self._client:
try:
await self._client.disconnect()
except Exception:
pass
self._client = None
def _on_disconnect(self, _client: BleakClient) -> None:
_LOGGER.debug("BMS at %s disconnected", self._address)
self._client = None
# ------------------------------------------------------------------
# Frame reception
# ------------------------------------------------------------------
def _on_notify(self, _char, data: bytearray) -> None:
"""Accumulate BLE notification chunks into complete protocol frames.
BLE max payload is 20 bytes (default MTU), so a single BMS frame
(up to ~50 bytes for 16 cells) arrives across several notifications.
We buffer until we can calculate and verify the expected frame length.
"""
self._buffer.extend(data)
# Discard leading garbage until we see a frame start byte
while self._buffer and self._buffer[0] != FRAME_START:
self._buffer.pop(0)
# Need at least the 4-byte header to know payload length
if len(self._buffer) < _HEADER_LEN:
return
payload_len = self._buffer[3]
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
if len(self._buffer) < expected_total:
return # still waiting for more chunks
frame = bytes(self._buffer[:expected_total])
del self._buffer[:expected_total]
if frame[-1] != FRAME_END:
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
return
if frame[2] != 0x00:
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
frame[2], frame[1])
return
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
self._response_data = frame
self._response_event.set()
# ------------------------------------------------------------------
# Request / response
# ------------------------------------------------------------------
async def request(self, command: bytes, timeout: float = 5.0) -> bytes | None:
"""Send a command frame and wait for the corresponding response frame."""
async with self._lock:
self._response_event.clear()
self._response_data = None
try:
await self._client.write_gatt_char(TX_CHAR_UUID, command, response=True)
except BleakError as exc:
_LOGGER.error("BLE write failed: %s", exc)
return None
try:
await asyncio.wait_for(self._response_event.wait(), timeout)
return self._response_data
except asyncio.TimeoutError:
_LOGGER.warning("BMS response timeout (cmd=%s)", command.hex())
return None
# ------------------------------------------------------------------
# Frame parsers
# ------------------------------------------------------------------
@staticmethod
def parse_general_info(frame: bytes) -> dict:
"""Parse a 0x03 general info response frame.
Payload byte offsets (frame[4] is payload[0]):
0-1 Total voltage uint16 BE ÷100 → V
2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging)
4-5 Residual capacity uint16 BE ÷100 → Ah
6-7 Nominal capacity uint16 BE ÷100 → Ah
8-9 Cycle count uint16 BE
10-11 Production date (ignored)
12-15 Balance status (ignored)
16-17 Protection status (ignored)
18 Software version (ignored)
19 State of charge uint8 %
20 MOS status uint8
21 Cell count uint8
22 Temp probe count uint8
23+ Temperatures uint16 BE each (raw 2731) ÷ 10 → °C
"""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
temp_count = p[22]
temperatures: list[float] = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temperatures.append(round((raw - 2731) / 10.0, 1))
balance = struct.unpack_from(">H", p, 12)[0] | struct.unpack_from(">H", p, 14)[0]
prot = struct.unpack_from(">H", p, 16)[0]
mos = p[20]
return {
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temperatures,
# MOS status
"mos_charge_enabled": bool(mos & 0x01),
"mos_discharge_enabled": bool(mos & 0x02),
# Cell balancing (any cell currently balancing)
"balance_active": balance != 0,
# Protection flags (bit per event, True = protection triggered)
"prot_cell_overvolt": bool(prot & (1 << 0)),
"prot_cell_undervolt": bool(prot & (1 << 1)),
"prot_pack_overvolt": bool(prot & (1 << 2)),
"prot_pack_undervolt": bool(prot & (1 << 3)),
"prot_charge_overtemp": bool(prot & (1 << 4)),
"prot_charge_undertemp": bool(prot & (1 << 5)),
"prot_discharge_overtemp": bool(prot & (1 << 6)),
"prot_discharge_undertemp": bool(prot & (1 << 7)),
"prot_charge_overcurrent": bool(prot & (1 << 8)),
"prot_discharge_overcurrent": bool(prot & (1 << 9)),
"prot_short_circuit": bool(prot & (1 << 10)),
"prot_frontend_ic_error": bool(prot & (1 << 11)),
"prot_software_lock": bool(prot & (1 << 12)),
}
@staticmethod
def parse_version(frame: bytes) -> str:
"""Parse a 0x05 hardware version response frame into an ASCII string."""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
return p.decode("ascii", errors="replace").strip("\x00").strip()
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
"""Parse a 0x04 cell voltage response frame.
Per spec: frame[3] (the header length byte) = cell_count × 2.
The payload contains ONLY the voltage bytes — no count byte.
0+ Cell voltages uint16 BE each unit mV ÷1000 → V
"""
count = frame[3] // 2 # header length byte = N_cells × 2
p = frame[_HEADER_LEN:-_TRAILER_LEN]
voltages: list[float] = []
for i in range(count):
raw = struct.unpack_from(">H", p, i * 2)[0]
voltages.append(round(raw / 1000.0, 3))
return {"cell_voltages": voltages}