| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- """
- ClickHouse service for storing device events.
- """
- from datetime import datetime
- from typing import Any
- from clickhouse_driver import Client
- from app.config import settings
- def get_clickhouse_client() -> Client:
- """Get ClickHouse client."""
- return Client(
- host=getattr(settings, "CLICKHOUSE_HOST", "localhost"),
- port=getattr(settings, "CLICKHOUSE_PORT", 9000),
- database="mybeacon",
- )
- async def insert_ble_events(
- device_id: int,
- device_mac: str,
- organization_id: int | None,
- events: list[dict[str, Any]],
- ) -> int:
- """
- Insert BLE events batch into ClickHouse.
- Args:
- device_id: Device simple_id
- device_mac: Device MAC address
- organization_id: Organization ID (None = 0 for unassigned)
- events: List of BLE event dicts
- Returns:
- Number of inserted rows
- """
- if not events:
- return 0
- client = get_clickhouse_client()
- rows = []
- for event in events:
- # Parse timestamp (ts is unix timestamp in milliseconds)
- ts_ms = event.get("ts")
- if ts_ms:
- timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
- else:
- timestamp = datetime.utcnow()
- row = (
- timestamp,
- device_id,
- device_mac,
- organization_id or 0,
- event.get("mac", ""),
- event.get("rssi", 0),
- event.get("uuid", ""),
- event.get("major", 0),
- event.get("minor", 0),
- event.get("tx_power", 0),
- str(event), # raw_data
- )
- rows.append(row)
- # Batch insert
- client.execute(
- """
- INSERT INTO ble_events (
- timestamp, device_id, device_mac, organization_id,
- beacon_mac, rssi, uuid, major, minor, tx_power, raw_data
- ) VALUES
- """,
- rows,
- )
- return len(rows)
- async def insert_wifi_events(
- device_id: int,
- device_mac: str,
- organization_id: int | None,
- events: list[dict[str, Any]],
- ) -> int:
- """
- Insert WiFi events batch into ClickHouse.
- Args:
- device_id: Device simple_id
- device_mac: Device MAC address
- organization_id: Organization ID (None = 0 for unassigned)
- events: List of WiFi event dicts
- Returns:
- Number of inserted rows
- """
- if not events:
- return 0
- client = get_clickhouse_client()
- rows = []
- for event in events:
- # Parse timestamp (ts is unix timestamp in milliseconds)
- ts_ms = event.get("ts")
- if ts_ms:
- timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
- else:
- timestamp = datetime.utcnow()
- row = (
- timestamp,
- device_id,
- device_mac,
- organization_id or 0,
- event.get("mac", ""),
- event.get("ssid", ""),
- event.get("rssi", 0),
- event.get("channel", 0),
- event.get("frame_type", "probe_request"),
- str(event), # raw_data
- )
- rows.append(row)
- # Batch insert
- client.execute(
- """
- INSERT INTO wifi_events (
- timestamp, device_id, device_mac, organization_id,
- client_mac, ssid, rssi, channel, frame_type, raw_data
- ) VALUES
- """,
- rows,
- )
- return len(rows)
|