| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- """
- Host monitoring service for collecting system metrics.
- """
- import asyncio
- import time
- from datetime import datetime, timedelta, timezone
- import psutil
- from sqlalchemy import delete, select, text
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import async_session_maker
- from app.models.host_metrics import HostMetrics
- from app.services.alert_service import alert_service
- class HostMonitor:
- """Collect and store host system metrics."""
- def __init__(self):
- self.previous_disk_io = None
- self.previous_net_io = None
- self.previous_cpu_stats = None
- self.previous_timestamp = None
- self.previous_pg_stats = None
- self.previous_ch_stats = None
- self.running = False
- self.latest_metrics = {} # Latest collected metrics for dashboard
- async def collect_postgresql_metrics(self) -> dict:
- """Collect PostgreSQL database metrics."""
- try:
- async with async_session_maker() as session:
- # Active connections
- result = await session.execute(text("""
- SELECT count(*) as active,
- (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max
- FROM pg_stat_activity
- WHERE state = 'active'
- """))
- row = result.fetchone()
- active_connections = row[0] if row else 0
- total_connections = row[1] if row else 100
- # Database size
- from app.config import settings
- db_name = settings.DATABASE_URL.split('/')[-1].split('?')[0]
- result = await session.execute(text(f"""
- SELECT pg_database_size('{db_name}')
- """))
- db_size = int(result.scalar() or 0)
- # Cache hit ratio
- result = await session.execute(text("""
- SELECT
- sum(blks_hit) * 100.0 / NULLIF(sum(blks_hit) + sum(blks_read), 0) as cache_hit_ratio
- FROM pg_stat_database
- """))
- cache_hit_ratio = float(result.scalar() or 0)
- # Transactions per second (delta-based)
- result = await session.execute(text("""
- SELECT sum(xact_commit + xact_rollback) as total_xacts,
- sum(deadlocks) as deadlocks,
- sum(temp_files) as temp_files
- FROM pg_stat_database
- """))
- row = result.fetchone()
- total_xacts = int(row[0] or 0)
- deadlocks = int(row[1] or 0)
- temp_files = int(row[2] or 0)
- # Calculate TPS
- tps = 0
- if self.previous_pg_stats and self.previous_timestamp:
- time_delta = time.time() - self.previous_timestamp
- if time_delta > 0:
- tps = int((total_xacts - self.previous_pg_stats['xacts']) / time_delta)
- self.previous_pg_stats = {'xacts': total_xacts}
- return {
- 'pg_active_connections': active_connections,
- 'pg_total_connections': total_connections,
- 'pg_database_size_bytes': db_size,
- 'pg_cache_hit_ratio': round(cache_hit_ratio, 2),
- 'pg_transactions_per_sec': max(0, tps),
- 'pg_deadlocks': deadlocks,
- 'pg_temp_files': temp_files,
- }
- except Exception as e:
- print(f"[HostMonitor] Error collecting PostgreSQL metrics: {e}")
- return {
- 'pg_active_connections': 0,
- 'pg_total_connections': 0,
- 'pg_database_size_bytes': 0,
- 'pg_cache_hit_ratio': 0,
- 'pg_transactions_per_sec': 0,
- 'pg_deadlocks': 0,
- 'pg_temp_files': 0,
- }
- async def collect_clickhouse_metrics(self) -> dict:
- """Collect ClickHouse database metrics."""
- try:
- # Check if clickhouse_connect module is available
- try:
- import clickhouse_connect
- except ImportError:
- return {
- 'ch_active_queries': 0,
- 'ch_database_size_bytes': 0,
- 'ch_queries_per_sec': 0,
- 'ch_rows_read_per_sec': 0,
- 'ch_memory_usage_bytes': 0,
- }
- from app.config import settings
- # Check if ClickHouse is configured
- if not hasattr(settings, 'CLICKHOUSE_HOST'):
- return {
- 'ch_active_queries': 0,
- 'ch_database_size_bytes': 0,
- 'ch_queries_per_sec': 0,
- 'ch_rows_read_per_sec': 0,
- 'ch_memory_usage_bytes': 0,
- }
- # Connect to ClickHouse
- client = clickhouse_connect.get_client(
- host=settings.CLICKHOUSE_HOST,
- port=settings.CLICKHOUSE_PORT,
- username=settings.CLICKHOUSE_USER,
- password=settings.CLICKHOUSE_PASSWORD,
- )
- # Active queries
- result = client.query("SELECT count() FROM system.processes")
- active_queries = result.result_rows[0][0] if result.result_rows else 0
- # Database size
- result = client.query("""
- SELECT sum(bytes) FROM system.parts
- WHERE active AND database NOT IN ('system', 'information_schema')
- """)
- db_size = result.result_rows[0][0] if result.result_rows else 0
- # Query stats (delta-based)
- result = client.query("""
- SELECT
- count() as queries,
- sum(read_rows) as rows_read,
- sum(memory_usage) as memory
- FROM system.query_log
- WHERE event_time > now() - INTERVAL 60 SECOND
- """)
- row = result.result_rows[0] if result.result_rows else (0, 0, 0)
- queries = int(row[0] or 0)
- rows_read = int(row[1] or 0)
- memory_usage = int(row[2] or 0)
- # Calculate QPS
- qps = 0
- rows_per_sec = 0
- if self.previous_ch_stats and self.previous_timestamp:
- time_delta = time.time() - self.previous_timestamp
- if time_delta > 0:
- qps = int((queries - self.previous_ch_stats['queries']) / time_delta)
- rows_per_sec = int((rows_read - self.previous_ch_stats['rows']) / time_delta)
- self.previous_ch_stats = {'queries': queries, 'rows': rows_read}
- return {
- 'ch_active_queries': active_queries,
- 'ch_database_size_bytes': db_size or 0,
- 'ch_queries_per_sec': max(0, qps),
- 'ch_rows_read_per_sec': max(0, rows_per_sec),
- 'ch_memory_usage_bytes': memory_usage or 0,
- }
- except Exception as e:
- print(f"[HostMonitor] Error collecting ClickHouse metrics: {e}")
- return {
- 'ch_active_queries': 0,
- 'ch_database_size_bytes': 0,
- 'ch_queries_per_sec': 0,
- 'ch_rows_read_per_sec': 0,
- 'ch_memory_usage_bytes': 0,
- }
- async def collect_metrics(self) -> dict:
- """Collect comprehensive system metrics."""
- current_timestamp = time.time()
- # CPU - detailed
- cpu_percent = psutil.cpu_percent(interval=1)
- cpu_count = psutil.cpu_count()
- cpu_per_core = psutil.cpu_percent(interval=0, percpu=True)
- cpu_times = psutil.cpu_times()
- cpu_stats = psutil.cpu_stats()
- # Context switches and interrupts (delta)
- context_switches = cpu_stats.ctx_switches
- interrupts = cpu_stats.interrupts
- ctx_switches_per_sec = 0
- interrupts_per_sec = 0
- if self.previous_cpu_stats:
- time_delta = current_timestamp - self.previous_timestamp
- if time_delta > 0:
- ctx_switches_per_sec = (context_switches - self.previous_cpu_stats.ctx_switches) / time_delta
- interrupts_per_sec = (interrupts - self.previous_cpu_stats.interrupts) / time_delta
- self.previous_cpu_stats = cpu_stats
- # Memory - detailed
- mem = psutil.virtual_memory()
- swap = psutil.swap_memory()
- # Load Average
- load_avg = psutil.getloadavg()
- load_1, load_5, load_15 = load_avg
- # Disk I/O - with IOPS and throughput
- disk_io = psutil.disk_io_counters()
- disk_usage = psutil.disk_usage('/')
- # Calculate disk deltas (IOPS, throughput)
- disk_read_iops = 0
- disk_write_iops = 0
- disk_read_mbps = 0
- disk_write_mbps = 0
- if self.previous_disk_io and self.previous_timestamp:
- time_delta = current_timestamp - self.previous_timestamp
- if time_delta > 0:
- disk_read_iops = (disk_io.read_count - self.previous_disk_io.read_count) / time_delta
- disk_write_iops = (disk_io.write_count - self.previous_disk_io.write_count) / time_delta
- disk_read_mbps = ((disk_io.read_bytes - self.previous_disk_io.read_bytes) / time_delta) / (1024 * 1024)
- disk_write_mbps = ((disk_io.write_bytes - self.previous_disk_io.write_bytes) / time_delta) / (1024 * 1024)
- self.previous_disk_io = disk_io
- # Network - with packets and throughput
- net_io = psutil.net_io_counters()
- # Calculate network deltas
- net_in_mbps = 0
- net_out_mbps = 0
- net_packets_in_per_sec = 0
- net_packets_out_per_sec = 0
- if self.previous_net_io and self.previous_timestamp:
- time_delta = current_timestamp - self.previous_timestamp
- if time_delta > 0:
- net_in_mbps = ((net_io.bytes_recv - self.previous_net_io.bytes_recv) / time_delta) / (1024 * 1024)
- net_out_mbps = ((net_io.bytes_sent - self.previous_net_io.bytes_sent) / time_delta) / (1024 * 1024)
- net_packets_in_per_sec = (net_io.packets_recv - self.previous_net_io.packets_recv) / time_delta
- net_packets_out_per_sec = (net_io.packets_sent - self.previous_net_io.packets_sent) / time_delta
- self.previous_net_io = net_io
- self.previous_timestamp = current_timestamp
- # Processes - top CPU and memory consumers
- processes = []
- for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
- try:
- processes.append(proc.info)
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- top_cpu = sorted(processes, key=lambda p: p.get('cpu_percent', 0), reverse=True)[:5]
- top_mem = sorted(processes, key=lambda p: p.get('memory_percent', 0), reverse=True)[:5]
- # Clean up process info
- top_cpu_clean = [
- {'pid': p['pid'], 'name': p['name'], 'cpu': round(p.get('cpu_percent', 0), 1)}
- for p in top_cpu if p.get('cpu_percent', 0) > 0
- ]
- top_mem_clean = [
- {'pid': p['pid'], 'name': p['name'], 'mem': round(p.get('memory_percent', 0), 1)}
- for p in top_mem if p.get('memory_percent', 0) > 0
- ]
- metrics = {
- 'timestamp': datetime.now(timezone.utc),
- # CPU
- 'cpu_percent': cpu_percent,
- 'cpu_count': cpu_count,
- 'cpu_per_core': cpu_per_core,
- 'cpu_steal': getattr(cpu_times, 'steal', 0), # VM steal time
- 'context_switches_per_sec': int(ctx_switches_per_sec),
- 'interrupts_per_sec': int(interrupts_per_sec),
- # Memory
- 'memory_total': mem.total,
- 'memory_used': mem.used,
- 'memory_percent': mem.percent,
- 'memory_available': mem.available,
- 'memory_buffers': getattr(mem, 'buffers', 0),
- 'memory_cached': getattr(mem, 'cached', 0),
- 'swap_total': swap.total,
- 'swap_used': swap.used,
- 'swap_percent': swap.percent,
- # Load
- 'load_1': load_1,
- 'load_5': load_5,
- 'load_15': load_15,
- # Disk I/O
- 'disk_read_bytes': disk_io.read_bytes,
- 'disk_write_bytes': disk_io.write_bytes,
- 'disk_read_iops': int(disk_read_iops),
- 'disk_write_iops': int(disk_write_iops),
- 'disk_read_mbps': round(disk_read_mbps, 2),
- 'disk_write_mbps': round(disk_write_mbps, 2),
- 'disk_io_time_ms': getattr(disk_io, 'read_time', 0) + getattr(disk_io, 'write_time', 0),
- 'disk_usage_percent': disk_usage.percent,
- # Network
- 'net_sent_bytes': net_io.bytes_sent,
- 'net_recv_bytes': net_io.bytes_recv,
- 'net_in_mbps': round(net_in_mbps, 2),
- 'net_out_mbps': round(net_out_mbps, 2),
- 'net_packets_in_per_sec': int(net_packets_in_per_sec),
- 'net_packets_out_per_sec': int(net_packets_out_per_sec),
- 'net_errors_in': net_io.errin,
- 'net_errors_out': net_io.errout,
- 'net_drops_in': net_io.dropin,
- 'net_drops_out': net_io.dropout,
- # Processes
- 'process_count': len(psutil.pids()),
- 'thread_count': sum(p.num_threads() for p in psutil.process_iter() if p.is_running()),
- 'top_cpu_processes': top_cpu_clean,
- 'top_mem_processes': top_mem_clean,
- }
- # Collect database metrics
- pg_metrics = await self.collect_postgresql_metrics()
- ch_metrics = await self.collect_clickhouse_metrics()
- # Collect HTTP metrics
- from app.core.http_metrics import http_metrics_collector
- http_metrics = http_metrics_collector.get_metrics()
- # Merge all metrics
- return {
- **metrics,
- **pg_metrics,
- **ch_metrics,
- 'http_requests_per_sec': http_metrics['requests_per_sec'],
- 'http_avg_response_time_ms': round(http_metrics['avg_response_time_ms'], 2),
- 'http_error_rate': round(http_metrics['error_rate'], 2),
- 'http_active_requests': http_metrics['active_requests'],
- }
- async def store_metrics(self, metrics: dict):
- """Store metrics in database."""
- async with async_session_maker() as session:
- metric = HostMetrics(**metrics)
- session.add(metric)
- await session.commit()
- async def check_thresholds(self, metrics: dict):
- """Check if metrics exceed configured thresholds and create alerts."""
- # Get thresholds from settings
- async with async_session_maker() as session:
- from app.models.settings import Settings
- result = await session.execute(
- select(Settings).where(Settings.key == "host_monitoring")
- )
- settings = result.scalar_one_or_none()
- if not settings:
- return
- thresholds = settings.value
- # Check CPU
- if metrics['cpu_percent'] > thresholds.get('cpu_threshold', 90):
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning' if metrics['cpu_percent'] < 95 else 'critical',
- title=f'High CPU Usage: {metrics["cpu_percent"]:.1f}%',
- message=f'CPU usage is at {metrics["cpu_percent"]:.1f}%, threshold is {thresholds.get("cpu_threshold", 90)}%',
- alert_metadata={'metric': 'cpu_percent', 'value': metrics['cpu_percent']},
- )
- # Check Memory
- if metrics['memory_percent'] > thresholds.get('memory_threshold', 90):
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning' if metrics['memory_percent'] < 95 else 'critical',
- title=f'High Memory Usage: {metrics["memory_percent"]:.1f}%',
- message=f'Memory usage is at {metrics["memory_percent"]:.1f}%, threshold is {thresholds.get("memory_threshold", 90)}%',
- alert_metadata={'metric': 'memory_percent', 'value': metrics['memory_percent']},
- )
- # Check Load Average (relative to CPU count)
- load_threshold = thresholds.get('load_threshold', 2.0) * metrics['cpu_count']
- if metrics['load_1'] > load_threshold:
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning',
- title=f'High Load Average: {metrics["load_1"]:.2f}',
- message=f'1-minute load average is {metrics["load_1"]:.2f}, threshold is {load_threshold:.2f}',
- alert_metadata={'metric': 'load_1', 'value': metrics['load_1']},
- )
- # Check Disk Usage
- if metrics['disk_usage_percent'] > thresholds.get('disk_threshold', 90):
- await alert_service.create_alert(
- alert_type='host_metrics',
- severity='warning' if metrics['disk_usage_percent'] < 95 else 'critical',
- title=f'High Disk Usage: {metrics["disk_usage_percent"]:.1f}%',
- message=f'Disk usage is at {metrics["disk_usage_percent"]:.1f}%, threshold is {thresholds.get("disk_threshold", 90)}%',
- alert_metadata={'metric': 'disk_usage_percent', 'value': metrics['disk_usage_percent']},
- )
- async def cleanup_old_metrics(self, days: int = 30):
- """Delete metrics older than specified days."""
- cutoff = datetime.now(timezone.utc) - timedelta(days=days)
- async with async_session_maker() as session:
- await session.execute(
- delete(HostMetrics).where(HostMetrics.timestamp < cutoff)
- )
- await session.commit()
- async def run_monitoring_loop(self):
- """Main monitoring loop - runs in background."""
- print("[HostMonitor] Starting host monitoring loop")
- self.running = True
- while self.running:
- try:
- # Collect metrics
- metrics = await self.collect_metrics()
- # Save latest metrics for dashboard
- self.latest_metrics = metrics
- # Store in database
- await self.store_metrics(metrics)
- # Check thresholds
- await self.check_thresholds(metrics)
- # Cleanup old data once per hour
- if datetime.now().minute == 0:
- await self.cleanup_old_metrics()
- # Wait 60 seconds before next collection
- await asyncio.sleep(60)
- except Exception as e:
- print(f"[HostMonitor] Error in monitoring loop: {e}")
- await asyncio.sleep(60)
- async def stop(self):
- """Stop monitoring loop."""
- print("[HostMonitor] Stopping host monitoring loop")
- self.running = False
- # Global instance
- host_monitor = HostMonitor()
|