Files
Jannis 4a769bf50f Initial commit: Xiaoxiang Smart BMS Home Assistant integration
- BLE GATT communication via bleak (UART-over-GATT, service 0xff00)
- Parses 0x03 (general info) and 0x04 (cell voltages) frames
- Protocol verified against official RS485/UART spec V4
- DataUpdateCoordinator with 5s poll interval (configurable 2-60s)
- Auto-reconnect on BLE disconnect
- Config flow: BT auto-discovery + manual MAC entry + options flow
- Sensors: voltage, current, power, SoC, capacity, cycles, temps, cells, cell delta
- HACS-ready (hacs.json, manifest.json)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 19:07:18 +02:00

16 KiB
Raw Permalink Blame History

Xiaoxiang Smart BMS — Home Assistant Integration Masterplan

Overview

A native HA custom integration (not an addon) that connects to your Xiaoxiang BMS over Bluetooth Low Energy (BLE), parses the proprietary UART-over-GATT protocol, and exposes battery data as first-class Home Assistant sensor entities.

No MQTT broker, no extra container, no sidecar — just a custom_components/ folder you drop into your HA config directory.


Architecture

Home Assistant
└── Bluetooth subsystem (built-in)
    └── xiaoxiang_bms (custom_component)
        ├── Config Flow        ← GUI: scan BT, pick device, save MAC
        ├── DataUpdateCoordinator  ← polls BMS every 30s via BLE
        │   └── BluetoothHandler   ← BLE GATT read/write (bleak)
        │       ├── Request 0x03   → General info frame
        │       └── Request 0x04   → Cell voltages frame
        └── SensorEntity (×N)  ← one entity per data field

BMS Protocol Reference

Derived from: https://github.com/Jnnshschl/AndroidBMSApp

GATT UUIDs

Role UUID
UART Service 0000ff00-0000-1000-8000-00805f9b34fb
RX (notify/read) 0000ff01-0000-1000-8000-00805f9b34fb
TX (write) 0000ff02-0000-1000-8000-00805f9b34fb

Frame Format

[0xDD] [CMD] [STATUS] [LENGTH] [PAYLOAD...] [CHECKSUM_HI] [CHECKSUM_LO] [0x77]
  • All multi-byte values: Big-endian
  • Max frame size: 80 bytes

Request Commands

CMD_GENERAL_INFO = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
CMD_CELL_INFO    = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77])

General Info Response (0x03) — byte offsets in payload

Offset Length Field Scaling Unit
0-1 2 Total voltage ÷100 V
2-3 2 Current (signed) ÷100 A
4-5 2 Residual capacity ÷100 Ah
6-7 2 Nominal capacity ÷100 Ah
8-9 2 Cycle count ×1 -
10-11 2 Production date - (skip)
12-15 4 Balance status bits - (skip)
16-17 2 Protection status - (skip)
18 1 Version - (skip)
19 1 State of charge ×1 %
20 1 MOS status - (skip)
21 1 Cell count ×1 -
22 1 Temp probe count ×1 -
23+ 2×N Temperatures (val2731)÷10 °C

Current is a signed 16-bit int; negative = charging, positive = discharging

Cell Info Response (0x04) — byte offsets in payload

Offset Length Field Scaling Unit
0 1 Cell count (÷2) - -
1+ 2×N Cell voltages ÷1000 V

Entities Exposed in Home Assistant

Entity Device Class Unit Notes
sensor.bms_voltage voltage V Total pack voltage
sensor.bms_current current A Signed (neg=charge)
sensor.bms_power power W Calculated: V × I
sensor.bms_state_of_charge battery % 0100
sensor.bms_residual_capacity energy_storage Ah Remaining Ah
sensor.bms_nominal_capacity energy_storage Ah Full capacity
sensor.bms_cycle_count - - Lifetime cycles
sensor.bms_temperature_1..N temperature °C Per probe
sensor.bms_cell_voltage_1..N voltage V Per cell
sensor.bms_cell_delta voltage mV Maxmin cell diff

File Structure

