""" Host monitoring service for collecting system metrics. """ import asyncio import time from datetime import datetime, timedelta, timezone import psutil from sqlalchemy import delete, select, text from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import async_session_maker from app.models.host_metrics import HostMetrics from app.services.alert_service import alert_service class HostMonitor: """Collect and store host system metrics.""" def __init__(self): self.previous_disk_io = None self.previous_net_io = None self.previous_cpu_stats = None self.previous_timestamp = None self.previous_pg_stats = None self.previous_ch_stats = None self.running = False self.latest_metrics = {} # Latest collected metrics for dashboard async def collect_postgresql_metrics(self) -> dict: """Collect PostgreSQL database metrics.""" try: async with async_session_maker() as session: # Active connections result = await session.execute(text(""" SELECT count(*) as active, (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max FROM pg_stat_activity WHERE state = 'active' """)) row = result.fetchone() active_connections = row[0] if row else 0 total_connections = row[1] if row else 100 # Database size from app.config import settings db_name = settings.DATABASE_URL.split('/')[-1].split('?')[0] result = await session.execute(text(f""" SELECT pg_database_size('{db_name}') """)) db_size = int(result.scalar() or 0) # Cache hit ratio result = await session.execute(text(""" SELECT sum(blks_hit) * 100.0 / NULLIF(sum(blks_hit) + sum(blks_read), 0) as cache_hit_ratio FROM pg_stat_database """)) cache_hit_ratio = float(result.scalar() or 0) # Transactions per second (delta-based) result = await session.execute(text(""" SELECT sum(xact_commit + xact_rollback) as total_xacts, sum(deadlocks) as deadlocks, sum(temp_files) as temp_files FROM pg_stat_database """)) row = result.fetchone() total_xacts = int(row[0] or 0) deadlocks = int(row[1] or 0) temp_files = int(row[2] or 0) # Calculate TPS tps = 0 if self.previous_pg_stats and self.previous_timestamp: time_delta = time.time() - self.previous_timestamp if time_delta > 0: tps = int((total_xacts - self.previous_pg_stats['xacts']) / time_delta) self.previous_pg_stats = {'xacts': total_xacts} return { 'pg_active_connections': active_connections, 'pg_total_connections': total_connections, 'pg_database_size_bytes': db_size, 'pg_cache_hit_ratio': round(cache_hit_ratio, 2), 'pg_transactions_per_sec': max(0, tps), 'pg_deadlocks': deadlocks, 'pg_temp_files': temp_files, } except Exception as e: print(f"[HostMonitor] Error collecting PostgreSQL metrics: {e}") return { 'pg_active_connections': 0, 'pg_total_connections': 0, 'pg_database_size_bytes': 0, 'pg_cache_hit_ratio': 0, 'pg_transactions_per_sec': 0, 'pg_deadlocks': 0, 'pg_temp_files': 0, } async def collect_clickhouse_metrics(self) -> dict: """Collect ClickHouse database metrics.""" try: # Check if clickhouse_connect module is available try: import clickhouse_connect except ImportError: return { 'ch_active_queries': 0, 'ch_database_size_bytes': 0, 'ch_queries_per_sec': 0, 'ch_rows_read_per_sec': 0, 'ch_memory_usage_bytes': 0, } from app.config import settings # Check if ClickHouse is configured if not hasattr(settings, 'CLICKHOUSE_HOST'): return { 'ch_active_queries': 0, 'ch_database_size_bytes': 0, 'ch_queries_per_sec': 0, 'ch_rows_read_per_sec': 0, 'ch_memory_usage_bytes': 0, } # Connect to ClickHouse client = clickhouse_connect.get_client( host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT, username=settings.CLICKHOUSE_USER, password=settings.CLICKHOUSE_PASSWORD, ) # Active queries result = client.query("SELECT count() FROM system.processes") active_queries = result.result_rows[0][0] if result.result_rows else 0 # Database size result = client.query(""" SELECT sum(bytes) FROM system.parts WHERE active AND database NOT IN ('system', 'information_schema') """) db_size = result.result_rows[0][0] if result.result_rows else 0 # Query stats (delta-based) result = client.query(""" SELECT count() as queries, sum(read_rows) as rows_read, sum(memory_usage) as memory FROM system.query_log WHERE event_time > now() - INTERVAL 60 SECOND """) row = result.result_rows[0] if result.result_rows else (0, 0, 0) queries = int(row[0] or 0) rows_read = int(row[1] or 0) memory_usage = int(row[2] or 0) # Calculate QPS qps = 0 rows_per_sec = 0 if self.previous_ch_stats and self.previous_timestamp: time_delta = time.time() - self.previous_timestamp if time_delta > 0: qps = int((queries - self.previous_ch_stats['queries']) / time_delta) rows_per_sec = int((rows_read - self.previous_ch_stats['rows']) / time_delta) self.previous_ch_stats = {'queries': queries, 'rows': rows_read} return { 'ch_active_queries': active_queries, 'ch_database_size_bytes': db_size or 0, 'ch_queries_per_sec': max(0, qps), 'ch_rows_read_per_sec': max(0, rows_per_sec), 'ch_memory_usage_bytes': memory_usage or 0, } except Exception as e: print(f"[HostMonitor] Error collecting ClickHouse metrics: {e}") return { 'ch_active_queries': 0, 'ch_database_size_bytes': 0, 'ch_queries_per_sec': 0, 'ch_rows_read_per_sec': 0, 'ch_memory_usage_bytes': 0, } async def collect_metrics(self) -> dict: """Collect comprehensive system metrics.""" current_timestamp = time.time() # CPU - detailed cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() cpu_per_core = psutil.cpu_percent(interval=0, percpu=True) cpu_times = psutil.cpu_times() cpu_stats = psutil.cpu_stats() # Context switches and interrupts (delta) context_switches = cpu_stats.ctx_switches interrupts = cpu_stats.interrupts ctx_switches_per_sec = 0 interrupts_per_sec = 0 if self.previous_cpu_stats: time_delta = current_timestamp - self.previous_timestamp if time_delta > 0: ctx_switches_per_sec = (context_switches - self.previous_cpu_stats.ctx_switches) / time_delta interrupts_per_sec = (interrupts - self.previous_cpu_stats.interrupts) / time_delta self.previous_cpu_stats = cpu_stats # Memory - detailed mem = psutil.virtual_memory() swap = psutil.swap_memory() # Load Average load_avg = psutil.getloadavg() load_1, load_5, load_15 = load_avg # Disk I/O - with IOPS and throughput disk_io = psutil.disk_io_counters() disk_usage = psutil.disk_usage('/') # Calculate disk deltas (IOPS, throughput) disk_read_iops = 0 disk_write_iops = 0 disk_read_mbps = 0 disk_write_mbps = 0 if self.previous_disk_io and self.previous_timestamp: time_delta = current_timestamp - self.previous_timestamp if time_delta > 0: disk_read_iops = (disk_io.read_count - self.previous_disk_io.read_count) / time_delta disk_write_iops = (disk_io.write_count - self.previous_disk_io.write_count) / time_delta disk_read_mbps = ((disk_io.read_bytes - self.previous_disk_io.read_bytes) / time_delta) / (1024 * 1024) disk_write_mbps = ((disk_io.write_bytes - self.previous_disk_io.write_bytes) / time_delta) / (1024 * 1024) self.previous_disk_io = disk_io # Network - with packets and throughput net_io = psutil.net_io_counters() # Calculate network deltas net_in_mbps = 0 net_out_mbps = 0 net_packets_in_per_sec = 0 net_packets_out_per_sec = 0 if self.previous_net_io and self.previous_timestamp: time_delta = current_timestamp - self.previous_timestamp if time_delta > 0: net_in_mbps = ((net_io.bytes_recv - self.previous_net_io.bytes_recv) / time_delta) / (1024 * 1024) net_out_mbps = ((net_io.bytes_sent - self.previous_net_io.bytes_sent) / time_delta) / (1024 * 1024) net_packets_in_per_sec = (net_io.packets_recv - self.previous_net_io.packets_recv) / time_delta net_packets_out_per_sec = (net_io.packets_sent - self.previous_net_io.packets_sent) / time_delta self.previous_net_io = net_io self.previous_timestamp = current_timestamp # Processes - top CPU and memory consumers # Convert iterator to list to ensure it's fully consumed and closed processes = [] total_threads = 0 try: proc_list = list(psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'num_threads'])) for proc in proc_list: try: info = proc.info processes.append(info) # Count threads while we're iterating total_threads += info.get('num_threads', 0) except (psutil.NoSuchProcess, psutil.AccessDenied): pass except Exception as e: print(f"[HostMonitor] Error collecting process list: {e}") top_cpu = sorted(processes, key=lambda p: p.get('cpu_percent', 0), reverse=True)[:5] top_mem = sorted(processes, key=lambda p: p.get('memory_percent', 0), reverse=True)[:5] # Clean up process info top_cpu_clean = [ {'pid': p['pid'], 'name': p['name'], 'cpu': round(p.get('cpu_percent', 0), 1)} for p in top_cpu if p.get('cpu_percent', 0) > 0 ] top_mem_clean = [ {'pid': p['pid'], 'name': p['name'], 'mem': round(p.get('memory_percent', 0), 1)} for p in top_mem if p.get('memory_percent', 0) > 0 ] metrics = { 'timestamp': datetime.now(timezone.utc), # CPU 'cpu_percent': cpu_percent, 'cpu_count': cpu_count, 'cpu_per_core': cpu_per_core, 'cpu_steal': getattr(cpu_times, 'steal', 0), # VM steal time 'context_switches_per_sec': int(ctx_switches_per_sec), 'interrupts_per_sec': int(interrupts_per_sec), # Memory 'memory_total': mem.total, 'memory_used': mem.used, 'memory_percent': mem.percent, 'memory_available': mem.available, 'memory_buffers': getattr(mem, 'buffers', 0), 'memory_cached': getattr(mem, 'cached', 0), 'swap_total': swap.total, 'swap_used': swap.used, 'swap_percent': swap.percent, # Load 'load_1': load_1, 'load_5': load_5, 'load_15': load_15, # Disk I/O 'disk_read_bytes': disk_io.read_bytes, 'disk_write_bytes': disk_io.write_bytes, 'disk_read_iops': int(disk_read_iops), 'disk_write_iops': int(disk_write_iops), 'disk_read_mbps': round(disk_read_mbps, 2), 'disk_write_mbps': round(disk_write_mbps, 2), 'disk_io_time_ms': getattr(disk_io, 'read_time', 0) + getattr(disk_io, 'write_time', 0), 'disk_usage_percent': disk_usage.percent, # Network 'net_sent_bytes': net_io.bytes_sent, 'net_recv_bytes': net_io.bytes_recv, 'net_in_mbps': round(net_in_mbps, 2), 'net_out_mbps': round(net_out_mbps, 2), 'net_packets_in_per_sec': int(net_packets_in_per_sec), 'net_packets_out_per_sec': int(net_packets_out_per_sec), 'net_errors_in': net_io.errin, 'net_errors_out': net_io.errout, 'net_drops_in': net_io.dropin, 'net_drops_out': net_io.dropout, # Processes 'process_count': len(processes), # Use already collected process list 'thread_count': total_threads, # Already counted during iteration 'top_cpu_processes': top_cpu_clean, 'top_mem_processes': top_mem_clean, } # Collect database metrics pg_metrics = await self.collect_postgresql_metrics() ch_metrics = await self.collect_clickhouse_metrics() # Collect HTTP metrics from app.core.http_metrics import http_metrics_collector http_metrics = http_metrics_collector.get_metrics() # Merge all metrics return { **metrics, **pg_metrics, **ch_metrics, 'http_requests_per_sec': http_metrics['requests_per_sec'], 'http_avg_response_time_ms': round(http_metrics['avg_response_time_ms'], 2), 'http_error_rate': round(http_metrics['error_rate'], 2), 'http_active_requests': http_metrics['active_requests'], } async def store_metrics(self, metrics: dict): """Store metrics in database.""" async with async_session_maker() as session: metric = HostMetrics(**metrics) session.add(metric) await session.commit() async def check_thresholds(self, metrics: dict): """Check if metrics exceed configured thresholds and create alerts.""" # Get thresholds from settings async with async_session_maker() as session: from app.models.settings import Settings result = await session.execute( select(Settings).where(Settings.key == "host_monitoring") ) settings = result.scalar_one_or_none() if not settings: return thresholds = settings.value # Check CPU if metrics['cpu_percent'] > thresholds.get('cpu_threshold', 90): await alert_service.create_alert( alert_type='host_metrics', severity='warning' if metrics['cpu_percent'] < 95 else 'critical', title=f'High CPU Usage: {metrics["cpu_percent"]:.1f}%', message=f'CPU usage is at {metrics["cpu_percent"]:.1f}%, threshold is {thresholds.get("cpu_threshold", 90)}%', alert_metadata={'metric': 'cpu_percent', 'value': metrics['cpu_percent']}, ) # Check Memory if metrics['memory_percent'] > thresholds.get('memory_threshold', 90): await alert_service.create_alert( alert_type='host_metrics', severity='warning' if metrics['memory_percent'] < 95 else 'critical', title=f'High Memory Usage: {metrics["memory_percent"]:.1f}%', message=f'Memory usage is at {metrics["memory_percent"]:.1f}%, threshold is {thresholds.get("memory_threshold", 90)}%', alert_metadata={'metric': 'memory_percent', 'value': metrics['memory_percent']}, ) # Check Load Average (relative to CPU count) load_threshold = thresholds.get('load_threshold', 2.0) * metrics['cpu_count'] if metrics['load_1'] > load_threshold: await alert_service.create_alert( alert_type='host_metrics', severity='warning', title=f'High Load Average: {metrics["load_1"]:.2f}', message=f'1-minute load average is {metrics["load_1"]:.2f}, threshold is {load_threshold:.2f}', alert_metadata={'metric': 'load_1', 'value': metrics['load_1']}, ) # Check Disk Usage if metrics['disk_usage_percent'] > thresholds.get('disk_threshold', 90): await alert_service.create_alert( alert_type='host_metrics', severity='warning' if metrics['disk_usage_percent'] < 95 else 'critical', title=f'High Disk Usage: {metrics["disk_usage_percent"]:.1f}%', message=f'Disk usage is at {metrics["disk_usage_percent"]:.1f}%, threshold is {thresholds.get("disk_threshold", 90)}%', alert_metadata={'metric': 'disk_usage_percent', 'value': metrics['disk_usage_percent']}, ) async def cleanup_old_metrics(self, days: int = 30): """Delete metrics older than specified days.""" cutoff = datetime.now(timezone.utc) - timedelta(days=days) async with async_session_maker() as session: await session.execute( delete(HostMetrics).where(HostMetrics.timestamp < cutoff) ) await session.commit() async def run_monitoring_loop(self): """Main monitoring loop - runs in background.""" print("[HostMonitor] Starting host monitoring loop") self.running = True while self.running: try: # Collect metrics metrics = await self.collect_metrics() # Save latest metrics for dashboard self.latest_metrics = metrics # Store in database await self.store_metrics(metrics) # Check thresholds await self.check_thresholds(metrics) # Cleanup old data once per hour if datetime.now().minute == 0: await self.cleanup_old_metrics() # Wait 60 seconds before next collection await asyncio.sleep(60) except Exception as e: print(f"[HostMonitor] Error in monitoring loop: {e}") await asyncio.sleep(60) async def stop(self): """Stop monitoring loop.""" print("[HostMonitor] Stopping host monitoring loop") self.running = False # Global instance host_monitor = HostMonitor()