host_monitor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. """
  2. Host monitoring service for collecting system metrics.
  3. """
  4. import asyncio
  5. import time
  6. from datetime import datetime, timedelta, timezone
  7. import psutil
  8. from sqlalchemy import delete, select
  9. from sqlalchemy.ext.asyncio import AsyncSession
  10. from app.core.database import async_session_maker
  11. from app.models.host_metrics import HostMetrics
  12. from app.services.alert_service import alert_service
  13. class HostMonitor:
  14. """Collect and store host system metrics."""
  15. def __init__(self):
  16. self.previous_disk_io = None
  17. self.previous_net_io = None
  18. self.previous_cpu_stats = None
  19. self.previous_timestamp = None
  20. self.running = False
  21. async def collect_metrics(self) -> dict:
  22. """Collect comprehensive system metrics."""
  23. current_timestamp = time.time()
  24. # CPU - detailed
  25. cpu_percent = psutil.cpu_percent(interval=1)
  26. cpu_count = psutil.cpu_count()
  27. cpu_per_core = psutil.cpu_percent(interval=0, percpu=True)
  28. cpu_times = psutil.cpu_times()
  29. cpu_stats = psutil.cpu_stats()
  30. # Context switches and interrupts (delta)
  31. context_switches = cpu_stats.ctx_switches
  32. interrupts = cpu_stats.interrupts
  33. ctx_switches_per_sec = 0
  34. interrupts_per_sec = 0
  35. if self.previous_cpu_stats:
  36. time_delta = current_timestamp - self.previous_timestamp
  37. if time_delta > 0:
  38. ctx_switches_per_sec = (context_switches - self.previous_cpu_stats.ctx_switches) / time_delta
  39. interrupts_per_sec = (interrupts - self.previous_cpu_stats.interrupts) / time_delta
  40. self.previous_cpu_stats = cpu_stats
  41. # Memory - detailed
  42. mem = psutil.virtual_memory()
  43. swap = psutil.swap_memory()
  44. # Load Average
  45. load_avg = psutil.getloadavg()
  46. load_1, load_5, load_15 = load_avg
  47. # Disk I/O - with IOPS and throughput
  48. disk_io = psutil.disk_io_counters()
  49. disk_usage = psutil.disk_usage('/')
  50. # Calculate disk deltas (IOPS, throughput)
  51. disk_read_iops = 0
  52. disk_write_iops = 0
  53. disk_read_mbps = 0
  54. disk_write_mbps = 0
  55. if self.previous_disk_io and self.previous_timestamp:
  56. time_delta = current_timestamp - self.previous_timestamp
  57. if time_delta > 0:
  58. disk_read_iops = (disk_io.read_count - self.previous_disk_io.read_count) / time_delta
  59. disk_write_iops = (disk_io.write_count - self.previous_disk_io.write_count) / time_delta
  60. disk_read_mbps = ((disk_io.read_bytes - self.previous_disk_io.read_bytes) / time_delta) / (1024 * 1024)
  61. disk_write_mbps = ((disk_io.write_bytes - self.previous_disk_io.write_bytes) / time_delta) / (1024 * 1024)
  62. self.previous_disk_io = disk_io
  63. # Network - with packets and throughput
  64. net_io = psutil.net_io_counters()
  65. # Calculate network deltas
  66. net_in_mbps = 0
  67. net_out_mbps = 0
  68. net_packets_in_per_sec = 0
  69. net_packets_out_per_sec = 0
  70. if self.previous_net_io and self.previous_timestamp:
  71. time_delta = current_timestamp - self.previous_timestamp
  72. if time_delta > 0:
  73. net_in_mbps = ((net_io.bytes_recv - self.previous_net_io.bytes_recv) / time_delta) / (1024 * 1024)
  74. net_out_mbps = ((net_io.bytes_sent - self.previous_net_io.bytes_sent) / time_delta) / (1024 * 1024)
  75. net_packets_in_per_sec = (net_io.packets_recv - self.previous_net_io.packets_recv) / time_delta
  76. net_packets_out_per_sec = (net_io.packets_sent - self.previous_net_io.packets_sent) / time_delta
  77. self.previous_net_io = net_io
  78. self.previous_timestamp = current_timestamp
  79. # Processes - top CPU and memory consumers
  80. processes = []
  81. for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
  82. try:
  83. processes.append(proc.info)
  84. except (psutil.NoSuchProcess, psutil.AccessDenied):
  85. pass
  86. top_cpu = sorted(processes, key=lambda p: p.get('cpu_percent', 0), reverse=True)[:5]
  87. top_mem = sorted(processes, key=lambda p: p.get('memory_percent', 0), reverse=True)[:5]
  88. # Clean up process info
  89. top_cpu_clean = [
  90. {'pid': p['pid'], 'name': p['name'], 'cpu': round(p.get('cpu_percent', 0), 1)}
  91. for p in top_cpu if p.get('cpu_percent', 0) > 0
  92. ]
  93. top_mem_clean = [
  94. {'pid': p['pid'], 'name': p['name'], 'mem': round(p.get('memory_percent', 0), 1)}
  95. for p in top_mem if p.get('memory_percent', 0) > 0
  96. ]
  97. return {
  98. 'timestamp': datetime.now(timezone.utc),
  99. # CPU
  100. 'cpu_percent': cpu_percent,
  101. 'cpu_count': cpu_count,
  102. 'cpu_per_core': cpu_per_core,
  103. 'cpu_steal': getattr(cpu_times, 'steal', 0), # VM steal time
  104. 'context_switches_per_sec': int(ctx_switches_per_sec),
  105. 'interrupts_per_sec': int(interrupts_per_sec),
  106. # Memory
  107. 'memory_total': mem.total,
  108. 'memory_used': mem.used,
  109. 'memory_percent': mem.percent,
  110. 'memory_available': mem.available,
  111. 'memory_buffers': getattr(mem, 'buffers', 0),
  112. 'memory_cached': getattr(mem, 'cached', 0),
  113. 'swap_total': swap.total,
  114. 'swap_used': swap.used,
  115. 'swap_percent': swap.percent,
  116. # Load
  117. 'load_1': load_1,
  118. 'load_5': load_5,
  119. 'load_15': load_15,
  120. # Disk I/O
  121. 'disk_read_bytes': disk_io.read_bytes,
  122. 'disk_write_bytes': disk_io.write_bytes,
  123. 'disk_read_iops': int(disk_read_iops),
  124. 'disk_write_iops': int(disk_write_iops),
  125. 'disk_read_mbps': round(disk_read_mbps, 2),
  126. 'disk_write_mbps': round(disk_write_mbps, 2),
  127. 'disk_io_time_ms': getattr(disk_io, 'read_time', 0) + getattr(disk_io, 'write_time', 0),
  128. 'disk_usage_percent': disk_usage.percent,
  129. # Network
  130. 'net_sent_bytes': net_io.bytes_sent,
  131. 'net_recv_bytes': net_io.bytes_recv,
  132. 'net_in_mbps': round(net_in_mbps, 2),
  133. 'net_out_mbps': round(net_out_mbps, 2),
  134. 'net_packets_in_per_sec': int(net_packets_in_per_sec),
  135. 'net_packets_out_per_sec': int(net_packets_out_per_sec),
  136. 'net_errors_in': net_io.errin,
  137. 'net_errors_out': net_io.errout,
  138. 'net_drops_in': net_io.dropin,
  139. 'net_drops_out': net_io.dropout,
  140. # Processes
  141. 'process_count': len(psutil.pids()),
  142. 'thread_count': sum(p.num_threads() for p in psutil.process_iter() if p.is_running()),
  143. 'top_cpu_processes': top_cpu_clean,
  144. 'top_mem_processes': top_mem_clean,
  145. }
  146. async def store_metrics(self, metrics: dict):
  147. """Store metrics in database."""
  148. async with async_session_maker() as session:
  149. metric = HostMetrics(**metrics)
  150. session.add(metric)
  151. await session.commit()
  152. async def check_thresholds(self, metrics: dict):
  153. """Check if metrics exceed configured thresholds and create alerts."""
  154. # Get thresholds from settings
  155. async with async_session_maker() as session:
  156. from app.models.settings import Settings
  157. result = await session.execute(
  158. select(Settings).where(Settings.key == "host_monitoring")
  159. )
  160. settings = result.scalar_one_or_none()
  161. if not settings:
  162. return
  163. thresholds = settings.value
  164. # Check CPU
  165. if metrics['cpu_percent'] > thresholds.get('cpu_threshold', 90):
  166. await alert_service.create_alert(
  167. alert_type='host_metrics',
  168. severity='warning' if metrics['cpu_percent'] < 95 else 'critical',
  169. title=f'High CPU Usage: {metrics["cpu_percent"]:.1f}%',
  170. message=f'CPU usage is at {metrics["cpu_percent"]:.1f}%, threshold is {thresholds.get("cpu_threshold", 90)}%',
  171. alert_metadata={'metric': 'cpu_percent', 'value': metrics['cpu_percent']},
  172. )
  173. # Check Memory
  174. if metrics['memory_percent'] > thresholds.get('memory_threshold', 90):
  175. await alert_service.create_alert(
  176. alert_type='host_metrics',
  177. severity='warning' if metrics['memory_percent'] < 95 else 'critical',
  178. title=f'High Memory Usage: {metrics["memory_percent"]:.1f}%',
  179. message=f'Memory usage is at {metrics["memory_percent"]:.1f}%, threshold is {thresholds.get("memory_threshold", 90)}%',
  180. alert_metadata={'metric': 'memory_percent', 'value': metrics['memory_percent']},
  181. )
  182. # Check Load Average (relative to CPU count)
  183. load_threshold = thresholds.get('load_threshold', 2.0) * metrics['cpu_count']
  184. if metrics['load_1'] > load_threshold:
  185. await alert_service.create_alert(
  186. alert_type='host_metrics',
  187. severity='warning',
  188. title=f'High Load Average: {metrics["load_1"]:.2f}',
  189. message=f'1-minute load average is {metrics["load_1"]:.2f}, threshold is {load_threshold:.2f}',
  190. alert_metadata={'metric': 'load_1', 'value': metrics['load_1']},
  191. )
  192. # Check Disk Usage
  193. if metrics['disk_usage_percent'] > thresholds.get('disk_threshold', 90):
  194. await alert_service.create_alert(
  195. alert_type='host_metrics',
  196. severity='warning' if metrics['disk_usage_percent'] < 95 else 'critical',
  197. title=f'High Disk Usage: {metrics["disk_usage_percent"]:.1f}%',
  198. message=f'Disk usage is at {metrics["disk_usage_percent"]:.1f}%, threshold is {thresholds.get("disk_threshold", 90)}%',
  199. alert_metadata={'metric': 'disk_usage_percent', 'value': metrics['disk_usage_percent']},
  200. )
  201. async def cleanup_old_metrics(self, days: int = 30):
  202. """Delete metrics older than specified days."""
  203. cutoff = datetime.now(timezone.utc) - timedelta(days=days)
  204. async with async_session_maker() as session:
  205. await session.execute(
  206. delete(HostMetrics).where(HostMetrics.timestamp < cutoff)
  207. )
  208. await session.commit()
  209. async def run_monitoring_loop(self):
  210. """Main monitoring loop - runs in background."""
  211. print("[HostMonitor] Starting host monitoring loop")
  212. self.running = True
  213. while self.running:
  214. try:
  215. # Collect metrics
  216. metrics = await self.collect_metrics()
  217. # Store in database
  218. await self.store_metrics(metrics)
  219. # Check thresholds
  220. await self.check_thresholds(metrics)
  221. # Cleanup old data once per hour
  222. if datetime.now().minute == 0:
  223. await self.cleanup_old_metrics()
  224. # Wait 60 seconds before next collection
  225. await asyncio.sleep(60)
  226. except Exception as e:
  227. print(f"[HostMonitor] Error in monitoring loop: {e}")
  228. await asyncio.sleep(60)
  229. async def stop(self):
  230. """Stop monitoring loop."""
  231. print("[HostMonitor] Stopping host monitoring loop")
  232. self.running = False
  233. # Global instance
  234. host_monitor = HostMonitor()