Browse Source

Add ClickHouse integration for BLE/WiFi events storage

- Created clickhouse_service.py with insert_ble_events() and insert_wifi_events()
- Updated events.py endpoints to store events in ClickHouse
- Parse unix timestamp (ts field) from device events
- Store full event data in raw_data field
- Tables: mybeacon.ble_events and mybeacon.wifi_events
- Monthly partitioning by timestamp
- Successfully tested with real hardware (192.168.5.244)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
root 1 month ago
parent
commit
d110a9aae5
2 changed files with 165 additions and 8 deletions
  1. 25 8
      backend/app/api/v1/events.py
  2. 140 0
      backend/app/services/clickhouse_service.py

+ 25 - 8
backend/app/api/v1/events.py

@@ -13,6 +13,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
 
 from app.core.database import get_db
 from app.models.device import Device
+from app.services import clickhouse_service
 
 router = APIRouter()
 
@@ -71,14 +72,22 @@ async def ble_batch(
     device = await _auth_device_token(authorization, db)
 
     payload = _read_body(await request.body(), content_encoding)
-    count = int(payload.get("count", 0) or len(payload.get("events", [])))
+    events = payload.get("events", [])
+    count = int(payload.get("count", 0) or len(events))
+
+    # Store in ClickHouse
+    inserted = await clickhouse_service.insert_ble_events(
+        device_id=device.simple_id,
+        device_mac=device.mac_address,
+        organization_id=device.organization_id,
+        events=events,
+    )
 
-    # TODO: Store in ClickHouse
     print(
-        f"[BLE BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}"
+        f"[BLE BATCH] device={device.mac_address} simple_id={device.simple_id} count={count} inserted={inserted}"
     )
 
-    return EventsResponse(ok=True, received=count)
+    return EventsResponse(ok=True, received=inserted)
 
 
 @router.post("/wifi", response_model=EventsResponse)
@@ -96,11 +105,19 @@ async def wifi_batch(
     device = await _auth_device_token(authorization, db)
 
     payload = _read_body(await request.body(), content_encoding)
-    count = int(payload.get("count", 0) or len(payload.get("events", [])))
+    events = payload.get("events", [])
+    count = int(payload.get("count", 0) or len(events))
+
+    # Store in ClickHouse
+    inserted = await clickhouse_service.insert_wifi_events(
+        device_id=device.simple_id,
+        device_mac=device.mac_address,
+        organization_id=device.organization_id,
+        events=events,
+    )
 
-    # TODO: Store in ClickHouse
     print(
-        f"[WIFI BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}"
+        f"[WIFI BATCH] device={device.mac_address} simple_id={device.simple_id} count={count} inserted={inserted}"
     )
 
-    return EventsResponse(ok=True, received=count)
+    return EventsResponse(ok=True, received=inserted)

+ 140 - 0
backend/app/services/clickhouse_service.py

@@ -0,0 +1,140 @@
+"""
+ClickHouse service for storing device events.
+"""
+
+from datetime import datetime
+from typing import Any
+
+from clickhouse_driver import Client
+
+from app.config import settings
+
+
+def get_clickhouse_client() -> Client:
+    """Get ClickHouse client."""
+    return Client(
+        host=getattr(settings, "CLICKHOUSE_HOST", "localhost"),
+        port=getattr(settings, "CLICKHOUSE_PORT", 9000),
+        database="mybeacon",
+    )
+
+
+async def insert_ble_events(
+    device_id: int,
+    device_mac: str,
+    organization_id: int | None,
+    events: list[dict[str, Any]],
+) -> int:
+    """
+    Insert BLE events batch into ClickHouse.
+
+    Args:
+        device_id: Device simple_id
+        device_mac: Device MAC address
+        organization_id: Organization ID (None = 0 for unassigned)
+        events: List of BLE event dicts
+
+    Returns:
+        Number of inserted rows
+    """
+    if not events:
+        return 0
+
+    client = get_clickhouse_client()
+
+    rows = []
+    for event in events:
+        # Parse timestamp (ts is unix timestamp in milliseconds)
+        ts_ms = event.get("ts")
+        if ts_ms:
+            timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
+        else:
+            timestamp = datetime.utcnow()
+
+        row = (
+            timestamp,
+            device_id,
+            device_mac,
+            organization_id or 0,
+            event.get("mac", ""),
+            event.get("rssi", 0),
+            event.get("uuid", ""),
+            event.get("major", 0),
+            event.get("minor", 0),
+            event.get("tx_power", 0),
+            str(event),  # raw_data
+        )
+        rows.append(row)
+
+    # Batch insert
+    client.execute(
+        """
+        INSERT INTO ble_events (
+            timestamp, device_id, device_mac, organization_id,
+            beacon_mac, rssi, uuid, major, minor, tx_power, raw_data
+        ) VALUES
+        """,
+        rows,
+    )
+
+    return len(rows)
+
+
+async def insert_wifi_events(
+    device_id: int,
+    device_mac: str,
+    organization_id: int | None,
+    events: list[dict[str, Any]],
+) -> int:
+    """
+    Insert WiFi events batch into ClickHouse.
+
+    Args:
+        device_id: Device simple_id
+        device_mac: Device MAC address
+        organization_id: Organization ID (None = 0 for unassigned)
+        events: List of WiFi event dicts
+
+    Returns:
+        Number of inserted rows
+    """
+    if not events:
+        return 0
+
+    client = get_clickhouse_client()
+
+    rows = []
+    for event in events:
+        # Parse timestamp (ts is unix timestamp in milliseconds)
+        ts_ms = event.get("ts")
+        if ts_ms:
+            timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
+        else:
+            timestamp = datetime.utcnow()
+
+        row = (
+            timestamp,
+            device_id,
+            device_mac,
+            organization_id or 0,
+            event.get("mac", ""),
+            event.get("ssid", ""),
+            event.get("rssi", 0),
+            event.get("channel", 0),
+            event.get("frame_type", "probe_request"),
+            str(event),  # raw_data
+        )
+        rows.append(row)
+
+    # Batch insert
+    client.execute(
+        """
+        INSERT INTO wifi_events (
+            timestamp, device_id, device_mac, organization_id,
+            client_mac, ssid, rssi, channel, frame_type, raw_data
+        ) VALUES
+        """,
+        rows,
+    )
+
+    return len(rows)