Initial commit: Xiaoxiang Smart BMS Home Assistant integration

- BLE GATT communication via bleak (UART-over-GATT, service 0xff00)
- Parses 0x03 (general info) and 0x04 (cell voltages) frames
- Protocol verified against official RS485/UART spec V4
- DataUpdateCoordinator with 5s poll interval (configurable 2-60s)
- Auto-reconnect on BLE disconnect
- Config flow: BT auto-discovery + manual MAC entry + options flow
- Sensors: voltage, current, power, SoC, capacity, cycles, temps, cells, cell delta
- HACS-ready (hacs.json, manifest.json)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-11 19:07:18 +02:00
commit 4a769bf50f
14 changed files with 1333 additions and 0 deletions
+9
View File
@@ -0,0 +1,9 @@
{
"permissions": {
"allow": [
"WebFetch(domain:raw.githubusercontent.com)",
"WebSearch",
"WebFetch(domain:api.github.com)"
]
}
}
+35
View File
@@ -0,0 +1,35 @@
# Python
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.egg-info/
*.egg
dist/
build/
.eggs/
# Virtual environments
.venv/
venv/
env/
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Home Assistant
*.db
*.log
home-assistant_v2.db
# Testing
.pytest_cache/
.coverage
htmlcov/
+478
View File
@@ -0,0 +1,478 @@
# Xiaoxiang Smart BMS — Home Assistant Integration Masterplan
## Overview
A **native HA custom integration** (not an addon) that connects to your Xiaoxiang BMS
over Bluetooth Low Energy (BLE), parses the proprietary UART-over-GATT protocol,
and exposes battery data as first-class Home Assistant sensor entities.
No MQTT broker, no extra container, no sidecar — just a `custom_components/` folder
you drop into your HA config directory.
---
## Architecture
```
Home Assistant
└── Bluetooth subsystem (built-in)
└── xiaoxiang_bms (custom_component)
├── Config Flow ← GUI: scan BT, pick device, save MAC
├── DataUpdateCoordinator ← polls BMS every 30s via BLE
│ └── BluetoothHandler ← BLE GATT read/write (bleak)
│ ├── Request 0x03 → General info frame
│ └── Request 0x04 → Cell voltages frame
└── SensorEntity (×N) ← one entity per data field
```
---
## BMS Protocol Reference
Derived from: https://github.com/Jnnshschl/AndroidBMSApp
### GATT UUIDs
| Role | UUID |
|------|------|
| UART Service | `0000ff00-0000-1000-8000-00805f9b34fb` |
| RX (notify/read) | `0000ff01-0000-1000-8000-00805f9b34fb` |
| TX (write) | `0000ff02-0000-1000-8000-00805f9b34fb` |
### Frame Format
```
[0xDD] [CMD] [STATUS] [LENGTH] [PAYLOAD...] [CHECKSUM_HI] [CHECKSUM_LO] [0x77]
```
- All multi-byte values: **Big-endian**
- Max frame size: 80 bytes
### Request Commands
```python
CMD_GENERAL_INFO = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
CMD_CELL_INFO = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77])
```
### General Info Response (0x03) — byte offsets in payload
| Offset | Length | Field | Scaling | Unit |
|--------|--------|-------|---------|------|
| 0-1 | 2 | Total voltage | ÷100 | V |
| 2-3 | 2 | Current (signed) | ÷100 | A |
| 4-5 | 2 | Residual capacity | ÷100 | Ah |
| 6-7 | 2 | Nominal capacity | ÷100 | Ah |
| 8-9 | 2 | Cycle count | ×1 | - |
| 10-11 | 2 | Production date | - | (skip) |
| 12-15 | 4 | Balance status bits | - | (skip) |
| 16-17 | 2 | Protection status | - | (skip) |
| 18 | 1 | Version | - | (skip) |
| 19 | 1 | State of charge | ×1 | % |
| 20 | 1 | MOS status | - | (skip) |
| 21 | 1 | Cell count | ×1 | - |
| 22 | 1 | Temp probe count | ×1 | - |
| 23+ | 2×N | Temperatures | (val2731)÷10 | °C |
> Current is a signed 16-bit int; negative = charging, positive = discharging
### Cell Info Response (0x04) — byte offsets in payload
| Offset | Length | Field | Scaling | Unit |
|--------|--------|-------|---------|------|
| 0 | 1 | Cell count (÷2) | - | - |
| 1+ | 2×N | Cell voltages | ÷1000 | V |
---
## Entities Exposed in Home Assistant
| Entity | Device Class | Unit | Notes |
|--------|-------------|------|-------|
| `sensor.bms_voltage` | voltage | V | Total pack voltage |
| `sensor.bms_current` | current | A | Signed (neg=charge) |
| `sensor.bms_power` | power | W | Calculated: V × I |
| `sensor.bms_state_of_charge` | battery | % | 0100 |
| `sensor.bms_residual_capacity` | energy_storage | Ah | Remaining Ah |
| `sensor.bms_nominal_capacity` | energy_storage | Ah | Full capacity |
| `sensor.bms_cycle_count` | - | - | Lifetime cycles |
| `sensor.bms_temperature_1..N` | temperature | °C | Per probe |
| `sensor.bms_cell_voltage_1..N` | voltage | V | Per cell |
| `sensor.bms_cell_delta` | voltage | mV | Maxmin cell diff |
---
## File Structure
```
custom_components/
└── xiaoxiang_bms/
├── __init__.py # Integration setup / unload
├── manifest.json # HA metadata, bluetooth dependency
├── const.py # All constants (UUIDs, commands, offsets)
├── config_flow.py # GUI setup wizard (BT scan → pick device → save)
├── coordinator.py # DataUpdateCoordinator — poll loop + data store
├── bluetooth_handler.py # Low-level BLE GATT comms (bleak)
├── sensor.py # SensorEntity definitions
└── strings.json # UI strings
translations/
└── en.json # English translations for config flow
```
---
## Implementation Steps
### Step 1 — `manifest.json`
```json
{
"domain": "xiaoxiang_bms",
"name": "Xiaoxiang Smart BMS",
"version": "1.0.0",
"config_flow": true,
"dependencies": ["bluetooth"],
"bluetooth": [
{"service_uuid": "0000ff00-0000-1000-8000-00805f9b34fb"}
],
"requirements": [],
"codeowners": ["@you"],
"iot_class": "local_polling"
}
```
> Declaring `bluetooth` in `dependencies` lets HA's Bluetooth subsystem manage
> the adapter and lets the config flow use `bluetooth.async_discovered_service_info`.
---
### Step 2 — `const.py`
```python
DOMAIN = "xiaoxiang_bms"
DEFAULT_POLL_INTERVAL = 30 # seconds
# GATT UUIDs
UART_SERVICE_UUID = "0000ff00-0000-1000-8000-00805f9b34fb"
RX_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
TX_CHAR_UUID = "0000ff02-0000-1000-8000-00805f9b34fb"
# Request frames
CMD_GENERAL = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
CMD_CELL = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77])
# Frame markers
FRAME_START = 0xDD
FRAME_END = 0x77
CMD_GENERAL_ID = 0x03
CMD_CELL_ID = 0x04
```
---
### Step 3 — `bluetooth_handler.py`
Core BLE communication class using **bleak** (already bundled with HA):
```python
import asyncio
import struct
from bleak import BleakClient
class BmsBluetoothHandler:
def __init__(self, address: str):
self._address = address
self._client: BleakClient | None = None
self._response_buffer = bytearray()
self._response_event = asyncio.Event()
self._response_data: bytes | None = None
async def connect(self):
self._client = BleakClient(self._address)
await self._client.connect()
await self._client.start_notify(RX_CHAR_UUID, self._on_notify)
async def disconnect(self):
if self._client and self._client.is_connected:
await self._client.disconnect()
def _on_notify(self, _, data: bytearray):
"""Accumulate chunks until complete frame received."""
self._response_buffer.extend(data)
# Check for end marker
if self._response_buffer and self._response_buffer[-1] == FRAME_END:
if self._response_buffer[0] == FRAME_START:
self._response_data = bytes(self._response_buffer)
self._response_buffer.clear()
self._response_event.set()
async def request(self, command: bytes, timeout=5.0) -> bytes | None:
self._response_event.clear()
self._response_data = None
await self._client.write_gatt_char(TX_CHAR_UUID, command)
try:
await asyncio.wait_for(self._response_event.wait(), timeout)
return self._response_data
except asyncio.TimeoutError:
return None
@staticmethod
def parse_general_info(frame: bytes) -> dict:
# frame[0]=0xDD, frame[1]=CMD, frame[2]=status, frame[3]=length
# payload starts at frame[4]
p = frame[4:]
temp_count = p[22]
temps = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temps.append((raw - 2731) / 10.0)
return {
"voltage": struct.unpack_from(">H", p, 0)[0] / 100.0,
"current": struct.unpack_from(">h", p, 2)[0] / 100.0, # signed!
"residual_capacity":struct.unpack_from(">H", p, 4)[0] / 100.0,
"nominal_capacity": struct.unpack_from(">H", p, 6)[0] / 100.0,
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temps,
}
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
p = frame[4:]
count = p[0] // 2
voltages = []
for i in range(count):
raw = struct.unpack_from(">H", p, 1 + i * 2)[0]
voltages.append(raw / 1000.0)
return {"cell_voltages": voltages}
```
---
### Step 4 — `coordinator.py`
```python
from datetime import timedelta
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
class BmsCoordinator(DataUpdateCoordinator):
def __init__(self, hass, address: str):
super().__init__(hass, logger, name="xiaoxiang_bms",
update_interval=timedelta(seconds=DEFAULT_POLL_INTERVAL))
self._handler = BmsBluetoothHandler(address)
self.data = {}
async def _async_setup(self):
await self._handler.connect()
async def _async_update_data(self):
try:
general_frame = await self._handler.request(CMD_GENERAL)
cell_frame = await self._handler.request(CMD_CELL)
if not general_frame or not cell_frame:
raise UpdateFailed("BMS did not respond")
data = BmsBluetoothHandler.parse_general_info(general_frame)
data.update(BmsBluetoothHandler.parse_cell_info(cell_frame))
# Derived fields
data["power"] = round(data["voltage"] * data["current"], 2)
if data["cell_voltages"]:
data["cell_delta"] = round(
(max(data["cell_voltages"]) - min(data["cell_voltages"])) * 1000, 1
)
return data
except Exception as e:
raise UpdateFailed(f"BMS polling error: {e}") from e
```
---
### Step 5 — `config_flow.py`
Two-step config flow:
1. **Auto-discovery**: HA detects BLE advertisements matching the BMS service UUID
and suggests the device automatically (appears as a notification in HA).
2. **Manual**: User can go to Settings → Integrations → Add → "Xiaoxiang BMS"
and paste/enter the BT MAC address.
```python
class XiaoxiangBmsConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_bluetooth(self, discovery_info):
"""Called automatically when HA sees the BMS advertising."""
self._address = discovery_info.address
self._name = discovery_info.name or "Xiaoxiang BMS"
await self.async_set_unique_id(self._address)
self._abort_if_unique_id_configured()
return await self.async_step_confirm()
async def async_step_confirm(self, user_input=None):
if user_input is not None:
return self.async_create_entry(title=self._name,
data={"address": self._address})
return self.async_show_form(step_id="confirm", ...)
async def async_step_user(self, user_input=None):
"""Manual entry fallback."""
if user_input:
address = user_input["address"]
await self.async_set_unique_id(address)
self._abort_if_unique_id_configured()
return self.async_create_entry(title="Xiaoxiang BMS",
data={"address": address})
return self.async_show_form(step_id="user",
data_schema=vol.Schema({"address": str}))
```
---
### Step 6 — `sensor.py`
One `SensorEntity` subclass per data field. Dynamic cell/temp sensors are
created in `async_setup_entry` based on the first successful poll.
```python
SENSOR_DESCRIPTIONS = [
SensorEntityDescription(key="voltage", name="Voltage",
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
device_class=SensorDeviceClass.VOLTAGE),
SensorEntityDescription(key="current", name="Current",
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
device_class=SensorDeviceClass.CURRENT),
SensorEntityDescription(key="power", name="Power",
native_unit_of_measurement=UnitOfPower.WATT,
device_class=SensorDeviceClass.POWER),
SensorEntityDescription(key="state_of_charge", name="State of Charge",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.BATTERY),
SensorEntityDescription(key="residual_capacity", name="Remaining Capacity",
native_unit_of_measurement="Ah"),
SensorEntityDescription(key="nominal_capacity", name="Nominal Capacity",
native_unit_of_measurement="Ah"),
SensorEntityDescription(key="cycle_count", name="Cycle Count"),
SensorEntityDescription(key="cell_delta", name="Cell Voltage Delta",
native_unit_of_measurement="mV"),
]
```
---
### Step 7 — `__init__.py`
```python
async def async_setup_entry(hass, entry):
coordinator = BmsCoordinator(hass, entry.data["address"])
await coordinator._async_setup()
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, ["sensor"])
return True
async def async_unload_entry(hass, entry):
coordinator = hass.data[DOMAIN].pop(entry.entry_id)
await coordinator._handler.disconnect()
return await hass.config_entries.async_unload_platforms(entry, ["sensor"])
```
---
## Installation
### Option A — Manual (any HA install)
```bash
# On your HA host (SSH or Samba share)
cp -r custom_components/xiaoxiang_bms \
/config/custom_components/xiaoxiang_bms
# Restart HA, then:
# Settings → Integrations → Add → search "Xiaoxiang BMS"
```
### Option B — HACS Custom Repository
1. HACS → Custom Repositories → add this repo URL → Category: Integration
2. Install → Restart HA
3. Settings → Integrations → Add → "Xiaoxiang BMS"
---
## Key Technical Decisions
| Decision | Choice | Reason |
|----------|--------|--------|
| Addon vs Integration | Integration | Direct HA sensor entities, no MQTT needed |
| BLE library | bleak (HA-bundled) | No extra deps, HA manages BT adapter lifecycle |
| Discovery | Bluetooth service UUID match | User gets auto-discovery notification in HA |
| Poll vs Push | Poll (30s) | BMS doesn't send unsolicited notifications; request→response protocol |
| Data model | DataUpdateCoordinator | Single shared connection, all sensors refresh together |
| Dynamic entities | First-poll count | Cell/temp count varies by battery; discover at runtime |
---
## Development & Testing
### Test without real hardware
Use a BLE UART simulator (e.g., `nRF Connect` app or a Raspberry Pi with
`bluezero` or `bumble`) to send back canned frames to validate parsing.
```python
# Minimal test for parser
frame = bytes([0xDD, 0x03, 0x00, 0x1B,
0x05, 0xDC, # voltage: 15.00V
0xFF, 0x9C, # current: -1.00A (charging)
0x09, 0x60, # residual: 24.00Ah
0x0B, 0xB8, # nominal: 30.00Ah
0x00, 0x05, # cycles: 5
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # dates/balance/protection/ver
0x64, # SoC: 100%
0x03, # MOS
0x10, # 16 cells
0x02, # 2 temp probes
0x0A, 0xAB, # temp1: (2731 - 2731) / 10 → wait: (0x0AAB=2731) → 0°C
0x0A, 0xC5, # temp2: (2757 - 2731) / 10 → 2.6°C
0xFF, 0xFD, 0x77]) # checksum + end
result = BmsBluetoothHandler.parse_general_info(frame)
assert result["voltage"] == 15.0
assert result["current"] == -1.0
```
### Validate on real HA
```bash
# Enable debug logging in configuration.yaml:
logger:
logs:
custom_components.xiaoxiang_bms: debug
```
---
## Risks & Mitigations
| Risk | Mitigation |
|------|-----------|
| BLE not available on HA host | Docs note: requires Bluetooth hardware; HA OS has it, Docker needs `--privileged` or BT passthrough |
| Chunked BLE frames | Buffer accumulation in `_on_notify` until `0x77` end byte seen |
| BMS goes out of range | `UpdateFailed` → HA marks sensors unavailable; reconnect on next poll |
| Multiple BMS units | Each config entry = separate coordinator = separate MAC |
| Frame byte offsets vary by firmware | Add firmware version detection; make offsets configurable |
---
## Implementation Order
- [ ] 1. `const.py` — UUIDs and command bytes
- [ ] 2. `bluetooth_handler.py` — protocol parser (unit-testable standalone)
- [ ] 3. `coordinator.py` — DataUpdateCoordinator wrapping the handler
- [ ] 4. `manifest.json` — HA metadata
- [ ] 5. `__init__.py` — entry setup/unload
- [ ] 6. `config_flow.py` — BT scan + manual entry
- [ ] 7. `sensor.py` — all sensor entities
- [ ] 8. `strings.json` / `translations/en.json` — UI text
- [ ] 9. Local test without hardware (canned frames)
- [ ] 10. Test on real HA + real BMS
Binary file not shown.
@@ -0,0 +1,42 @@
"""Xiaoxiang Smart BMS — Home Assistant integration."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import CONF_ADDRESS, CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL, DOMAIN
from .coordinator import BmsCoordinator
PLATFORMS = ["sensor"]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up the BMS integration from a config entry."""
address = entry.data[CONF_ADDRESS]
poll_interval = entry.options.get(CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL)
coordinator = BmsCoordinator(hass, address, poll_interval)
await coordinator.async_setup()
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Reload the entry when options (e.g. poll interval) change
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload the BMS integration."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
coordinator: BmsCoordinator = hass.data[DOMAIN].pop(entry.entry_id)
await coordinator.async_teardown()
return unload_ok
async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload the entry when options are changed."""
await hass.config_entries.async_reload(entry.entry_id)
@@ -0,0 +1,192 @@
"""BLE GATT communication handler for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import asyncio
import logging
import struct
from bleak import BleakClient, BleakError
from .const import (
FRAME_END,
FRAME_START,
RX_CHAR_UUID,
TX_CHAR_UUID,
)
_LOGGER = logging.getLogger(__name__)
# Full frame layout:
# [0xDD] [CMD] [STATUS] [PAYLOAD_LEN] [PAYLOAD...] [CHK_HI] [CHK_LO] [0x77]
# Header = 4 bytes, trailer = 3 bytes (checksum × 2 + end marker)
_HEADER_LEN = 4
_TRAILER_LEN = 3
class BmsBluetoothHandler:
"""Manages BLE connection and protocol framing for a Xiaoxiang BMS device."""
def __init__(self, address: str) -> None:
self._address = address
self._client: BleakClient | None = None
self._buffer = bytearray()
self._response_event = asyncio.Event()
self._response_data: bytes | None = None
self._lock = asyncio.Lock()
# ------------------------------------------------------------------
# Connection management
# ------------------------------------------------------------------
@property
def is_connected(self) -> bool:
return self._client is not None and self._client.is_connected
async def connect(self) -> None:
"""Open BLE connection and start notifications."""
if self.is_connected:
return
_LOGGER.debug("Connecting to BMS at %s", self._address)
self._client = BleakClient(
self._address,
disconnected_callback=self._on_disconnect,
)
await self._client.connect()
await self._client.start_notify(RX_CHAR_UUID, self._on_notify)
_LOGGER.debug("Connected to BMS at %s", self._address)
async def disconnect(self) -> None:
"""Close the BLE connection cleanly."""
if self._client:
try:
await self._client.disconnect()
except Exception:
pass
self._client = None
def _on_disconnect(self, _client: BleakClient) -> None:
_LOGGER.debug("BMS at %s disconnected", self._address)
self._client = None
# ------------------------------------------------------------------
# Frame reception
# ------------------------------------------------------------------
def _on_notify(self, _char, data: bytearray) -> None:
"""Accumulate BLE notification chunks into complete protocol frames.
BLE max payload is 20 bytes (default MTU), so a single BMS frame
(up to ~50 bytes for 16 cells) arrives across several notifications.
We buffer until we can calculate and verify the expected frame length.
"""
self._buffer.extend(data)
# Discard leading garbage until we see a frame start byte
while self._buffer and self._buffer[0] != FRAME_START:
self._buffer.pop(0)
# Need at least the 4-byte header to know payload length
if len(self._buffer) < _HEADER_LEN:
return
payload_len = self._buffer[3]
expected_total = _HEADER_LEN + payload_len + _TRAILER_LEN
if len(self._buffer) < expected_total:
return # still waiting for more chunks
frame = bytes(self._buffer[:expected_total])
del self._buffer[:expected_total]
if frame[-1] != FRAME_END:
_LOGGER.warning("BMS frame missing end marker, discarding: %s", frame.hex())
return
if frame[2] != 0x00:
_LOGGER.warning("BMS returned error status 0x%02X for cmd 0x%02X",
frame[2], frame[1])
return
_LOGGER.debug("BMS frame received (cmd=0x%02X, len=%d)", frame[1], payload_len)
self._response_data = frame
self._response_event.set()
# ------------------------------------------------------------------
# Request / response
# ------------------------------------------------------------------
async def request(self, command: bytes, timeout: float = 5.0) -> bytes | None:
"""Send a command frame and wait for the corresponding response frame."""
async with self._lock:
self._response_event.clear()
self._response_data = None
try:
await self._client.write_gatt_char(TX_CHAR_UUID, command, response=False)
except BleakError as exc:
_LOGGER.error("BLE write failed: %s", exc)
return None
try:
await asyncio.wait_for(self._response_event.wait(), timeout)
return self._response_data
except asyncio.TimeoutError:
_LOGGER.warning("BMS response timeout (cmd=%s)", command.hex())
return None
# ------------------------------------------------------------------
# Frame parsers
# ------------------------------------------------------------------
@staticmethod
def parse_general_info(frame: bytes) -> dict:
"""Parse a 0x03 general info response frame.
Payload byte offsets (frame[4] is payload[0]):
0-1 Total voltage uint16 BE ÷100 → V
2-3 Current int16 BE ÷100 → A (positive = charging, negative = discharging)
4-5 Residual capacity uint16 BE ÷100 → Ah
6-7 Nominal capacity uint16 BE ÷100 → Ah
8-9 Cycle count uint16 BE
10-11 Production date (ignored)
12-15 Balance status (ignored)
16-17 Protection status (ignored)
18 Software version (ignored)
19 State of charge uint8 %
20 MOS status uint8
21 Cell count uint8
22 Temp probe count uint8
23+ Temperatures uint16 BE each (raw 2731) ÷ 10 → °C
"""
p = frame[_HEADER_LEN:-_TRAILER_LEN]
temp_count = p[22]
temperatures: list[float] = []
for i in range(temp_count):
raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
temperatures.append(round((raw - 2731) / 10.0, 1))
return {
"voltage": round(struct.unpack_from(">H", p, 0)[0] / 100.0, 2),
"current": round(struct.unpack_from(">h", p, 2)[0] / 100.0, 2),
"residual_capacity": round(struct.unpack_from(">H", p, 4)[0] / 100.0, 2),
"nominal_capacity": round(struct.unpack_from(">H", p, 6)[0] / 100.0, 2),
"cycle_count": struct.unpack_from(">H", p, 8)[0],
"state_of_charge": p[19],
"cell_count": p[21],
"temperatures": temperatures,
}
@staticmethod
def parse_cell_info(frame: bytes) -> dict:
"""Parse a 0x04 cell voltage response frame.
Per spec: frame[3] (the header length byte) = cell_count × 2.
The payload contains ONLY the voltage bytes — no count byte.
0+ Cell voltages uint16 BE each unit mV ÷1000 → V
"""
count = frame[3] // 2 # header length byte = N_cells × 2
p = frame[_HEADER_LEN:-_TRAILER_LEN]
voltages: list[float] = []
for i in range(count):
raw = struct.unpack_from(">H", p, i * 2)[0]
voltages.append(round(raw / 1000.0, 3))
return {"cell_voltages": voltages}
@@ -0,0 +1,130 @@
"""Config flow for the Xiaoxiang Smart BMS integration."""
from __future__ import annotations
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
async_discovered_service_info,
)
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from .const import (
CONF_ADDRESS,
CONF_POLL_INTERVAL,
DEFAULT_POLL_INTERVAL,
DOMAIN,
MAX_POLL_INTERVAL,
MIN_POLL_INTERVAL,
UART_SERVICE_UUID,
)
class XiaoxiangBmsConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for the Xiaoxiang Smart BMS."""
VERSION = 1
def __init__(self) -> None:
self._address: str | None = None
self._name: str | None = None
# ------------------------------------------------------------------
# Auto-discovery path (HA sees BMS advertising matching service UUID)
# ------------------------------------------------------------------
async def async_step_bluetooth(
self, discovery_info: BluetoothServiceInfoBleak
) -> FlowResult:
"""Handle BLE auto-discovery."""
self._address = discovery_info.address
self._name = discovery_info.name or f"Xiaoxiang BMS ({discovery_info.address})"
await self.async_set_unique_id(discovery_info.address)
self._abort_if_unique_id_configured()
self.context["title_placeholders"] = {"name": self._name}
return await self.async_step_confirm()
async def async_step_confirm(self, user_input: dict | None = None) -> FlowResult:
"""Confirm auto-discovered device."""
if user_input is not None:
return self.async_create_entry(
title=self._name,
data={CONF_ADDRESS: self._address},
)
return self.async_show_form(
step_id="confirm",
description_placeholders={
"name": self._name,
"address": self._address,
},
)
# ------------------------------------------------------------------
# Manual path (user goes to Integrations → Add → Xiaoxiang BMS)
# ------------------------------------------------------------------
async def async_step_user(self, user_input: dict | None = None) -> FlowResult:
"""Handle manual setup. Shows discovered BMS devices as a dropdown if found."""
errors: dict[str, str] = {}
if user_input is not None:
address = user_input[CONF_ADDRESS].upper().strip()
await self.async_set_unique_id(address)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=f"Xiaoxiang BMS ({address})",
data={CONF_ADDRESS: address},
)
# Build a dropdown of already-discovered BMS devices (filtered by service UUID)
discovered: dict[str, str] = {
info.address: f"{info.name or 'Xiaoxiang BMS'} ({info.address})"
for info in async_discovered_service_info(self.hass)
if UART_SERVICE_UUID in (info.service_uuids or [])
}
if discovered:
schema = vol.Schema({vol.Required(CONF_ADDRESS): vol.In(discovered)})
else:
schema = vol.Schema({
vol.Required(CONF_ADDRESS): str,
})
return self.async_show_form(
step_id="user",
data_schema=schema,
errors=errors,
)
# ------------------------------------------------------------------
# Options flow (change poll interval after setup)
# ------------------------------------------------------------------
@staticmethod
@callback
def async_get_options_flow(config_entry: config_entries.ConfigEntry) -> BmsOptionsFlow:
return BmsOptionsFlow(config_entry)
class BmsOptionsFlow(config_entries.OptionsFlow):
"""Allow the user to change the poll interval after initial setup."""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
self._config_entry = config_entry
async def async_step_init(self, user_input: dict | None = None) -> FlowResult:
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
current = self._config_entry.options.get(CONF_POLL_INTERVAL, DEFAULT_POLL_INTERVAL)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema({
vol.Required(CONF_POLL_INTERVAL, default=current): vol.All(
int, vol.Range(min=MIN_POLL_INTERVAL, max=MAX_POLL_INTERVAL)
),
}),
)
+27
View File
@@ -0,0 +1,27 @@
"""Constants for the Xiaoxiang Smart BMS integration."""
DOMAIN = "xiaoxiang_bms"
CONF_ADDRESS = "address"
CONF_POLL_INTERVAL = "poll_interval"
DEFAULT_POLL_INTERVAL = 5 # seconds — fast for solar/energy monitoring
MIN_POLL_INTERVAL = 2
MAX_POLL_INTERVAL = 60
# GATT UUIDs (Xiaoxiang BMS UART-over-GATT)
UART_SERVICE_UUID = "0000ff00-0000-1000-8000-00805f9b34fb"
RX_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb" # BMS → HA (notify)
TX_CHAR_UUID = "0000ff02-0000-1000-8000-00805f9b34fb" # HA → BMS (write)
# Request frames: [0xDD, 0xA5, CMD, 0x00, CHK_HI, CHK_LO, 0x77]
CMD_GENERAL = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77]) # pack info
CMD_CELL = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77]) # cell voltages
# Frame markers
FRAME_START = 0xDD
FRAME_END = 0x77
# Response command IDs (byte 1 of frame)
CMD_ID_GENERAL = 0x03
CMD_ID_CELL = 0x04
@@ -0,0 +1,93 @@
"""DataUpdateCoordinator for the Xiaoxiang Smart BMS."""
from __future__ import annotations
import logging
from datetime import timedelta
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .bluetooth_handler import BmsBluetoothHandler
from .const import CMD_CELL, CMD_GENERAL, DOMAIN
_LOGGER = logging.getLogger(__name__)
class BmsCoordinator(DataUpdateCoordinator[dict]):
"""Polls the BMS over BLE and distributes data to all sensor entities."""
def __init__(
self,
hass: HomeAssistant,
address: str,
poll_interval: int,
) -> None:
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=poll_interval),
)
self.address = address
self._handler = BmsBluetoothHandler(address)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def async_setup(self) -> None:
"""Connect to the BMS. Called once during config entry setup."""
await self._handler.connect()
async def async_teardown(self) -> None:
"""Disconnect cleanly. Called on entry unload."""
await self._handler.disconnect()
# ------------------------------------------------------------------
# Poll
# ------------------------------------------------------------------
async def _async_update_data(self) -> dict:
"""Fetch data from the BMS. Reconnects automatically if disconnected."""
if not self._handler.is_connected:
_LOGGER.debug("BMS not connected, attempting reconnect…")
try:
await self._handler.connect()
except Exception as exc:
raise UpdateFailed(f"BMS reconnect failed: {exc}") from exc
try:
general_frame = await self._handler.request(CMD_GENERAL)
if general_frame is None:
raise UpdateFailed("No response to general info request (0x03)")
cell_frame = await self._handler.request(CMD_CELL)
if cell_frame is None:
raise UpdateFailed("No response to cell info request (0x04)")
except UpdateFailed:
raise
except Exception as exc:
raise UpdateFailed(f"BLE communication error: {exc}") from exc
data = BmsBluetoothHandler.parse_general_info(general_frame)
data.update(BmsBluetoothHandler.parse_cell_info(cell_frame))
# Derived fields
data["power"] = round(data["voltage"] * data["current"], 2)
if data["cell_voltages"]:
v_max = max(data["cell_voltages"])
v_min = min(data["cell_voltages"])
data["cell_delta"] = round((v_max - v_min) * 1000, 1)
else:
data["cell_delta"] = None
_LOGGER.debug(
"BMS data: %.2fV %.2fA %d%% %.2fAh %d cells",
data["voltage"],
data["current"],
data["state_of_charge"],
data["residual_capacity"],
len(data["cell_voltages"]),
)
return data
@@ -0,0 +1,14 @@
{
"domain": "xiaoxiang_bms",
"name": "Xiaoxiang Smart BMS",
"version": "1.0.0",
"config_flow": true,
"dependencies": ["bluetooth"],
"bluetooth": [
{"service_uuid": "0000ff00-0000-1000-8000-00805f9b34fb"}
],
"requirements": [],
"codeowners": [],
"iot_class": "local_polling",
"documentation": "https://github.com/Jnnshschl/AndroidBMSApp"
}
+228
View File
@@ -0,0 +1,228 @@
"""Sensor entities for the Xiaoxiang Smart BMS integration."""
from __future__ import annotations
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
PERCENTAGE,
UnitOfElectricCurrent,
UnitOfElectricPotential,
UnitOfPower,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import CONF_ADDRESS, DOMAIN
from .coordinator import BmsCoordinator
# ---------------------------------------------------------------------------
# Static sensor definitions — one entity per key in coordinator.data
# ---------------------------------------------------------------------------
STATIC_SENSORS: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="voltage",
name="Voltage",
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
),
SensorEntityDescription(
key="current",
name="Current",
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
device_class=SensorDeviceClass.CURRENT,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
),
SensorEntityDescription(
key="power",
name="Power",
native_unit_of_measurement=UnitOfPower.WATT,
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=1,
),
SensorEntityDescription(
key="state_of_charge",
name="State of Charge",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.BATTERY,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="residual_capacity",
name="Remaining Capacity",
native_unit_of_measurement="Ah",
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
icon="mdi:battery-charging",
),
SensorEntityDescription(
key="nominal_capacity",
name="Nominal Capacity",
native_unit_of_measurement="Ah",
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
icon="mdi:battery",
),
SensorEntityDescription(
key="cycle_count",
name="Cycle Count",
state_class=SensorStateClass.TOTAL_INCREASING,
icon="mdi:battery-sync",
),
SensorEntityDescription(
key="cell_delta",
name="Cell Voltage Delta",
native_unit_of_measurement="mV",
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=1,
icon="mdi:battery-alert-variant",
),
)
# ---------------------------------------------------------------------------
# Platform setup
# ---------------------------------------------------------------------------
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Create all BMS sensor entities after the first successful poll."""
coordinator: BmsCoordinator = hass.data[DOMAIN][entry.entry_id]
address: str = entry.data[CONF_ADDRESS]
entities: list[SensorEntity] = [
BmsStaticSensor(coordinator, description, address)
for description in STATIC_SENSORS
]
# Temperature sensors — count determined from first poll
for i in range(len(coordinator.data.get("temperatures", []))):
entities.append(
BmsDynamicSensor(
coordinator=coordinator,
address=address,
unique_key=f"temperature_{i + 1}",
friendly_name=f"Temperature {i + 1}",
data_key="temperatures",
index=i,
unit=UnitOfTemperature.CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
icon=None,
)
)
# Cell voltage sensors — count determined from first poll
for i in range(len(coordinator.data.get("cell_voltages", []))):
entities.append(
BmsDynamicSensor(
coordinator=coordinator,
address=address,
unique_key=f"cell_voltage_{i + 1}",
friendly_name=f"Cell {i + 1} Voltage",
data_key="cell_voltages",
index=i,
unit=UnitOfElectricPotential.VOLT,
device_class=SensorDeviceClass.VOLTAGE,
icon=None,
)
)
async_add_entities(entities)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _device_info(address: str) -> DeviceInfo:
return DeviceInfo(
identifiers={(DOMAIN, address)},
name="Xiaoxiang Smart BMS",
manufacturer="Xiaoxiang",
model="Smart BMS",
)
# ---------------------------------------------------------------------------
# Entity classes
# ---------------------------------------------------------------------------
class BmsStaticSensor(CoordinatorEntity[BmsCoordinator], SensorEntity):
"""A sensor whose key maps directly to a scalar value in coordinator.data."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: BmsCoordinator,
description: SensorEntityDescription,
address: str,
) -> None:
super().__init__(coordinator)
self.entity_description = description
self._attr_unique_id = f"{address}_{description.key}"
self._attr_device_info = _device_info(address)
@property
def native_value(self):
return self.coordinator.data.get(self.entity_description.key)
class BmsDynamicSensor(CoordinatorEntity[BmsCoordinator], SensorEntity):
"""A sensor that reads an element from a list inside coordinator.data.
Used for per-cell voltages and per-probe temperatures, whose count is
only known after the first successful BMS poll.
"""
_attr_has_entity_name = True
_attr_state_class = SensorStateClass.MEASUREMENT
def __init__(
self,
coordinator: BmsCoordinator,
address: str,
unique_key: str,
friendly_name: str,
data_key: str,
index: int,
unit: str,
device_class: SensorDeviceClass | None,
icon: str | None,
) -> None:
super().__init__(coordinator)
self._data_key = data_key
self._index = index
self._attr_unique_id = f"{address}_{unique_key}"
self._attr_name = friendly_name
self._attr_native_unit_of_measurement = unit
self._attr_device_class = device_class
self._attr_device_info = _device_info(address)
if icon:
self._attr_icon = icon
if device_class == SensorDeviceClass.VOLTAGE:
self._attr_suggested_display_precision = 3
elif device_class == SensorDeviceClass.TEMPERATURE:
self._attr_suggested_display_precision = 1
@property
def native_value(self):
values: list = self.coordinator.data.get(self._data_key, [])
if self._index < len(values):
return values[self._index]
return None
@@ -0,0 +1,40 @@
{
"config": {
"step": {
"user": {
"title": "Xiaoxiang Smart BMS",
"description": "Enter the Bluetooth address of your BMS, or select a discovered device from the dropdown.",
"data": {
"address": "Bluetooth Address"
},
"data_description": {
"address": "MAC address of the BMS (e.g. AA:BB:CC:DD:EE:FF). Tip: pair the device first via your system Bluetooth settings."
}
},
"confirm": {
"title": "Confirm Xiaoxiang BMS",
"description": "Connect to **{name}** ({address})?\n\nMake sure the BMS is powered on and within Bluetooth range."
}
},
"abort": {
"already_configured": "This BMS is already configured."
},
"error": {
"cannot_connect": "Unable to connect to the BMS. Check the address and try again."
}
},
"options": {
"step": {
"init": {
"title": "BMS Options",
"description": "Adjust the polling interval. Lower values give more accurate real-time readings but use more Bluetooth bandwidth.",
"data": {
"poll_interval": "Poll interval (seconds)"
},
"data_description": {
"poll_interval": "How often to request data from the BMS (260 seconds). Default is 5 s for solar/energy monitoring."
}
}
}
}
}
+5
View File
@@ -0,0 +1,5 @@
{
"name": "Xiaoxiang Smart BMS",
"render_readme": true,
"homeassistant": "2023.6.0"
}
+40
View File
@@ -0,0 +1,40 @@
{
"config": {
"step": {
"user": {
"title": "Xiaoxiang Smart BMS",
"description": "Enter the Bluetooth address of your BMS, or select a discovered device from the dropdown.",
"data": {
"address": "Bluetooth Address"
},
"data_description": {
"address": "MAC address of the BMS (e.g. AA:BB:CC:DD:EE:FF). Tip: pair the device first via your system Bluetooth settings."
}
},
"confirm": {
"title": "Confirm Xiaoxiang BMS",
"description": "Connect to **{name}** ({address})?\n\nMake sure the BMS is powered on and within Bluetooth range."
}
},
"abort": {
"already_configured": "This BMS is already configured."
},
"error": {
"cannot_connect": "Unable to connect to the BMS. Check the address and try again."
}
},
"options": {
"step": {
"init": {
"title": "BMS Options",
"description": "Adjust the polling interval. Lower values give more accurate real-time readings but use more Bluetooth bandwidth.",
"data": {
"poll_interval": "Poll interval (seconds)"
},
"data_description": {
"poll_interval": "How often to request data from the BMS (260 seconds). Default is 5 s for solar/energy monitoring."
}
}
}
}
}