clickhouse_service.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. """
  2. ClickHouse service for storing device events.
  3. """
  4. from datetime import datetime
  5. from typing import Any
  6. from clickhouse_driver import Client
  7. from app.config import settings
  8. def get_clickhouse_client() -> Client:
  9. """Get ClickHouse client."""
  10. return Client(
  11. host=getattr(settings, "CLICKHOUSE_HOST", "localhost"),
  12. port=getattr(settings, "CLICKHOUSE_PORT", 9000),
  13. database="mybeacon",
  14. )
  15. async def insert_ble_events(
  16. device_id: int,
  17. device_mac: str,
  18. organization_id: int | None,
  19. events: list[dict[str, Any]],
  20. ) -> int:
  21. """
  22. Insert BLE events batch into ClickHouse.
  23. Args:
  24. device_id: Device simple_id
  25. device_mac: Device MAC address
  26. organization_id: Organization ID (None = 0 for unassigned)
  27. events: List of BLE event dicts
  28. Returns:
  29. Number of inserted rows
  30. """
  31. if not events:
  32. return 0
  33. client = get_clickhouse_client()
  34. rows = []
  35. for event in events:
  36. # Parse timestamp (ts is unix timestamp in milliseconds)
  37. ts_ms = event.get("ts")
  38. if ts_ms:
  39. timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
  40. else:
  41. timestamp = datetime.utcnow()
  42. row = (
  43. timestamp,
  44. device_id,
  45. device_mac,
  46. organization_id or 0,
  47. event.get("mac", ""),
  48. event.get("rssi", 0),
  49. event.get("uuid", ""),
  50. event.get("major", 0),
  51. event.get("minor", 0),
  52. event.get("tx_power", 0),
  53. str(event), # raw_data
  54. )
  55. rows.append(row)
  56. # Batch insert
  57. client.execute(
  58. """
  59. INSERT INTO ble_events (
  60. timestamp, device_id, device_mac, organization_id,
  61. beacon_mac, rssi, uuid, major, minor, tx_power, raw_data
  62. ) VALUES
  63. """,
  64. rows,
  65. )
  66. return len(rows)
  67. async def insert_wifi_events(
  68. device_id: int,
  69. device_mac: str,
  70. organization_id: int | None,
  71. events: list[dict[str, Any]],
  72. ) -> int:
  73. """
  74. Insert WiFi events batch into ClickHouse.
  75. Args:
  76. device_id: Device simple_id
  77. device_mac: Device MAC address
  78. organization_id: Organization ID (None = 0 for unassigned)
  79. events: List of WiFi event dicts
  80. Returns:
  81. Number of inserted rows
  82. """
  83. if not events:
  84. return 0
  85. client = get_clickhouse_client()
  86. rows = []
  87. for event in events:
  88. # Parse timestamp (ts is unix timestamp in milliseconds)
  89. ts_ms = event.get("ts")
  90. if ts_ms:
  91. timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
  92. else:
  93. timestamp = datetime.utcnow()
  94. row = (
  95. timestamp,
  96. device_id,
  97. device_mac,
  98. organization_id or 0,
  99. event.get("mac", ""),
  100. event.get("ssid", ""),
  101. event.get("rssi", 0),
  102. event.get("channel", 0),
  103. event.get("frame_type", "probe_request"),
  104. str(event), # raw_data
  105. )
  106. rows.append(row)
  107. # Batch insert
  108. client.execute(
  109. """
  110. INSERT INTO wifi_events (
  111. timestamp, device_id, device_mac, organization_id,
  112. client_mac, ssid, rssi, channel, frame_type, raw_data
  113. ) VALUES
  114. """,
  115. rows,
  116. )
  117. return len(rows)