events.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. """
  2. Device events endpoints (BLE and WiFi).
  3. """
  4. import gzip
  5. import json
  6. from typing import Annotated
  7. from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
  8. from pydantic import BaseModel
  9. from sqlalchemy import select
  10. from sqlalchemy.ext.asyncio import AsyncSession
  11. from app.core.database import get_db
  12. from app.models.device import Device
  13. router = APIRouter()
  14. class EventsResponse(BaseModel):
  15. """Events batch response."""
  16. ok: bool = True
  17. received: int
  18. async def _auth_device_token(
  19. authorization: str | None, db: AsyncSession
  20. ) -> Device:
  21. """Authenticate device by token from Authorization header."""
  22. if not authorization or not authorization.lower().startswith("bearer "):
  23. raise HTTPException(
  24. status_code=status.HTTP_401_UNAUTHORIZED,
  25. detail="Missing token",
  26. )
  27. token = authorization.split(None, 1)[1]
  28. result = await db.execute(select(Device).where(Device.device_token == token))
  29. device = result.scalar_one_or_none()
  30. if not device:
  31. raise HTTPException(
  32. status_code=status.HTTP_401_UNAUTHORIZED,
  33. detail="Invalid token",
  34. )
  35. return device
  36. def _read_body(body: bytes, content_encoding: str | None) -> dict:
  37. """Decompress and parse request body."""
  38. raw = body
  39. if content_encoding and content_encoding.lower() == "gzip":
  40. raw = gzip.decompress(body)
  41. return json.loads(raw.decode("utf-8"))
  42. @router.post("/ble", response_model=EventsResponse)
  43. async def ble_batch(
  44. request: Request,
  45. db: Annotated[AsyncSession, Depends(get_db)],
  46. content_encoding: Annotated[str | None, Header()] = None,
  47. authorization: Annotated[str | None, Header()] = None,
  48. ):
  49. """
  50. Receive batch of BLE scan events.
  51. Body can be gzip compressed (Content-Encoding: gzip).
  52. """
  53. device = await _auth_device_token(authorization, db)
  54. payload = _read_body(await request.body(), content_encoding)
  55. count = int(payload.get("count", 0) or len(payload.get("events", [])))
  56. # TODO: Store in ClickHouse
  57. print(
  58. f"[BLE BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}"
  59. )
  60. return EventsResponse(ok=True, received=count)
  61. @router.post("/wifi", response_model=EventsResponse)
  62. async def wifi_batch(
  63. request: Request,
  64. db: Annotated[AsyncSession, Depends(get_db)],
  65. content_encoding: Annotated[str | None, Header()] = None,
  66. authorization: Annotated[str | None, Header()] = None,
  67. ):
  68. """
  69. Receive batch of WiFi probe request events.
  70. Body can be gzip compressed (Content-Encoding: gzip).
  71. """
  72. device = await _auth_device_token(authorization, db)
  73. payload = _read_body(await request.body(), content_encoding)
  74. count = int(payload.get("count", 0) or len(payload.get("events", [])))
  75. # TODO: Store in ClickHouse
  76. print(
  77. f"[WIFI BATCH] device={device.mac_address} simple_id={device.simple_id} count={count}"
  78. )
  79. return EventsResponse(ok=True, received=count)