"""DataUpdateCoordinator for the Xiaoxiang Smart BMS.""" from __future__ import annotations import logging from datetime import timedelta from homeassistant.components.bluetooth import async_ble_device_from_address from homeassistant.core import HomeAssistant from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .bluetooth_handler import BmsBluetoothHandler from .const import CMD_CELL, CMD_GENERAL, CMD_VERSION, DOMAIN _LOGGER = logging.getLogger(__name__) class BmsCoordinator(DataUpdateCoordinator[dict]): """Polls the BMS over BLE and distributes data to all sensor entities. Uses a connect → read → disconnect pattern on every poll so the BMS's single BLE connection slot is free between updates (mobile app access). """ def __init__( self, hass: HomeAssistant, address: str, poll_interval: int, ) -> None: super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=timedelta(seconds=poll_interval), ) self.address = address self._handler = BmsBluetoothHandler(address) self.hw_version: str | None = None # ------------------------------------------------------------------ # Device info — shared by sensor, binary_sensor, number platforms # ------------------------------------------------------------------ @property def device_info(self) -> DeviceInfo: return DeviceInfo( identifiers={(DOMAIN, self.address)}, name="Xiaoxiang Smart BMS", manufacturer="Xiaoxiang", model=self.hw_version or "Smart BMS", ) # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def async_setup(self) -> None: """No-op — no persistent connection to establish.""" async def async_teardown(self) -> None: """No-op — each poll disconnects itself.""" # ------------------------------------------------------------------ # Poll # ------------------------------------------------------------------ async def _async_update_data(self) -> dict: """Connect to the BMS, fetch all data, disconnect.""" device = async_ble_device_from_address(self.hass, self.address, connectable=True) if device is None: raise UpdateFailed( f"BMS ({self.address}) not reachable — check Bluetooth adapter / proxy" ) # Fetch hardware version once; skip on subsequent polls commands = [CMD_GENERAL, CMD_CELL] if self.hw_version is None: commands.append(CMD_VERSION) try: responses = await self._handler.poll(device, commands) except Exception as exc: raise UpdateFailed(f"BMS poll failed: {exc}") from exc general_frame, cell_frame = responses[0], responses[1] if general_frame is None: raise UpdateFailed("No response to general info request (0x03)") if cell_frame is None: raise UpdateFailed("No response to cell info request (0x04)") if self.hw_version is None and len(responses) > 2 and responses[2]: self.hw_version = BmsBluetoothHandler.parse_version(responses[2]) _LOGGER.debug("BMS hardware version: %s", self.hw_version) data = BmsBluetoothHandler.parse_general_info(general_frame) data.update(BmsBluetoothHandler.parse_cell_info(cell_frame)) # Derived fields data["power"] = round(data["voltage"] * data["current"], 2) data["energy_stored"] = round(data["voltage"] * data["residual_capacity"] / 1000, 3) if data["cell_voltages"]: v_max = max(data["cell_voltages"]) v_min = min(data["cell_voltages"]) data["cell_delta"] = round((v_max - v_min) * 1000, 1) else: data["cell_delta"] = None _LOGGER.debug( "BMS data: %.2fV %.2fA %d%% %.2fAh %.3fkWh %d cells", data["voltage"], data["current"], data["state_of_charge"], data["residual_capacity"], data["energy_stored"], len(data["cell_voltages"]), ) return data