"""DataUpdateCoordinator for the Xiaoxiang Smart BMS.""" from __future__ import annotations import logging from datetime import timedelta from homeassistant.components.bluetooth import async_ble_device_from_address from homeassistant.core import HomeAssistant from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .bluetooth_handler import BmsBluetoothHandler from .const import CMD_CELL, CMD_GENERAL, CMD_VERSION, DOMAIN _LOGGER = logging.getLogger(__name__) class BmsCoordinator(DataUpdateCoordinator[dict]): """Polls the BMS over BLE and distributes data to all sensor entities.""" def __init__( self, hass: HomeAssistant, address: str, poll_interval: int, ) -> None: super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=timedelta(seconds=poll_interval), ) self.address = address self._handler = BmsBluetoothHandler(address) self.hw_version: str | None = None # populated on first successful poll # ------------------------------------------------------------------ # Device info — centralised so sensor + binary_sensor share it # ------------------------------------------------------------------ @property def device_info(self) -> DeviceInfo: return DeviceInfo( identifiers={(DOMAIN, self.address)}, name="Xiaoxiang Smart BMS", manufacturer="Xiaoxiang", model=self.hw_version or "Smart BMS", ) # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def _get_ble_device(self): """Resolve the best available BLE device via HA's Bluetooth subsystem. Automatically uses ESPHome BLE proxies if they can reach the BMS. """ device = async_ble_device_from_address(self.hass, self.address, connectable=True) if device is None: raise UpdateFailed( f"BMS ({self.address}) not reachable by any Bluetooth adapter or proxy" ) return device async def async_setup(self) -> None: """No-op — connection is established lazily on the first poll.""" async def async_teardown(self) -> None: """Disconnect cleanly. Called on entry unload.""" await self._handler.disconnect() # ------------------------------------------------------------------ # Poll # ------------------------------------------------------------------ async def _async_update_data(self) -> dict: """Fetch data from the BMS. Reconnects automatically if disconnected.""" if not self._handler.is_connected: _LOGGER.debug("BMS not connected, attempting reconnect…") try: await self._handler.connect(self._get_ble_device()) except UpdateFailed: raise except Exception as exc: raise UpdateFailed(f"BMS reconnect failed: {exc}") from exc try: general_frame = await self._handler.request(CMD_GENERAL) if general_frame is None: raise UpdateFailed("No response to general info request (0x03)") cell_frame = await self._handler.request(CMD_CELL) if cell_frame is None: raise UpdateFailed("No response to cell info request (0x04)") # Fetch hardware version string once — used in DeviceInfo model field if self.hw_version is None: version_frame = await self._handler.request(CMD_VERSION) if version_frame: self.hw_version = BmsBluetoothHandler.parse_version(version_frame) _LOGGER.debug("BMS hardware version: %s", self.hw_version) except UpdateFailed: raise except Exception as exc: raise UpdateFailed(f"BLE communication error: {exc}") from exc data = BmsBluetoothHandler.parse_general_info(general_frame) data.update(BmsBluetoothHandler.parse_cell_info(cell_frame)) # Derived fields data["power"] = round(data["voltage"] * data["current"], 2) data["energy_stored"] = round(data["voltage"] * data["residual_capacity"] / 1000, 3) if data["cell_voltages"]: v_max = max(data["cell_voltages"]) v_min = min(data["cell_voltages"]) data["cell_delta"] = round((v_max - v_min) * 1000, 1) else: data["cell_delta"] = None _LOGGER.debug( "BMS data: %.2fV %.2fA %d%% %.2fAh %.3fkWh %d cells", data["voltage"], data["current"], data["state_of_charge"], data["residual_capacity"], data["energy_stored"], len(data["cell_voltages"]), ) return data