clickhouse_service.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. """
  2. ClickHouse service for storing device events.
  3. """
  4. from datetime import datetime
  5. from typing import Any
  6. import clickhouse_connect
  7. from app.config import settings
  8. def get_clickhouse_client():
  9. """Get ClickHouse client."""
  10. return clickhouse_connect.get_client(
  11. host=getattr(settings, "CLICKHOUSE_HOST", "localhost"),
  12. port=getattr(settings, "CLICKHOUSE_PORT", 8123),
  13. database="mybeacon",
  14. username=getattr(settings, "CLICKHOUSE_USER", "default"),
  15. password=getattr(settings, "CLICKHOUSE_PASSWORD", ""),
  16. )
  17. async def insert_ble_events(
  18. device_id: int,
  19. device_mac: str,
  20. organization_id: int | None,
  21. events: list[dict[str, Any]],
  22. ) -> int:
  23. """
  24. Insert BLE events batch into ClickHouse.
  25. Args:
  26. device_id: Device simple_id
  27. device_mac: Device MAC address
  28. organization_id: Organization ID (None = 0 for unassigned)
  29. events: List of BLE event dicts
  30. Returns:
  31. Number of inserted rows
  32. """
  33. if not events:
  34. return 0
  35. client = get_clickhouse_client()
  36. rows = []
  37. for event in events:
  38. # Parse timestamp (ts is unix timestamp in milliseconds)
  39. ts_ms = event.get("ts")
  40. if ts_ms:
  41. timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
  42. else:
  43. timestamp = datetime.utcnow()
  44. row = (
  45. timestamp,
  46. device_id,
  47. device_mac,
  48. organization_id or 0,
  49. event.get("mac", ""),
  50. event.get("rssi", 0),
  51. event.get("uuid", ""),
  52. event.get("major", 0),
  53. event.get("minor", 0),
  54. event.get("tx_power", 0),
  55. str(event), # raw_data
  56. )
  57. rows.append(row)
  58. # Batch insert
  59. client.insert(
  60. table="ble_events",
  61. data=rows,
  62. column_names=[
  63. "timestamp", "device_id", "device_mac", "organization_id",
  64. "beacon_mac", "rssi", "uuid", "major", "minor", "tx_power", "raw_data"
  65. ],
  66. )
  67. return len(rows)
  68. async def insert_wifi_events(
  69. device_id: int,
  70. device_mac: str,
  71. organization_id: int | None,
  72. events: list[dict[str, Any]],
  73. ) -> int:
  74. """
  75. Insert WiFi events batch into ClickHouse.
  76. Args:
  77. device_id: Device simple_id
  78. device_mac: Device MAC address
  79. organization_id: Organization ID (None = 0 for unassigned)
  80. events: List of WiFi event dicts
  81. Returns:
  82. Number of inserted rows
  83. """
  84. if not events:
  85. return 0
  86. client = get_clickhouse_client()
  87. rows = []
  88. for event in events:
  89. # Parse timestamp (ts is unix timestamp in milliseconds)
  90. ts_ms = event.get("ts")
  91. if ts_ms:
  92. timestamp = datetime.fromtimestamp(ts_ms / 1000.0)
  93. else:
  94. timestamp = datetime.utcnow()
  95. row = (
  96. timestamp,
  97. device_id,
  98. device_mac,
  99. organization_id or 0,
  100. event.get("mac", ""),
  101. event.get("ssid", ""),
  102. event.get("rssi", 0),
  103. event.get("channel", 0),
  104. event.get("frame_type", "probe_request"),
  105. str(event), # raw_data
  106. )
  107. rows.append(row)
  108. # Batch insert
  109. client.insert(
  110. table="wifi_events",
  111. data=rows,
  112. column_names=[
  113. "timestamp", "device_id", "device_mac", "organization_id",
  114. "client_mac", "ssid", "rssi", "channel", "frame_type", "raw_data"
  115. ],
  116. )
  117. return len(rows)