Files
Jannis 4a769bf50f Initial commit: Xiaoxiang Smart BMS Home Assistant integration
- BLE GATT communication via bleak (UART-over-GATT, service 0xff00)
- Parses 0x03 (general info) and 0x04 (cell voltages) frames
- Protocol verified against official RS485/UART spec V4
- DataUpdateCoordinator with 5s poll interval (configurable 2-60s)
- Auto-reconnect on BLE disconnect
- Config flow: BT auto-discovery + manual MAC entry + options flow
- Sensors: voltage, current, power, SoC, capacity, cycles, temps, cells, cell delta
- HACS-ready (hacs.json, manifest.json)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 19:07:18 +02:00

479 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Xiaoxiang Smart BMS — Home Assistant Integration Masterplan
## Overview
A **native HA custom integration** (not an addon) that connects to your Xiaoxiang BMS
over Bluetooth Low Energy (BLE), parses the proprietary UART-over-GATT protocol,
and exposes battery data as first-class Home Assistant sensor entities.
No MQTT broker, no extra container, no sidecar — just a `custom_components/` folder
you drop into your HA config directory.
---
## Architecture
```
Home Assistant
└── Bluetooth subsystem (built-in)
└── xiaoxiang_bms (custom_component)
├── Config Flow ← GUI: scan BT, pick device, save MAC
├── DataUpdateCoordinator ← polls BMS every 30s via BLE
│ └── BluetoothHandler ← BLE GATT read/write (bleak)
│ ├── Request 0x03 → General info frame
│ └── Request 0x04 → Cell voltages frame
└── SensorEntity (×N) ← one entity per data field
```
---
## BMS Protocol Reference
Derived from: https://github.com/Jnnshschl/AndroidBMSApp
### GATT UUIDs
| Role | UUID |
|------|------|
| UART Service | `0000ff00-0000-1000-8000-00805f9b34fb` |
| RX (notify/read) | `0000ff01-0000-1000-8000-00805f9b34fb` |
| TX (write) | `0000ff02-0000-1000-8000-00805f9b34fb` |
### Frame Format
```
[0xDD] [CMD] [STATUS] [LENGTH] [PAYLOAD...] [CHECKSUM_HI] [CHECKSUM_LO] [0x77]
```
- All multi-byte values: **Big-endian**
- Max frame size: 80 bytes
### Request Commands
```python
CMD_GENERAL_INFO = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
CMD_CELL_INFO = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77])
```
### General Info Response (0x03) — byte offsets in payload
| Offset | Length | Field | Scaling | Unit |
|--------|--------|-------|---------|------|
| 0-1 | 2 | Total voltage | ÷100 | V |
| 2-3 | 2 | Current (signed) | ÷100 | A |
| 4-5 | 2 | Residual capacity | ÷100 | Ah |
| 6-7 | 2 | Nominal capacity | ÷100 | Ah |
| 8-9 | 2 | Cycle count | ×1 | - |
| 10-11 | 2 | Production date | - | (skip) |
| 12-15 | 4 | Balance status bits | - | (skip) |
| 16-17 | 2 | Protection status | - | (skip) |
| 18 | 1 | Version | - | (skip) |
| 19 | 1 | State of charge | ×1 | % |
| 20 | 1 | MOS status | - | (skip) |
| 21 | 1 | Cell count | ×1 | - |
| 22 | 1 | Temp probe count | ×1 | - |
| 23+ | 2×N | Temperatures | (val2731)÷10 | °C |
> Current is a signed 16-bit int; negative = charging, positive = discharging
### Cell Info Response (0x04) — byte offsets in payload
| Offset | Length | Field | Scaling | Unit |
|--------|--------|-------|---------|------|
| 0 | 1 | Cell count (÷2) | - | - |
| 1+ | 2×N | Cell voltages | ÷1000 | V |
---
## Entities Exposed in Home Assistant
| Entity | Device Class | Unit | Notes |
|--------|-------------|------|-------|
| `sensor.bms_voltage` | voltage | V | Total pack voltage |
| `sensor.bms_current` | current | A | Signed (neg=charge) |
| `sensor.bms_power` | power | W | Calculated: V × I |
| `sensor.bms_state_of_charge` | battery | % | 0100 |
| `sensor.bms_residual_capacity` | energy_storage | Ah | Remaining Ah |
| `sensor.bms_nominal_capacity` | energy_storage | Ah | Full capacity |
| `sensor.bms_cycle_count` | - | - | Lifetime cycles |
| `sensor.bms_temperature_1..N` | temperature | °C | Per probe |
| `sensor.bms_cell_voltage_1..N` | voltage | V | Per cell |
| `sensor.bms_cell_delta` | voltage | mV | Maxmin cell diff |
---
## File Structure
```
custom_components/
└── xiaoxiang_bms/
├── __init__.py # Integration setup / unload
├── manifest.json # HA metadata, bluetooth dependency
├── const.py # All constants (UUIDs, commands, offsets)
├── config_flow.py # GUI setup wizard (BT scan → pick device → save)
├── coordinator.py # DataUpdateCoordinator — poll loop + data store
├── bluetooth_handler.py # Low-level BLE GATT comms (bleak)
├── sensor.py # SensorEntity definitions
└── strings.json # UI strings
translations/
└── en.json # English translations for config flow
```
---
## Implementation Steps
### Step 1 — `manifest.json`
```json
{
"domain": "xiaoxiang_bms",
"name": "Xiaoxiang Smart BMS",
"version": "1.0.0",
"config_flow": true,
"dependencies": ["bluetooth"],
"bluetooth": [
{"service_uuid": "0000ff00-0000-1000-8000-00805f9b34fb"}
],
"requirements": [],
"codeowners": ["@you"],
"iot_class": "local_polling"
}
```
> Declaring `bluetooth` in `dependencies` lets HA's Bluetooth subsystem manage
> the adapter and lets the config flow use `bluetooth.async_discovered_service_info`.
---
### Step 2 — `const.py`
```python
DOMAIN = "xiaoxiang_bms"
DEFAULT_POLL_INTERVAL = 30 # seconds
# GATT UUIDs
UART_SERVICE_UUID = "0000ff00-0000-1000-8000-00805f9b34fb"
RX_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
TX_CHAR_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
# Request frames
CMD_GENERAL = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
CMD_CELL = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77])
# Frame markers
FRAME_START = 0xDD
FRAME_END = 0x77
CMD_GENERAL_ID = 0x03
CMD_CELL_ID = 0x04
```
---
### Step 3 — `bluetooth_handler.py`
Core BLE communication class using **bleak** (already bundled with HA):
```python
import asyncio
import struct
from bleak import BleakClient
class BmsBluetoothHandler:
def __init__(self, address: str):
self._address = address
self._client: BleakClient | None = None
self._response_buffer = bytearray()
self._response_event = asyncio.Event()
self._response_data: bytes | None = None
async def connect(self):
self._client = BleakClient(self._address)
await self._client.connect()
await self._client.start_notify(RX_CHAR_UUID, self._on_notify)
async def disconnect(self):
if self._client and self._client.is_connected:
await self._client.disconnect()
def _on_notify(self, _, data: bytearray):
"""Accumulate chunks until complete frame received."""
self._response_buffer.extend(data)
# Check for end marker
if self._response_buffer and self._response_buffer[-1] == FRAME_END:
if self._response_buffer[0] == FRAME_START:
self._response_data = bytes(self._response_buffer)
self._response_buffer.clear()
self._response_event.set()
async def request(self, command: bytes, timeout=5.0) -> bytes | None:
self._response_event.clear()
self._response_data = None
await self._client.write_gatt_char(TX_CHAR_UUID, command)
try:
await asyncio.wait_for(self._response_event.wait(), timeout)
return self._response_data
except asyncio.TimeoutError:
return None
@staticmethod
def parse_general_info(frame: bytes) -> dict:
# frame[0]=0xDD, frame[1]=CMD, frame[2]=status, frame[3]=length
# payload starts at frame[4]
p = frame[4:]
temp_count = p[22]
temps = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temps.append((raw - 2731) / 10.0)
return {
"voltage": struct.unpack_from(">H", p, 0)[0] / 100.0,
"current": struct.unpack_from(">h", p, 2)[0] / 100.0, # signed!
"residual_capacity":struct.unpack_from(">H", p, 4)[0] / 100.0,
"nominal_capacity": struct.unpack_from(">H", p, 6)[0] / 100.0,
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temps,
}
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
p = frame[4:]
count = p[0] // 2
voltages = []
for i in range(count):
raw = struct.unpack_from(">H", p, 1 + i * 2)[0]
voltages.append(raw / 1000.0)
return {"cell_voltages": voltages}
```
---
### Step 4 — `coordinator.py`
```python
from datetime import timedelta
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
class BmsCoordinator(DataUpdateCoordinator):
def __init__(self, hass, address: str):
super().__init__(hass, logger, name="xiaoxiang_bms",
update_interval=timedelta(seconds=DEFAULT_POLL_INTERVAL))
self._handler = BmsBluetoothHandler(address)
self.data = {}
async def _async_setup(self):
await self._handler.connect()
async def _async_update_data(self):
try:
general_frame = await self._handler.request(CMD_GENERAL)
cell_frame = await self._handler.request(CMD_CELL)
if not general_frame or not cell_frame:
raise UpdateFailed("BMS did not respond")
data = BmsBluetoothHandler.parse_general_info(general_frame)
data.update(BmsBluetoothHandler.parse_cell_info(cell_frame))
# Derived fields
data["power"] = round(data["voltage"] * data["current"], 2)
if data["cell_voltages"]:
data["cell_delta"] = round(
(max(data["cell_voltages"]) - min(data["cell_voltages"])) * 1000, 1
)
return data
except Exception as e:
raise UpdateFailed(f"BMS polling error: {e}") from e
```
---
### Step 5 — `config_flow.py`
Two-step config flow:
1. **Auto-discovery**: HA detects BLE advertisements matching the BMS service UUID
and suggests the device automatically (appears as a notification in HA).
2. **Manual**: User can go to Settings → Integrations → Add → "Xiaoxiang BMS"
and paste/enter the BT MAC address.
```python
class XiaoxiangBmsConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_bluetooth(self, discovery_info):
"""Called automatically when HA sees the BMS advertising."""
self._address = discovery_info.address
self._name = discovery_info.name or "Xiaoxiang BMS"
await self.async_set_unique_id(self._address)
self._abort_if_unique_id_configured()
return await self.async_step_confirm()
async def async_step_confirm(self, user_input=None):
if user_input is not None:
return self.async_create_entry(title=self._name,
data={"address": self._address})
return self.async_show_form(step_id="confirm", ...)
async def async_step_user(self, user_input=None):
"""Manual entry fallback."""
if user_input:
address = user_input["address"]
await self.async_set_unique_id(address)
self._abort_if_unique_id_configured()
return self.async_create_entry(title="Xiaoxiang BMS",
data={"address": address})
return self.async_show_form(step_id="user",
data_schema=vol.Schema({"address": str}))
```
---
### Step 6 — `sensor.py`
One `SensorEntity` subclass per data field. Dynamic cell/temp sensors are
created in `async_setup_entry` based on the first successful poll.
```python
SENSOR_DESCRIPTIONS = [
SensorEntityDescription(key="voltage", name="Voltage",
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
device_class=SensorDeviceClass.VOLTAGE),
SensorEntityDescription(key="current", name="Current",
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
device_class=SensorDeviceClass.CURRENT),
SensorEntityDescription(key="power", name="Power",
native_unit_of_measurement=UnitOfPower.WATT,
device_class=SensorDeviceClass.POWER),
SensorEntityDescription(key="state_of_charge", name="State of Charge",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.BATTERY),
SensorEntityDescription(key="residual_capacity", name="Remaining Capacity",
native_unit_of_measurement="Ah"),
SensorEntityDescription(key="nominal_capacity", name="Nominal Capacity",
native_unit_of_measurement="Ah"),
SensorEntityDescription(key="cycle_count", name="Cycle Count"),
SensorEntityDescription(key="cell_delta", name="Cell Voltage Delta",
native_unit_of_measurement="mV"),
]
```
---
### Step 7 — `__init__.py`
```python
async def async_setup_entry(hass, entry):
coordinator = BmsCoordinator(hass, entry.data["address"])
await coordinator._async_setup()
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, ["sensor"])
return True
async def async_unload_entry(hass, entry):
coordinator = hass.data[DOMAIN].pop(entry.entry_id)
await coordinator._handler.disconnect()
return await hass.config_entries.async_unload_platforms(entry, ["sensor"])
```
---
## Installation
### Option A — Manual (any HA install)
```bash
# On your HA host (SSH or Samba share)
cp -r custom_components/xiaoxiang_bms \
/config/custom_components/xiaoxiang_bms
# Restart HA, then:
# Settings → Integrations → Add → search "Xiaoxiang BMS"
```
### Option B — HACS Custom Repository
1. HACS → Custom Repositories → add this repo URL → Category: Integration
2. Install → Restart HA
3. Settings → Integrations → Add → "Xiaoxiang BMS"
---
## Key Technical Decisions
| Decision | Choice | Reason |
|----------|--------|--------|
| Addon vs Integration | Integration | Direct HA sensor entities, no MQTT needed |
| BLE library | bleak (HA-bundled) | No extra deps, HA manages BT adapter lifecycle |
| Discovery | Bluetooth service UUID match | User gets auto-discovery notification in HA |
| Poll vs Push | Poll (30s) | BMS doesn't send unsolicited notifications; request→response protocol |
| Data model | DataUpdateCoordinator | Single shared connection, all sensors refresh together |
| Dynamic entities | First-poll count | Cell/temp count varies by battery; discover at runtime |
---
## Development & Testing
### Test without real hardware
Use a BLE UART simulator (e.g., `nRF Connect` app or a Raspberry Pi with
`bluezero` or `bumble`) to send back canned frames to validate parsing.
```python
# Minimal test for parser
frame = bytes([0xDD, 0x03, 0x00, 0x1B,
0x05, 0xDC, # voltage: 15.00V
0xFF, 0x9C, # current: -1.00A (charging)
0x09, 0x60, # residual: 24.00Ah
0x0B, 0xB8, # nominal: 30.00Ah
0x00, 0x05, # cycles: 5
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # dates/balance/protection/ver
0x64, # SoC: 100%
0x03, # MOS
0x10, # 16 cells
0x02, # 2 temp probes
0x0A, 0xAB, # temp1: (2731 - 2731) / 10 → wait: (0x0AAB=2731) → 0°C
0x0A, 0xC5, # temp2: (2757 - 2731) / 10 → 2.6°C
0xFF, 0xFD, 0x77]) # checksum + end
result = BmsBluetoothHandler.parse_general_info(frame)
assert result["voltage"] == 15.0
assert result["current"] == -1.0
```
### Validate on real HA
```bash
# Enable debug logging in configuration.yaml:
logger:
logs:
custom_components.xiaoxiang_bms: debug
```
---
## Risks & Mitigations
| Risk | Mitigation |
|------|-----------|
| BLE not available on HA host | Docs note: requires Bluetooth hardware; HA OS has it, Docker needs `--privileged` or BT passthrough |
| Chunked BLE frames | Buffer accumulation in `_on_notify` until `0x77` end byte seen |
| BMS goes out of range | `UpdateFailed` → HA marks sensors unavailable; reconnect on next poll |
| Multiple BMS units | Each config entry = separate coordinator = separate MAC |
| Frame byte offsets vary by firmware | Add firmware version detection; make offsets configurable |
---
## Implementation Order
- [ ] 1. `const.py` — UUIDs and command bytes
- [ ] 2. `bluetooth_handler.py` — protocol parser (unit-testable standalone)
- [ ] 3. `coordinator.py` — DataUpdateCoordinator wrapping the handler
- [ ] 4. `manifest.json` — HA metadata
- [ ] 5. `__init__.py` — entry setup/unload
- [ ] 6. `config_flow.py` — BT scan + manual entry
- [ ] 7. `sensor.py` — all sensor entities
- [ ] 8. `strings.json` / `translations/en.json` — UI text
- [ ] 9. Local test without hardware (canned frames)
- [ ] 10. Test on real HA + real BMS