""" ClickHouse service for storing device events. """ from datetime import datetime from typing import Any import clickhouse_connect from app.config import settings def get_clickhouse_client(): """Get ClickHouse client.""" return clickhouse_connect.get_client( host=getattr(settings, "CLICKHOUSE_HOST", "localhost"), port=getattr(settings, "CLICKHOUSE_PORT", 8123), database="mybeacon", username=getattr(settings, "CLICKHOUSE_USER", "default"), password=getattr(settings, "CLICKHOUSE_PASSWORD", ""), ) async def insert_ble_events( device_id: int, device_mac: str, organization_id: int | None, events: list[dict[str, Any]], ) -> int: """ Insert BLE events batch into ClickHouse. Args: device_id: Device simple_id device_mac: Device MAC address organization_id: Organization ID (None = 0 for unassigned) events: List of BLE event dicts Returns: Number of inserted rows """ if not events: return 0 client = get_clickhouse_client() rows = [] for event in events: # Parse timestamp (ts is unix timestamp in milliseconds) ts_ms = event.get("ts") if ts_ms: timestamp = datetime.fromtimestamp(ts_ms / 1000.0) else: timestamp = datetime.utcnow() row = ( timestamp, device_id, device_mac, organization_id or 0, event.get("mac", ""), event.get("rssi", 0), event.get("uuid", ""), event.get("major", 0), event.get("minor", 0), event.get("tx_power", 0), str(event), # raw_data ) rows.append(row) # Batch insert client.insert( table="ble_events", data=rows, column_names=[ "timestamp", "device_id", "device_mac", "organization_id", "beacon_mac", "rssi", "uuid", "major", "minor", "tx_power", "raw_data" ], ) return len(rows) async def insert_wifi_events( device_id: int, device_mac: str, organization_id: int | None, events: list[dict[str, Any]], ) -> int: """ Insert WiFi events batch into ClickHouse. Args: device_id: Device simple_id device_mac: Device MAC address organization_id: Organization ID (None = 0 for unassigned) events: List of WiFi event dicts Returns: Number of inserted rows """ if not events: return 0 client = get_clickhouse_client() rows = [] for event in events: # Parse timestamp (ts is unix timestamp in milliseconds) ts_ms = event.get("ts") if ts_ms: timestamp = datetime.fromtimestamp(ts_ms / 1000.0) else: timestamp = datetime.utcnow() row = ( timestamp, device_id, device_mac, organization_id or 0, event.get("mac", ""), event.get("ssid", ""), event.get("rssi", 0), event.get("channel", 0), event.get("frame_type", "probe_request"), str(event), # raw_data ) rows.append(row) # Batch insert client.insert( table="wifi_events", data=rows, column_names=[ "timestamp", "device_id", "device_mac", "organization_id", "client_mac", "ssid", "rssi", "channel", "frame_type", "raw_data" ], ) return len(rows)