| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- """
- Device events endpoints (BLE and WiFi).
- """
- import gzip
- import json
- from typing import Annotated
- from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
- from pydantic import BaseModel
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import get_db
- from app.models.device import Device
- router = APIRouter()
- class EventsResponse(BaseModel):
- """Events batch response."""
- ok: bool = True
- received: int
- async def _auth_device_token(
- authorization: str | None, db: AsyncSession
- ) -> Device:
- """Authenticate device by token from Authorization header."""
- if not authorization or not authorization.lower().startswith("bearer "):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Missing token",
- )
- token = authorization.split(None, 1)[1]
- result = await db.execute(select(Device).where(Device.device_token == token))
- device = result.scalar_one_or_none()
- if not device:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid token",
- )
- return device
- def _read_body(body: bytes, content_encoding: str | None) -> dict:
- """Decompress and parse request body."""
- raw = body
- if content_encoding and content_encoding.lower() == "gzip":
- raw = gzip.decompress(body)
- return json.loads(raw.decode("utf-8"))
- @router.post("/ble", response_model=EventsResponse)
- async def ble_batch(
- request: Request,
- db: Annotated[AsyncSession, Depends(get_db)],
- content_encoding: Annotated[str | None, Header()] = None,
- authorization: Annotated[str | None, Header()] = None,
- ):
- """
- Receive batch of BLE scan events.
- Body can be gzip compressed (Content-Encoding: gzip).
- """
- device = await _auth_device_token(authorization, db)
- payload = _read_body(await request.body(), content_encoding)
- count = int(payload.get("count", 0) or len(payload.get("events", [])))
- # TODO: Store in ClickHouse
- print(
- f"[BLE BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}"
- )
- return EventsResponse(ok=True, received=count)
- @router.post("/wifi", response_model=EventsResponse)
- async def wifi_batch(
- request: Request,
- db: Annotated[AsyncSession, Depends(get_db)],
- content_encoding: Annotated[str | None, Header()] = None,
- authorization: Annotated[str | None, Header()] = None,
- ):
- """
- Receive batch of WiFi probe request events.
- Body can be gzip compressed (Content-Encoding: gzip).
- """
- device = await _auth_device_token(authorization, db)
- payload = _read_body(await request.body(), content_encoding)
- count = int(payload.get("count", 0) or len(payload.get("events", [])))
- # TODO: Store in ClickHouse
- print(
- f"[WIFI BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}"
- )
- return EventsResponse(ok=True, received=count)
|