Files
ha-xiaoxiang-bms/custom_components/xiaoxiang_bms/bluetooth_handler.py
T
Jannis b52b25973e Add MOS gate control via select entity
Adds a Select entity with four options (Normal / Charge Disabled /
Discharge Disabled / Both Disabled) that sends the 0xE1 write command
to the BMS and immediately refreshes sensor state.  Current option is
derived from the mos_charge_enabled / mos_discharge_enabled bits
already parsed on every poll.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 20:46:10 +02:00

298 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import asyncio
import logging
import struct
from bleak import BleakClient, BleakError
from bleak.backends.device import BLEDevice
from .const import (
FRAME_END,
FRAME_START,
RX_CHAR_UUID,
TX_CHAR_UUID,
)
_LOGGER = logging.getLogger(__name__)
# Full frame layout:
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker)
_HEADER_LEN = 4
_TRAILER_LEN = 3
class BmsBluetoothHandler:
"""Protocol framing and parsing for a Xiaoxiang BMS device.
Designed for a connect → poll → disconnect pattern: the BMS only allows
one simultaneous BLE connection, so we hold it only for the duration of
a single data fetch and release it immediately after.
"""
def __init__(self, address: str) -> None:
self._address = address
self._buffer = bytearray()
self._response_event = asyncio.Event()
self._response_data: bytes | None = None
self._lock = asyncio.Lock()
# ------------------------------------------------------------------
# High-level poll — the only entry point the coordinator needs
# ------------------------------------------------------------------
async def poll(
self,
ble_device: BLEDevice,
commands: list[bytes],
timeout: float = 3.0,
retries: int = 3,
) -> list[bytes | None]:
"""Connect, send each command in sequence, disconnect.
The BMS only supports a single BLE connection at a time. By connecting
only during the active read window and disconnecting immediately after,
the mobile app (or any other client) can connect freely between polls.
"""
_LOGGER.debug("Polling BMS at %s", self._address)
client = BleakClient(ble_device)
try:
await client.connect()
await client.start_notify(RX_CHAR_UUID, self._on_notify)
# Give the BMS a moment to register the subscription before
# we start sending commands
await asyncio.sleep(0.3)
return [
await self._request(client, cmd, timeout, retries)
for cmd in commands
]
finally:
try:
await client.disconnect()
except Exception:
pass
self._buffer.clear()
# ------------------------------------------------------------------
# Frame reception
# ------------------------------------------------------------------
def _on_notify(self, _char, data: bytearray) -> None:
"""Accumulate BLE notification chunks into complete protocol frames.
BLE max payload is 20 bytes (default MTU), so a single BMS frame
(up to ~50 bytes for 16 cells) arrives across several notifications.
We buffer until we can calculate and verify the expected frame length.
"""
self._buffer.extend(data)
# Discard leading garbage until we see a frame start byte
while self._buffer and self._buffer[0] != FRAME_START:
self._buffer.pop(0)
# Need at least the 4-byte header to know payload length
if len(self._buffer) < _HEADER_LEN:
return
payload_len = self._buffer[3]
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
if len(self._buffer) < expected_total:
return # still waiting for more chunks
frame = bytes(self._buffer[:expected_total])
del self._buffer[:expected_total]
if frame[-1] != FRAME_END:
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
return
if frame[2] != 0x00:
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
frame[2], frame[1])
return
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
self._response_data = frame
self._response_event.set()
# ------------------------------------------------------------------
# Request / response (private — used inside poll())
# ------------------------------------------------------------------
async def _request(
self,
client: BleakClient,
command: bytes,
timeout: float,
retries: int,
) -> bytes | None:
"""Send one command and wait for the response frame, with retries.
Tries Write With Response first; falls back to Write Without Response
if the characteristic rejects it — covers both BMS firmware variants.
"""
async with self._lock:
for attempt in range(1, retries + 1):
self._response_event.clear()
self._response_data = None
try:
await client.write_gatt_char(TX_CHAR_UUID, command, response=True)
except BleakError:
try:
await client.write_gatt_char(TX_CHAR_UUID, command, response=False)
except BleakError as exc:
_LOGGER.error("BLE write failed (attempt %d/%d): %s",
attempt, retries, exc)
if attempt < retries:
await asyncio.sleep(0.3)
continue
try:
await asyncio.wait_for(self._response_event.wait(), timeout)
return self._response_data
except asyncio.TimeoutError:
_LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)",
command.hex(), attempt, retries)
if attempt < retries:
await asyncio.sleep(0.3)
return None
# ------------------------------------------------------------------
# MOS write command
# ------------------------------------------------------------------
async def write_mos(self, ble_device: BLEDevice, value: int) -> bool:
"""Send a MOS control write command and return True on ACK.
Follows the same connect → send → disconnect pattern as poll() so
it doesn't interfere with the normal poll cycle.
"""
command = self._build_mos_command(value)
_LOGGER.debug("Writing MOS value 0x%02X to BMS at %s", value, self._address)
client = BleakClient(ble_device)
try:
await client.connect()
await client.start_notify(RX_CHAR_UUID, self._on_notify)
await asyncio.sleep(0.5)
response = await self._request(client, command, timeout=3.0, retries=2)
# Response: DD E1 00 00 CHK_H CHK_L 77 (status byte 0x00 = OK)
return response is not None and response[2] == 0x00
finally:
try:
await client.disconnect()
except Exception:
pass
self._buffer.clear()
@staticmethod
def _build_mos_command(value: int) -> bytes:
"""Build a MOS control write frame with correct checksum.
Frame: DD 5A E1 02 00 XX CHK_H CHK_L 77
Checked bytes (per spec): command_code + length + data bytes
= 0xE1 + 0x02 + 0x00 + XX
Checksum = two's complement of sum, high byte first.
Verified against spec example:
XX=0x02 → sum=0xE5 → ~0xE5+1=0xFF1B → CHK FF 1B ✓
"""
checked = [0xE1, 0x02, 0x00, value & 0xFF]
checksum = (~sum(checked) + 1) & 0xFFFF
return bytes([
0xDD, 0x5A, 0xE1, 0x02, 0x00, value & 0xFF,
(checksum >> 8) & 0xFF, checksum & 0xFF,
0x77,
])
# ------------------------------------------------------------------
# Frame parsers
# ------------------------------------------------------------------
@staticmethod
def parse_general_info(frame: bytes) -> dict:
"""Parse a 0x03 general info response frame.
Payload byte offsets (frame[4] is payload[0]):
0-1 Total voltage uint16 BE ÷100 → V
2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging)
4-5 Residual capacity uint16 BE ÷100 → Ah
6-7 Nominal capacity uint16 BE ÷100 → Ah
8-9 Cycle count uint16 BE
10-11 Production date (ignored)
12-15 Balance status (ignored)
16-17 Protection status (ignored)
18 Software version (ignored)
19 State of charge uint8 %
20 MOS status uint8
21 Cell count uint8
22 Temp probe count uint8
23+ Temperatures uint16 BE each (raw 2731) ÷ 10 → °C
"""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
temp_count = p[22]
temperatures: list[float] = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temperatures.append(round((raw - 2731) / 10.0, 1))
balance = struct.unpack_from(">H", p, 12)[0] | struct.unpack_from(">H", p, 14)[0]
prot = struct.unpack_from(">H", p, 16)[0]
mos = p[20]
return {
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temperatures,
# MOS status
"mos_charge_enabled": bool(mos & 0x01),
"mos_discharge_enabled": bool(mos & 0x02),
# Cell balancing (any cell currently balancing)
"balance_active": balance != 0,
# Protection flags (bit per event, True = protection triggered)
"prot_cell_overvolt": bool(prot & (1 << 0)),
"prot_cell_undervolt": bool(prot & (1 << 1)),
"prot_pack_overvolt": bool(prot & (1 << 2)),
"prot_pack_undervolt": bool(prot & (1 << 3)),
"prot_charge_overtemp": bool(prot & (1 << 4)),
"prot_charge_undertemp": bool(prot & (1 << 5)),
"prot_discharge_overtemp": bool(prot & (1 << 6)),
"prot_discharge_undertemp": bool(prot & (1 << 7)),
"prot_charge_overcurrent": bool(prot & (1 << 8)),
"prot_discharge_overcurrent": bool(prot & (1 << 9)),
"prot_short_circuit": bool(prot & (1 << 10)),
"prot_frontend_ic_error": bool(prot & (1 << 11)),
"prot_software_lock": bool(prot & (1 << 12)),
}
@staticmethod
def parse_version(frame: bytes) -> str:
"""Parse a 0x05 hardware version response frame into an ASCII string."""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
return p.decode("ascii", errors="replace").strip("\x00").strip()
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
"""Parse a 0x04 cell voltage response frame.
Per spec: frame[3] (the header length byte) = cell_count × 2.
The payload contains ONLY the voltage bytes — no count byte.
0+ Cell voltages uint16 BE each unit mV ÷1000 → V
"""
count = frame[3] // 2 # header length byte = N_cells × 2
p = frame[_HEADER_LEN:-_TRAILER_LEN]
voltages: list[float] = []
for i in range(count):
raw = struct.unpack_from(">H", p, i * 2)[0]
voltages.append(round(raw / 1000.0, 3))
return {"cell_voltages": voltages}