host_monitor.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. """
  2. Host monitoring service for collecting system metrics.
  3. """
  4. import asyncio
  5. import time
  6. from datetime import datetime, timedelta, timezone
  7. import psutil
  8. from sqlalchemy import delete, select, text
  9. from sqlalchemy.ext.asyncio import AsyncSession
  10. from app.core.database import async_session_maker
  11. from app.models.host_metrics import HostMetrics
  12. from app.services.alert_service import alert_service
  13. class HostMonitor:
  14. """Collect and store host system metrics."""
  15. def __init__(self):
  16. self.previous_disk_io = None
  17. self.previous_net_io = None
  18. self.previous_cpu_stats = None
  19. self.previous_timestamp = None
  20. self.previous_pg_stats = None
  21. self.previous_ch_stats = None
  22. self.running = False
  23. self.latest_metrics = {} # Latest collected metrics for dashboard
  24. async def collect_postgresql_metrics(self) -> dict:
  25. """Collect PostgreSQL database metrics."""
  26. try:
  27. async with async_session_maker() as session:
  28. # Active connections
  29. result = await session.execute(text("""
  30. SELECT count(*) as active,
  31. (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max
  32. FROM pg_stat_activity
  33. WHERE state = 'active'
  34. """))
  35. row = result.fetchone()
  36. active_connections = row[0] if row else 0
  37. total_connections = row[1] if row else 100
  38. # Database size
  39. from app.config import settings
  40. db_name = settings.DATABASE_URL.split('/')[-1].split('?')[0]
  41. result = await session.execute(text(f"""
  42. SELECT pg_database_size('{db_name}')
  43. """))
  44. db_size = int(result.scalar() or 0)
  45. # Cache hit ratio
  46. result = await session.execute(text("""
  47. SELECT
  48. sum(blks_hit) * 100.0 / NULLIF(sum(blks_hit) + sum(blks_read), 0) as cache_hit_ratio
  49. FROM pg_stat_database
  50. """))
  51. cache_hit_ratio = float(result.scalar() or 0)
  52. # Transactions per second (delta-based)
  53. result = await session.execute(text("""
  54. SELECT sum(xact_commit + xact_rollback) as total_xacts,
  55. sum(deadlocks) as deadlocks,
  56. sum(temp_files) as temp_files
  57. FROM pg_stat_database
  58. """))
  59. row = result.fetchone()
  60. total_xacts = int(row[0] or 0)
  61. deadlocks = int(row[1] or 0)
  62. temp_files = int(row[2] or 0)
  63. # Calculate TPS
  64. tps = 0
  65. if self.previous_pg_stats and self.previous_timestamp:
  66. time_delta = time.time() - self.previous_timestamp
  67. if time_delta > 0:
  68. tps = int((total_xacts - self.previous_pg_stats['xacts']) / time_delta)
  69. self.previous_pg_stats = {'xacts': total_xacts}
  70. return {
  71. 'pg_active_connections': active_connections,
  72. 'pg_total_connections': total_connections,
  73. 'pg_database_size_bytes': db_size,
  74. 'pg_cache_hit_ratio': round(cache_hit_ratio, 2),
  75. 'pg_transactions_per_sec': max(0, tps),
  76. 'pg_deadlocks': deadlocks,
  77. 'pg_temp_files': temp_files,
  78. }
  79. except Exception as e:
  80. print(f"[HostMonitor] Error collecting PostgreSQL metrics: {e}")
  81. return {
  82. 'pg_active_connections': 0,
  83. 'pg_total_connections': 0,
  84. 'pg_database_size_bytes': 0,
  85. 'pg_cache_hit_ratio': 0,
  86. 'pg_transactions_per_sec': 0,
  87. 'pg_deadlocks': 0,
  88. 'pg_temp_files': 0,
  89. }
  90. async def collect_clickhouse_metrics(self) -> dict:
  91. """Collect ClickHouse database metrics."""
  92. try:
  93. # Check if clickhouse_connect module is available
  94. try:
  95. import clickhouse_connect
  96. except ImportError:
  97. return {
  98. 'ch_active_queries': 0,
  99. 'ch_database_size_bytes': 0,
  100. 'ch_queries_per_sec': 0,
  101. 'ch_rows_read_per_sec': 0,
  102. 'ch_memory_usage_bytes': 0,
  103. }
  104. from app.config import settings
  105. # Check if ClickHouse is configured
  106. if not hasattr(settings, 'CLICKHOUSE_HOST'):
  107. return {
  108. 'ch_active_queries': 0,
  109. 'ch_database_size_bytes': 0,
  110. 'ch_queries_per_sec': 0,
  111. 'ch_rows_read_per_sec': 0,
  112. 'ch_memory_usage_bytes': 0,
  113. }
  114. # Connect to ClickHouse
  115. client = clickhouse_connect.get_client(
  116. host=settings.CLICKHOUSE_HOST,
  117. port=settings.CLICKHOUSE_PORT,
  118. username=settings.CLICKHOUSE_USER,
  119. password=settings.CLICKHOUSE_PASSWORD,
  120. )
  121. # Active queries
  122. result = client.query("SELECT count() FROM system.processes")
  123. active_queries = result.result_rows[0][0] if result.result_rows else 0
  124. # Database size
  125. result = client.query("""
  126. SELECT sum(bytes) FROM system.parts
  127. WHERE active AND database NOT IN ('system', 'information_schema')
  128. """)
  129. db_size = result.result_rows[0][0] if result.result_rows else 0
  130. # Query stats (delta-based)
  131. result = client.query("""
  132. SELECT
  133. sum(query_count) as queries,
  134. sum(read_rows) as rows_read,
  135. sum(memory_usage) as memory
  136. FROM system.query_log
  137. WHERE event_time > now() - INTERVAL 60 SECOND
  138. """)
  139. row = result.result_rows[0] if result.result_rows else (0, 0, 0)
  140. queries = row[0] or 0
  141. rows_read = row[1] or 0
  142. memory_usage = row[2] or 0
  143. # Calculate QPS
  144. qps = 0
  145. rows_per_sec = 0
  146. if self.previous_ch_stats and self.previous_timestamp:
  147. time_delta = time.time() - self.previous_timestamp
  148. if time_delta > 0:
  149. qps = int((queries - self.previous_ch_stats['queries']) / time_delta)
  150. rows_per_sec = int((rows_read - self.previous_ch_stats['rows']) / time_delta)
  151. self.previous_ch_stats = {'queries': queries, 'rows': rows_read}
  152. return {
  153. 'ch_active_queries': active_queries,
  154. 'ch_database_size_bytes': db_size or 0,
  155. 'ch_queries_per_sec': max(0, qps),
  156. 'ch_rows_read_per_sec': max(0, rows_per_sec),
  157. 'ch_memory_usage_bytes': memory_usage or 0,
  158. }
  159. except Exception as e:
  160. print(f"[HostMonitor] Error collecting ClickHouse metrics: {e}")
  161. return {
  162. 'ch_active_queries': 0,
  163. 'ch_database_size_bytes': 0,
  164. 'ch_queries_per_sec': 0,
  165. 'ch_rows_read_per_sec': 0,
  166. 'ch_memory_usage_bytes': 0,
  167. }
  168. async def collect_metrics(self) -> dict:
  169. """Collect comprehensive system metrics."""
  170. current_timestamp = time.time()
  171. # CPU - detailed
  172. cpu_percent = psutil.cpu_percent(interval=1)
  173. cpu_count = psutil.cpu_count()
  174. cpu_per_core = psutil.cpu_percent(interval=0, percpu=True)
  175. cpu_times = psutil.cpu_times()
  176. cpu_stats = psutil.cpu_stats()
  177. # Context switches and interrupts (delta)
  178. context_switches = cpu_stats.ctx_switches
  179. interrupts = cpu_stats.interrupts
  180. ctx_switches_per_sec = 0
  181. interrupts_per_sec = 0
  182. if self.previous_cpu_stats:
  183. time_delta = current_timestamp - self.previous_timestamp
  184. if time_delta > 0:
  185. ctx_switches_per_sec = (context_switches - self.previous_cpu_stats.ctx_switches) / time_delta
  186. interrupts_per_sec = (interrupts - self.previous_cpu_stats.interrupts) / time_delta
  187. self.previous_cpu_stats = cpu_stats
  188. # Memory - detailed
  189. mem = psutil.virtual_memory()
  190. swap = psutil.swap_memory()
  191. # Load Average
  192. load_avg = psutil.getloadavg()
  193. load_1, load_5, load_15 = load_avg
  194. # Disk I/O - with IOPS and throughput
  195. disk_io = psutil.disk_io_counters()
  196. disk_usage = psutil.disk_usage('/')
  197. # Calculate disk deltas (IOPS, throughput)
  198. disk_read_iops = 0
  199. disk_write_iops = 0
  200. disk_read_mbps = 0
  201. disk_write_mbps = 0
  202. if self.previous_disk_io and self.previous_timestamp:
  203. time_delta = current_timestamp - self.previous_timestamp
  204. if time_delta > 0:
  205. disk_read_iops = (disk_io.read_count - self.previous_disk_io.read_count) / time_delta
  206. disk_write_iops = (disk_io.write_count - self.previous_disk_io.write_count) / time_delta
  207. disk_read_mbps = ((disk_io.read_bytes - self.previous_disk_io.read_bytes) / time_delta) / (1024 * 1024)
  208. disk_write_mbps = ((disk_io.write_bytes - self.previous_disk_io.write_bytes) / time_delta) / (1024 * 1024)
  209. self.previous_disk_io = disk_io
  210. # Network - with packets and throughput
  211. net_io = psutil.net_io_counters()
  212. # Calculate network deltas
  213. net_in_mbps = 0
  214. net_out_mbps = 0
  215. net_packets_in_per_sec = 0
  216. net_packets_out_per_sec = 0
  217. if self.previous_net_io and self.previous_timestamp:
  218. time_delta = current_timestamp - self.previous_timestamp
  219. if time_delta > 0:
  220. net_in_mbps = ((net_io.bytes_recv - self.previous_net_io.bytes_recv) / time_delta) / (1024 * 1024)
  221. net_out_mbps = ((net_io.bytes_sent - self.previous_net_io.bytes_sent) / time_delta) / (1024 * 1024)
  222. net_packets_in_per_sec = (net_io.packets_recv - self.previous_net_io.packets_recv) / time_delta
  223. net_packets_out_per_sec = (net_io.packets_sent - self.previous_net_io.packets_sent) / time_delta
  224. self.previous_net_io = net_io
  225. self.previous_timestamp = current_timestamp
  226. # Processes - top CPU and memory consumers
  227. processes = []
  228. for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
  229. try:
  230. processes.append(proc.info)
  231. except (psutil.NoSuchProcess, psutil.AccessDenied):
  232. pass
  233. top_cpu = sorted(processes, key=lambda p: p.get('cpu_percent', 0), reverse=True)[:5]
  234. top_mem = sorted(processes, key=lambda p: p.get('memory_percent', 0), reverse=True)[:5]
  235. # Clean up process info
  236. top_cpu_clean = [
  237. {'pid': p['pid'], 'name': p['name'], 'cpu': round(p.get('cpu_percent', 0), 1)}
  238. for p in top_cpu if p.get('cpu_percent', 0) > 0
  239. ]
  240. top_mem_clean = [
  241. {'pid': p['pid'], 'name': p['name'], 'mem': round(p.get('memory_percent', 0), 1)}
  242. for p in top_mem if p.get('memory_percent', 0) > 0
  243. ]
  244. metrics = {
  245. 'timestamp': datetime.now(timezone.utc),
  246. # CPU
  247. 'cpu_percent': cpu_percent,
  248. 'cpu_count': cpu_count,
  249. 'cpu_per_core': cpu_per_core,
  250. 'cpu_steal': getattr(cpu_times, 'steal', 0), # VM steal time
  251. 'context_switches_per_sec': int(ctx_switches_per_sec),
  252. 'interrupts_per_sec': int(interrupts_per_sec),
  253. # Memory
  254. 'memory_total': mem.total,
  255. 'memory_used': mem.used,
  256. 'memory_percent': mem.percent,
  257. 'memory_available': mem.available,
  258. 'memory_buffers': getattr(mem, 'buffers', 0),
  259. 'memory_cached': getattr(mem, 'cached', 0),
  260. 'swap_total': swap.total,
  261. 'swap_used': swap.used,
  262. 'swap_percent': swap.percent,
  263. # Load
  264. 'load_1': load_1,
  265. 'load_5': load_5,
  266. 'load_15': load_15,
  267. # Disk I/O
  268. 'disk_read_bytes': disk_io.read_bytes,
  269. 'disk_write_bytes': disk_io.write_bytes,
  270. 'disk_read_iops': int(disk_read_iops),
  271. 'disk_write_iops': int(disk_write_iops),
  272. 'disk_read_mbps': round(disk_read_mbps, 2),
  273. 'disk_write_mbps': round(disk_write_mbps, 2),
  274. 'disk_io_time_ms': getattr(disk_io, 'read_time', 0) + getattr(disk_io, 'write_time', 0),
  275. 'disk_usage_percent': disk_usage.percent,
  276. # Network
  277. 'net_sent_bytes': net_io.bytes_sent,
  278. 'net_recv_bytes': net_io.bytes_recv,
  279. 'net_in_mbps': round(net_in_mbps, 2),
  280. 'net_out_mbps': round(net_out_mbps, 2),
  281. 'net_packets_in_per_sec': int(net_packets_in_per_sec),
  282. 'net_packets_out_per_sec': int(net_packets_out_per_sec),
  283. 'net_errors_in': net_io.errin,
  284. 'net_errors_out': net_io.errout,
  285. 'net_drops_in': net_io.dropin,
  286. 'net_drops_out': net_io.dropout,
  287. # Processes
  288. 'process_count': len(psutil.pids()),
  289. 'thread_count': sum(p.num_threads() for p in psutil.process_iter() if p.is_running()),
  290. 'top_cpu_processes': top_cpu_clean,
  291. 'top_mem_processes': top_mem_clean,
  292. }
  293. # Collect database metrics
  294. pg_metrics = await self.collect_postgresql_metrics()
  295. ch_metrics = await self.collect_clickhouse_metrics()
  296. # Collect HTTP metrics
  297. from app.core.http_metrics import http_metrics_collector
  298. http_metrics = http_metrics_collector.get_metrics()
  299. # Merge all metrics
  300. return {
  301. **metrics,
  302. **pg_metrics,
  303. **ch_metrics,
  304. 'http_requests_per_sec': http_metrics['requests_per_sec'],
  305. 'http_avg_response_time_ms': round(http_metrics['avg_response_time_ms'], 2),
  306. 'http_error_rate': round(http_metrics['error_rate'], 2),
  307. 'http_active_requests': http_metrics['active_requests'],
  308. }
  309. async def store_metrics(self, metrics: dict):
  310. """Store metrics in database."""
  311. async with async_session_maker() as session:
  312. metric = HostMetrics(**metrics)
  313. session.add(metric)
  314. await session.commit()
  315. async def check_thresholds(self, metrics: dict):
  316. """Check if metrics exceed configured thresholds and create alerts."""
  317. # Get thresholds from settings
  318. async with async_session_maker() as session:
  319. from app.models.settings import Settings
  320. result = await session.execute(
  321. select(Settings).where(Settings.key == "host_monitoring")
  322. )
  323. settings = result.scalar_one_or_none()
  324. if not settings:
  325. return
  326. thresholds = settings.value
  327. # Check CPU
  328. if metrics['cpu_percent'] > thresholds.get('cpu_threshold', 90):
  329. await alert_service.create_alert(
  330. alert_type='host_metrics',
  331. severity='warning' if metrics['cpu_percent'] < 95 else 'critical',
  332. title=f'High CPU Usage: {metrics["cpu_percent"]:.1f}%',
  333. message=f'CPU usage is at {metrics["cpu_percent"]:.1f}%, threshold is {thresholds.get("cpu_threshold", 90)}%',
  334. alert_metadata={'metric': 'cpu_percent', 'value': metrics['cpu_percent']},
  335. )
  336. # Check Memory
  337. if metrics['memory_percent'] > thresholds.get('memory_threshold', 90):
  338. await alert_service.create_alert(
  339. alert_type='host_metrics',
  340. severity='warning' if metrics['memory_percent'] < 95 else 'critical',
  341. title=f'High Memory Usage: {metrics["memory_percent"]:.1f}%',
  342. message=f'Memory usage is at {metrics["memory_percent"]:.1f}%, threshold is {thresholds.get("memory_threshold", 90)}%',
  343. alert_metadata={'metric': 'memory_percent', 'value': metrics['memory_percent']},
  344. )
  345. # Check Load Average (relative to CPU count)
  346. load_threshold = thresholds.get('load_threshold', 2.0) * metrics['cpu_count']
  347. if metrics['load_1'] > load_threshold:
  348. await alert_service.create_alert(
  349. alert_type='host_metrics',
  350. severity='warning',
  351. title=f'High Load Average: {metrics["load_1"]:.2f}',
  352. message=f'1-minute load average is {metrics["load_1"]:.2f}, threshold is {load_threshold:.2f}',
  353. alert_metadata={'metric': 'load_1', 'value': metrics['load_1']},
  354. )
  355. # Check Disk Usage
  356. if metrics['disk_usage_percent'] > thresholds.get('disk_threshold', 90):
  357. await alert_service.create_alert(
  358. alert_type='host_metrics',
  359. severity='warning' if metrics['disk_usage_percent'] < 95 else 'critical',
  360. title=f'High Disk Usage: {metrics["disk_usage_percent"]:.1f}%',
  361. message=f'Disk usage is at {metrics["disk_usage_percent"]:.1f}%, threshold is {thresholds.get("disk_threshold", 90)}%',
  362. alert_metadata={'metric': 'disk_usage_percent', 'value': metrics['disk_usage_percent']},
  363. )
  364. async def cleanup_old_metrics(self, days: int = 30):
  365. """Delete metrics older than specified days."""
  366. cutoff = datetime.now(timezone.utc) - timedelta(days=days)
  367. async with async_session_maker() as session:
  368. await session.execute(
  369. delete(HostMetrics).where(HostMetrics.timestamp < cutoff)
  370. )
  371. await session.commit()
  372. async def run_monitoring_loop(self):
  373. """Main monitoring loop - runs in background."""
  374. print("[HostMonitor] Starting host monitoring loop")
  375. self.running = True
  376. while self.running:
  377. try:
  378. # Collect metrics
  379. metrics = await self.collect_metrics()
  380. # Save latest metrics for dashboard
  381. self.latest_metrics = metrics
  382. # Store in database
  383. await self.store_metrics(metrics)
  384. # Check thresholds
  385. await self.check_thresholds(metrics)
  386. # Cleanup old data once per hour
  387. if datetime.now().minute == 0:
  388. await self.cleanup_old_metrics()
  389. # Wait 60 seconds before next collection
  390. await asyncio.sleep(60)
  391. except Exception as e:
  392. print(f"[HostMonitor] Error in monitoring loop: {e}")
  393. await asyncio.sleep(60)
  394. async def stop(self):
  395. """Stop monitoring loop."""
  396. print("[HostMonitor] Stopping host monitoring loop")
  397. self.running = False
  398. # Global instance
  399. host_monitor = HostMonitor()