""" Device events endpoints (BLE and WiFi). """ import gzip import json from typing import Annotated from fastapi import APIRouter, Depends, Header, HTTPException, Request, status from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.models.device import Device router = APIRouter() class EventsResponse(BaseModel): """Events batch response.""" ok: bool = True received: int async def _auth_device_token( authorization: str | None, db: AsyncSession ) -> Device: """Authenticate device by token from Authorization header.""" if not authorization or not authorization.lower().startswith("bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing token", ) token = authorization.split(None, 1)[1] result = await db.execute(select(Device).where(Device.device_token == token)) device = result.scalar_one_or_none() if not device: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", ) return device def _read_body(body: bytes, content_encoding: str | None) -> dict: """Decompress and parse request body.""" raw = body if content_encoding and content_encoding.lower() == "gzip": raw = gzip.decompress(body) return json.loads(raw.decode("utf-8")) @router.post("/ble", response_model=EventsResponse) async def ble_batch( request: Request, db: Annotated[AsyncSession, Depends(get_db)], content_encoding: Annotated[str | None, Header()] = None, authorization: Annotated[str | None, Header()] = None, ): """ Receive batch of BLE scan events. Body can be gzip compressed (Content-Encoding: gzip). """ device = await _auth_device_token(authorization, db) payload = _read_body(await request.body(), content_encoding) count = int(payload.get("count", 0) or len(payload.get("events", []))) # TODO: Store in ClickHouse print( f"[BLE BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}" ) return EventsResponse(ok=True, received=count) @router.post("/wifi", response_model=EventsResponse) async def wifi_batch( request: Request, db: Annotated[AsyncSession, Depends(get_db)], content_encoding: Annotated[str | None, Header()] = None, authorization: Annotated[str | None, Header()] = None, ): """ Receive batch of WiFi probe request events. Body can be gzip compressed (Content-Encoding: gzip). """ device = await _auth_device_token(authorization, db) payload = _read_body(await request.body(), content_encoding) count = int(payload.get("count", 0) or len(payload.get("events", []))) # TODO: Store in ClickHouse print( f"[WIFI BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}" ) return EventsResponse(ok=True, received=count)