http_metrics.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """
  2. HTTP metrics tracking middleware.
  3. Collects RPS, response time, error rate, active requests.
  4. """
  5. import time
  6. from collections import deque
  7. from datetime import datetime, timezone
  8. from threading import Lock
  9. from typing import Deque, Tuple
  10. from starlette.middleware.base import BaseHTTPMiddleware
  11. from starlette.requests import Request
  12. from starlette.responses import Response
  13. class HTTPMetricsCollector:
  14. """Thread-safe collector for HTTP metrics."""
  15. def __init__(self, window_seconds: int = 60):
  16. self.window_seconds = window_seconds
  17. self.requests: Deque[Tuple[float, int, float]] = deque() # (timestamp, status_code, duration_ms)
  18. self.active_requests = 0
  19. self.lock = Lock()
  20. def record_request(self, status_code: int, duration_ms: float):
  21. """Record a completed request."""
  22. now = time.time()
  23. with self.lock:
  24. self.requests.append((now, status_code, duration_ms))
  25. self._cleanup_old_requests(now)
  26. def increment_active(self):
  27. """Increment active request counter."""
  28. with self.lock:
  29. self.active_requests += 1
  30. def decrement_active(self):
  31. """Decrement active request counter."""
  32. with self.lock:
  33. self.active_requests = max(0, self.active_requests - 1)
  34. def _cleanup_old_requests(self, now: float):
  35. """Remove requests older than window."""
  36. cutoff = now - self.window_seconds
  37. while self.requests and self.requests[0][0] < cutoff:
  38. self.requests.popleft()
  39. def get_metrics(self) -> dict:
  40. """Get current metrics snapshot."""
  41. now = time.time()
  42. with self.lock:
  43. self._cleanup_old_requests(now)
  44. if not self.requests:
  45. return {
  46. "requests_per_sec": 0,
  47. "avg_response_time_ms": 0,
  48. "error_rate": 0,
  49. "active_requests": self.active_requests,
  50. }
  51. total_requests = len(self.requests)
  52. error_requests = sum(1 for _, status, _ in self.requests if status >= 400)
  53. total_duration = sum(duration for _, _, duration in self.requests)
  54. return {
  55. "requests_per_sec": int(total_requests / self.window_seconds),
  56. "avg_response_time_ms": total_duration / total_requests if total_requests > 0 else 0,
  57. "error_rate": (error_requests / total_requests * 100) if total_requests > 0 else 0,
  58. "active_requests": self.active_requests,
  59. }
  60. # Global collector instance
  61. http_metrics_collector = HTTPMetricsCollector(window_seconds=60)
  62. class HTTPMetricsMiddleware(BaseHTTPMiddleware):
  63. """Middleware to track HTTP request metrics."""
  64. async def dispatch(self, request: Request, call_next):
  65. # Skip metrics endpoints to avoid recursive counting
  66. if request.url.path.startswith("/api/v1/superadmin/monitoring"):
  67. return await call_next(request)
  68. http_metrics_collector.increment_active()
  69. start_time = time.time()
  70. try:
  71. response: Response = await call_next(request)
  72. duration_ms = (time.time() - start_time) * 1000
  73. http_metrics_collector.record_request(response.status_code, duration_ms)
  74. return response
  75. except Exception as e:
  76. duration_ms = (time.time() - start_time) * 1000
  77. http_metrics_collector.record_request(500, duration_ms)
  78. raise
  79. finally:
  80. http_metrics_collector.decrement_active()