""" ClickHouse service for storing device events. """ from datetime import datetime from typing import Any from clickhouse_driver import Client from app.config import settings def get_clickhouse_client() -> Client: """Get ClickHouse client.""" return Client( host=getattr(settings, "CLICKHOUSE_HOST", "localhost"), port=getattr(settings, "CLICKHOUSE_PORT", 9000), database="mybeacon", ) async def insert_ble_events( device_id: int, device_mac: str, organization_id: int | None, events: list[dict[str, Any]], ) -> int: """ Insert BLE events batch into ClickHouse. Args: device_id: Device simple_id device_mac: Device MAC address organization_id: Organization ID (None = 0 for unassigned) events: List of BLE event dicts Returns: Number of inserted rows """ if not events: return 0 client = get_clickhouse_client() rows = [] for event in events: # Parse timestamp (ts is unix timestamp in milliseconds) ts_ms = event.get("ts") if ts_ms: timestamp = datetime.fromtimestamp(ts_ms / 1000.0) else: timestamp = datetime.utcnow() row = ( timestamp, device_id, device_mac, organization_id or 0, event.get("mac", ""), event.get("rssi", 0), event.get("uuid", ""), event.get("major", 0), event.get("minor", 0), event.get("tx_power", 0), str(event), # raw_data ) rows.append(row) # Batch insert client.execute( """ INSERT INTO ble_events ( timestamp, device_id, device_mac, organization_id, beacon_mac, rssi, uuid, major, minor, tx_power, raw_data ) VALUES """, rows, ) return len(rows) async def insert_wifi_events( device_id: int, device_mac: str, organization_id: int | None, events: list[dict[str, Any]], ) -> int: """ Insert WiFi events batch into ClickHouse. Args: device_id: Device simple_id device_mac: Device MAC address organization_id: Organization ID (None = 0 for unassigned) events: List of WiFi event dicts Returns: Number of inserted rows """ if not events: return 0 client = get_clickhouse_client() rows = [] for event in events: # Parse timestamp (ts is unix timestamp in milliseconds) ts_ms = event.get("ts") if ts_ms: timestamp = datetime.fromtimestamp(ts_ms / 1000.0) else: timestamp = datetime.utcnow() row = ( timestamp, device_id, device_mac, organization_id or 0, event.get("mac", ""), event.get("ssid", ""), event.get("rssi", 0), event.get("channel", 0), event.get("frame_type", "probe_request"), str(event), # raw_data ) rows.append(row) # Batch insert client.execute( """ INSERT INTO wifi_events ( timestamp, device_id, device_mac, organization_id, client_mac, ssid, rssi, channel, frame_type, raw_data ) VALUES """, rows, ) return len(rows)