Browse Source

Add PostgreSQL, ClickHouse and HTTP/API metrics collection

Backend implementation:
- Expanded HostMetrics model with 17 new fields:
  - PostgreSQL: active/total connections, DB size, cache hit ratio, TPS, deadlocks, temp files
  - ClickHouse: active queries, DB size, QPS, rows/sec, memory usage
  - HTTP/API: RPS, avg response time, error rate, active requests

- Created HTTPMetricsMiddleware for real-time RPS tracking:
  - 60-second rolling window for accurate rates
  - Tracks requests/sec, response time, error rate
  - Thread-safe collector with deque for efficient cleanup

- Added database metrics collectors:
  - PostgreSQL: queries pg_stat_activity, pg_stat_database, pg_database_size()
  - ClickHouse: queries system.processes, system.parts, system.query_log
  - Delta-based calculations for TPS, QPS

- Created migration 652fb7324044 adding 17 columns to host_metrics table
- Updated API endpoint to return all new metrics

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
root 4 weeks ago
parent
commit
66112901d5

+ 66 - 0
backend/alembic/versions/20251229_0221_652fb7324044_add_database_and_http_metrics.py

@@ -0,0 +1,66 @@
+"""add_database_and_http_metrics
+
+Revision ID: 652fb7324044
+Revises: a68acea9f536
+Create Date: 2025-12-29 02:21:37.916052+00:00
+
+"""
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision: str = '652fb7324044'
+down_revision: Union[str, None] = 'a68acea9f536'
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+    # PostgreSQL metrics
+    op.add_column('host_metrics', sa.Column('pg_active_connections', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('pg_total_connections', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('pg_database_size_bytes', sa.BigInteger(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('pg_cache_hit_ratio', sa.Float(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('pg_transactions_per_sec', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('pg_deadlocks', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('pg_temp_files', sa.Integer(), nullable=False, server_default='0'))
+
+    # ClickHouse metrics
+    op.add_column('host_metrics', sa.Column('ch_active_queries', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('ch_database_size_bytes', sa.BigInteger(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('ch_queries_per_sec', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('ch_rows_read_per_sec', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('ch_memory_usage_bytes', sa.BigInteger(), nullable=False, server_default='0'))
+
+    # HTTP/API metrics
+    op.add_column('host_metrics', sa.Column('http_requests_per_sec', sa.Integer(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('http_avg_response_time_ms', sa.Float(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('http_error_rate', sa.Float(), nullable=False, server_default='0'))
+    op.add_column('host_metrics', sa.Column('http_active_requests', sa.Integer(), nullable=False, server_default='0'))
+
+
+def downgrade() -> None:
+    # Remove HTTP metrics
+    op.drop_column('host_metrics', 'http_active_requests')
+    op.drop_column('host_metrics', 'http_error_rate')
+    op.drop_column('host_metrics', 'http_avg_response_time_ms')
+    op.drop_column('host_metrics', 'http_requests_per_sec')
+
+    # Remove ClickHouse metrics
+    op.drop_column('host_metrics', 'ch_memory_usage_bytes')
+    op.drop_column('host_metrics', 'ch_rows_read_per_sec')
+    op.drop_column('host_metrics', 'ch_queries_per_sec')
+    op.drop_column('host_metrics', 'ch_database_size_bytes')
+    op.drop_column('host_metrics', 'ch_active_queries')
+
+    # Remove PostgreSQL metrics
+    op.drop_column('host_metrics', 'pg_temp_files')
+    op.drop_column('host_metrics', 'pg_deadlocks')
+    op.drop_column('host_metrics', 'pg_transactions_per_sec')
+    op.drop_column('host_metrics', 'pg_cache_hit_ratio')
+    op.drop_column('host_metrics', 'pg_database_size_bytes')
+    op.drop_column('host_metrics', 'pg_total_connections')
+    op.drop_column('host_metrics', 'pg_active_connections')

+ 19 - 0
backend/app/api/v1/superadmin/monitoring.py

@@ -76,6 +76,25 @@ async def get_recent_host_metrics(
                 "thread_count": m.thread_count,
                 "thread_count": m.thread_count,
                 "top_cpu_processes": m.top_cpu_processes,
                 "top_cpu_processes": m.top_cpu_processes,
                 "top_mem_processes": m.top_mem_processes,
                 "top_mem_processes": m.top_mem_processes,
+                # PostgreSQL
+                "pg_active_connections": m.pg_active_connections,
+                "pg_total_connections": m.pg_total_connections,
+                "pg_database_size_bytes": m.pg_database_size_bytes,
+                "pg_cache_hit_ratio": m.pg_cache_hit_ratio,
+                "pg_transactions_per_sec": m.pg_transactions_per_sec,
+                "pg_deadlocks": m.pg_deadlocks,
+                "pg_temp_files": m.pg_temp_files,
+                # ClickHouse
+                "ch_active_queries": m.ch_active_queries,
+                "ch_database_size_bytes": m.ch_database_size_bytes,
+                "ch_queries_per_sec": m.ch_queries_per_sec,
+                "ch_rows_read_per_sec": m.ch_rows_read_per_sec,
+                "ch_memory_usage_bytes": m.ch_memory_usage_bytes,
+                # HTTP/API
+                "http_requests_per_sec": m.http_requests_per_sec,
+                "http_avg_response_time_ms": m.http_avg_response_time_ms,
+                "http_error_rate": m.http_error_rate,
+                "http_active_requests": m.http_active_requests,
             }
             }
             for m in reversed(metrics)
             for m in reversed(metrics)
         ]
         ]

+ 100 - 0
backend/app/core/http_metrics.py

@@ -0,0 +1,100 @@
+"""
+HTTP metrics tracking middleware.
+Collects RPS, response time, error rate, active requests.
+"""
+
+import time
+from collections import deque
+from datetime import datetime, timezone
+from threading import Lock
+from typing import Deque, Tuple
+
+from starlette.middleware.base import BaseHTTPMiddleware
+from starlette.requests import Request
+from starlette.responses import Response
+
+
+class HTTPMetricsCollector:
+    """Thread-safe collector for HTTP metrics."""
+
+    def __init__(self, window_seconds: int = 60):
+        self.window_seconds = window_seconds
+        self.requests: Deque[Tuple[float, int, float]] = deque()  # (timestamp, status_code, duration_ms)
+        self.active_requests = 0
+        self.lock = Lock()
+
+    def record_request(self, status_code: int, duration_ms: float):
+        """Record a completed request."""
+        now = time.time()
+        with self.lock:
+            self.requests.append((now, status_code, duration_ms))
+            self._cleanup_old_requests(now)
+
+    def increment_active(self):
+        """Increment active request counter."""
+        with self.lock:
+            self.active_requests += 1
+
+    def decrement_active(self):
+        """Decrement active request counter."""
+        with self.lock:
+            self.active_requests = max(0, self.active_requests - 1)
+
+    def _cleanup_old_requests(self, now: float):
+        """Remove requests older than window."""
+        cutoff = now - self.window_seconds
+        while self.requests and self.requests[0][0] < cutoff:
+            self.requests.popleft()
+
+    def get_metrics(self) -> dict:
+        """Get current metrics snapshot."""
+        now = time.time()
+        with self.lock:
+            self._cleanup_old_requests(now)
+
+            if not self.requests:
+                return {
+                    "requests_per_sec": 0,
+                    "avg_response_time_ms": 0,
+                    "error_rate": 0,
+                    "active_requests": self.active_requests,
+                }
+
+            total_requests = len(self.requests)
+            error_requests = sum(1 for _, status, _ in self.requests if status >= 400)
+            total_duration = sum(duration for _, _, duration in self.requests)
+
+            return {
+                "requests_per_sec": int(total_requests / self.window_seconds),
+                "avg_response_time_ms": total_duration / total_requests if total_requests > 0 else 0,
+                "error_rate": (error_requests / total_requests * 100) if total_requests > 0 else 0,
+                "active_requests": self.active_requests,
+            }
+
+
+# Global collector instance
+http_metrics_collector = HTTPMetricsCollector(window_seconds=60)
+
+
+class HTTPMetricsMiddleware(BaseHTTPMiddleware):
+    """Middleware to track HTTP request metrics."""
+
+    async def dispatch(self, request: Request, call_next):
+        # Skip metrics endpoints to avoid recursive counting
+        if request.url.path.startswith("/api/v1/superadmin/monitoring"):
+            return await call_next(request)
+
+        http_metrics_collector.increment_active()
+        start_time = time.time()
+
+        try:
+            response: Response = await call_next(request)
+            duration_ms = (time.time() - start_time) * 1000
+            http_metrics_collector.record_request(response.status_code, duration_ms)
+            return response
+        except Exception as e:
+            duration_ms = (time.time() - start_time) * 1000
+            http_metrics_collector.record_request(500, duration_ms)
+            raise
+        finally:
+            http_metrics_collector.decrement_active()

+ 4 - 0
backend/app/main.py

@@ -6,6 +6,7 @@ from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 from fastapi.middleware.cors import CORSMiddleware
 
 
 from app.config import settings
 from app.config import settings
+from app.core.http_metrics import HTTPMetricsMiddleware
 
 
 # Create FastAPI app
 # Create FastAPI app
 app = FastAPI(
 app = FastAPI(
@@ -25,6 +26,9 @@ app.add_middleware(
     allow_headers=["*"],
     allow_headers=["*"],
 )
 )
 
 
+# Add HTTP metrics middleware
+app.add_middleware(HTTPMetricsMiddleware)
+
 
 
 @app.get("/")
 @app.get("/")
 async def root():
 async def root():

+ 22 - 0
backend/app/models/host_metrics.py

@@ -69,3 +69,25 @@ class HostMetrics(Base):
     thread_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
     thread_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
     top_cpu_processes: Mapped[list | None] = mapped_column(JSON)  # Top 5 by CPU
     top_cpu_processes: Mapped[list | None] = mapped_column(JSON)  # Top 5 by CPU
     top_mem_processes: Mapped[list | None] = mapped_column(JSON)  # Top 5 by memory
     top_mem_processes: Mapped[list | None] = mapped_column(JSON)  # Top 5 by memory
+
+    # PostgreSQL
+    pg_active_connections: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    pg_total_connections: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    pg_database_size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
+    pg_cache_hit_ratio: Mapped[float] = mapped_column(Float, nullable=False, default=0)  # %
+    pg_transactions_per_sec: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    pg_deadlocks: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    pg_temp_files: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+
+    # ClickHouse
+    ch_active_queries: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    ch_database_size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
+    ch_queries_per_sec: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    ch_rows_read_per_sec: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    ch_memory_usage_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
+
+    # HTTP/API metrics
+    http_requests_per_sec: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    http_avg_response_time_ms: Mapped[float] = mapped_column(Float, nullable=False, default=0)
+    http_error_rate: Mapped[float] = mapped_column(Float, nullable=False, default=0)  # %
+    http_active_requests: Mapped[int] = mapped_column(Integer, nullable=False, default=0)

+ 173 - 2
backend/app/services/host_monitor.py

@@ -7,7 +7,7 @@ import time
 from datetime import datetime, timedelta, timezone
 from datetime import datetime, timedelta, timezone
 
 
 import psutil
 import psutil
-from sqlalchemy import delete, select
+from sqlalchemy import delete, select, text
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy.ext.asyncio import AsyncSession
 
 
 from app.core.database import async_session_maker
 from app.core.database import async_session_maker
@@ -23,8 +23,160 @@ class HostMonitor:
         self.previous_net_io = None
         self.previous_net_io = None
         self.previous_cpu_stats = None
         self.previous_cpu_stats = None
         self.previous_timestamp = None
         self.previous_timestamp = None
+        self.previous_pg_stats = None
+        self.previous_ch_stats = None
         self.running = False
         self.running = False
 
 
+    async def collect_postgresql_metrics(self) -> dict:
+        """Collect PostgreSQL database metrics."""
+        try:
+            async with async_session_maker() as session:
+                # Active connections
+                result = await session.execute(text("""
+                    SELECT count(*) as active,
+                           (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max
+                    FROM pg_stat_activity
+                    WHERE state = 'active'
+                """))
+                row = result.fetchone()
+                active_connections = row[0] if row else 0
+                total_connections = row[1] if row else 100
+
+                # Database size
+                from app.config import settings
+                db_name = settings.DATABASE_URL.split('/')[-1].split('?')[0]
+                result = await session.execute(text(f"""
+                    SELECT pg_database_size('{db_name}')
+                """))
+                db_size = result.scalar() or 0
+
+                # Cache hit ratio
+                result = await session.execute(text("""
+                    SELECT
+                        sum(blks_hit) * 100.0 / NULLIF(sum(blks_hit) + sum(blks_read), 0) as cache_hit_ratio
+                    FROM pg_stat_database
+                """))
+                cache_hit_ratio = result.scalar() or 0
+
+                # Transactions per second (delta-based)
+                result = await session.execute(text("""
+                    SELECT sum(xact_commit + xact_rollback) as total_xacts,
+                           sum(deadlocks) as deadlocks,
+                           sum(temp_files) as temp_files
+                    FROM pg_stat_database
+                """))
+                row = result.fetchone()
+                total_xacts = row[0] or 0
+                deadlocks = row[1] or 0
+                temp_files = row[2] or 0
+
+                # Calculate TPS
+                tps = 0
+                if self.previous_pg_stats and self.previous_timestamp:
+                    time_delta = time.time() - self.previous_timestamp
+                    if time_delta > 0:
+                        tps = int((total_xacts - self.previous_pg_stats['xacts']) / time_delta)
+
+                self.previous_pg_stats = {'xacts': total_xacts}
+
+                return {
+                    'pg_active_connections': active_connections,
+                    'pg_total_connections': total_connections,
+                    'pg_database_size_bytes': db_size,
+                    'pg_cache_hit_ratio': round(cache_hit_ratio, 2),
+                    'pg_transactions_per_sec': max(0, tps),
+                    'pg_deadlocks': deadlocks,
+                    'pg_temp_files': temp_files,
+                }
+        except Exception as e:
+            print(f"[HostMonitor] Error collecting PostgreSQL metrics: {e}")
+            return {
+                'pg_active_connections': 0,
+                'pg_total_connections': 0,
+                'pg_database_size_bytes': 0,
+                'pg_cache_hit_ratio': 0,
+                'pg_transactions_per_sec': 0,
+                'pg_deadlocks': 0,
+                'pg_temp_files': 0,
+            }
+
+    async def collect_clickhouse_metrics(self) -> dict:
+        """Collect ClickHouse database metrics."""
+        try:
+            import clickhouse_connect
+            from app.config import settings
+
+            # Check if ClickHouse is configured
+            if not hasattr(settings, 'CLICKHOUSE_HOST'):
+                return {
+                    'ch_active_queries': 0,
+                    'ch_database_size_bytes': 0,
+                    'ch_queries_per_sec': 0,
+                    'ch_rows_read_per_sec': 0,
+                    'ch_memory_usage_bytes': 0,
+                }
+
+            # Connect to ClickHouse
+            client = clickhouse_connect.get_client(
+                host=settings.CLICKHOUSE_HOST,
+                port=settings.CLICKHOUSE_PORT,
+                username=settings.CLICKHOUSE_USER,
+                password=settings.CLICKHOUSE_PASSWORD,
+            )
+
+            # Active queries
+            result = client.query("SELECT count() FROM system.processes")
+            active_queries = result.result_rows[0][0] if result.result_rows else 0
+
+            # Database size
+            result = client.query("""
+                SELECT sum(bytes) FROM system.parts
+                WHERE active AND database NOT IN ('system', 'information_schema')
+            """)
+            db_size = result.result_rows[0][0] if result.result_rows else 0
+
+            # Query stats (delta-based)
+            result = client.query("""
+                SELECT
+                    sum(query_count) as queries,
+                    sum(read_rows) as rows_read,
+                    sum(memory_usage) as memory
+                FROM system.query_log
+                WHERE event_time > now() - INTERVAL 60 SECOND
+            """)
+            row = result.result_rows[0] if result.result_rows else (0, 0, 0)
+            queries = row[0] or 0
+            rows_read = row[1] or 0
+            memory_usage = row[2] or 0
+
+            # Calculate QPS
+            qps = 0
+            rows_per_sec = 0
+            if self.previous_ch_stats and self.previous_timestamp:
+                time_delta = time.time() - self.previous_timestamp
+                if time_delta > 0:
+                    qps = int((queries - self.previous_ch_stats['queries']) / time_delta)
+                    rows_per_sec = int((rows_read - self.previous_ch_stats['rows']) / time_delta)
+
+            self.previous_ch_stats = {'queries': queries, 'rows': rows_read}
+
+            return {
+                'ch_active_queries': active_queries,
+                'ch_database_size_bytes': db_size or 0,
+                'ch_queries_per_sec': max(0, qps),
+                'ch_rows_read_per_sec': max(0, rows_per_sec),
+                'ch_memory_usage_bytes': memory_usage or 0,
+            }
+        except Exception as e:
+            print(f"[HostMonitor] Error collecting ClickHouse metrics: {e}")
+            return {
+                'ch_active_queries': 0,
+                'ch_database_size_bytes': 0,
+                'ch_queries_per_sec': 0,
+                'ch_rows_read_per_sec': 0,
+                'ch_memory_usage_bytes': 0,
+            }
+
     async def collect_metrics(self) -> dict:
     async def collect_metrics(self) -> dict:
         """Collect comprehensive system metrics."""
         """Collect comprehensive system metrics."""
         current_timestamp = time.time()
         current_timestamp = time.time()
@@ -166,7 +318,26 @@ class HostMonitor:
             'process_count': len(psutil.pids()),
             'process_count': len(psutil.pids()),
             'thread_count': sum(p.num_threads() for p in psutil.process_iter() if p.is_running()),
             'thread_count': sum(p.num_threads() for p in psutil.process_iter() if p.is_running()),
             'top_cpu_processes': top_cpu_clean,
             'top_cpu_processes': top_cpu_clean,
-            'top_mem_processes': top_mem_clean,
+            'top_mem_processes': top_mem_processes,
+        }
+
+        # Collect database metrics
+        pg_metrics = await self.collect_postgresql_metrics()
+        ch_metrics = await self.collect_clickhouse_metrics()
+
+        # Collect HTTP metrics
+        from app.core.http_metrics import http_metrics_collector
+        http_metrics = http_metrics_collector.get_metrics()
+
+        # Merge all metrics
+        return {
+            **metrics,
+            **pg_metrics,
+            **ch_metrics,
+            'http_requests_per_sec': http_metrics['requests_per_sec'],
+            'http_avg_response_time_ms': round(http_metrics['avg_response_time_ms'], 2),
+            'http_error_rate': round(http_metrics['error_rate'], 2),
+            'http_active_requests': http_metrics['active_requests'],
         }
         }
 
 
     async def store_metrics(self, metrics: dict):
     async def store_metrics(self, metrics: dict):