custom_components/
└── xiaoxiang_bms/
    ├── __init__.py          # Integration setup / unload
    ├── manifest.json        # HA metadata, bluetooth dependency
    ├── const.py             # All constants (UUIDs, commands, offsets)
    ├── config_flow.py       # GUI setup wizard (BT scan → pick device → save)
    ├── coordinator.py       # DataUpdateCoordinator — poll loop + data store
    ├── bluetooth_handler.py # Low-level BLE GATT comms (bleak)
    ├── sensor.py            # SensorEntity definitions
    └── strings.json         # UI strings
translations/
└── en.json                  # English translations for config flow

Implementation Steps

Step 1 — manifest.json

{
  "domain": "xiaoxiang_bms",
  "name": "Xiaoxiang Smart BMS",
  "version": "1.0.0",
  "config_flow": true,
  "dependencies": ["bluetooth"],
  "bluetooth": [
    {"service_uuid": "0000ff00-0000-1000-8000-00805f9b34fb"}
  ],
  "requirements": [],
  "codeowners": ["@you"],
  "iot_class": "local_polling"
}

Declaring bluetooth in dependencies lets HA's Bluetooth subsystem manage the adapter and lets the config flow use bluetooth.async_discovered_service_info.


Step 2 — const.py

DOMAIN = "xiaoxiang_bms"
DEFAULT_POLL_INTERVAL = 30  # seconds

# GATT UUIDs
UART_SERVICE_UUID = "0000ff00-0000-1000-8000-00805f9b34fb"
RX_CHAR_UUID      = "0000ff01-0000-1000-8000-00805f9b34fb"
TX_CHAR_UUID      = "0000ff02-0000-1000-8000-00805f9b34fb"

# Request frames
CMD_GENERAL = bytes([0xDD, 0xA5, 0x03, 0x00, 0xFF, 0xFD, 0x77])
CMD_CELL    = bytes([0xDD, 0xA5, 0x04, 0x00, 0xFF, 0xFC, 0x77])

# Frame markers
FRAME_START = 0xDD
FRAME_END   = 0x77

CMD_GENERAL_ID = 0x03
CMD_CELL_ID    = 0x04

Step 3 — bluetooth_handler.py

Core BLE communication class using bleak (already bundled with HA):

import asyncio
import struct
from bleak import BleakClient

class BmsBluetoothHandler:
    def __init__(self, address: str):
        self._address = address
        self._client: BleakClient | None = None
        self._response_buffer = bytearray()
        self._response_event = asyncio.Event()
        self._response_data: bytes | None = None

    async def connect(self):
        self._client = BleakClient(self._address)
        await self._client.connect()
        await self._client.start_notify(RX_CHAR_UUID, self._on_notify)

    async def disconnect(self):
        if self._client and self._client.is_connected:
            await self._client.disconnect()

    def _on_notify(self, _, data: bytearray):
        """Accumulate chunks until complete frame received."""
        self._response_buffer.extend(data)
        # Check for end marker
        if self._response_buffer and self._response_buffer[-1] == FRAME_END:
            if self._response_buffer[0] == FRAME_START:
                self._response_data = bytes(self._response_buffer)
                self._response_buffer.clear()
                self._response_event.set()

    async def request(self, command: bytes, timeout=5.0) -> bytes | None:
        self._response_event.clear()
        self._response_data = None
        await self._client.write_gatt_char(TX_CHAR_UUID, command)
        try:
            await asyncio.wait_for(self._response_event.wait(), timeout)
            return self._response_data
        except asyncio.TimeoutError:
            return None

    @staticmethod
    def parse_general_info(frame: bytes) -> dict:
        # frame[0]=0xDD, frame[1]=CMD, frame[2]=status, frame[3]=length
        # payload starts at frame[4]
        p = frame[4:]
        temp_count = p[22]
        temps = []
        for i in range(temp_count):
            raw = struct.unpack_from(">H", p, 23 + i * 2)[0]
            temps.append((raw - 2731) / 10.0)

        return {
            "voltage":          struct.unpack_from(">H", p, 0)[0] / 100.0,
            "current":          struct.unpack_from(">h", p, 2)[0] / 100.0,  # signed!
            "residual_capacity":struct.unpack_from(">H", p, 4)[0] / 100.0,
            "nominal_capacity": struct.unpack_from(">H", p, 6)[0] / 100.0,
            "cycle_count":      struct.unpack_from(">H", p, 8)[0],
            "state_of_charge":  p[19],
            "cell_count":       p[21],
            "temperatures":     temps,
        }

    @staticmethod
    def parse_cell_info(frame: bytes) -> dict:
        p = frame[4:]
        count = p[0] // 2
        voltages = []
        for i in range(count):
            raw = struct.unpack_from(">H", p, 1 + i * 2)[0]
            voltages.append(raw / 1000.0)
        return {"cell_voltages": voltages}

