events.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. """
  2. Device events endpoints (BLE and WiFi).
  3. """
  4. import gzip
  5. import json
  6. from typing import Annotated
  7. from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
  8. from pydantic import BaseModel
  9. from sqlalchemy import select
  10. from sqlalchemy.ext.asyncio import AsyncSession
  11. from app.core.database import get_db
  12. from app.models.device import Device
  13. from app.services import clickhouse_service
  14. router = APIRouter()
  15. class EventsResponse(BaseModel):
  16. """Events batch response."""
  17. ok: bool = True
  18. received: int
  19. async def _auth_device_token(
  20. authorization: str | None, db: AsyncSession
  21. ) -> Device:
  22. """Authenticate device by token from Authorization header."""
  23. if not authorization or not authorization.lower().startswith("bearer "):
  24. raise HTTPException(
  25. status_code=status.HTTP_401_UNAUTHORIZED,
  26. detail="Missing token",
  27. )
  28. token = authorization.split(None, 1)[1]
  29. result = await db.execute(select(Device).where(Device.device_token == token))
  30. device = result.scalar_one_or_none()
  31. if not device:
  32. raise HTTPException(
  33. status_code=status.HTTP_401_UNAUTHORIZED,
  34. detail="Invalid token",
  35. )
  36. return device
  37. def _read_body(body: bytes, content_encoding: str | None) -> dict:
  38. """Decompress and parse request body."""
  39. raw = body
  40. if content_encoding and content_encoding.lower() == "gzip":
  41. raw = gzip.decompress(body)
  42. return json.loads(raw.decode("utf-8"))
  43. @router.post("/ble", response_model=EventsResponse)
  44. async def ble_batch(
  45. request: Request,
  46. db: Annotated[AsyncSession, Depends(get_db)],
  47. content_encoding: Annotated[str | None, Header()] = None,
  48. authorization: Annotated[str | None, Header()] = None,
  49. ):
  50. """
  51. Receive batch of BLE scan events.
  52. Body can be gzip compressed (Content-Encoding: gzip).
  53. """
  54. device = await _auth_device_token(authorization, db)
  55. payload = _read_body(await request.body(), content_encoding)
  56. events = payload.get("events", [])
  57. count = int(payload.get("count", 0) or len(events))
  58. # Store in ClickHouse
  59. inserted = await clickhouse_service.insert_ble_events(
  60. device_id=device.simple_id,
  61. device_mac=device.mac_address,
  62. organization_id=device.organization_id,
  63. events=events,
  64. )
  65. print(
  66. f"[BLE BATCH] device={device.mac_address} simple_id={device.simple_id} count={count} inserted={inserted}"
  67. )
  68. return EventsResponse(ok=True, received=inserted)
  69. @router.post("/wifi", response_model=EventsResponse)
  70. async def wifi_batch(
  71. request: Request,
  72. db: Annotated[AsyncSession, Depends(get_db)],
  73. content_encoding: Annotated[str | None, Header()] = None,
  74. authorization: Annotated[str | None, Header()] = None,
  75. ):
  76. """
  77. Receive batch of WiFi probe request events.
  78. Body can be gzip compressed (Content-Encoding: gzip).
  79. """
  80. device = await _auth_device_token(authorization, db)
  81. payload = _read_body(await request.body(), content_encoding)
  82. events = payload.get("events", [])
  83. count = int(payload.get("count", 0) or len(events))
  84. # Store in ClickHouse
  85. inserted = await clickhouse_service.insert_wifi_events(
  86. device_id=device.simple_id,
  87. device_mac=device.mac_address,
  88. organization_id=device.organization_id,
  89. events=events,
  90. )
  91. print(
  92. f"[WIFI BATCH] device={device.mac_address} simple_id={device.simple_id} count={count} inserted={inserted}"
  93. )
  94. return EventsResponse(ok=True, received=inserted)