b8bee14839
The connect/disconnect-every-cycle approach caused ~50% failure rate over 48h — each of the ~2880 daily connection attempts per device had a significant chance of failure through ESPHome proxies. New model (same as the user's Android app): - Connect once, keep the connection alive across poll cycles - _ensure_connected() reconnects automatically if the link drops - _on_disconnect() callback detects unexpected disconnections - On timeout, force-disconnect so next cycle gets a clean reconnect - Polls now only send commands (no connection overhead) — expected completion in <1s instead of 10-25s Connection lifecycle: startup → first poll → _ensure_connected() → persistent drop detected → next poll → _ensure_connected() → reconnected shutdown → async_teardown() → disconnect() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
347 lines
13 KiB
Python
347 lines
13 KiB
Python
"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import struct
|
|
from collections.abc import Callable
|
|
|
|
from bleak import BleakError
|
|
from bleak.backends.device import BLEDevice
|
|
from bleak_retry_connector import establish_connection, BleakClientWithServiceCache
|
|
|
|
from .const import (
|
|
FRAME_END,
|
|
FRAME_START,
|
|
RX_CHAR_UUID,
|
|
TX_CHAR_UUID,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
# Full frame layout:
|
|
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
|
|
# Header = 4 bytes, trailer = 3 bytes (checksum x 2 + end marker)
|
|
_HEADER_LEN = 4
|
|
_TRAILER_LEN = 3
|
|
|
|
|
|
class BmsBluetoothHandler:
|
|
"""Protocol framing and parsing for a Xiaoxiang BMS device.
|
|
|
|
Holds a **persistent** BLE connection. The connection is established on
|
|
the first poll and kept alive across cycles. If it drops, the next poll
|
|
automatically reconnects. This avoids the massive overhead of
|
|
connect/disconnect every cycle through ESPHome BLE proxies.
|
|
"""
|
|
|
|
def __init__(self, address: str) -> None:
|
|
self._address = address
|
|
self._buffer = bytearray()
|
|
self._response_queue: asyncio.Queue[bytes] = asyncio.Queue()
|
|
self._client: BleakClientWithServiceCache | None = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Connection management
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return self._client is not None and self._client.is_connected
|
|
|
|
def _on_disconnect(self, _client) -> None:
|
|
"""Called by bleak when the connection drops unexpectedly."""
|
|
_LOGGER.debug("BMS %s disconnected", self._address)
|
|
self._client = None
|
|
|
|
async def _ensure_connected(
|
|
self,
|
|
ble_device: BLEDevice,
|
|
ble_device_callback: Callable[[], BLEDevice | None] | None = None,
|
|
) -> None:
|
|
"""Connect if not already connected."""
|
|
if self.is_connected:
|
|
return
|
|
|
|
# Clean up stale client
|
|
if self._client is not None:
|
|
try:
|
|
await self._client.disconnect()
|
|
except BleakError:
|
|
pass
|
|
self._client = None
|
|
|
|
self._reset()
|
|
|
|
_LOGGER.debug("Connecting to BMS %s", self._address)
|
|
client = await establish_connection(
|
|
BleakClientWithServiceCache,
|
|
ble_device,
|
|
self._address,
|
|
disconnected_callback=self._on_disconnect,
|
|
max_attempts=2,
|
|
ble_device_callback=ble_device_callback,
|
|
)
|
|
await client.start_notify(RX_CHAR_UUID, self._on_notify)
|
|
await asyncio.sleep(0.3)
|
|
self._client = client
|
|
_LOGGER.info("BMS %s connected", self._address)
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Explicitly close the connection (used during teardown)."""
|
|
if self._client is not None:
|
|
try:
|
|
await self._client.disconnect()
|
|
except BleakError:
|
|
pass
|
|
self._client = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _drain_queue(self) -> None:
|
|
"""Discard any stale frames left in the queue from a prior cycle."""
|
|
while not self._response_queue.empty():
|
|
try:
|
|
self._response_queue.get_nowait()
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
|
|
def _reset(self) -> None:
|
|
"""Clear all transient state."""
|
|
self._buffer.clear()
|
|
self._drain_queue()
|
|
|
|
# ------------------------------------------------------------------
|
|
# High-level poll
|
|
# ------------------------------------------------------------------
|
|
|
|
async def poll(
|
|
self,
|
|
ble_device: BLEDevice,
|
|
commands: list[bytes],
|
|
timeout: float = 3.0,
|
|
retries: int = 2,
|
|
ble_device_callback: Callable[[], BLEDevice | None] | None = None,
|
|
) -> list[bytes | None]:
|
|
"""Send each command over the persistent connection, return responses.
|
|
|
|
Connects automatically if not already connected. If the connection
|
|
drops mid-poll, raises BleakError so the coordinator can handle it.
|
|
"""
|
|
self._drain_queue()
|
|
|
|
await self._ensure_connected(ble_device, ble_device_callback)
|
|
|
|
return [
|
|
await self._request(self._client, cmd, timeout, retries)
|
|
for cmd in commands
|
|
]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Frame reception
|
|
# ------------------------------------------------------------------
|
|
|
|
def _on_notify(self, _char, data: bytearray) -> None:
|
|
"""Accumulate BLE notification chunks into complete protocol frames."""
|
|
self._buffer.extend(data)
|
|
|
|
# Discard leading garbage until we see a frame start byte
|
|
while self._buffer and self._buffer[0] != FRAME_START:
|
|
self._buffer.pop(0)
|
|
|
|
# Need at least the 4-byte header to know payload length
|
|
if len(self._buffer) < _HEADER_LEN:
|
|
return
|
|
|
|
payload_len = self._buffer[3]
|
|
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
|
|
|
|
if len(self._buffer) < expected_total:
|
|
return # still waiting for more chunks
|
|
|
|
frame = bytes(self._buffer[:expected_total])
|
|
del self._buffer[:expected_total]
|
|
|
|
if frame[-1] != FRAME_END:
|
|
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
|
|
return
|
|
|
|
if frame[2] != 0x00:
|
|
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
|
|
frame[2], frame[1])
|
|
return
|
|
|
|
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
|
|
self._response_queue.put_nowait(frame)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Request / response
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _request(
|
|
self,
|
|
client: BleakClientWithServiceCache,
|
|
command: bytes,
|
|
timeout: float,
|
|
retries: int,
|
|
) -> bytes | None:
|
|
"""Send one command and wait for the response frame, with retries."""
|
|
for attempt in range(1, retries + 1):
|
|
if not client.is_connected:
|
|
_LOGGER.warning("BMS %s connection lost during request", self._address)
|
|
self._client = None
|
|
return None
|
|
|
|
self._drain_queue()
|
|
self._buffer.clear()
|
|
|
|
try:
|
|
await client.write_gatt_char(TX_CHAR_UUID, command, response=True)
|
|
except BleakError:
|
|
if not client.is_connected:
|
|
self._client = None
|
|
return None
|
|
try:
|
|
await client.write_gatt_char(TX_CHAR_UUID, command, response=False)
|
|
except BleakError as exc:
|
|
_LOGGER.warning("BLE write failed (attempt %d/%d): %s",
|
|
attempt, retries, exc)
|
|
if attempt < retries:
|
|
await asyncio.sleep(0.3)
|
|
continue
|
|
|
|
try:
|
|
frame = await asyncio.wait_for(
|
|
self._response_queue.get(), timeout
|
|
)
|
|
return frame
|
|
except asyncio.TimeoutError:
|
|
_LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)",
|
|
command.hex(), attempt, retries)
|
|
if attempt < retries:
|
|
await asyncio.sleep(0.3)
|
|
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# MOS write command
|
|
# ------------------------------------------------------------------
|
|
|
|
async def write_mos(
|
|
self,
|
|
ble_device: BLEDevice,
|
|
value: int,
|
|
ble_device_callback: Callable[[], BLEDevice | None] | None = None,
|
|
) -> bool:
|
|
"""Send a MOS control write command over the persistent connection."""
|
|
self._drain_queue()
|
|
command = self._build_mos_command(value)
|
|
_LOGGER.debug("Writing MOS value 0x%02X to BMS at %s", value, self._address)
|
|
|
|
await self._ensure_connected(ble_device, ble_device_callback)
|
|
|
|
response = await self._request(self._client, command, timeout=3.0, retries=2)
|
|
return response is not None and response[2] == 0x00
|
|
|
|
@staticmethod
|
|
def _build_mos_command(value: int) -> bytes:
|
|
"""Build a MOS control write frame with correct checksum.
|
|
|
|
Frame: DD 5A E1 02 00 XX CHK_H CHK_L 77
|
|
Checked bytes (per spec): command_code + length + data bytes
|
|
= 0xE1 + 0x02 + 0x00 + XX
|
|
Checksum = two's complement of sum, high byte first.
|
|
"""
|
|
checked = [0xE1, 0x02, 0x00, value & 0xFF]
|
|
checksum = (~sum(checked) + 1) & 0xFFFF
|
|
return bytes([
|
|
0xDD, 0x5A, 0xE1, 0x02, 0x00, value & 0xFF,
|
|
(checksum >> 8) & 0xFF, checksum & 0xFF,
|
|
0x77,
|
|
])
|
|
|
|
# ------------------------------------------------------------------
|
|
# Frame parsers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def parse_general_info(frame: bytes) -> dict:
|
|
"""Parse a 0x03 general info response frame.
|
|
|
|
Payload byte offsets (frame[4] is payload[0]):
|
|
0-1 Total voltage uint16 BE /100 -> V
|
|
2-3 Current int16 BE /100 -> A (positive = charging)
|
|
4-5 Residual capacity uint16 BE /100 -> Ah
|
|
6-7 Nominal capacity uint16 BE /100 -> Ah
|
|
8-9 Cycle count uint16 BE
|
|
10-11 Production date (ignored)
|
|
12-15 Balance status (ignored)
|
|
16-17 Protection status (ignored)
|
|
18 Software version (ignored)
|
|
19 State of charge uint8 %
|
|
20 MOS status uint8
|
|
21 Cell count uint8
|
|
22 Temp probe count uint8
|
|
23+ Temperatures uint16 BE each (raw - 2731) / 10 -> C
|
|
"""
|
|
p = frame[_HEADER_LEN:-_TRAILER_LEN]
|
|
|
|
temp_count = p[22]
|
|
temperatures: list[float] = []
|
|
for i in range(temp_count):
|
|
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
|
|
temperatures.append(round((raw - 2731) / 10.0, 1))
|
|
|
|
balance = struct.unpack_from(">H", p, 12)[0] | struct.unpack_from(">H", p, 14)[0]
|
|
prot = struct.unpack_from(">H", p, 16)[0]
|
|
mos = p[20]
|
|
|
|
return {
|
|
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
|
|
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
|
|
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
|
|
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
|
|
"cycle_count": struct.unpack_from(">H", p, 8)[0],
|
|
"state_of_charge": p[19],
|
|
"cell_count": p[21],
|
|
"temperatures": temperatures,
|
|
"mos_charge_enabled": bool(mos & 0x01),
|
|
"mos_discharge_enabled": bool(mos & 0x02),
|
|
"balance_active": balance != 0,
|
|
"prot_cell_overvolt": bool(prot & (1 << 0)),
|
|
"prot_cell_undervolt": bool(prot & (1 << 1)),
|
|
"prot_pack_overvolt": bool(prot & (1 << 2)),
|
|
"prot_pack_undervolt": bool(prot & (1 << 3)),
|
|
"prot_charge_overtemp": bool(prot & (1 << 4)),
|
|
"prot_charge_undertemp": bool(prot & (1 << 5)),
|
|
"prot_discharge_overtemp": bool(prot & (1 << 6)),
|
|
"prot_discharge_undertemp": bool(prot & (1 << 7)),
|
|
"prot_charge_overcurrent": bool(prot & (1 << 8)),
|
|
"prot_discharge_overcurrent": bool(prot & (1 << 9)),
|
|
"prot_short_circuit": bool(prot & (1 << 10)),
|
|
"prot_frontend_ic_error": bool(prot & (1 << 11)),
|
|
"prot_software_lock": bool(prot & (1 << 12)),
|
|
}
|
|
|
|
@staticmethod
|
|
def parse_version(frame: bytes) -> str:
|
|
"""Parse a 0x05 hardware version response frame into an ASCII string."""
|
|
p = frame[_HEADER_LEN:-_TRAILER_LEN]
|
|
return p.decode("ascii", errors="replace").strip("\x00").strip()
|
|
|
|
@staticmethod
|
|
def parse_cell_info(frame: bytes) -> dict:
|
|
"""Parse a 0x04 cell voltage response frame.
|
|
|
|
Per spec: frame[3] (the header length byte) = cell_count x 2.
|
|
"""
|
|
count = frame[3] // 2
|
|
p = frame[_HEADER_LEN:-_TRAILER_LEN]
|
|
voltages: list[float] = []
|
|
for i in range(count):
|
|
raw = struct.unpack_from(">H", p, i * 2)[0]
|
|
voltages.append(round(raw / 1000.0, 3))
|
|
return {"cell_voltages": voltages}
|