host_monitor.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. """
  2. Host monitoring service for collecting system metrics.
  3. """
  4. import asyncio
  5. import time
  6. from datetime import datetime, timedelta, timezone
  7. import psutil
  8. from sqlalchemy import delete, select, text
  9. from sqlalchemy.ext.asyncio import AsyncSession
  10. from app.core.database import async_session_maker
  11. from app.models.host_metrics import HostMetrics
  12. from app.services.alert_service import alert_service
  13. class HostMonitor:
  14. """Collect and store host system metrics."""
  15. def __init__(self):
  16. self.previous_disk_io = None
  17. self.previous_net_io = None
  18. self.previous_cpu_stats = None
  19. self.previous_timestamp = None
  20. self.previous_pg_stats = None
  21. self.previous_ch_stats = None
  22. self.running = False
  23. self.latest_metrics = {} # Latest collected metrics for dashboard
  24. async def collect_postgresql_metrics(self) -> dict:
  25. """Collect PostgreSQL database metrics."""
  26. try:
  27. async with async_session_maker() as session:
  28. # Active connections
  29. result = await session.execute(text("""
  30. SELECT count(*) as active,
  31. (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max
  32. FROM pg_stat_activity
  33. WHERE state = 'active'
  34. """))
  35. row = result.fetchone()
  36. active_connections = row[0] if row else 0
  37. total_connections = row[1] if row else 100
  38. # Database size
  39. from app.config import settings
  40. db_name = settings.DATABASE_URL.split('/')[-1].split('?')[0]
  41. result = await session.execute(text(f"""
  42. SELECT pg_database_size('{db_name}')
  43. """))
  44. db_size = int(result.scalar() or 0)
  45. # Cache hit ratio
  46. result = await session.execute(text("""
  47. SELECT
  48. sum(blks_hit) * 100.0 / NULLIF(sum(blks_hit) + sum(blks_read), 0) as cache_hit_ratio
  49. FROM pg_stat_database
  50. """))
  51. cache_hit_ratio = float(result.scalar() or 0)
  52. # Transactions per second (delta-based)
  53. result = await session.execute(text("""
  54. SELECT sum(xact_commit + xact_rollback) as total_xacts,
  55. sum(deadlocks) as deadlocks,
  56. sum(temp_files) as temp_files
  57. FROM pg_stat_database
  58. """))
  59. row = result.fetchone()
  60. total_xacts = int(row[0] or 0)
  61. deadlocks = int(row[1] or 0)
  62. temp_files = int(row[2] or 0)
  63. # Calculate TPS
  64. tps = 0
  65. if self.previous_pg_stats and self.previous_timestamp:
  66. time_delta = time.time() - self.previous_timestamp
  67. if time_delta > 0:
  68. tps = int((total_xacts - self.previous_pg_stats['xacts']) / time_delta)
  69. self.previous_pg_stats = {'xacts': total_xacts}
  70. return {
  71. 'pg_active_connections': active_connections,
  72. 'pg_total_connections': total_connections,
  73. 'pg_database_size_bytes': db_size,
  74. 'pg_cache_hit_ratio': round(cache_hit_ratio, 2),
  75. 'pg_transactions_per_sec': max(0, tps),
  76. 'pg_deadlocks': deadlocks,
  77. 'pg_temp_files': temp_files,
  78. }
  79. except Exception as e:
  80. print(f"[HostMonitor] Error collecting PostgreSQL metrics: {e}")
  81. return {
  82. 'pg_active_connections': 0,
  83. 'pg_total_connections': 0,
  84. 'pg_database_size_bytes': 0,
  85. 'pg_cache_hit_ratio': 0,
  86. 'pg_transactions_per_sec': 0,
  87. 'pg_deadlocks': 0,
  88. 'pg_temp_files': 0,
  89. }
  90. async def collect_clickhouse_metrics(self) -> dict:
  91. """Collect ClickHouse database metrics."""
  92. try:
  93. # Check if clickhouse_connect module is available
  94. try:
  95. import clickhouse_connect
  96. except ImportError:
  97. return {
  98. 'ch_active_queries': 0,
  99. 'ch_database_size_bytes': 0,
  100. 'ch_queries_per_sec': 0,
  101. 'ch_rows_read_per_sec': 0,
  102. 'ch_memory_usage_bytes': 0,
  103. }
  104. from app.config import settings
  105. # Check if ClickHouse is configured
  106. if not hasattr(settings, 'CLICKHOUSE_HOST'):
  107. return {
  108. 'ch_active_queries': 0,
  109. 'ch_database_size_bytes': 0,
  110. 'ch_queries_per_sec': 0,
  111. 'ch_rows_read_per_sec': 0,
  112. 'ch_memory_usage_bytes': 0,
  113. }
  114. # Connect to ClickHouse
  115. client = clickhouse_connect.get_client(
  116. host=settings.CLICKHOUSE_HOST,
  117. port=settings.CLICKHOUSE_PORT,
  118. username=settings.CLICKHOUSE_USER,
  119. password=settings.CLICKHOUSE_PASSWORD,
  120. )
  121. # Active queries
  122. result = client.query("SELECT count() FROM system.processes")
  123. active_queries = result.result_rows[0][0] if result.result_rows else 0
  124. # Database size
  125. result = client.query("""
  126. SELECT sum(bytes) FROM system.parts
  127. WHERE active AND database NOT IN ('system', 'information_schema')
  128. """)
  129. db_size = result.result_rows[0][0] if result.result_rows else 0
  130. # Query stats (delta-based)
  131. result = client.query("""
  132. SELECT
  133. count() as queries,
  134. sum(read_rows) as rows_read,
  135. sum(memory_usage) as memory
  136. FROM system.query_log
  137. WHERE event_time > now() - INTERVAL 60 SECOND
  138. """)
  139. row = result.result_rows[0] if result.result_rows else (0, 0, 0)
  140. queries = int(row[0] or 0)
  141. rows_read = int(row[1] or 0)
  142. memory_usage = int(row[2] or 0)
  143. # Calculate QPS
  144. qps = 0
  145. rows_per_sec = 0
  146. if self.previous_ch_stats and self.previous_timestamp:
  147. time_delta = time.time() - self.previous_timestamp
  148. if time_delta > 0:
  149. qps = int((queries - self.previous_ch_stats['queries']) / time_delta)
  150. rows_per_sec = int((rows_read - self.previous_ch_stats['rows']) / time_delta)
  151. self.previous_ch_stats = {'queries': queries, 'rows': rows_read}
  152. return {
  153. 'ch_active_queries': active_queries,
  154. 'ch_database_size_bytes': db_size or 0,
  155. 'ch_queries_per_sec': max(0, qps),
  156. 'ch_rows_read_per_sec': max(0, rows_per_sec),
  157. 'ch_memory_usage_bytes': memory_usage or 0,
  158. }
  159. except Exception as e:
  160. print(f"[HostMonitor] Error collecting ClickHouse metrics: {e}")
  161. return {
  162. 'ch_active_queries': 0,
  163. 'ch_database_size_bytes': 0,
  164. 'ch_queries_per_sec': 0,
  165. 'ch_rows_read_per_sec': 0,
  166. 'ch_memory_usage_bytes': 0,
  167. }
  168. async def collect_metrics(self) -> dict:
  169. """Collect comprehensive system metrics."""
  170. current_timestamp = time.time()
  171. # CPU - detailed
  172. cpu_percent = psutil.cpu_percent(interval=1)
  173. cpu_count = psutil.cpu_count()
  174. cpu_per_core = psutil.cpu_percent(interval=0, percpu=True)
  175. cpu_times = psutil.cpu_times()
  176. cpu_stats = psutil.cpu_stats()
  177. # Context switches and interrupts (delta)
  178. context_switches = cpu_stats.ctx_switches
  179. interrupts = cpu_stats.interrupts
  180. ctx_switches_per_sec = 0
  181. interrupts_per_sec = 0
  182. if self.previous_cpu_stats:
  183. time_delta = current_timestamp - self.previous_timestamp
  184. if time_delta > 0:
  185. ctx_switches_per_sec = (context_switches - self.previous_cpu_stats.ctx_switches) / time_delta
  186. interrupts_per_sec = (interrupts - self.previous_cpu_stats.interrupts) / time_delta
  187. self.previous_cpu_stats = cpu_stats
  188. # Memory - detailed
  189. mem = psutil.virtual_memory()
  190. swap = psutil.swap_memory()
  191. # Load Average
  192. load_avg = psutil.getloadavg()
  193. load_1, load_5, load_15 = load_avg
  194. # Disk I/O - with IOPS and throughput
  195. disk_io = psutil.disk_io_counters()
  196. disk_usage = psutil.disk_usage('/')
  197. # Calculate disk deltas (IOPS, throughput)
  198. disk_read_iops = 0
  199. disk_write_iops = 0
  200. disk_read_mbps = 0
  201. disk_write_mbps = 0
  202. if self.previous_disk_io and self.previous_timestamp:
  203. time_delta = current_timestamp - self.previous_timestamp
  204. if time_delta > 0:
  205. disk_read_iops = (disk_io.read_count - self.previous_disk_io.read_count) / time_delta
  206. disk_write_iops = (disk_io.write_count - self.previous_disk_io.write_count) / time_delta
  207. disk_read_mbps = ((disk_io.read_bytes - self.previous_disk_io.read_bytes) / time_delta) / (1024 * 1024)
  208. disk_write_mbps = ((disk_io.write_bytes - self.previous_disk_io.write_bytes) / time_delta) / (1024 * 1024)
  209. self.previous_disk_io = disk_io
  210. # Network - with packets and throughput
  211. net_io = psutil.net_io_counters()
  212. # Calculate network deltas
  213. net_in_mbps = 0
  214. net_out_mbps = 0
  215. net_packets_in_per_sec = 0
  216. net_packets_out_per_sec = 0
  217. if self.previous_net_io and self.previous_timestamp:
  218. time_delta = current_timestamp - self.previous_timestamp
  219. if time_delta > 0:
  220. net_in_mbps = ((net_io.bytes_recv - self.previous_net_io.bytes_recv) / time_delta) / (1024 * 1024)
  221. net_out_mbps = ((net_io.bytes_sent - self.previous_net_io.bytes_sent) / time_delta) / (1024 * 1024)
  222. net_packets_in_per_sec = (net_io.packets_recv - self.previous_net_io.packets_recv) / time_delta
  223. net_packets_out_per_sec = (net_io.packets_sent - self.previous_net_io.packets_sent) / time_delta
  224. self.previous_net_io = net_io
  225. self.previous_timestamp = current_timestamp
  226. # Processes - top CPU and memory consumers
  227. # Convert iterator to list to ensure it's fully consumed and closed
  228. processes = []
  229. total_threads = 0
  230. try:
  231. proc_list = list(psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'num_threads']))
  232. for proc in proc_list:
  233. try:
  234. info = proc.info
  235. processes.append(info)
  236. # Count threads while we're iterating
  237. total_threads += info.get('num_threads', 0)
  238. except (psutil.NoSuchProcess, psutil.AccessDenied):
  239. pass
  240. except Exception as e:
  241. print(f"[HostMonitor] Error collecting process list: {e}")
  242. top_cpu = sorted(processes, key=lambda p: p.get('cpu_percent', 0), reverse=True)[:5]
  243. top_mem = sorted(processes, key=lambda p: p.get('memory_percent', 0), reverse=True)[:5]
  244. # Clean up process info
  245. top_cpu_clean = [
  246. {'pid': p['pid'], 'name': p['name'], 'cpu': round(p.get('cpu_percent', 0), 1)}
  247. for p in top_cpu if p.get('cpu_percent', 0) > 0
  248. ]
  249. top_mem_clean = [
  250. {'pid': p['pid'], 'name': p['name'], 'mem': round(p.get('memory_percent', 0), 1)}
  251. for p in top_mem if p.get('memory_percent', 0) > 0
  252. ]
  253. metrics = {
  254. 'timestamp': datetime.now(timezone.utc),
  255. # CPU
  256. 'cpu_percent': cpu_percent,
  257. 'cpu_count': cpu_count,
  258. 'cpu_per_core': cpu_per_core,
  259. 'cpu_steal': getattr(cpu_times, 'steal', 0), # VM steal time
  260. 'context_switches_per_sec': int(ctx_switches_per_sec),
  261. 'interrupts_per_sec': int(interrupts_per_sec),
  262. # Memory
  263. 'memory_total': mem.total,
  264. 'memory_used': mem.used,
  265. 'memory_percent': mem.percent,
  266. 'memory_available': mem.available,
  267. 'memory_buffers': getattr(mem, 'buffers', 0),
  268. 'memory_cached': getattr(mem, 'cached', 0),
  269. 'swap_total': swap.total,
  270. 'swap_used': swap.used,
  271. 'swap_percent': swap.percent,
  272. # Load
  273. 'load_1': load_1,
  274. 'load_5': load_5,
  275. 'load_15': load_15,
  276. # Disk I/O
  277. 'disk_read_bytes': disk_io.read_bytes,
  278. 'disk_write_bytes': disk_io.write_bytes,
  279. 'disk_read_iops': int(disk_read_iops),
  280. 'disk_write_iops': int(disk_write_iops),
  281. 'disk_read_mbps': round(disk_read_mbps, 2),
  282. 'disk_write_mbps': round(disk_write_mbps, 2),
  283. 'disk_io_time_ms': getattr(disk_io, 'read_time', 0) + getattr(disk_io, 'write_time', 0),
  284. 'disk_usage_percent': disk_usage.percent,
  285. # Network
  286. 'net_sent_bytes': net_io.bytes_sent,
  287. 'net_recv_bytes': net_io.bytes_recv,
  288. 'net_in_mbps': round(net_in_mbps, 2),
  289. 'net_out_mbps': round(net_out_mbps, 2),
  290. 'net_packets_in_per_sec': int(net_packets_in_per_sec),
  291. 'net_packets_out_per_sec': int(net_packets_out_per_sec),
  292. 'net_errors_in': net_io.errin,
  293. 'net_errors_out': net_io.errout,
  294. 'net_drops_in': net_io.dropin,
  295. 'net_drops_out': net_io.dropout,
  296. # Processes
  297. 'process_count': len(processes), # Use already collected process list
  298. 'thread_count': total_threads, # Already counted during iteration
  299. 'top_cpu_processes': top_cpu_clean,
  300. 'top_mem_processes': top_mem_clean,
  301. }
  302. # Collect database metrics
  303. pg_metrics = await self.collect_postgresql_metrics()
  304. ch_metrics = await self.collect_clickhouse_metrics()
  305. # Collect HTTP metrics
  306. from app.core.http_metrics import http_metrics_collector
  307. http_metrics = http_metrics_collector.get_metrics()
  308. # Merge all metrics
  309. return {
  310. **metrics,
  311. **pg_metrics,
  312. **ch_metrics,
  313. 'http_requests_per_sec': http_metrics['requests_per_sec'],
  314. 'http_avg_response_time_ms': round(http_metrics['avg_response_time_ms'], 2),
  315. 'http_error_rate': round(http_metrics['error_rate'], 2),
  316. 'http_active_requests': http_metrics['active_requests'],
  317. }
  318. async def store_metrics(self, metrics: dict):
  319. """Store metrics in database."""
  320. async with async_session_maker() as session:
  321. metric = HostMetrics(**metrics)
  322. session.add(metric)
  323. await session.commit()
  324. async def check_thresholds(self, metrics: dict):
  325. """Check if metrics exceed configured thresholds and create alerts."""
  326. # Get thresholds from settings
  327. async with async_session_maker() as session:
  328. from app.models.settings import Settings
  329. result = await session.execute(
  330. select(Settings).where(Settings.key == "host_monitoring")
  331. )
  332. settings = result.scalar_one_or_none()
  333. if not settings:
  334. return
  335. thresholds = settings.value
  336. # Check CPU
  337. if metrics['cpu_percent'] > thresholds.get('cpu_threshold', 90):
  338. await alert_service.create_alert(
  339. alert_type='host_metrics',
  340. severity='warning' if metrics['cpu_percent'] < 95 else 'critical',
  341. title=f'High CPU Usage: {metrics["cpu_percent"]:.1f}%',
  342. message=f'CPU usage is at {metrics["cpu_percent"]:.1f}%, threshold is {thresholds.get("cpu_threshold", 90)}%',
  343. alert_metadata={'metric': 'cpu_percent', 'value': metrics['cpu_percent']},
  344. )
  345. # Check Memory
  346. if metrics['memory_percent'] > thresholds.get('memory_threshold', 90):
  347. await alert_service.create_alert(
  348. alert_type='host_metrics',
  349. severity='warning' if metrics['memory_percent'] < 95 else 'critical',
  350. title=f'High Memory Usage: {metrics["memory_percent"]:.1f}%',
  351. message=f'Memory usage is at {metrics["memory_percent"]:.1f}%, threshold is {thresholds.get("memory_threshold", 90)}%',
  352. alert_metadata={'metric': 'memory_percent', 'value': metrics['memory_percent']},
  353. )
  354. # Check Load Average (relative to CPU count)
  355. load_threshold = thresholds.get('load_threshold', 2.0) * metrics['cpu_count']
  356. if metrics['load_1'] > load_threshold:
  357. await alert_service.create_alert(
  358. alert_type='host_metrics',
  359. severity='warning',
  360. title=f'High Load Average: {metrics["load_1"]:.2f}',
  361. message=f'1-minute load average is {metrics["load_1"]:.2f}, threshold is {load_threshold:.2f}',
  362. alert_metadata={'metric': 'load_1', 'value': metrics['load_1']},
  363. )
  364. # Check Disk Usage
  365. if metrics['disk_usage_percent'] > thresholds.get('disk_threshold', 90):
  366. await alert_service.create_alert(
  367. alert_type='host_metrics',
  368. severity='warning' if metrics['disk_usage_percent'] < 95 else 'critical',
  369. title=f'High Disk Usage: {metrics["disk_usage_percent"]:.1f}%',
  370. message=f'Disk usage is at {metrics["disk_usage_percent"]:.1f}%, threshold is {thresholds.get("disk_threshold", 90)}%',
  371. alert_metadata={'metric': 'disk_usage_percent', 'value': metrics['disk_usage_percent']},
  372. )
  373. async def cleanup_old_metrics(self, days: int = 30):
  374. """Delete metrics older than specified days."""
  375. cutoff = datetime.now(timezone.utc) - timedelta(days=days)
  376. async with async_session_maker() as session:
  377. await session.execute(
  378. delete(HostMetrics).where(HostMetrics.timestamp < cutoff)
  379. )
  380. await session.commit()
  381. async def run_monitoring_loop(self):
  382. """Main monitoring loop - runs in background."""
  383. print("[HostMonitor] Starting host monitoring loop")
  384. self.running = True
  385. while self.running:
  386. try:
  387. # Collect metrics
  388. metrics = await self.collect_metrics()
  389. # Save latest metrics for dashboard
  390. self.latest_metrics = metrics
  391. # Store in database
  392. await self.store_metrics(metrics)
  393. # Check thresholds
  394. await self.check_thresholds(metrics)
  395. # Cleanup old data once per hour
  396. if datetime.now().minute == 0:
  397. await self.cleanup_old_metrics()
  398. # Wait 60 seconds before next collection
  399. await asyncio.sleep(60)
  400. except Exception as e:
  401. print(f"[HostMonitor] Error in monitoring loop: {e}")
  402. await asyncio.sleep(60)
  403. async def stop(self):
  404. """Stop monitoring loop."""
  405. print("[HostMonitor] Stopping host monitoring loop")
  406. self.running = False
  407. # Global instance
  408. host_monitor = HostMonitor()