"""DataUpdateCoordinator for the Xiaoxiang Smart BMS.""" from __future__ import annotations import logging from datetime import timedelta from homeassistant.components.bluetooth import async_ble_device_from_address from homeassistant.core import HomeAssistant from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .bluetooth_handler import BmsBluetoothHandler from .const import CMD_CELL, CMD_GENERAL, DOMAIN _LOGGER = logging.getLogger(__name__) class BmsCoordinator(DataUpdateCoordinator[dict]): """Polls the BMS over BLE and distributes data to all sensor entities.""" def __init__( self, hass: HomeAssistant, address: str, poll_interval: int, ) -> None: super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=timedelta(seconds=poll_interval), ) self.address = address self._handler = BmsBluetoothHandler(address) # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def _get_ble_device(self): """Resolve the best available BLE device via HA's Bluetooth subsystem. This automatically uses ESPHome BLE proxies if they can reach the BMS and the local adapter cannot (or vice versa). """ device = async_ble_device_from_address(self.hass, self.address, connectable=True) if device is None: raise UpdateFailed( f"BMS ({self.address}) not reachable by any Bluetooth adapter or proxy" ) return device async def async_setup(self) -> None: """Connect to the BMS. Called once during config entry setup.""" await self._handler.connect(self._get_ble_device()) async def async_teardown(self) -> None: """Disconnect cleanly. Called on entry unload.""" await self._handler.disconnect() # ------------------------------------------------------------------ # Poll # ------------------------------------------------------------------ async def _async_update_data(self) -> dict: """Fetch data from the BMS. Reconnects automatically if disconnected.""" if not self._handler.is_connected: _LOGGER.debug("BMS not connected, attempting reconnect…") try: await self._handler.connect(self._get_ble_device()) except UpdateFailed: raise except Exception as exc: raise UpdateFailed(f"BMS reconnect failed: {exc}") from exc try: general_frame = await self._handler.request(CMD_GENERAL) if general_frame is None: raise UpdateFailed("No response to general info request (0x03)") cell_frame = await self._handler.request(CMD_CELL) if cell_frame is None: raise UpdateFailed("No response to cell info request (0x04)") except UpdateFailed: raise except Exception as exc: raise UpdateFailed(f"BLE communication error: {exc}") from exc data = BmsBluetoothHandler.parse_general_info(general_frame) data.update(BmsBluetoothHandler.parse_cell_info(cell_frame)) # Derived fields data["power"] = round(data["voltage"] * data["current"], 2) if data["cell_voltages"]: v_max = max(data["cell_voltages"]) v_min = min(data["cell_voltages"]) data["cell_delta"] = round((v_max - v_min) * 1000, 1) else: data["cell_delta"] = None _LOGGER.debug( "BMS data: %.2fV %.2fA %d%% %.2fAh %d cells", data["voltage"], data["current"], data["state_of_charge"], data["residual_capacity"], len(data["cell_voltages"]), ) return data