5d527168e2
Sensors added: - energy_stored (kWh = V × Ah / 1000) for energy dashboard Binary sensors added (all from existing 0x03 frame, no extra BLE requests): - Charge MOSFET / Discharge MOSFET (MOS gate status) - Cell Balancing (any balance bit active) - 13× protection flags: cell/pack over/under-voltage, charge/discharge over/under-temperature, charge/discharge over-current, short circuit, frontend IC error, software lock Other: - Hardware version string fetched once via CMD 0x05, shown in device card - DeviceInfo centralised on coordinator (sensor + binary_sensor share it) - CONF_ADDRESS removed from sensor.py (coordinator holds address) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
133 lines
4.9 KiB
Python
133 lines
4.9 KiB
Python
"""DataUpdateCoordinator for the Xiaoxiang Smart BMS."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import timedelta
|
|
|
|
from homeassistant.components.bluetooth import async_ble_device_from_address
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.device_registry import DeviceInfo
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .bluetooth_handler import BmsBluetoothHandler
|
|
from .const import CMD_CELL, CMD_GENERAL, CMD_VERSION, DOMAIN
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class BmsCoordinator(DataUpdateCoordinator[dict]):
|
|
"""Polls the BMS over BLE and distributes data to all sensor entities."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
address: str,
|
|
poll_interval: int,
|
|
) -> None:
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=DOMAIN,
|
|
update_interval=timedelta(seconds=poll_interval),
|
|
)
|
|
self.address = address
|
|
self._handler = BmsBluetoothHandler(address)
|
|
self.hw_version: str | None = None # populated on first successful poll
|
|
|
|
# ------------------------------------------------------------------
|
|
# Device info — centralised so sensor + binary_sensor share it
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def device_info(self) -> DeviceInfo:
|
|
return DeviceInfo(
|
|
identifiers={(DOMAIN, self.address)},
|
|
name="Xiaoxiang Smart BMS",
|
|
manufacturer="Xiaoxiang",
|
|
model=self.hw_version or "Smart BMS",
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_ble_device(self):
|
|
"""Resolve the best available BLE device via HA's Bluetooth subsystem.
|
|
|
|
Automatically uses ESPHome BLE proxies if they can reach the BMS.
|
|
"""
|
|
device = async_ble_device_from_address(self.hass, self.address, connectable=True)
|
|
if device is None:
|
|
raise UpdateFailed(
|
|
f"BMS ({self.address}) not reachable by any Bluetooth adapter or proxy"
|
|
)
|
|
return device
|
|
|
|
async def async_setup(self) -> None:
|
|
"""No-op — connection is established lazily on the first poll."""
|
|
|
|
async def async_teardown(self) -> None:
|
|
"""Disconnect cleanly. Called on entry unload."""
|
|
await self._handler.disconnect()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Poll
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _async_update_data(self) -> dict:
|
|
"""Fetch data from the BMS. Reconnects automatically if disconnected."""
|
|
if not self._handler.is_connected:
|
|
_LOGGER.debug("BMS not connected, attempting reconnect…")
|
|
try:
|
|
await self._handler.connect(self._get_ble_device())
|
|
except UpdateFailed:
|
|
raise
|
|
except Exception as exc:
|
|
raise UpdateFailed(f"BMS reconnect failed: {exc}") from exc
|
|
|
|
try:
|
|
general_frame = await self._handler.request(CMD_GENERAL)
|
|
if general_frame is None:
|
|
raise UpdateFailed("No response to general info request (0x03)")
|
|
|
|
cell_frame = await self._handler.request(CMD_CELL)
|
|
if cell_frame is None:
|
|
raise UpdateFailed("No response to cell info request (0x04)")
|
|
|
|
# Fetch hardware version string once — used in DeviceInfo model field
|
|
if self.hw_version is None:
|
|
version_frame = await self._handler.request(CMD_VERSION)
|
|
if version_frame:
|
|
self.hw_version = BmsBluetoothHandler.parse_version(version_frame)
|
|
_LOGGER.debug("BMS hardware version: %s", self.hw_version)
|
|
|
|
except UpdateFailed:
|
|
raise
|
|
except Exception as exc:
|
|
raise UpdateFailed(f"BLE communication error: {exc}") from exc
|
|
|
|
data = BmsBluetoothHandler.parse_general_info(general_frame)
|
|
data.update(BmsBluetoothHandler.parse_cell_info(cell_frame))
|
|
|
|
# Derived fields
|
|
data["power"] = round(data["voltage"] * data["current"], 2)
|
|
data["energy_stored"] = round(data["voltage"] * data["residual_capacity"] / 1000, 3)
|
|
|
|
if data["cell_voltages"]:
|
|
v_max = max(data["cell_voltages"])
|
|
v_min = min(data["cell_voltages"])
|
|
data["cell_delta"] = round((v_max - v_min) * 1000, 1)
|
|
else:
|
|
data["cell_delta"] = None
|
|
|
|
_LOGGER.debug(
|
|
"BMS data: %.2fV %.2fA %d%% %.2fAh %.3fkWh %d cells",
|
|
data["voltage"],
|
|
data["current"],
|
|
data["state_of_charge"],
|
|
data["residual_capacity"],
|
|
data["energy_stored"],
|
|
len(data["cell_voltages"]),
|
|
)
|
|
return data
|