Files
ha-xiaoxiang-bms/custom_components/xiaoxiang_bms/coordinator.py
T
Jannis 1520ed3c0f Refactor BLE layer for 24/7 reliability
- Replace raw BleakClient with establish_connection from
  bleak-retry-connector (retries, GATT service cache, proxy-aware)
- Replace fragile asyncio.Event with asyncio.Queue for response frames,
  drain stale data on each connection to prevent cross-cycle leakage
- Register BLE advertisement callback to keep BLEDevice reference fresh
  across ESPHome proxy path changes
- Remove asyncio.sleep(2) device lookup hack
- Increase poll timeout floor from 10s to 20s
- Increase failure tolerance from 3 to 5 consecutive misses
- Bump default poll interval to 30s, min to 15s (halves connection churn)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-12 09:36:08 +02:00

184 lines
6.9 KiB
Python

"""DataUpdateCoordinator for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import asyncio
import logging
from datetime import timedelta
from bleak.backends.device import BLEDevice
from homeassistant.components.bluetooth import async_ble_device_from_address
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .bluetooth_handler import BmsBluetoothHandler
from .const import CMD_CELL, CMD_GENERAL, CMD_VERSION, DOMAIN
_LOGGER = logging.getLogger(__name__)
# Only mark sensors unavailable after this many *consecutive* failed polls.
# Transient BLE misses (device not in cache, ESPHome proxy busy, etc.) return
# the last known data instead so the UI doesn't oscillate.
_FAILURES_BEFORE_UNAVAILABLE = 5
class BmsCoordinator(DataUpdateCoordinator[dict]):
"""Polls the BMS over BLE and distributes data to all sensor entities.
Uses a connect -> read -> disconnect pattern on every poll so the BMS's
single BLE connection slot is free between updates (mobile app access).
"""
def __init__(
self,
hass: HomeAssistant,
address: str,
poll_interval: int,
name: str = "Xiaoxiang Smart BMS",
) -> None:
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=poll_interval),
)
self.address = address
self._device_name = name
self._poll_timeout = max(poll_interval - 5, 20)
self._handler = BmsBluetoothHandler(address)
self.hw_version: str | None = None
self._consecutive_failures = 0
# Kept fresh by the BLE advertisement callback registered in __init__.py
self._ble_device: BLEDevice | None = None
# ------------------------------------------------------------------
# Device info — shared by sensor, binary_sensor, number platforms
# ------------------------------------------------------------------
@property
def device_info(self) -> DeviceInfo:
return DeviceInfo(
identifiers={(DOMAIN, self.address)},
name=self._device_name,
manufacturer="Xiaoxiang",
model=self.hw_version or "Smart BMS",
)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def async_setup(self) -> None:
"""No-op — no persistent connection to establish."""
async def async_teardown(self) -> None:
"""No-op — each poll disconnects itself."""
async def async_write_mos(self, value: int) -> None:
"""Send a MOS control command to the BMS, then refresh sensor state."""
device = self._get_ble_device()
if device is None:
raise HomeAssistantError(
f"BMS ({self.address}) not reachable — cannot send MOS command"
)
success = await self._handler.write_mos(device, value)
if not success:
raise HomeAssistantError("BMS did not acknowledge the MOS command")
await self.async_request_refresh()
# ------------------------------------------------------------------
# BLE device lookup
# ------------------------------------------------------------------
def _get_ble_device(self) -> BLEDevice | None:
"""Return the freshest available BLEDevice reference.
Prefers the advertisement-callback reference (_ble_device) because it
tracks proxy transport path changes. Falls back to the scanner cache.
"""
return self._ble_device or async_ble_device_from_address(
self.hass, self.address, connectable=True
)
# ------------------------------------------------------------------
# Poll
# ------------------------------------------------------------------
def _handle_failure(self, reason: str) -> dict:
"""On a transient failure, return cached data up to the threshold.
Only raises UpdateFailed (-> sensors go unavailable) after
_FAILURES_BEFORE_UNAVAILABLE consecutive misses.
"""
self._consecutive_failures += 1
if self._consecutive_failures <= _FAILURES_BEFORE_UNAVAILABLE and self.data:
_LOGGER.debug(
"BMS poll failed (%d/%d), keeping last known data: %s",
self._consecutive_failures,
_FAILURES_BEFORE_UNAVAILABLE,
reason,
)
return self.data
raise UpdateFailed(reason)
async def _async_update_data(self) -> dict:
"""Connect to the BMS, fetch all data, disconnect."""
device = self._get_ble_device()
if device is None:
return self._handle_failure(
f"BMS ({self.address}) not reachable — check Bluetooth adapter / proxy"
)
commands = [CMD_GENERAL, CMD_CELL]
if self.hw_version is None:
commands.append(CMD_VERSION)
try:
responses = await asyncio.wait_for(
self._handler.poll(device, commands),
timeout=self._poll_timeout,
)
except asyncio.TimeoutError:
return self._handle_failure(
f"BMS poll timed out after {self._poll_timeout}s"
)
except Exception as exc:
return self._handle_failure(f"BMS poll failed: {exc}")
general_frame, cell_frame = responses[0], responses[1]
if general_frame is None:
return self._handle_failure("No response to general info request (0x03)")
if cell_frame is None:
return self._handle_failure("No response to cell info request (0x04)")
# Successful poll — reset failure counter
self._consecutive_failures = 0
if self.hw_version is None and len(responses) > 2 and responses[2]:
self.hw_version = BmsBluetoothHandler.parse_version(responses[2])
_LOGGER.debug("BMS hardware version: %s", self.hw_version)
data = BmsBluetoothHandler.parse_general_info(general_frame)
data.update(BmsBluetoothHandler.parse_cell_info(cell_frame))
data["power"] = round(data["voltage"] * data["current"], 2)
data["energy_stored"] = round(data["voltage"] * data["residual_capacity"] / 1000, 3)
if data["cell_voltages"]:
v_max = max(data["cell_voltages"])
v_min = min(data["cell_voltages"])
data["cell_delta"] = round((v_max - v_min) * 1000, 1)
else:
data["cell_delta"] = None
_LOGGER.debug(
"BMS data: %.2fV %.2fA %d%% %.2fAh %.3fkWh %d cells",
data["voltage"], data["current"], data["state_of_charge"],
data["residual_capacity"], data["energy_stored"],
len(data["cell_voltages"]),
)
return data