Step 4 — coordinator.py

from datetime import timedelta
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed

class BmsCoordinator(DataUpdateCoordinator):
    def __init__(self, hass, address: str):
        super().__init__(hass, logger, name="xiaoxiang_bms",
                         update_interval=timedelta(seconds=DEFAULT_POLL_INTERVAL))
        self._handler = BmsBluetoothHandler(address)
        self.data = {}

    async def _async_setup(self):
        await self._handler.connect()

    async def _async_update_data(self):
        try:
            general_frame = await self._handler.request(CMD_GENERAL)
            cell_frame    = await self._handler.request(CMD_CELL)
            if not general_frame or not cell_frame:
                raise UpdateFailed("BMS did not respond")
            data = BmsBluetoothHandler.parse_general_info(general_frame)
            data.update(BmsBluetoothHandler.parse_cell_info(cell_frame))
            # Derived fields
            data["power"] = round(data["voltage"] * data["current"], 2)
            if data["cell_voltages"]:
                data["cell_delta"] = round(
                    (max(data["cell_voltages"]) - min(data["cell_voltages"])) * 1000, 1
                )
            return data
        except Exception as e:
            raise UpdateFailed(f"BMS polling error: {e}") from e

Step 5 — config_flow.py

Two-step config flow:

  1. Auto-discovery: HA detects BLE advertisements matching the BMS service UUID and suggests the device automatically (appears as a notification in HA).
  2. Manual: User can go to Settings → Integrations → Add → "Xiaoxiang BMS" and paste/enter the BT MAC address.
class XiaoxiangBmsConfigFlow(ConfigFlow, domain=DOMAIN):
    VERSION = 1

    async def async_step_bluetooth(self, discovery_info):
        """Called automatically when HA sees the BMS advertising."""
        self._address = discovery_info.address
        self._name = discovery_info.name or "Xiaoxiang BMS"
        await self.async_set_unique_id(self._address)
        self._abort_if_unique_id_configured()
        return await self.async_step_confirm()

    async def async_step_confirm(self, user_input=None):
        if user_input is not None:
            return self.async_create_entry(title=self._name,
                                           data={"address": self._address})
        return self.async_show_form(step_id="confirm", ...)

    async def async_step_user(self, user_input=None):
        """Manual entry fallback."""
        if user_input:
            address = user_input["address"]
            await self.async_set_unique_id(address)
            self._abort_if_unique_id_configured()
            return self.async_create_entry(title="Xiaoxiang BMS",
                                           data={"address": address})
        return self.async_show_form(step_id="user",
                                    data_schema=vol.Schema({"address": str}))

Step 6 — sensor.py

One SensorEntity subclass per data field. Dynamic cell/temp sensors are created in async_setup_entry based on the first successful poll.

SENSOR_DESCRIPTIONS = [
    SensorEntityDescription(key="voltage",           name="Voltage",
        native_unit_of_measurement=UnitOfElectricPotential.VOLT,
        device_class=SensorDeviceClass.VOLTAGE),
    SensorEntityDescription(key="current",           name="Current",
        native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
        device_class=SensorDeviceClass.CURRENT),
    SensorEntityDescription(key="power",             name="Power",
        native_unit_of_measurement=UnitOfPower.WATT,
        device_class=SensorDeviceClass.POWER),
    SensorEntityDescription(key="state_of_charge",  name="State of Charge",
        native_unit_of_measurement=PERCENTAGE,
        device_class=SensorDeviceClass.BATTERY),
    SensorEntityDescription(key="residual_capacity", name="Remaining Capacity",
        native_unit_of_measurement="Ah"),
    SensorEntityDescription(key="nominal_capacity",  name="Nominal Capacity",
        native_unit_of_measurement="Ah"),
    SensorEntityDescription(key="cycle_count",       name="Cycle Count"),
    SensorEntityDescription(key="cell_delta",        name="Cell Voltage Delta",
        native_unit_of_measurement="mV"),
]

