"""DataUpdateCoordinator for the Xiaoxiang Smart BMS.""" from __future__ import annotations import asyncio import logging from datetime import timedelta from bleak import BleakError from bleak.backends.device import BLEDevice from homeassistant.components.bluetooth import async_ble_device_from_address from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .bluetooth_handler import BmsBluetoothHandler from .const import CMD_CELL, CMD_GENERAL, CMD_VERSION, DOMAIN _LOGGER = logging.getLogger(__name__) # Only mark sensors unavailable after this many *consecutive* failed polls. _FAILURES_BEFORE_UNAVAILABLE = 5 # Hard ceiling on a single poll (commands only — no connection overhead # when already connected). Reconnection adds ~5 s on top. _POLL_TIMEOUT = 15 class BmsCoordinator(DataUpdateCoordinator[dict]): """Polls the BMS over BLE and distributes data to all sensor entities. Holds a **persistent** BLE connection per device. The connection is established on the first poll and kept alive across cycles. If it drops, the next poll automatically reconnects. """ def __init__( self, hass: HomeAssistant, address: str, poll_interval: int, name: str = "Xiaoxiang Smart BMS", ble_lock: asyncio.Lock | None = None, ) -> None: super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=timedelta(seconds=poll_interval), ) self.address = address self._device_name = name self._handler = BmsBluetoothHandler(address) self.hw_version: str | None = None self._consecutive_failures = 0 # Kept fresh by the BLE advertisement callback registered in __init__.py self._ble_device: BLEDevice | None = None # Shared across all BMS coordinator instances — serialises BLE # operations so multiple devices don't fight for proxy slots. self._ble_lock = ble_lock or asyncio.Lock() # ------------------------------------------------------------------ # Device info # ------------------------------------------------------------------ @property def device_info(self) -> DeviceInfo: return DeviceInfo( identifiers={(DOMAIN, self.address)}, name=self._device_name, manufacturer="Xiaoxiang", model=self.hw_version or "Smart BMS", ) # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def async_setup(self) -> None: """No-op — connection is established lazily on first poll.""" async def async_teardown(self) -> None: """Disconnect the persistent BLE connection.""" await self._handler.disconnect() async def async_write_mos(self, value: int) -> None: """Send a MOS control command to the BMS, then refresh sensor state.""" device = self._get_ble_device() if device is None: raise HomeAssistantError( f"BMS ({self.address}) not reachable — cannot send MOS command" ) async with self._ble_lock: try: success = await self._handler.write_mos( device, value, ble_device_callback=self._get_ble_device, ) except (BleakError, asyncio.TimeoutError) as exc: raise HomeAssistantError(f"MOS command failed: {exc}") from exc if not success: raise HomeAssistantError("BMS did not acknowledge the MOS command") await self.async_request_refresh() # ------------------------------------------------------------------ # BLE device lookup # ------------------------------------------------------------------ def _get_ble_device(self) -> BLEDevice | None: """Return the freshest available BLEDevice reference.""" if self._ble_device is not None: return self._ble_device device = async_ble_device_from_address( self.hass, self.address, connectable=True ) if device is not None: _LOGGER.debug("BMS %s found via scanner cache", self.address) return device # ------------------------------------------------------------------ # Poll # ------------------------------------------------------------------ def _handle_failure(self, reason: str) -> dict: """Return cached data up to the threshold, then go unavailable.""" self._consecutive_failures += 1 if self._consecutive_failures <= _FAILURES_BEFORE_UNAVAILABLE and self.data: _LOGGER.debug( "BMS poll failed (%d/%d), keeping last known data: %s", self._consecutive_failures, _FAILURES_BEFORE_UNAVAILABLE, reason, ) return self.data raise UpdateFailed(reason) async def _async_update_data(self) -> dict: """Poll the BMS over the persistent connection.""" device = self._get_ble_device() if device is None: # Wait up to 5 s for an advertisement (after HA restart / proxy reconnect) _LOGGER.debug("BMS %s not discovered yet, waiting…", self.address) for _ in range(5): await asyncio.sleep(1.0) device = self._get_ble_device() if device is not None: break if device is None: return self._handle_failure( f"BMS ({self.address}) not reachable — check Bluetooth adapter / proxy" ) commands = [CMD_GENERAL, CMD_CELL] if self.hw_version is None: commands.append(CMD_VERSION) # Serialise BLE operations across all BMS instances. async with self._ble_lock: try: responses = await asyncio.wait_for( self._handler.poll( device, commands, ble_device_callback=self._get_ble_device, ), timeout=_POLL_TIMEOUT, ) except asyncio.TimeoutError: # Connection might be dead — force reconnect next cycle await self._handler.disconnect() return self._handle_failure( f"BMS poll timed out after {_POLL_TIMEOUT}s" ) except (BleakError, Exception) as exc: return self._handle_failure(f"BMS poll failed: {exc}") general_frame, cell_frame = responses[0], responses[1] if general_frame is None: return self._handle_failure("No response to general info request (0x03)") if cell_frame is None: return self._handle_failure("No response to cell info request (0x04)") # Successful poll — reset failure counter self._consecutive_failures = 0 if self.hw_version is None and len(responses) > 2 and responses[2]: self.hw_version = BmsBluetoothHandler.parse_version(responses[2]) _LOGGER.debug("BMS hardware version: %s", self.hw_version) data = BmsBluetoothHandler.parse_general_info(general_frame) data.update(BmsBluetoothHandler.parse_cell_info(cell_frame)) data["power"] = round(data["voltage"] * data["current"], 2) data["energy_stored"] = round(data["voltage"] * data["residual_capacity"] / 1000, 3) if data["cell_voltages"]: v_max = max(data["cell_voltages"]) v_min = min(data["cell_voltages"]) data["cell_delta"] = round((v_max - v_min) * 1000, 1) else: data["cell_delta"] = None _LOGGER.debug( "BMS data: %.2fV %.2fA %d%% %.2fAh %.3fkWh %d cells (connected: %s)", data["voltage"], data["current"], data["state_of_charge"], data["residual_capacity"], data["energy_stored"], len(data["cell_voltages"]), self._handler.is_connected, ) return data