From 1520ed3c0fbd998bac02027bb7bdd6afa63dd7e2 Mon Sep 17 00:00:00 2001 From: Jannis Christiani Date: Sun, 12 Apr 2026 09:36:08 +0200 Subject: [PATCH] Refactor BLE layer for 24/7 reliability - Replace raw BleakClient with establish_connection from bleak-retry-connector (retries, GATT service cache, proxy-aware) - Replace fragile asyncio.Event with asyncio.Queue for response frames, drain stale data on each connection to prevent cross-cycle leakage - Register BLE advertisement callback to keep BLEDevice reference fresh across ESPHome proxy path changes - Remove asyncio.sleep(2) device lookup hack - Increase poll timeout floor from 10s to 20s - Increase failure tolerance from 3 to 5 consecutive misses - Bump default poll interval to 30s, min to 15s (halves connection churn) Co-Authored-By: Claude Opus 4.6 --- custom_components/xiaoxiang_bms/__init__.py | 29 +++- .../xiaoxiang_bms/bluetooth_handler.py | 133 +++++++++++------- custom_components/xiaoxiang_bms/const.py | 4 +- .../xiaoxiang_bms/coordinator.py | 37 +++-- 4 files changed, 131 insertions(+), 72 deletions(-) diff --git a/custom_components/xiaoxiang_bms/__init__.py b/custom_components/xiaoxiang_bms/__init__.py index 704ba7c..a050d9f 100644 --- a/custom_components/xiaoxiang_bms/__init__.py +++ b/custom_components/xiaoxiang_bms/__init__.py @@ -1,8 +1,15 @@ """Xiaoxiang Smart BMS — Home Assistant integration.""" from __future__ import annotations +from homeassistant.components.bluetooth import ( + BluetoothChange, + BluetoothScanningMode, + BluetoothServiceInfoBleak, + async_register_callback, +) +from homeassistant.components.bluetooth.match import ADDRESS, BluetoothCallbackMatcher from homeassistant.config_entries import ConfigEntry -from homeassistant.core import HomeAssistant +from homeassistant.core import HomeAssistant, callback from .const import CONF_ADDRESS, CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL, DOMAIN from .coordinator import BmsCoordinator @@ -16,13 +23,31 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: poll_interval = entry.options.get(CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL) coordinator = BmsCoordinator(hass, address, poll_interval, name=entry.title) + + # Keep the coordinator's BLE device reference fresh via advertisement callback. + # This avoids stale transport paths when ESPHome proxies cycle. + @callback + def _async_update_ble( + service_info: BluetoothServiceInfoBleak, + change: BluetoothChange, + ) -> None: + coordinator._ble_device = service_info.device + + entry.async_on_unload( + async_register_callback( + hass, + _async_update_ble, + BluetoothCallbackMatcher({ADDRESS: address}), + BluetoothScanningMode.PASSIVE, + ) + ) + await coordinator.async_setup() await coordinator.async_config_entry_first_refresh() hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) - # Reload the entry when options (e.g. poll interval) change entry.async_on_unload(entry.add_update_listener(_async_update_listener)) return True diff --git a/custom_components/xiaoxiang_bms/bluetooth_handler.py b/custom_components/xiaoxiang_bms/bluetooth_handler.py index bfc7eaf..c253744 100644 --- a/custom_components/xiaoxiang_bms/bluetooth_handler.py +++ b/custom_components/xiaoxiang_bms/bluetooth_handler.py @@ -5,8 +5,9 @@ import asyncio import logging import struct -from bleak import BleakClient, BleakError +from bleak import BleakError from bleak.backends.device import BLEDevice +from bleak_retry_connector import establish_connection, BleakClientWithServiceCache from .const import ( FRAME_END, @@ -19,7 +20,7 @@ _LOGGER = logging.getLogger(__name__) # Full frame layout: # [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77] -# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker) +# Header = 4 bytes, trailer = 3 bytes (checksum x 2 + end marker) _HEADER_LEN = 4 _TRAILER_LEN = 3 @@ -27,7 +28,7 @@ _TRAILER_LEN = 3 class BmsBluetoothHandler: """Protocol framing and parsing for a Xiaoxiang BMS device. - Designed for a connect → poll → disconnect pattern: the BMS only allows + Designed for a connect -> poll -> disconnect pattern: the BMS only allows one simultaneous BLE connection, so we hold it only for the duration of a single data fetch and release it immediately after. """ @@ -35,9 +36,24 @@ class BmsBluetoothHandler: def __init__(self, address: str) -> None: self._address = address self._buffer = bytearray() - self._response_event = asyncio.Event() - self._response_data: bytes | None = None - self._lock = asyncio.Lock() + self._response_queue: asyncio.Queue[bytes] = asyncio.Queue() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _drain_queue(self) -> None: + """Discard any stale frames left in the queue from a prior cycle.""" + while not self._response_queue.empty(): + try: + self._response_queue.get_nowait() + except asyncio.QueueEmpty: + break + + def _reset(self) -> None: + """Clear all transient state before a new connection.""" + self._buffer.clear() + self._drain_queue() # ------------------------------------------------------------------ # High-level poll — the only entry point the coordinator needs @@ -47,7 +63,7 @@ class BmsBluetoothHandler: self, ble_device: BLEDevice, commands: list[bytes], - timeout: float = 3.0, + timeout: float = 5.0, retries: int = 3, ) -> list[bytes | None]: """Connect, send each command in sequence, disconnect. @@ -56,13 +72,17 @@ class BmsBluetoothHandler: only during the active read window and disconnecting immediately after, the mobile app (or any other client) can connect freely between polls. """ + self._reset() _LOGGER.debug("Polling BMS at %s", self._address) - client = BleakClient(ble_device) + + client = await establish_connection( + BleakClientWithServiceCache, + ble_device, + self._address, + max_attempts=3, + ) try: - await client.connect() await client.start_notify(RX_CHAR_UUID, self._on_notify) - # Give the BMS a moment to register the subscription before - # we start sending commands await asyncio.sleep(0.3) return [ await self._request(client, cmd, timeout, retries) @@ -71,9 +91,8 @@ class BmsBluetoothHandler: finally: try: await client.disconnect() - except Exception: + except BleakError: pass - self._buffer.clear() # ------------------------------------------------------------------ # Frame reception @@ -115,8 +134,7 @@ class BmsBluetoothHandler: return _LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len) - self._response_data = frame - self._response_event.set() + self._response_queue.put_nowait(frame) # ------------------------------------------------------------------ # Request / response (private — used inside poll()) @@ -124,7 +142,7 @@ class BmsBluetoothHandler: async def _request( self, - client: BleakClient, + client: BleakClientWithServiceCache, command: bytes, timeout: float, retries: int, @@ -134,30 +152,33 @@ class BmsBluetoothHandler: Tries Write With Response first; falls back to Write Without Response if the characteristic rejects it — covers both BMS firmware variants. """ - async with self._lock: - for attempt in range(1, retries + 1): - self._response_event.clear() - self._response_data = None - try: - await client.write_gatt_char(TX_CHAR_UUID, command, response=True) - except BleakError: - try: - await client.write_gatt_char(TX_CHAR_UUID, command, response=False) - except BleakError as exc: - _LOGGER.error("BLE write failed (attempt %d/%d): %s", - attempt, retries, exc) - if attempt < retries: - await asyncio.sleep(0.3) - continue + for attempt in range(1, retries + 1): + # Drain any stale frames before sending a new command + self._drain_queue() + self._buffer.clear() + try: + await client.write_gatt_char(TX_CHAR_UUID, command, response=True) + except BleakError: try: - await asyncio.wait_for(self._response_event.wait(), timeout) - return self._response_data - except asyncio.TimeoutError: - _LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)", - command.hex(), attempt, retries) + await client.write_gatt_char(TX_CHAR_UUID, command, response=False) + except BleakError as exc: + _LOGGER.warning("BLE write failed (attempt %d/%d): %s", + attempt, retries, exc) if attempt < retries: - await asyncio.sleep(0.3) + await asyncio.sleep(0.5) + continue + + try: + frame = await asyncio.wait_for( + self._response_queue.get(), timeout + ) + return frame + except asyncio.TimeoutError: + _LOGGER.warning("BMS timeout (cmd=0x%s, attempt %d/%d)", + command.hex(), attempt, retries) + if attempt < retries: + await asyncio.sleep(0.5) return None @@ -168,25 +189,29 @@ class BmsBluetoothHandler: async def write_mos(self, ble_device: BLEDevice, value: int) -> bool: """Send a MOS control write command and return True on ACK. - Follows the same connect → send → disconnect pattern as poll() so + Follows the same connect -> send -> disconnect pattern as poll() so it doesn't interfere with the normal poll cycle. """ + self._reset() command = self._build_mos_command(value) _LOGGER.debug("Writing MOS value 0x%02X to BMS at %s", value, self._address) - client = BleakClient(ble_device) + + client = await establish_connection( + BleakClientWithServiceCache, + ble_device, + self._address, + max_attempts=3, + ) try: - await client.connect() await client.start_notify(RX_CHAR_UUID, self._on_notify) - await asyncio.sleep(0.5) - response = await self._request(client, command, timeout=3.0, retries=2) - # Response: DD E1 00 00 CHK_H CHK_L 77 (status byte 0x00 = OK) + await asyncio.sleep(0.3) + response = await self._request(client, command, timeout=5.0, retries=2) return response is not None and response[2] == 0x00 finally: try: await client.disconnect() - except Exception: + except BleakError: pass - self._buffer.clear() @staticmethod def _build_mos_command(value: int) -> bytes: @@ -198,7 +223,7 @@ class BmsBluetoothHandler: Checksum = two's complement of sum, high byte first. Verified against spec example: - XX=0x02 → sum=0xE5 → ~0xE5+1=0xFF1B → CHK FF 1B ✓ + XX=0x02 -> sum=0xE5 -> ~0xE5+1=0xFF1B -> CHK FF 1B """ checked = [0xE1, 0x02, 0x00, value & 0xFF] checksum = (~sum(checked) + 1) & 0xFFFF @@ -217,10 +242,10 @@ class BmsBluetoothHandler: """Parse a 0x03 general info response frame. Payload byte offsets (frame[4] is payload[0]): - 0-1 Total voltage uint16 BE ÷100 → V - 2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging) - 4-5 Residual capacity uint16 BE ÷100 → Ah - 6-7 Nominal capacity uint16 BE ÷100 → Ah + 0-1 Total voltage uint16 BE /100 -> V + 2-3 Current int16 BE /100 -> A (positive = charging, negative = discharging) + 4-5 Residual capacity uint16 BE /100 -> Ah + 6-7 Nominal capacity uint16 BE /100 -> Ah 8-9 Cycle count uint16 BE 10-11 Production date (ignored) 12-15 Balance status (ignored) @@ -230,7 +255,7 @@ class BmsBluetoothHandler: 20 MOS status uint8 21 Cell count uint8 22 Temp probe count uint8 - 23+ Temperatures uint16 BE each (raw − 2731) ÷ 10 → °C + 23+ Temperatures uint16 BE each (raw - 2731) / 10 -> C """ p = frame[_HEADER_LEN:-_TRAILER_LEN] @@ -284,11 +309,11 @@ class BmsBluetoothHandler: def parse_cell_info(frame: bytes) -> dict: """Parse a 0x04 cell voltage response frame. - Per spec: frame[3] (the header length byte) = cell_count × 2. + Per spec: frame[3] (the header length byte) = cell_count x 2. The payload contains ONLY the voltage bytes — no count byte. - 0+ Cell voltages uint16 BE each unit mV ÷1000 → V + 0+ Cell voltages uint16 BE each unit mV /1000 -> V """ - count = frame[3] // 2 # header length byte = N_cells × 2 + count = frame[3] // 2 # header length byte = N_cells x 2 p = frame[_HEADER_LEN:-_TRAILER_LEN] voltages: list[float] = [] for i in range(count): diff --git a/custom_components/xiaoxiang_bms/const.py b/custom_components/xiaoxiang_bms/const.py index 1c3d44c..51dbbd0 100644 --- a/custom_components/xiaoxiang_bms/const.py +++ b/custom_components/xiaoxiang_bms/const.py @@ -5,8 +5,8 @@ DOMAIN = "xiaoxiang_bms" CONF_ADDRESS = "address" CONF_POLL_INTERVAL = "poll_interval" -DEFAULT_POLL_INTERVAL = 15 # seconds — each poll does a full BLE connect/disconnect -MIN_POLL_INTERVAL = 10 # below this the BMS has no breathing room between polls +DEFAULT_POLL_INTERVAL = 30 # seconds — each poll does a full BLE connect/disconnect +MIN_POLL_INTERVAL = 15 # below this the BMS has no breathing room between polls MAX_POLL_INTERVAL = 300 # GATT UUIDs (Xiaoxiang BMS UART-over-GATT) diff --git a/custom_components/xiaoxiang_bms/coordinator.py b/custom_components/xiaoxiang_bms/coordinator.py index ee2e578..b13b3d3 100644 --- a/custom_components/xiaoxiang_bms/coordinator.py +++ b/custom_components/xiaoxiang_bms/coordinator.py @@ -5,6 +5,7 @@ import asyncio import logging from datetime import timedelta +from bleak.backends.device import BLEDevice from homeassistant.components.bluetooth import async_ble_device_from_address from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError @@ -19,13 +20,13 @@ _LOGGER = logging.getLogger(__name__) # Only mark sensors unavailable after this many *consecutive* failed polls. # Transient BLE misses (device not in cache, ESPHome proxy busy, etc.) return # the last known data instead so the UI doesn't oscillate. -_FAILURES_BEFORE_UNAVAILABLE = 3 +_FAILURES_BEFORE_UNAVAILABLE = 5 class BmsCoordinator(DataUpdateCoordinator[dict]): """Polls the BMS over BLE and distributes data to all sensor entities. - Uses a connect → read → disconnect pattern on every poll so the BMS's + Uses a connect -> read -> disconnect pattern on every poll so the BMS's single BLE connection slot is free between updates (mobile app access). """ @@ -44,10 +45,12 @@ class BmsCoordinator(DataUpdateCoordinator[dict]): ) self.address = address self._device_name = name - self._poll_timeout = max(poll_interval - 3, 10) + self._poll_timeout = max(poll_interval - 5, 20) self._handler = BmsBluetoothHandler(address) self.hw_version: str | None = None self._consecutive_failures = 0 + # Kept fresh by the BLE advertisement callback registered in __init__.py + self._ble_device: BLEDevice | None = None # ------------------------------------------------------------------ # Device info — shared by sensor, binary_sensor, number platforms @@ -74,7 +77,7 @@ class BmsCoordinator(DataUpdateCoordinator[dict]): async def async_write_mos(self, value: int) -> None: """Send a MOS control command to the BMS, then refresh sensor state.""" - device = async_ble_device_from_address(self.hass, self.address, connectable=True) + device = self._get_ble_device() if device is None: raise HomeAssistantError( f"BMS ({self.address}) not reachable — cannot send MOS command" @@ -82,9 +85,22 @@ class BmsCoordinator(DataUpdateCoordinator[dict]): success = await self._handler.write_mos(device, value) if not success: raise HomeAssistantError("BMS did not acknowledge the MOS command") - # Refresh immediately so sensors reflect the new MOS state await self.async_request_refresh() + # ------------------------------------------------------------------ + # BLE device lookup + # ------------------------------------------------------------------ + + def _get_ble_device(self) -> BLEDevice | None: + """Return the freshest available BLEDevice reference. + + Prefers the advertisement-callback reference (_ble_device) because it + tracks proxy transport path changes. Falls back to the scanner cache. + """ + return self._ble_device or async_ble_device_from_address( + self.hass, self.address, connectable=True + ) + # ------------------------------------------------------------------ # Poll # ------------------------------------------------------------------ @@ -92,7 +108,7 @@ class BmsCoordinator(DataUpdateCoordinator[dict]): def _handle_failure(self, reason: str) -> dict: """On a transient failure, return cached data up to the threshold. - Only raises UpdateFailed (→ sensors go unavailable) after + Only raises UpdateFailed (-> sensors go unavailable) after _FAILURES_BEFORE_UNAVAILABLE consecutive misses. """ self._consecutive_failures += 1 @@ -109,14 +125,7 @@ class BmsCoordinator(DataUpdateCoordinator[dict]): async def _async_update_data(self) -> dict: """Connect to the BMS, fetch all data, disconnect.""" - # The BMS may not be in the scanner cache immediately after a disconnect. - # Wait up to 2 s for an advertisement before giving up. - device = async_ble_device_from_address(self.hass, self.address, connectable=True) - if device is None: - await asyncio.sleep(2.0) - device = async_ble_device_from_address( - self.hass, self.address, connectable=True - ) + device = self._get_ble_device() if device is None: return self._handle_failure( f"BMS ({self.address}) not reachable — check Bluetooth adapter / proxy"