| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- """
- ClickHouse service for storing device events.
- """
- from datetime import datetime
- from typing import Any
- import clickhouse_connect
- from app.config import settings
- def get_clickhouse_client():
- """Get ClickHouse client."""
- return clickhouse_connect.get_client(
- host=getattr(settings, "CLICKHOUSE_HOST", "localhost"),
- port=getattr(settings, "CLICKHOUSE_PORT", 8123),
- database="mybeacon",
- username=getattr(settings, "CLICKHOUSE_USER", "default"),
- password=getattr(settings, "CLICKHOUSE_PASSWORD", ""),
- )
- async def insert_ble_events(
- device_id: int,
- device_mac: str,
- organization_id: int | None,
- events: list[dict[str, Any]],
- ) -> int:
- """
- Insert BLE events batch into ClickHouse.
- Args:
- device_id: Device simple_id
- device_mac: Device MAC address
- organization_id: Organization ID (None = 0 for unassigned)
- events: List of BLE event dicts
- Returns:
- Number of inserted rows
- """
- if not events:
- return 0
- client = get_clickhouse_client()
- rows = []
- for event in events:
- # Parse timestamp (ts is unix timestamp in milliseconds)
- ts_ms = event.get("ts")
- if ts_ms:
- timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
- else:
- timestamp = datetime.utcnow()
- row = (
- timestamp,
- device_id,
- device_mac,
- organization_id or 0,
- event.get("mac", ""),
- event.get("rssi", 0),
- event.get("uuid", ""),
- event.get("major", 0),
- event.get("minor", 0),
- event.get("tx_power", 0),
- str(event), # raw_data
- )
- rows.append(row)
- # Batch insert
- client.insert(
- table="ble_events",
- data=rows,
- column_names=[
- "timestamp", "device_id", "device_mac", "organization_id",
- "beacon_mac", "rssi", "uuid", "major", "minor", "tx_power", "raw_data"
- ],
- )
- return len(rows)
- async def insert_wifi_events(
- device_id: int,
- device_mac: str,
- organization_id: int | None,
- events: list[dict[str, Any]],
- ) -> int:
- """
- Insert WiFi events batch into ClickHouse.
- Args:
- device_id: Device simple_id
- device_mac: Device MAC address
- organization_id: Organization ID (None = 0 for unassigned)
- events: List of WiFi event dicts
- Returns:
- Number of inserted rows
- """
- if not events:
- return 0
- client = get_clickhouse_client()
- rows = []
- for event in events:
- # Parse timestamp (ts is unix timestamp in milliseconds)
- ts_ms = event.get("ts")
- if ts_ms:
- timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
- else:
- timestamp = datetime.utcnow()
- row = (
- timestamp,
- device_id,
- device_mac,
- organization_id or 0,
- event.get("mac", ""),
- event.get("ssid", ""),
- event.get("rssi", 0),
- event.get("channel", 0),
- event.get("frame_type", "probe_request"),
- str(event), # raw_data
- )
- rows.append(row)
- # Batch insert
- client.insert(
- table="wifi_events",
- data=rows,
- column_names=[
- "timestamp", "device_id", "device_mac", "organization_id",
- "client_mac", "ssid", "rssi", "channel", "frame_type", "raw_data"
- ],
- )
- return len(rows)
|