Step 7 — __init__.py

async def async_setup_entry(hass, entry):
    coordinator = BmsCoordinator(hass, entry.data["address"])
    await coordinator._async_setup()
    await coordinator.async_config_entry_first_refresh()
    hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
    await hass.config_entries.async_forward_entry_setups(entry, ["sensor"])
    return True

async def async_unload_entry(hass, entry):
    coordinator = hass.data[DOMAIN].pop(entry.entry_id)
    await coordinator._handler.disconnect()
    return await hass.config_entries.async_unload_platforms(entry, ["sensor"])

Installation

Option A — Manual (any HA install)

# On your HA host (SSH or Samba share)
cp -r custom_components/xiaoxiang_bms \
       /config/custom_components/xiaoxiang_bms

# Restart HA, then:
# Settings → Integrations → Add → search "Xiaoxiang BMS"

Option B — HACS Custom Repository

  1. HACS → Custom Repositories → add this repo URL → Category: Integration
  2. Install → Restart HA
  3. Settings → Integrations → Add → "Xiaoxiang BMS"

Key Technical Decisions

Decision Choice Reason
Addon vs Integration Integration Direct HA sensor entities, no MQTT needed
BLE library bleak (HA-bundled) No extra deps, HA manages BT adapter lifecycle
Discovery Bluetooth service UUID match User gets auto-discovery notification in HA
Poll vs Push Poll (30s) BMS doesn't send unsolicited notifications; request→response protocol
Data model DataUpdateCoordinator Single shared connection, all sensors refresh together
Dynamic entities First-poll count Cell/temp count varies by battery; discover at runtime

Development & Testing

Test without real hardware

Use a BLE UART simulator (e.g., nRF Connect app or a Raspberry Pi with bluezero or bumble) to send back canned frames to validate parsing.

# Minimal test for parser
frame = bytes([0xDD, 0x03, 0x00, 0x1B,
               0x05, 0xDC,  # voltage: 15.00V
               0xFF, 0x9C,  # current: -1.00A (charging)
               0x09, 0x60,  # residual: 24.00Ah
               0x0B, 0xB8,  # nominal: 30.00Ah
               0x00, 0x05,  # cycles: 5
               0x00, 0x00, 0x00, 0x00, 0x00, 0x00,  # dates/balance/protection/ver
               0x64,        # SoC: 100%
               0x03,        # MOS
               0x10,        # 16 cells
               0x02,        # 2 temp probes
               0x0A, 0xAB,  # temp1: (2731 - 2731) / 10 → wait: (0x0AAB=2731) → 0°C
               0x0A, 0xC5,  # temp2: (2757 - 2731) / 10 → 2.6°C
               0xFF, 0xFD, 0x77])  # checksum + end
result = BmsBluetoothHandler.parse_general_info(frame)
assert result["voltage"] == 15.0
assert result["current"] == -1.0

Validate on real HA

# Enable debug logging in configuration.yaml:
logger:
  logs:
    custom_components.xiaoxiang_bms: debug

Risks & Mitigations

Risk Mitigation
BLE not available on HA host Docs note: requires Bluetooth hardware; HA OS has it, Docker needs --privileged or BT passthrough
Chunked BLE frames Buffer accumulation in _on_notify until 0x77 end byte seen
BMS goes out of range UpdateFailed → HA marks sensors unavailable; reconnect on next poll
Multiple BMS units Each config entry = separate coordinator = separate MAC
Frame byte offsets vary by firmware Add firmware version detection; make offsets configurable

Implementation Order

  • 1. const.py — UUIDs and command bytes
  • 2. bluetooth_handler.py — protocol parser (unit-testable standalone)
  • 3. coordinator.py — DataUpdateCoordinator wrapping the handler
  • 4. manifest.json — HA metadata
  • 5. __init__.py — entry setup/unload
  • 6. config_flow.py — BT scan + manual entry
  • 7. sensor.py — all sensor entities
  • 8. strings.json / translations/en.json — UI text
  • 9. Local test without hardware (canned frames)
  • 10. Test on real HA + real BMS