commit 4a769bf50ff66a520d8dbd03ad7a431bc084c951 Author: Jannis Christiani Date: Sat Apr 11 19:07:18 2026 +0200 Initial commit: Xiaoxiang Smart BMS Home Assistant integration - BLE GATT communication via bleak (UART-over-GATT, service 0xff00) - Parses 0x03 (general info) and 0x04 (cell voltages) frames - Protocol verified against official RS485/UART spec V4 - DataUpdateCoordinator with 5s poll interval (configurable 2-60s) - Auto-reconnect on BLE disconnect - Config flow: BT auto-discovery + manual MAC entry + options flow - Sensors: voltage, current, power, SoC, capacity, cycles, temps, cells, cell delta - HACS-ready (hacs.json, manifest.json) Co-Authored-By: Claude Sonnet 4.6 diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..109239b --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,9 @@ +{ + "permissions": { + "allow": [ + "WebFetch(domain:raw.githubusercontent.com)", + "WebSearch", + "WebFetch(domain:api.github.com)" + ] + } +} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6f27fec --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +*.egg-info/ +*.egg +dist/ +build/ +.eggs/ + +# Virtual environments +.venv/ +venv/ +env/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Home Assistant +*.db +*.log +home-assistant_v2.db + +# Testing +.pytest_cache/ +.coverage +htmlcov/ diff --git a/MASTERPLAN.md b/MASTERPLAN.md new file mode 100644 index 0000000..9a1745e --- /dev/null +++ b/MASTERPLAN.md @@ -0,0 +1,478 @@ +# Xiaoxiang Smart BMS — Home Assistant Integration Masterplan + +## Overview + +A **native HA custom integration** (not an addon) that connects to your Xiaoxiang BMS +over Bluetooth Low Energy (BLE), parses the proprietary UART-over-GATT protocol, +and exposes battery data as first-class Home Assistant sensor entities. + +No MQTT broker, no extra container, no sidecar — just a `custom_components/` folder +you drop into your HA config directory. + +--- + +## Architecture + +``` +Home Assistant +└── Bluetooth subsystem (built-in) + └── xiaoxiang_bms (custom_component) + ├── Config Flow ← GUI: scan BT, pick device, save MAC + ├── DataUpdateCoordinator ← polls BMS every 30s via BLE + │ └── BluetoothHandler ← BLE GATT read/write (bleak) + │ ├── Request 0x03 → General info frame + │ └── Request 0x04 → Cell voltages frame + └── SensorEntity (×N) ← one entity per data field +``` + +--- + +## BMS Protocol Reference + +Derived from: https://github.com/Jnnshschl/AndroidBMSApp + +### GATT UUIDs + +| Role | UUID | +|------|------| +| UART Service | `0000ff00-0000-1000-8000-00805f9b34fb` | +| RX (notify/read) | `0000ff01-0000-1000-8000-00805f9b34fb` | +| TX (write) | `0000ff02-0000-1000-8000-00805f9b34fb` | + +### Frame Format + +``` +[0xDD] [CMD] [STATUS] [LENGTH] [PAYLOAD...] [CHECKSUM_HI] [CHECKSUM_LO] [0x77] +``` + +- All multi-byte values: **Big-endian** +- Max frame size: 80 bytes + +### Request Commands + +```python +CMD_GENERAL_INFO = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]) +CMD_CELL_INFO = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77]) +``` + +### General Info Response (0x03) — byte offsets in payload + +| Offset | Length | Field | Scaling | Unit | +|--------|--------|-------|---------|------| +| 0-1 | 2 | Total voltage | ÷100 | V | +| 2-3 | 2 | Current (signed) | ÷100 | A | +| 4-5 | 2 | Residual capacity | ÷100 | Ah | +| 6-7 | 2 | Nominal capacity | ÷100 | Ah | +| 8-9 | 2 | Cycle count | ×1 | - | +| 10-11 | 2 | Production date | - | (skip) | +| 12-15 | 4 | Balance status bits | - | (skip) | +| 16-17 | 2 | Protection status | - | (skip) | +| 18 | 1 | Version | - | (skip) | +| 19 | 1 | State of charge | ×1 | % | +| 20 | 1 | MOS status | - | (skip) | +| 21 | 1 | Cell count | ×1 | - | +| 22 | 1 | Temp probe count | ×1 | - | +| 23+ | 2×N | Temperatures | (val−2731)÷10 | °C | + +> Current is a signed 16-bit int; negative = charging, positive = discharging + +### Cell Info Response (0x04) — byte offsets in payload + +| Offset | Length | Field | Scaling | Unit | +|--------|--------|-------|---------|------| +| 0 | 1 | Cell count (÷2) | - | - | +| 1+ | 2×N | Cell voltages | ÷1000 | V | + +--- + +## Entities Exposed in Home Assistant + +| Entity | Device Class | Unit | Notes | +|--------|-------------|------|-------| +| `sensor.bms_voltage` | voltage | V | Total pack voltage | +| `sensor.bms_current` | current | A | Signed (neg=charge) | +| `sensor.bms_power` | power | W | Calculated: V × I | +| `sensor.bms_state_of_charge` | battery | % | 0–100 | +| `sensor.bms_residual_capacity` | energy_storage | Ah | Remaining Ah | +| `sensor.bms_nominal_capacity` | energy_storage | Ah | Full capacity | +| `sensor.bms_cycle_count` | - | - | Lifetime cycles | +| `sensor.bms_temperature_1..N` | temperature | °C | Per probe | +| `sensor.bms_cell_voltage_1..N` | voltage | V | Per cell | +| `sensor.bms_cell_delta` | voltage | mV | Max−min cell diff | + +--- + +## File Structure + +``` +custom_components/ +└── xiaoxiang_bms/ + ├── __init__.py # Integration setup / unload + ├── manifest.json # HA metadata, bluetooth dependency + ├── const.py # All constants (UUIDs, commands, offsets) + ├── config_flow.py # GUI setup wizard (BT scan → pick device → save) + ├── coordinator.py # DataUpdateCoordinator — poll loop + data store + ├── bluetooth_handler.py # Low-level BLE GATT comms (bleak) + ├── sensor.py # SensorEntity definitions + └── strings.json # UI strings +translations/ +└── en.json # English translations for config flow +``` + +--- + +## Implementation Steps + +### Step 1 — `manifest.json` + +```json +{ + "domain": "xiaoxiang_bms", + "name": "Xiaoxiang Smart BMS", + "version": "1.0.0", + "config_flow": true, + "dependencies": ["bluetooth"], + "bluetooth": [ + {"service_uuid": "0000ff00-0000-1000-8000-00805f9b34fb"} + ], + "requirements": [], + "codeowners": ["@you"], + "iot_class": "local_polling" +} +``` + +> Declaring `bluetooth` in `dependencies` lets HA's Bluetooth subsystem manage +> the adapter and lets the config flow use `bluetooth.async_discovered_service_info`. + +--- + +### Step 2 — `const.py` + +```python +DOMAIN = "xiaoxiang_bms" +DEFAULT_POLL_INTERVAL = 30 # seconds + +# GATT UUIDs +UART_SERVICE_UUID = "0000ff00-0000-1000-8000-00805f9b34fb" +RX_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb" +TX_CHAR_UUID = "0000ff02-0000-1000-8000-00805f9b34fb" + +# Request frames +CMD_GENERAL = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]) +CMD_CELL = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77]) + +# Frame markers +FRAME_START = 0xDD +FRAME_END = 0x77 + +CMD_GENERAL_ID = 0x03 +CMD_CELL_ID = 0x04 +``` + +--- + +### Step 3 — `bluetooth_handler.py` + +Core BLE communication class using **bleak** (already bundled with HA): + +```python +import asyncio +import struct +from bleak import BleakClient + +class BmsBluetoothHandler: + def __init__(self, address: str): + self._address = address + self._client: BleakClient | None = None + self._response_buffer = bytearray() + self._response_event = asyncio.Event() + self._response_data: bytes | None = None + + async def connect(self): + self._client = BleakClient(self._address) + await self._client.connect() + await self._client.start_notify(RX_CHAR_UUID, self._on_notify) + + async def disconnect(self): + if self._client and self._client.is_connected: + await self._client.disconnect() + + def _on_notify(self, _, data: bytearray): + """Accumulate chunks until complete frame received.""" + self._response_buffer.extend(data) + # Check for end marker + if self._response_buffer and self._response_buffer[-1] == FRAME_END: + if self._response_buffer[0] == FRAME_START: + self._response_data = bytes(self._response_buffer) + self._response_buffer.clear() + self._response_event.set() + + async def request(self, command: bytes, timeout=5.0) -> bytes | None: + self._response_event.clear() + self._response_data = None + await self._client.write_gatt_char(TX_CHAR_UUID, command) + try: + await asyncio.wait_for(self._response_event.wait(), timeout) + return self._response_data + except asyncio.TimeoutError: + return None + + @staticmethod + def parse_general_info(frame: bytes) -> dict: + # frame[0]=0xDD, frame[1]=CMD, frame[2]=status, frame[3]=length + # payload starts at frame[4] + p = frame[4:] + temp_count = p[22] + temps = [] + for i in range(temp_count): + raw = struct.unpack_from(">H", p, 23 + i * 2)[0] + temps.append((raw - 2731) / 10.0) + + return { + "voltage": struct.unpack_from(">H", p, 0)[0] / 100.0, + "current": struct.unpack_from(">h", p, 2)[0] / 100.0, # signed! + "residual_capacity":struct.unpack_from(">H", p, 4)[0] / 100.0, + "nominal_capacity": struct.unpack_from(">H", p, 6)[0] / 100.0, + "cycle_count": struct.unpack_from(">H", p, 8)[0], + "state_of_charge": p[19], + "cell_count": p[21], + "temperatures": temps, + } + + @staticmethod + def parse_cell_info(frame: bytes) -> dict: + p = frame[4:] + count = p[0] // 2 + voltages = [] + for i in range(count): + raw = struct.unpack_from(">H", p, 1 + i * 2)[0] + voltages.append(raw / 1000.0) + return {"cell_voltages": voltages} +``` + +--- + +### Step 4 — `coordinator.py` + +```python +from datetime import timedelta +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed + +class BmsCoordinator(DataUpdateCoordinator): + def __init__(self, hass, address: str): + super().__init__(hass, logger, name="xiaoxiang_bms", + update_interval=timedelta(seconds=DEFAULT_POLL_INTERVAL)) + self._handler = BmsBluetoothHandler(address) + self.data = {} + + async def _async_setup(self): + await self._handler.connect() + + async def _async_update_data(self): + try: + general_frame = await self._handler.request(CMD_GENERAL) + cell_frame = await self._handler.request(CMD_CELL) + if not general_frame or not cell_frame: + raise UpdateFailed("BMS did not respond") + data = BmsBluetoothHandler.parse_general_info(general_frame) + data.update(BmsBluetoothHandler.parse_cell_info(cell_frame)) + # Derived fields + data["power"] = round(data["voltage"] * data["current"], 2) + if data["cell_voltages"]: + data["cell_delta"] = round( + (max(data["cell_voltages"]) - min(data["cell_voltages"])) * 1000, 1 + ) + return data + except Exception as e: + raise UpdateFailed(f"BMS polling error: {e}") from e +``` + +--- + +### Step 5 — `config_flow.py` + +Two-step config flow: +1. **Auto-discovery**: HA detects BLE advertisements matching the BMS service UUID + and suggests the device automatically (appears as a notification in HA). +2. **Manual**: User can go to Settings → Integrations → Add → "Xiaoxiang BMS" + and paste/enter the BT MAC address. + +```python +class XiaoxiangBmsConfigFlow(ConfigFlow, domain=DOMAIN): + VERSION = 1 + + async def async_step_bluetooth(self, discovery_info): + """Called automatically when HA sees the BMS advertising.""" + self._address = discovery_info.address + self._name = discovery_info.name or "Xiaoxiang BMS" + await self.async_set_unique_id(self._address) + self._abort_if_unique_id_configured() + return await self.async_step_confirm() + + async def async_step_confirm(self, user_input=None): + if user_input is not None: + return self.async_create_entry(title=self._name, + data={"address": self._address}) + return self.async_show_form(step_id="confirm", ...) + + async def async_step_user(self, user_input=None): + """Manual entry fallback.""" + if user_input: + address = user_input["address"] + await self.async_set_unique_id(address) + self._abort_if_unique_id_configured() + return self.async_create_entry(title="Xiaoxiang BMS", + data={"address": address}) + return self.async_show_form(step_id="user", + data_schema=vol.Schema({"address": str})) +``` + +--- + +### Step 6 — `sensor.py` + +One `SensorEntity` subclass per data field. Dynamic cell/temp sensors are +created in `async_setup_entry` based on the first successful poll. + +```python +SENSOR_DESCRIPTIONS = [ + SensorEntityDescription(key="voltage", name="Voltage", + native_unit_of_measurement=UnitOfElectricPotential.VOLT, + device_class=SensorDeviceClass.VOLTAGE), + SensorEntityDescription(key="current", name="Current", + native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, + device_class=SensorDeviceClass.CURRENT), + SensorEntityDescription(key="power", name="Power", + native_unit_of_measurement=UnitOfPower.WATT, + device_class=SensorDeviceClass.POWER), + SensorEntityDescription(key="state_of_charge", name="State of Charge", + native_unit_of_measurement=PERCENTAGE, + device_class=SensorDeviceClass.BATTERY), + SensorEntityDescription(key="residual_capacity", name="Remaining Capacity", + native_unit_of_measurement="Ah"), + SensorEntityDescription(key="nominal_capacity", name="Nominal Capacity", + native_unit_of_measurement="Ah"), + SensorEntityDescription(key="cycle_count", name="Cycle Count"), + SensorEntityDescription(key="cell_delta", name="Cell Voltage Delta", + native_unit_of_measurement="mV"), +] +``` + +--- + +### Step 7 — `__init__.py` + +```python +async def async_setup_entry(hass, entry): + coordinator = BmsCoordinator(hass, entry.data["address"]) + await coordinator._async_setup() + await coordinator.async_config_entry_first_refresh() + hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator + await hass.config_entries.async_forward_entry_setups(entry, ["sensor"]) + return True + +async def async_unload_entry(hass, entry): + coordinator = hass.data[DOMAIN].pop(entry.entry_id) + await coordinator._handler.disconnect() + return await hass.config_entries.async_unload_platforms(entry, ["sensor"]) +``` + +--- + +## Installation + +### Option A — Manual (any HA install) + +```bash +# On your HA host (SSH or Samba share) +cp -r custom_components/xiaoxiang_bms \ + /config/custom_components/xiaoxiang_bms + +# Restart HA, then: +# Settings → Integrations → Add → search "Xiaoxiang BMS" +``` + +### Option B — HACS Custom Repository + +1. HACS → Custom Repositories → add this repo URL → Category: Integration +2. Install → Restart HA +3. Settings → Integrations → Add → "Xiaoxiang BMS" + +--- + +## Key Technical Decisions + +| Decision | Choice | Reason | +|----------|--------|--------| +| Addon vs Integration | Integration | Direct HA sensor entities, no MQTT needed | +| BLE library | bleak (HA-bundled) | No extra deps, HA manages BT adapter lifecycle | +| Discovery | Bluetooth service UUID match | User gets auto-discovery notification in HA | +| Poll vs Push | Poll (30s) | BMS doesn't send unsolicited notifications; request→response protocol | +| Data model | DataUpdateCoordinator | Single shared connection, all sensors refresh together | +| Dynamic entities | First-poll count | Cell/temp count varies by battery; discover at runtime | + +--- + +## Development & Testing + +### Test without real hardware + +Use a BLE UART simulator (e.g., `nRF Connect` app or a Raspberry Pi with +`bluezero` or `bumble`) to send back canned frames to validate parsing. + +```python +# Minimal test for parser +frame = bytes([0xDD, 0x03, 0x00, 0x1B, + 0x05, 0xDC, # voltage: 15.00V + 0xFF, 0x9C, # current: -1.00A (charging) + 0x09, 0x60, # residual: 24.00Ah + 0x0B, 0xB8, # nominal: 30.00Ah + 0x00, 0x05, # cycles: 5 + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # dates/balance/protection/ver + 0x64, # SoC: 100% + 0x03, # MOS + 0x10, # 16 cells + 0x02, # 2 temp probes + 0x0A, 0xAB, # temp1: (2731 - 2731) / 10 → wait: (0x0AAB=2731) → 0°C + 0x0A, 0xC5, # temp2: (2757 - 2731) / 10 → 2.6°C + 0xFF, 0xFD, 0x77]) # checksum + end +result = BmsBluetoothHandler.parse_general_info(frame) +assert result["voltage"] == 15.0 +assert result["current"] == -1.0 +``` + +### Validate on real HA + +```bash +# Enable debug logging in configuration.yaml: +logger: + logs: + custom_components.xiaoxiang_bms: debug +``` + +--- + +## Risks & Mitigations + +| Risk | Mitigation | +|------|-----------| +| BLE not available on HA host | Docs note: requires Bluetooth hardware; HA OS has it, Docker needs `--privileged` or BT passthrough | +| Chunked BLE frames | Buffer accumulation in `_on_notify` until `0x77` end byte seen | +| BMS goes out of range | `UpdateFailed` → HA marks sensors unavailable; reconnect on next poll | +| Multiple BMS units | Each config entry = separate coordinator = separate MAC | +| Frame byte offsets vary by firmware | Add firmware version detection; make offsets configurable | + +--- + +## Implementation Order + +- [ ] 1. `const.py` — UUIDs and command bytes +- [ ] 2. `bluetooth_handler.py` — protocol parser (unit-testable standalone) +- [ ] 3. `coordinator.py` — DataUpdateCoordinator wrapping the handler +- [ ] 4. `manifest.json` — HA metadata +- [ ] 5. `__init__.py` — entry setup/unload +- [ ] 6. `config_flow.py` — BT scan + manual entry +- [ ] 7. `sensor.py` — all sensor entities +- [ ] 8. `strings.json` / `translations/en.json` — UI text +- [ ] 9. Local test without hardware (canned frames) +- [ ] 10. Test on real HA + real BMS diff --git a/RS485-UART-RS232-Communication-protocol.pdf b/RS485-UART-RS232-Communication-protocol.pdf new file mode 100644 index 0000000..8a3692e Binary files /dev/null and b/RS485-UART-RS232-Communication-protocol.pdf differ diff --git a/custom_components/xiaoxiang_bms/__init__.py b/custom_components/xiaoxiang_bms/__init__.py new file mode 100644 index 0000000..c7891d3 --- /dev/null +++ b/custom_components/xiaoxiang_bms/__init__.py @@ -0,0 +1,42 @@ +"""Xiaoxiang Smart BMS — Home Assistant integration.""" +from __future__ import annotations + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant + +from .const import CONF_ADDRESS, CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL, DOMAIN +from .coordinator import BmsCoordinator + +PLATFORMS = ["sensor"] + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up the BMS integration from a config entry.""" + address = entry.data[CONF_ADDRESS] + poll_interval = entry.options.get(CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL) + + coordinator = BmsCoordinator(hass, address, poll_interval) + await coordinator.async_setup() + await coordinator.async_config_entry_first_refresh() + + hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + + # Reload the entry when options (e.g. poll interval) change + entry.async_on_unload(entry.add_update_listener(_async_update_listener)) + + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload the BMS integration.""" + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + if unload_ok: + coordinator: BmsCoordinator = hass.data[DOMAIN].pop(entry.entry_id) + await coordinator.async_teardown() + return unload_ok + + +async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: + """Reload the entry when options are changed.""" + await hass.config_entries.async_reload(entry.entry_id) diff --git a/custom_components/xiaoxiang_bms/bluetooth_handler.py b/custom_components/xiaoxiang_bms/bluetooth_handler.py new file mode 100644 index 0000000..2bcba99 --- /dev/null +++ b/custom_components/xiaoxiang_bms/bluetooth_handler.py @@ -0,0 +1,192 @@ +"""BLE GATT communication handler for the Xiaoxiang Smart BMS.""" +from __future__ import annotations + +import asyncio +import logging +import struct + +from bleak import BleakClient, BleakError + +from .const import ( + FRAME_END, + FRAME_START, + RX_CHAR_UUID, + TX_CHAR_UUID, +) + +_LOGGER = logging.getLogger(__name__) + +# Full frame layout: +# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77] +# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker) +_HEADER_LEN = 4 +_TRAILER_LEN = 3 + + +class BmsBluetoothHandler: + """Manages BLE connection and protocol framing for a Xiaoxiang BMS device.""" + + def __init__(self, address: str) -> None: + self._address = address + self._client: BleakClient | None = None + self._buffer = bytearray() + self._response_event = asyncio.Event() + self._response_data: bytes | None = None + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Connection management + # ------------------------------------------------------------------ + + @property + def is_connected(self) -> bool: + return self._client is not None and self._client.is_connected + + async def connect(self) -> None: + """Open BLE connection and start notifications.""" + if self.is_connected: + return + _LOGGER.debug("Connecting to BMS at %s", self._address) + self._client = BleakClient( + self._address, + disconnected_callback=self._on_disconnect, + ) + await self._client.connect() + await self._client.start_notify(RX_CHAR_UUID, self._on_notify) + _LOGGER.debug("Connected to BMS at %s", self._address) + + async def disconnect(self) -> None: + """Close the BLE connection cleanly.""" + if self._client: + try: + await self._client.disconnect() + except Exception: + pass + self._client = None + + def _on_disconnect(self, _client: BleakClient) -> None: + _LOGGER.debug("BMS at %s disconnected", self._address) + self._client = None + + # ------------------------------------------------------------------ + # Frame reception + # ------------------------------------------------------------------ + + def _on_notify(self, _char, data: bytearray) -> None: + """Accumulate BLE notification chunks into complete protocol frames. + + BLE max payload is 20 bytes (default MTU), so a single BMS frame + (up to ~50 bytes for 16 cells) arrives across several notifications. + We buffer until we can calculate and verify the expected frame length. + """ + self._buffer.extend(data) + + # Discard leading garbage until we see a frame start byte + while self._buffer and self._buffer[0] != FRAME_START: + self._buffer.pop(0) + + # Need at least the 4-byte header to know payload length + if len(self._buffer) < _HEADER_LEN: + return + + payload_len = self._buffer[3] + expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN + + if len(self._buffer) < expected_total: + return # still waiting for more chunks + + frame = bytes(self._buffer[:expected_total]) + del self._buffer[:expected_total] + + if frame[-1] != FRAME_END: + _LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex()) + return + + if frame[2] != 0x00: + _LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X", + frame[2], frame[1]) + return + + _LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len) + self._response_data = frame + self._response_event.set() + + # ------------------------------------------------------------------ + # Request / response + # ------------------------------------------------------------------ + + async def request(self, command: bytes, timeout: float = 5.0) -> bytes | None: + """Send a command frame and wait for the corresponding response frame.""" + async with self._lock: + self._response_event.clear() + self._response_data = None + try: + await self._client.write_gatt_char(TX_CHAR_UUID, command, response=False) + except BleakError as exc: + _LOGGER.error("BLE write failed: %s", exc) + return None + try: + await asyncio.wait_for(self._response_event.wait(), timeout) + return self._response_data + except asyncio.TimeoutError: + _LOGGER.warning("BMS response timeout (cmd=%s)", command.hex()) + return None + + # ------------------------------------------------------------------ + # Frame parsers + # ------------------------------------------------------------------ + + @staticmethod + def parse_general_info(frame: bytes) -> dict: + """Parse a 0x03 general info response frame. + + Payload byte offsets (frame[4] is payload[0]): + 0-1 Total voltage uint16 BE ÷100 → V + 2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging) + 4-5 Residual capacity uint16 BE ÷100 → Ah + 6-7 Nominal capacity uint16 BE ÷100 → Ah + 8-9 Cycle count uint16 BE + 10-11 Production date (ignored) + 12-15 Balance status (ignored) + 16-17 Protection status (ignored) + 18 Software version (ignored) + 19 State of charge uint8 % + 20 MOS status uint8 + 21 Cell count uint8 + 22 Temp probe count uint8 + 23+ Temperatures uint16 BE each (raw − 2731) ÷ 10 → °C + """ + p = frame[_HEADER_LEN:-_TRAILER_LEN] + + temp_count = p[22] + temperatures: list[float] = [] + for i in range(temp_count): + raw = struct.unpack_from(">H", p, 23 + i * 2)[0] + temperatures.append(round((raw - 2731) / 10.0, 1)) + + return { + "voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2), + "current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2), + "residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2), + "nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2), + "cycle_count": struct.unpack_from(">H", p, 8)[0], + "state_of_charge": p[19], + "cell_count": p[21], + "temperatures": temperatures, + } + + @staticmethod + def parse_cell_info(frame: bytes) -> dict: + """Parse a 0x04 cell voltage response frame. + + Per spec: frame[3] (the header length byte) = cell_count × 2. + The payload contains ONLY the voltage bytes — no count byte. + 0+ Cell voltages uint16 BE each unit mV ÷1000 → V + """ + count = frame[3] // 2 # header length byte = N_cells × 2 + p = frame[_HEADER_LEN:-_TRAILER_LEN] + voltages: list[float] = [] + for i in range(count): + raw = struct.unpack_from(">H", p, i * 2)[0] + voltages.append(round(raw / 1000.0, 3)) + return {"cell_voltages": voltages} diff --git a/custom_components/xiaoxiang_bms/config_flow.py b/custom_components/xiaoxiang_bms/config_flow.py new file mode 100644 index 0000000..53536aa --- /dev/null +++ b/custom_components/xiaoxiang_bms/config_flow.py @@ -0,0 +1,130 @@ +"""Config flow for the Xiaoxiang Smart BMS integration.""" +from __future__ import annotations + +import voluptuous as vol +from homeassistant import config_entries +from homeassistant.components.bluetooth import ( + BluetoothServiceInfoBleak, + async_discovered_service_info, +) +from homeassistant.core import callback +from homeassistant.data_entry_flow import FlowResult + +from .const import ( + CONF_ADDRESS, + CONF_POLL_INTERVAL, + DEFAULT_POLL_INTERVAL, + DOMAIN, + MAX_POLL_INTERVAL, + MIN_POLL_INTERVAL, + UART_SERVICE_UUID, +) + + +class XiaoxiangBmsConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a config flow for the Xiaoxiang Smart BMS.""" + + VERSION = 1 + + def __init__(self) -> None: + self._address: str | None = None + self._name: str | None = None + + # ------------------------------------------------------------------ + # Auto-discovery path (HA sees BMS advertising matching service UUID) + # ------------------------------------------------------------------ + + async def async_step_bluetooth( + self, discovery_info: BluetoothServiceInfoBleak + ) -> FlowResult: + """Handle BLE auto-discovery.""" + self._address = discovery_info.address + self._name = discovery_info.name or f"Xiaoxiang BMS ({discovery_info.address})" + + await self.async_set_unique_id(discovery_info.address) + self._abort_if_unique_id_configured() + + self.context["title_placeholders"] = {"name": self._name} + return await self.async_step_confirm() + + async def async_step_confirm(self, user_input: dict | None = None) -> FlowResult: + """Confirm auto-discovered device.""" + if user_input is not None: + return self.async_create_entry( + title=self._name, + data={CONF_ADDRESS: self._address}, + ) + return self.async_show_form( + step_id="confirm", + description_placeholders={ + "name": self._name, + "address": self._address, + }, + ) + + # ------------------------------------------------------------------ + # Manual path (user goes to Integrations → Add → Xiaoxiang BMS) + # ------------------------------------------------------------------ + + async def async_step_user(self, user_input: dict | None = None) -> FlowResult: + """Handle manual setup. Shows discovered BMS devices as a dropdown if found.""" + errors: dict[str, str] = {} + + if user_input is not None: + address = user_input[CONF_ADDRESS].upper().strip() + await self.async_set_unique_id(address) + self._abort_if_unique_id_configured() + return self.async_create_entry( + title=f"Xiaoxiang BMS ({address})", + data={CONF_ADDRESS: address}, + ) + + # Build a dropdown of already-discovered BMS devices (filtered by service UUID) + discovered: dict[str, str] = { + info.address: f"{info.name or 'Xiaoxiang BMS'} ({info.address})" + for info in async_discovered_service_info(self.hass) + if UART_SERVICE_UUID in (info.service_uuids or []) + } + + if discovered: + schema = vol.Schema({vol.Required(CONF_ADDRESS): vol.In(discovered)}) + else: + schema = vol.Schema({ + vol.Required(CONF_ADDRESS): str, + }) + + return self.async_show_form( + step_id="user", + data_schema=schema, + errors=errors, + ) + + # ------------------------------------------------------------------ + # Options flow (change poll interval after setup) + # ------------------------------------------------------------------ + + @staticmethod + @callback + def async_get_options_flow(config_entry: config_entries.ConfigEntry) -> BmsOptionsFlow: + return BmsOptionsFlow(config_entry) + + +class BmsOptionsFlow(config_entries.OptionsFlow): + """Allow the user to change the poll interval after initial setup.""" + + def __init__(self, config_entry: config_entries.ConfigEntry) -> None: + self._config_entry = config_entry + + async def async_step_init(self, user_input: dict | None = None) -> FlowResult: + if user_input is not None: + return self.async_create_entry(title="", data=user_input) + + current = self._config_entry.options.get(CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL) + return self.async_show_form( + step_id="init", + data_schema=vol.Schema({ + vol.Required(CONF_POLL_INTERVAL, default=current): vol.All( + int, vol.Range(min=MIN_POLL_INTERVAL, max=MAX_POLL_INTERVAL) + ), + }), + ) diff --git a/custom_components/xiaoxiang_bms/const.py b/custom_components/xiaoxiang_bms/const.py new file mode 100644 index 0000000..60980b5 --- /dev/null +++ b/custom_components/xiaoxiang_bms/const.py @@ -0,0 +1,27 @@ +"""Constants for the Xiaoxiang Smart BMS integration.""" + +DOMAIN = "xiaoxiang_bms" + +CONF_ADDRESS = "address" +CONF_POLL_INTERVAL = "poll_interval" + +DEFAULT_POLL_INTERVAL = 5 # seconds — fast for solar/energy monitoring +MIN_POLL_INTERVAL = 2 +MAX_POLL_INTERVAL = 60 + +# GATT UUIDs (Xiaoxiang BMS UART-over-GATT) +UART_SERVICE_UUID = "0000ff00-0000-1000-8000-00805f9b34fb" +RX_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb" # BMS → HA (notify) +TX_CHAR_UUID = "0000ff02-0000-1000-8000-00805f9b34fb" # HA → BMS (write) + +# Request frames: [0xDD, 0xA5, CMD, 0x00, CHK_HI, CHK_LO, 0x77] +CMD_GENERAL = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]) # pack info +CMD_CELL = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77]) # cell voltages + +# Frame markers +FRAME_START = 0xDD +FRAME_END = 0x77 + +# Response command IDs (byte 1 of frame) +CMD_ID_GENERAL = 0x03 +CMD_ID_CELL = 0x04 diff --git a/custom_components/xiaoxiang_bms/coordinator.py b/custom_components/xiaoxiang_bms/coordinator.py new file mode 100644 index 0000000..7179f29 --- /dev/null +++ b/custom_components/xiaoxiang_bms/coordinator.py @@ -0,0 +1,93 @@ +"""DataUpdateCoordinator for the Xiaoxiang Smart BMS.""" +from __future__ import annotations + +import logging +from datetime import timedelta + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed + +from .bluetooth_handler import BmsBluetoothHandler +from .const import CMD_CELL, CMD_GENERAL, DOMAIN + +_LOGGER = logging.getLogger(__name__) + + +class BmsCoordinator(DataUpdateCoordinator[dict]): + """Polls the BMS over BLE and distributes data to all sensor entities.""" + + def __init__( + self, + hass: HomeAssistant, + address: str, + poll_interval: int, + ) -> None: + super().__init__( + hass, + _LOGGER, + name=DOMAIN, + update_interval=timedelta(seconds=poll_interval), + ) + self.address = address + self._handler = BmsBluetoothHandler(address) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def async_setup(self) -> None: + """Connect to the BMS. Called once during config entry setup.""" + await self._handler.connect() + + async def async_teardown(self) -> None: + """Disconnect cleanly. Called on entry unload.""" + await self._handler.disconnect() + + # ------------------------------------------------------------------ + # Poll + # ------------------------------------------------------------------ + + async def _async_update_data(self) -> dict: + """Fetch data from the BMS. Reconnects automatically if disconnected.""" + if not self._handler.is_connected: + _LOGGER.debug("BMS not connected, attempting reconnect…") + try: + await self._handler.connect() + except Exception as exc: + raise UpdateFailed(f"BMS reconnect failed: {exc}") from exc + + try: + general_frame = await self._handler.request(CMD_GENERAL) + if general_frame is None: + raise UpdateFailed("No response to general info request (0x03)") + + cell_frame = await self._handler.request(CMD_CELL) + if cell_frame is None: + raise UpdateFailed("No response to cell info request (0x04)") + except UpdateFailed: + raise + except Exception as exc: + raise UpdateFailed(f"BLE communication error: {exc}") from exc + + data = BmsBluetoothHandler.parse_general_info(general_frame) + data.update(BmsBluetoothHandler.parse_cell_info(cell_frame)) + + # Derived fields + data["power"] = round(data["voltage"] * data["current"], 2) + + if data["cell_voltages"]: + v_max = max(data["cell_voltages"]) + v_min = min(data["cell_voltages"]) + data["cell_delta"] = round((v_max - v_min) * 1000, 1) + else: + data["cell_delta"] = None + + _LOGGER.debug( + "BMS data: %.2fV %.2fA %d%% %.2fAh %d cells", + data["voltage"], + data["current"], + data["state_of_charge"], + data["residual_capacity"], + len(data["cell_voltages"]), + ) + return data diff --git a/custom_components/xiaoxiang_bms/manifest.json b/custom_components/xiaoxiang_bms/manifest.json new file mode 100644 index 0000000..ae7c5ce --- /dev/null +++ b/custom_components/xiaoxiang_bms/manifest.json @@ -0,0 +1,14 @@ +{ + "domain": "xiaoxiang_bms", + "name": "Xiaoxiang Smart BMS", + "version": "1.0.0", + "config_flow": true, + "dependencies": ["bluetooth"], + "bluetooth": [ + {"service_uuid": "0000ff00-0000-1000-8000-00805f9b34fb"} + ], + "requirements": [], + "codeowners": [], + "iot_class": "local_polling", + "documentation": "https://github.com/Jnnshschl/AndroidBMSApp" +} diff --git a/custom_components/xiaoxiang_bms/sensor.py b/custom_components/xiaoxiang_bms/sensor.py new file mode 100644 index 0000000..2b466de --- /dev/null +++ b/custom_components/xiaoxiang_bms/sensor.py @@ -0,0 +1,228 @@ +"""Sensor entities for the Xiaoxiang Smart BMS integration.""" +from __future__ import annotations + +from homeassistant.components.sensor import ( + SensorDeviceClass, + SensorEntity, + SensorEntityDescription, + SensorStateClass, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ( + PERCENTAGE, + UnitOfElectricCurrent, + UnitOfElectricPotential, + UnitOfPower, + UnitOfTemperature, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from .const import CONF_ADDRESS, DOMAIN +from .coordinator import BmsCoordinator + +# --------------------------------------------------------------------------- +# Static sensor definitions — one entity per key in coordinator.data +# --------------------------------------------------------------------------- + +STATIC_SENSORS: tuple[SensorEntityDescription, ...] = ( + SensorEntityDescription( + key="voltage", + name="Voltage", + native_unit_of_measurement=UnitOfElectricPotential.VOLT, + device_class=SensorDeviceClass.VOLTAGE, + state_class=SensorStateClass.MEASUREMENT, + suggested_display_precision=2, + ), + SensorEntityDescription( + key="current", + name="Current", + native_unit_of_measurement=UnitOfElectricCurrent.AMPERE, + device_class=SensorDeviceClass.CURRENT, + state_class=SensorStateClass.MEASUREMENT, + suggested_display_precision=2, + ), + SensorEntityDescription( + key="power", + name="Power", + native_unit_of_measurement=UnitOfPower.WATT, + device_class=SensorDeviceClass.POWER, + state_class=SensorStateClass.MEASUREMENT, + suggested_display_precision=1, + ), + SensorEntityDescription( + key="state_of_charge", + name="State of Charge", + native_unit_of_measurement=PERCENTAGE, + device_class=SensorDeviceClass.BATTERY, + state_class=SensorStateClass.MEASUREMENT, + ), + SensorEntityDescription( + key="residual_capacity", + name="Remaining Capacity", + native_unit_of_measurement="Ah", + state_class=SensorStateClass.MEASUREMENT, + suggested_display_precision=2, + icon="mdi:battery-charging", + ), + SensorEntityDescription( + key="nominal_capacity", + name="Nominal Capacity", + native_unit_of_measurement="Ah", + state_class=SensorStateClass.MEASUREMENT, + suggested_display_precision=2, + icon="mdi:battery", + ), + SensorEntityDescription( + key="cycle_count", + name="Cycle Count", + state_class=SensorStateClass.TOTAL_INCREASING, + icon="mdi:battery-sync", + ), + SensorEntityDescription( + key="cell_delta", + name="Cell Voltage Delta", + native_unit_of_measurement="mV", + state_class=SensorStateClass.MEASUREMENT, + suggested_display_precision=1, + icon="mdi:battery-alert-variant", + ), +) + + +# --------------------------------------------------------------------------- +# Platform setup +# --------------------------------------------------------------------------- + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Create all BMS sensor entities after the first successful poll.""" + coordinator: BmsCoordinator = hass.data[DOMAIN][entry.entry_id] + address: str = entry.data[CONF_ADDRESS] + + entities: list[SensorEntity] = [ + BmsStaticSensor(coordinator, description, address) + for description in STATIC_SENSORS + ] + + # Temperature sensors — count determined from first poll + for i in range(len(coordinator.data.get("temperatures", []))): + entities.append( + BmsDynamicSensor( + coordinator=coordinator, + address=address, + unique_key=f"temperature_{i + 1}", + friendly_name=f"Temperature {i + 1}", + data_key="temperatures", + index=i, + unit=UnitOfTemperature.CELSIUS, + device_class=SensorDeviceClass.TEMPERATURE, + icon=None, + ) + ) + + # Cell voltage sensors — count determined from first poll + for i in range(len(coordinator.data.get("cell_voltages", []))): + entities.append( + BmsDynamicSensor( + coordinator=coordinator, + address=address, + unique_key=f"cell_voltage_{i + 1}", + friendly_name=f"Cell {i + 1} Voltage", + data_key="cell_voltages", + index=i, + unit=UnitOfElectricPotential.VOLT, + device_class=SensorDeviceClass.VOLTAGE, + icon=None, + ) + ) + + async_add_entities(entities) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _device_info(address: str) -> DeviceInfo: + return DeviceInfo( + identifiers={(DOMAIN, address)}, + name="Xiaoxiang Smart BMS", + manufacturer="Xiaoxiang", + model="Smart BMS", + ) + + +# --------------------------------------------------------------------------- +# Entity classes +# --------------------------------------------------------------------------- + +class BmsStaticSensor(CoordinatorEntity[BmsCoordinator], SensorEntity): + """A sensor whose key maps directly to a scalar value in coordinator.data.""" + + _attr_has_entity_name = True + + def __init__( + self, + coordinator: BmsCoordinator, + description: SensorEntityDescription, + address: str, + ) -> None: + super().__init__(coordinator) + self.entity_description = description + self._attr_unique_id = f"{address}_{description.key}" + self._attr_device_info = _device_info(address) + + @property + def native_value(self): + return self.coordinator.data.get(self.entity_description.key) + + +class BmsDynamicSensor(CoordinatorEntity[BmsCoordinator], SensorEntity): + """A sensor that reads an element from a list inside coordinator.data. + + Used for per-cell voltages and per-probe temperatures, whose count is + only known after the first successful BMS poll. + """ + + _attr_has_entity_name = True + _attr_state_class = SensorStateClass.MEASUREMENT + + def __init__( + self, + coordinator: BmsCoordinator, + address: str, + unique_key: str, + friendly_name: str, + data_key: str, + index: int, + unit: str, + device_class: SensorDeviceClass | None, + icon: str | None, + ) -> None: + super().__init__(coordinator) + self._data_key = data_key + self._index = index + self._attr_unique_id = f"{address}_{unique_key}" + self._attr_name = friendly_name + self._attr_native_unit_of_measurement = unit + self._attr_device_class = device_class + self._attr_device_info = _device_info(address) + if icon: + self._attr_icon = icon + if device_class == SensorDeviceClass.VOLTAGE: + self._attr_suggested_display_precision = 3 + elif device_class == SensorDeviceClass.TEMPERATURE: + self._attr_suggested_display_precision = 1 + + @property + def native_value(self): + values: list = self.coordinator.data.get(self._data_key, []) + if self._index < len(values): + return values[self._index] + return None diff --git a/custom_components/xiaoxiang_bms/strings.json b/custom_components/xiaoxiang_bms/strings.json new file mode 100644 index 0000000..5109763 --- /dev/null +++ b/custom_components/xiaoxiang_bms/strings.json @@ -0,0 +1,40 @@ +{ + "config": { + "step": { + "user": { + "title": "Xiaoxiang Smart BMS", + "description": "Enter the Bluetooth address of your BMS, or select a discovered device from the dropdown.", + "data": { + "address": "Bluetooth Address" + }, + "data_description": { + "address": "MAC address of the BMS (e.g. AA:BB:CC:DD:EE:FF). Tip: pair the device first via your system Bluetooth settings." + } + }, + "confirm": { + "title": "Confirm Xiaoxiang BMS", + "description": "Connect to **{name}** ({address})?\n\nMake sure the BMS is powered on and within Bluetooth range." + } + }, + "abort": { + "already_configured": "This BMS is already configured." + }, + "error": { + "cannot_connect": "Unable to connect to the BMS. Check the address and try again." + } + }, + "options": { + "step": { + "init": { + "title": "BMS Options", + "description": "Adjust the polling interval. Lower values give more accurate real-time readings but use more Bluetooth bandwidth.", + "data": { + "poll_interval": "Poll interval (seconds)" + }, + "data_description": { + "poll_interval": "How often to request data from the BMS (2–60 seconds). Default is 5 s for solar/energy monitoring." + } + } + } + } +} diff --git a/hacs.json b/hacs.json new file mode 100644 index 0000000..800ba78 --- /dev/null +++ b/hacs.json @@ -0,0 +1,5 @@ +{ + "name": "Xiaoxiang Smart BMS", + "render_readme": true, + "homeassistant": "2023.6.0" +} diff --git a/translations/en.json b/translations/en.json new file mode 100644 index 0000000..5109763 --- /dev/null +++ b/translations/en.json @@ -0,0 +1,40 @@ +{ + "config": { + "step": { + "user": { + "title": "Xiaoxiang Smart BMS", + "description": "Enter the Bluetooth address of your BMS, or select a discovered device from the dropdown.", + "data": { + "address": "Bluetooth Address" + }, + "data_description": { + "address": "MAC address of the BMS (e.g. AA:BB:CC:DD:EE:FF). Tip: pair the device first via your system Bluetooth settings." + } + }, + "confirm": { + "title": "Confirm Xiaoxiang BMS", + "description": "Connect to **{name}** ({address})?\n\nMake sure the BMS is powered on and within Bluetooth range." + } + }, + "abort": { + "already_configured": "This BMS is already configured." + }, + "error": { + "cannot_connect": "Unable to connect to the BMS. Check the address and try again." + } + }, + "options": { + "step": { + "init": { + "title": "BMS Options", + "description": "Adjust the polling interval. Lower values give more accurate real-time readings but use more Bluetooth bandwidth.", + "data": { + "poll_interval": "Poll interval (seconds)" + }, + "data_description": { + "poll_interval": "How often to request data from the BMS (2–60 seconds). Default is 5 s for solar/energy monitoring." + } + } + } + } +}