host_monitor.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. """
  2. Host monitoring service for collecting system metrics.
  3. """
  4. import asyncio
  5. import time
  6. from datetime import datetime, timedelta, timezone
  7. import psutil
  8. from sqlalchemy import delete, select, text
  9. from sqlalchemy.ext.asyncio import AsyncSession
  10. from app.core.database import async_session_maker
  11. from app.models.host_metrics import HostMetrics
  12. from app.services.alert_service import alert_service
  13. class HostMonitor:
  14. """Collect and store host system metrics."""
  15. def __init__(self):
  16. self.previous_disk_io = None
  17. self.previous_net_io = None
  18. self.previous_cpu_stats = None
  19. self.previous_timestamp = None
  20. self.previous_pg_stats = None
  21. self.previous_ch_stats = None
  22. self.running = False
  23. async def collect_postgresql_metrics(self) -> dict:
  24. """Collect PostgreSQL database metrics."""
  25. try:
  26. async with async_session_maker() as session:
  27. # Active connections
  28. result = await session.execute(text("""
  29. SELECT count(*) as active,
  30. (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max
  31. FROM pg_stat_activity
  32. WHERE state = 'active'
  33. """))
  34. row = result.fetchone()
  35. active_connections = row[0] if row else 0
  36. total_connections = row[1] if row else 100
  37. # Database size
  38. from app.config import settings
  39. db_name = settings.DATABASE_URL.split('/')[-1].split('?')[0]
  40. result = await session.execute(text(f"""
  41. SELECT pg_database_size('{db_name}')
  42. """))
  43. db_size = result.scalar() or 0
  44. # Cache hit ratio
  45. result = await session.execute(text("""
  46. SELECT
  47. sum(blks_hit) * 100.0 / NULLIF(sum(blks_hit) + sum(blks_read), 0) as cache_hit_ratio
  48. FROM pg_stat_database
  49. """))
  50. cache_hit_ratio = result.scalar() or 0
  51. # Transactions per second (delta-based)
  52. result = await session.execute(text("""
  53. SELECT sum(xact_commit + xact_rollback) as total_xacts,
  54. sum(deadlocks) as deadlocks,
  55. sum(temp_files) as temp_files
  56. FROM pg_stat_database
  57. """))
  58. row = result.fetchone()
  59. total_xacts = row[0] or 0
  60. deadlocks = row[1] or 0
  61. temp_files = row[2] or 0
  62. # Calculate TPS
  63. tps = 0
  64. if self.previous_pg_stats and self.previous_timestamp:
  65. time_delta = time.time() - self.previous_timestamp
  66. if time_delta > 0:
  67. tps = int((total_xacts - self.previous_pg_stats['xacts']) / time_delta)
  68. self.previous_pg_stats = {'xacts': total_xacts}
  69. return {
  70. 'pg_active_connections': active_connections,
  71. 'pg_total_connections': total_connections,
  72. 'pg_database_size_bytes': db_size,
  73. 'pg_cache_hit_ratio': round(cache_hit_ratio, 2),
  74. 'pg_transactions_per_sec': max(0, tps),
  75. 'pg_deadlocks': deadlocks,
  76. 'pg_temp_files': temp_files,
  77. }
  78. except Exception as e:
  79. print(f"[HostMonitor] Error collecting PostgreSQL metrics: {e}")
  80. return {
  81. 'pg_active_connections': 0,
  82. 'pg_total_connections': 0,
  83. 'pg_database_size_bytes': 0,
  84. 'pg_cache_hit_ratio': 0,
  85. 'pg_transactions_per_sec': 0,
  86. 'pg_deadlocks': 0,
  87. 'pg_temp_files': 0,
  88. }
  89. async def collect_clickhouse_metrics(self) -> dict:
  90. """Collect ClickHouse database metrics."""
  91. try:
  92. import clickhouse_connect
  93. from app.config import settings
  94. # Check if ClickHouse is configured
  95. if not hasattr(settings, 'CLICKHOUSE_HOST'):
  96. return {
  97. 'ch_active_queries': 0,
  98. 'ch_database_size_bytes': 0,
  99. 'ch_queries_per_sec': 0,
  100. 'ch_rows_read_per_sec': 0,
  101. 'ch_memory_usage_bytes': 0,
  102. }
  103. # Connect to ClickHouse
  104. client = clickhouse_connect.get_client(
  105. host=settings.CLICKHOUSE_HOST,
  106. port=settings.CLICKHOUSE_PORT,
  107. username=settings.CLICKHOUSE_USER,
  108. password=settings.CLICKHOUSE_PASSWORD,
  109. )
  110. # Active queries
  111. result = client.query("SELECT count() FROM system.processes")
  112. active_queries = result.result_rows[0][0] if result.result_rows else 0
  113. # Database size
  114. result = client.query("""
  115. SELECT sum(bytes) FROM system.parts
  116. WHERE active AND database NOT IN ('system', 'information_schema')
  117. """)
  118. db_size = result.result_rows[0][0] if result.result_rows else 0
  119. # Query stats (delta-based)
  120. result = client.query("""
  121. SELECT
  122. sum(query_count) as queries,
  123. sum(read_rows) as rows_read,
  124. sum(memory_usage) as memory
  125. FROM system.query_log
  126. WHERE event_time > now() - INTERVAL 60 SECOND
  127. """)
  128. row = result.result_rows[0] if result.result_rows else (0, 0, 0)
  129. queries = row[0] or 0
  130. rows_read = row[1] or 0
  131. memory_usage = row[2] or 0
  132. # Calculate QPS
  133. qps = 0
  134. rows_per_sec = 0
  135. if self.previous_ch_stats and self.previous_timestamp:
  136. time_delta = time.time() - self.previous_timestamp
  137. if time_delta > 0:
  138. qps = int((queries - self.previous_ch_stats['queries']) / time_delta)
  139. rows_per_sec = int((rows_read - self.previous_ch_stats['rows']) / time_delta)
  140. self.previous_ch_stats = {'queries': queries, 'rows': rows_read}
  141. return {
  142. 'ch_active_queries': active_queries,
  143. 'ch_database_size_bytes': db_size or 0,
  144. 'ch_queries_per_sec': max(0, qps),
  145. 'ch_rows_read_per_sec': max(0, rows_per_sec),
  146. 'ch_memory_usage_bytes': memory_usage or 0,
  147. }
  148. except Exception as e:
  149. print(f"[HostMonitor] Error collecting ClickHouse metrics: {e}")
  150. return {
  151. 'ch_active_queries': 0,
  152. 'ch_database_size_bytes': 0,
  153. 'ch_queries_per_sec': 0,
  154. 'ch_rows_read_per_sec': 0,
  155. 'ch_memory_usage_bytes': 0,
  156. }
  157. async def collect_metrics(self) -> dict:
  158. """Collect comprehensive system metrics."""
  159. current_timestamp = time.time()
  160. # CPU - detailed
  161. cpu_percent = psutil.cpu_percent(interval=1)
  162. cpu_count = psutil.cpu_count()
  163. cpu_per_core = psutil.cpu_percent(interval=0, percpu=True)
  164. cpu_times = psutil.cpu_times()
  165. cpu_stats = psutil.cpu_stats()
  166. # Context switches and interrupts (delta)
  167. context_switches = cpu_stats.ctx_switches
  168. interrupts = cpu_stats.interrupts
  169. ctx_switches_per_sec = 0
  170. interrupts_per_sec = 0
  171. if self.previous_cpu_stats:
  172. time_delta = current_timestamp - self.previous_timestamp
  173. if time_delta > 0:
  174. ctx_switches_per_sec = (context_switches - self.previous_cpu_stats.ctx_switches) / time_delta
  175. interrupts_per_sec = (interrupts - self.previous_cpu_stats.interrupts) / time_delta
  176. self.previous_cpu_stats = cpu_stats
  177. # Memory - detailed
  178. mem = psutil.virtual_memory()
  179. swap = psutil.swap_memory()
  180. # Load Average
  181. load_avg = psutil.getloadavg()
  182. load_1, load_5, load_15 = load_avg
  183. # Disk I/O - with IOPS and throughput
  184. disk_io = psutil.disk_io_counters()
  185. disk_usage = psutil.disk_usage('/')
  186. # Calculate disk deltas (IOPS, throughput)
  187. disk_read_iops = 0
  188. disk_write_iops = 0
  189. disk_read_mbps = 0
  190. disk_write_mbps = 0
  191. if self.previous_disk_io and self.previous_timestamp:
  192. time_delta = current_timestamp - self.previous_timestamp
  193. if time_delta > 0:
  194. disk_read_iops = (disk_io.read_count - self.previous_disk_io.read_count) / time_delta
  195. disk_write_iops = (disk_io.write_count - self.previous_disk_io.write_count) / time_delta
  196. disk_read_mbps = ((disk_io.read_bytes - self.previous_disk_io.read_bytes) / time_delta) / (1024 * 1024)
  197. disk_write_mbps = ((disk_io.write_bytes - self.previous_disk_io.write_bytes) / time_delta) / (1024 * 1024)
  198. self.previous_disk_io = disk_io
  199. # Network - with packets and throughput
  200. net_io = psutil.net_io_counters()
  201. # Calculate network deltas
  202. net_in_mbps = 0
  203. net_out_mbps = 0
  204. net_packets_in_per_sec = 0
  205. net_packets_out_per_sec = 0
  206. if self.previous_net_io and self.previous_timestamp:
  207. time_delta = current_timestamp - self.previous_timestamp
  208. if time_delta > 0:
  209. net_in_mbps = ((net_io.bytes_recv - self.previous_net_io.bytes_recv) / time_delta) / (1024 * 1024)
  210. net_out_mbps = ((net_io.bytes_sent - self.previous_net_io.bytes_sent) / time_delta) / (1024 * 1024)
  211. net_packets_in_per_sec = (net_io.packets_recv - self.previous_net_io.packets_recv) / time_delta
  212. net_packets_out_per_sec = (net_io.packets_sent - self.previous_net_io.packets_sent) / time_delta
  213. self.previous_net_io = net_io
  214. self.previous_timestamp = current_timestamp
  215. # Processes - top CPU and memory consumers
  216. processes = []
  217. for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
  218. try:
  219. processes.append(proc.info)
  220. except (psutil.NoSuchProcess, psutil.AccessDenied):
  221. pass
  222. top_cpu = sorted(processes, key=lambda p: p.get('cpu_percent', 0), reverse=True)[:5]
  223. top_mem = sorted(processes, key=lambda p: p.get('memory_percent', 0), reverse=True)[:5]
  224. # Clean up process info
  225. top_cpu_clean = [
  226. {'pid': p['pid'], 'name': p['name'], 'cpu': round(p.get('cpu_percent', 0), 1)}
  227. for p in top_cpu if p.get('cpu_percent', 0) > 0
  228. ]
  229. top_mem_clean = [
  230. {'pid': p['pid'], 'name': p['name'], 'mem': round(p.get('memory_percent', 0), 1)}
  231. for p in top_mem if p.get('memory_percent', 0) > 0
  232. ]
  233. return {
  234. 'timestamp': datetime.now(timezone.utc),
  235. # CPU
  236. 'cpu_percent': cpu_percent,
  237. 'cpu_count': cpu_count,
  238. 'cpu_per_core': cpu_per_core,
  239. 'cpu_steal': getattr(cpu_times, 'steal', 0), # VM steal time
  240. 'context_switches_per_sec': int(ctx_switches_per_sec),
  241. 'interrupts_per_sec': int(interrupts_per_sec),
  242. # Memory
  243. 'memory_total': mem.total,
  244. 'memory_used': mem.used,
  245. 'memory_percent': mem.percent,
  246. 'memory_available': mem.available,
  247. 'memory_buffers': getattr(mem, 'buffers', 0),
  248. 'memory_cached': getattr(mem, 'cached', 0),
  249. 'swap_total': swap.total,
  250. 'swap_used': swap.used,
  251. 'swap_percent': swap.percent,
  252. # Load
  253. 'load_1': load_1,
  254. 'load_5': load_5,
  255. 'load_15': load_15,
  256. # Disk I/O
  257. 'disk_read_bytes': disk_io.read_bytes,
  258. 'disk_write_bytes': disk_io.write_bytes,
  259. 'disk_read_iops': int(disk_read_iops),
  260. 'disk_write_iops': int(disk_write_iops),
  261. 'disk_read_mbps': round(disk_read_mbps, 2),
  262. 'disk_write_mbps': round(disk_write_mbps, 2),
  263. 'disk_io_time_ms': getattr(disk_io, 'read_time', 0) + getattr(disk_io, 'write_time', 0),
  264. 'disk_usage_percent': disk_usage.percent,
  265. # Network
  266. 'net_sent_bytes': net_io.bytes_sent,
  267. 'net_recv_bytes': net_io.bytes_recv,
  268. 'net_in_mbps': round(net_in_mbps, 2),
  269. 'net_out_mbps': round(net_out_mbps, 2),
  270. 'net_packets_in_per_sec': int(net_packets_in_per_sec),
  271. 'net_packets_out_per_sec': int(net_packets_out_per_sec),
  272. 'net_errors_in': net_io.errin,
  273. 'net_errors_out': net_io.errout,
  274. 'net_drops_in': net_io.dropin,
  275. 'net_drops_out': net_io.dropout,
  276. # Processes
  277. 'process_count': len(psutil.pids()),
  278. 'thread_count': sum(p.num_threads() for p in psutil.process_iter() if p.is_running()),
  279. 'top_cpu_processes': top_cpu_clean,
  280. 'top_mem_processes': top_mem_processes,
  281. }
  282. # Collect database metrics
  283. pg_metrics = await self.collect_postgresql_metrics()
  284. ch_metrics = await self.collect_clickhouse_metrics()
  285. # Collect HTTP metrics
  286. from app.core.http_metrics import http_metrics_collector
  287. http_metrics = http_metrics_collector.get_metrics()
  288. # Merge all metrics
  289. return {
  290. **metrics,
  291. **pg_metrics,
  292. **ch_metrics,
  293. 'http_requests_per_sec': http_metrics['requests_per_sec'],
  294. 'http_avg_response_time_ms': round(http_metrics['avg_response_time_ms'], 2),
  295. 'http_error_rate': round(http_metrics['error_rate'], 2),
  296. 'http_active_requests': http_metrics['active_requests'],
  297. }
  298. async def store_metrics(self, metrics: dict):
  299. """Store metrics in database."""
  300. async with async_session_maker() as session:
  301. metric = HostMetrics(**metrics)
  302. session.add(metric)
  303. await session.commit()
  304. async def check_thresholds(self, metrics: dict):
  305. """Check if metrics exceed configured thresholds and create alerts."""
  306. # Get thresholds from settings
  307. async with async_session_maker() as session:
  308. from app.models.settings import Settings
  309. result = await session.execute(
  310. select(Settings).where(Settings.key == "host_monitoring")
  311. )
  312. settings = result.scalar_one_or_none()
  313. if not settings:
  314. return
  315. thresholds = settings.value
  316. # Check CPU
  317. if metrics['cpu_percent'] > thresholds.get('cpu_threshold', 90):
  318. await alert_service.create_alert(
  319. alert_type='host_metrics',
  320. severity='warning' if metrics['cpu_percent'] < 95 else 'critical',
  321. title=f'High CPU Usage: {metrics["cpu_percent"]:.1f}%',
  322. message=f'CPU usage is at {metrics["cpu_percent"]:.1f}%, threshold is {thresholds.get("cpu_threshold", 90)}%',
  323. alert_metadata={'metric': 'cpu_percent', 'value': metrics['cpu_percent']},
  324. )
  325. # Check Memory
  326. if metrics['memory_percent'] > thresholds.get('memory_threshold', 90):
  327. await alert_service.create_alert(
  328. alert_type='host_metrics',
  329. severity='warning' if metrics['memory_percent'] < 95 else 'critical',
  330. title=f'High Memory Usage: {metrics["memory_percent"]:.1f}%',
  331. message=f'Memory usage is at {metrics["memory_percent"]:.1f}%, threshold is {thresholds.get("memory_threshold", 90)}%',
  332. alert_metadata={'metric': 'memory_percent', 'value': metrics['memory_percent']},
  333. )
  334. # Check Load Average (relative to CPU count)
  335. load_threshold = thresholds.get('load_threshold', 2.0) * metrics['cpu_count']
  336. if metrics['load_1'] > load_threshold:
  337. await alert_service.create_alert(
  338. alert_type='host_metrics',
  339. severity='warning',
  340. title=f'High Load Average: {metrics["load_1"]:.2f}',
  341. message=f'1-minute load average is {metrics["load_1"]:.2f}, threshold is {load_threshold:.2f}',
  342. alert_metadata={'metric': 'load_1', 'value': metrics['load_1']},
  343. )
  344. # Check Disk Usage
  345. if metrics['disk_usage_percent'] > thresholds.get('disk_threshold', 90):
  346. await alert_service.create_alert(
  347. alert_type='host_metrics',
  348. severity='warning' if metrics['disk_usage_percent'] < 95 else 'critical',
  349. title=f'High Disk Usage: {metrics["disk_usage_percent"]:.1f}%',
  350. message=f'Disk usage is at {metrics["disk_usage_percent"]:.1f}%, threshold is {thresholds.get("disk_threshold", 90)}%',
  351. alert_metadata={'metric': 'disk_usage_percent', 'value': metrics['disk_usage_percent']},
  352. )
  353. async def cleanup_old_metrics(self, days: int = 30):
  354. """Delete metrics older than specified days."""
  355. cutoff = datetime.now(timezone.utc) - timedelta(days=days)
  356. async with async_session_maker() as session:
  357. await session.execute(
  358. delete(HostMetrics).where(HostMetrics.timestamp < cutoff)
  359. )
  360. await session.commit()
  361. async def run_monitoring_loop(self):
  362. """Main monitoring loop - runs in background."""
  363. print("[HostMonitor] Starting host monitoring loop")
  364. self.running = True
  365. while self.running:
  366. try:
  367. # Collect metrics
  368. metrics = await self.collect_metrics()
  369. # Store in database
  370. await self.store_metrics(metrics)
  371. # Check thresholds
  372. await self.check_thresholds(metrics)
  373. # Cleanup old data once per hour
  374. if datetime.now().minute == 0:
  375. await self.cleanup_old_metrics()
  376. # Wait 60 seconds before next collection
  377. await asyncio.sleep(60)
  378. except Exception as e:
  379. print(f"[HostMonitor] Error in monitoring loop: {e}")
  380. await asyncio.sleep(60)
  381. async def stop(self):
  382. """Stop monitoring loop."""
  383. print("[HostMonitor] Stopping host monitoring loop")
  384. self.running = False
  385. # Global instance
  386. host_monitor = HostMonitor()