| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- """
- Host monitoring service for collecting system metrics.
- """
- import asyncio
- import time
- from datetime import datetime, timedelta, timezone
- import psutil
- from sqlalchemy import delete, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import async_session_maker
- from app.models.host_metrics import HostMetrics
- from app.services.alert_service import alert_service
- class HostMonitor:
- """Collect and store host system metrics."""
- def __init__(self):
- self.previous_disk_io = None
- self.previous_net_io = None
- self.previous_cpu_stats = None
- self.previous_timestamp = None
- self.running = False
- async def collect_metrics(self) -> dict:
- """Collect comprehensive system metrics."""
- current_timestamp = time.time()
- # CPU - detailed
- cpu_percent = psutil.cpu_percent(interval=1)
- cpu_count = psutil.cpu_count()
- cpu_per_core = psutil.cpu_percent(interval=0, percpu=True)
- cpu_times = psutil.cpu_times()
- cpu_stats = psutil.cpu_stats()
- # Context switches and interrupts (delta)
- context_switches = cpu_stats.ctx_switches
- interrupts = cpu_stats.interrupts
- ctx_switches_per_sec = 0
- interrupts_per_sec = 0
- if self.previous_cpu_stats:
- time_delta = current_timestamp - self.previous_timestamp
- if time_delta > 0:
- ctx_switches_per_sec = (context_switches - self.previous_cpu_stats.ctx_switches) / time_delta
- interrupts_per_sec = (interrupts - self.previous_cpu_stats.interrupts) / time_delta
- self.previous_cpu_stats = cpu_stats
- # Memory - detailed
- mem = psutil.virtual_memory()
- swap = psutil.swap_memory()
- # Load Average
- load_avg = psutil.getloadavg()
- load_1, load_5, load_15 = load_avg
- # Disk I/O - with IOPS and throughput
- disk_io = psutil.disk_io_counters()
- disk_usage = psutil.disk_usage('/')
- # Calculate disk deltas (IOPS, throughput)
- disk_read_iops = 0
- disk_write_iops = 0
- disk_read_mbps = 0
- disk_write_mbps = 0
- if self.previous_disk_io and self.previous_timestamp:
- time_delta = current_timestamp - self.previous_timestamp
- if time_delta > 0:
- disk_read_iops = (disk_io.read_count - self.previous_disk_io.read_count) / time_delta
- disk_write_iops = (disk_io.write_count - self.previous_disk_io.write_count) / time_delta
- disk_read_mbps = ((disk_io.read_bytes - self.previous_disk_io.read_bytes) / time_delta) / (1024 * 1024)
- disk_write_mbps = ((disk_io.write_bytes - self.previous_disk_io.write_bytes) / time_delta) / (1024 * 1024)
- self.previous_disk_io = disk_io
- # Network - with packets and throughput
- net_io = psutil.net_io_counters()
- # Calculate network deltas
- net_in_mbps = 0
- net_out_mbps = 0
- net_packets_in_per_sec = 0
- net_packets_out_per_sec = 0
- if self.previous_net_io and self.previous_timestamp:
- time_delta = current_timestamp - self.previous_timestamp
- if time_delta > 0:
- net_in_mbps = ((net_io.bytes_recv - self.previous_net_io.bytes_recv) / time_delta) / (1024 * 1024)
- net_out_mbps = ((net_io.bytes_sent - self.previous_net_io.bytes_sent) / time_delta) / (1024 * 1024)
- net_packets_in_per_sec = (net_io.packets_recv - self.previous_net_io.packets_recv) / time_delta
- net_packets_out_per_sec = (net_io.packets_sent - self.previous_net_io.packets_sent) / time_delta
- self.previous_net_io = net_io
- self.previous_timestamp = current_timestamp
- # Processes - top CPU and memory consumers
- processes = []
- for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
- try:
- processes.append(proc.info)
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- top_cpu = sorted(processes, key=lambda p: p.get('cpu_percent', 0), reverse=True)[:5]
- top_mem = sorted(processes, key=lambda p: p.get('memory_percent', 0), reverse=True)[:5]
- # Clean up process info
- top_cpu_clean = [
- {'pid': p['pid'], 'name': p['name'], 'cpu': round(p.get('cpu_percent', 0), 1)}
- for p in top_cpu if p.get('cpu_percent', 0) > 0
- ]
- top_mem_clean = [
- {'pid': p['pid'], 'name': p['name'], 'mem': round(p.get('memory_percent', 0), 1)}
- for p in top_mem if p.get('memory_percent', 0) > 0
- ]
- return {
- 'timestamp': datetime.now(timezone.utc),
- # CPU
- 'cpu_percent': cpu_percent,
- 'cpu_count': cpu_count,
- 'cpu_per_core': cpu_per_core,
- 'cpu_steal': getattr(cpu_times, 'steal', 0), # VM steal time
- 'context_switches_per_sec': int(ctx_switches_per_sec),
- 'interrupts_per_sec': int(interrupts_per_sec),
- # Memory
- 'memory_total': mem.total,
- 'memory_used': mem.used,
- 'memory_percent': mem.percent,
- 'memory_available': mem.available,
- 'memory_buffers': getattr(mem, 'buffers', 0),
- 'memory_cached': getattr(mem, 'cached', 0),
- 'swap_total': swap.total,
- 'swap_used': swap.used,
- 'swap_percent': swap.percent,
- # Load
- 'load_1': load_1,
- 'load_5': load_5,
- 'load_15': load_15,
- # Disk I/O
- 'disk_read_bytes': disk_io.read_bytes,
- 'disk_write_bytes': disk_io.write_bytes,
- 'disk_read_iops': int(disk_read_iops),
- 'disk_write_iops': int(disk_write_iops),
- 'disk_read_mbps': round(disk_read_mbps, 2),
- 'disk_write_mbps': round(disk_write_mbps, 2),
- 'disk_io_time_ms': getattr(disk_io, 'read_time', 0) + getattr(disk_io, 'write_time', 0),
- 'disk_usage_percent': disk_usage.percent,
- # Network
- 'net_sent_bytes': net_io.bytes_sent,
- 'net_recv_bytes': net_io.bytes_recv,
- 'net_in_mbps': round(net_in_mbps, 2),
- 'net_out_mbps': round(net_out_mbps, 2),
- 'net_packets_in_per_sec': int(net_packets_in_per_sec),
- 'net_packets_out_per_sec': int(net_packets_out_per_sec),
- 'net_errors_in': net_io.errin,
- 'net_errors_out': net_io.errout,
- 'net_drops_in': net_io.dropin,
- 'net_drops_out': net_io.dropout,
- # Processes
- 'process_count': len(psutil.pids()),
- 'thread_count': sum(p.num_threads() for p in psutil.process_iter() if p.is_running()),
- 'top_cpu_processes': top_cpu_clean,
- 'top_mem_processes': top_mem_clean,
- }
- async def store_metrics(self, metrics: dict):
- """Store metrics in database."""
- async with async_session_maker() as session:
- metric = HostMetrics(**metrics)
- session.add(metric)
- await session.commit()
- async def check_thresholds(self, metrics: dict):
- """Check if metrics exceed configured thresholds and create alerts."""
- # Get thresholds from settings
- async with async_session_maker() as session:
- from app.models.settings import Settings
- result = await session.execute(
- select(Settings).where(Settings.key == "host_monitoring")
- )
- settings = result.scalar_one_or_none()
- if not settings:
- return
- thresholds = settings.value
- # Check CPU
- if metrics['cpu_percent'] > thresholds.get('cpu_threshold', 90):
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning' if metrics['cpu_percent'] < 95 else 'critical',
- title=f'High CPU Usage: {metrics["cpu_percent"]:.1f}%',
- message=f'CPU usage is at {metrics["cpu_percent"]:.1f}%, threshold is {thresholds.get("cpu_threshold", 90)}%',
- alert_metadata={'metric': 'cpu_percent', 'value': metrics['cpu_percent']},
- )
- # Check Memory
- if metrics['memory_percent'] > thresholds.get('memory_threshold', 90):
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning' if metrics['memory_percent'] < 95 else 'critical',
- title=f'High Memory Usage: {metrics["memory_percent"]:.1f}%',
- message=f'Memory usage is at {metrics["memory_percent"]:.1f}%, threshold is {thresholds.get("memory_threshold", 90)}%',
- alert_metadata={'metric': 'memory_percent', 'value': metrics['memory_percent']},
- )
- # Check Load Average (relative to CPU count)
- load_threshold = thresholds.get('load_threshold', 2.0) * metrics['cpu_count']
- if metrics['load_1'] > load_threshold:
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning',
- title=f'High Load Average: {metrics["load_1"]:.2f}',
- message=f'1-minute load average is {metrics["load_1"]:.2f}, threshold is {load_threshold:.2f}',
- alert_metadata={'metric': 'load_1', 'value': metrics['load_1']},
- )
- # Check Disk Usage
- if metrics['disk_usage_percent'] > thresholds.get('disk_threshold', 90):
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning' if metrics['disk_usage_percent'] < 95 else 'critical',
- title=f'High Disk Usage: {metrics["disk_usage_percent"]:.1f}%',
- message=f'Disk usage is at {metrics["disk_usage_percent"]:.1f}%, threshold is {thresholds.get("disk_threshold", 90)}%',
- alert_metadata={'metric': 'disk_usage_percent', 'value': metrics['disk_usage_percent']},
- )
- async def cleanup_old_metrics(self, days: int = 30):
- """Delete metrics older than specified days."""
- cutoff = datetime.now(timezone.utc) - timedelta(days=days)
- async with async_session_maker() as session:
- await session.execute(
- delete(HostMetrics).where(HostMetrics.timestamp < cutoff)
- )
- await session.commit()
- async def run_monitoring_loop(self):
- """Main monitoring loop - runs in background."""
- print("[HostMonitor] Starting host monitoring loop")
- self.running = True
- while self.running:
- try:
- # Collect metrics
- metrics = await self.collect_metrics()
- # Store in database
- await self.store_metrics(metrics)
- # Check thresholds
- await self.check_thresholds(metrics)
- # Cleanup old data once per hour
- if datetime.now().minute == 0:
- await self.cleanup_old_metrics()
- # Wait 60 seconds before next collection
- await asyncio.sleep(60)
- except Exception as e:
- print(f"[HostMonitor] Error in monitoring loop: {e}")
- await asyncio.sleep(60)
- async def stop(self):
- """Stop monitoring loop."""
- print("[HostMonitor] Stopping host monitoring loop")
- self.running = False
- # Global instance
- host_monitor = HostMonitor()
|