| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- """
- Alert service for creating and dispatching system notifications.
- """
- from datetime import datetime, timezone
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import async_session_maker
- from app.models.alert import Alert
- from app.models.settings import Settings
- class AlertService:
- """Manage system alerts and notifications."""
- async def create_alert(
- self,
- alert_type: str,
- severity: str,
- title: str,
- message: str,
- alert_metadata: dict | None = None,
- ) -> Alert:
- """Create a new alert and dispatch to configured channels."""
- async with async_session_maker() as session:
- # Check if similar alert already exists (prevent spam)
- existing = await self._find_similar_alert(session, alert_type, title)
- if existing:
- print(f"[AlertService] Similar alert already exists, skipping: {title}")
- return existing
- # Create alert
- alert = Alert(
- timestamp=datetime.now(timezone.utc),
- alert_type=alert_type,
- severity=severity,
- title=title,
- message=message,
- alert_metadata=alert_metadata or {},
- sent_dashboard=True, # Always show in dashboard
- )
- session.add(alert)
- await session.commit()
- await session.refresh(alert)
- print(f"[AlertService] Created alert: [{severity}] {title}")
- # Dispatch to configured channels
- await self._dispatch_alert(session, alert)
- return alert
- async def _find_similar_alert(
- self, session: AsyncSession, alert_type: str, title: str
- ) -> Alert | None:
- """Find recent similar alert to prevent spam."""
- # Check if alert with same type and title was created in last 5 minutes
- from datetime import timedelta
- cutoff = datetime.now(timezone.utc) - timedelta(minutes=5)
- result = await session.execute(
- select(Alert)
- .where(Alert.alert_type == alert_type)
- .where(Alert.title == title)
- .where(Alert.timestamp > cutoff)
- .where(Alert.dismissed == False)
- )
- return result.scalar_one_or_none()
- async def _dispatch_alert(self, session: AsyncSession, alert: Alert):
- """Dispatch alert to configured channels (Telegram, Email, etc)."""
- # Get alert channels configuration
- result = await session.execute(
- select(Settings).where(Settings.key == "alert_channels")
- )
- settings = result.scalar_one_or_none()
- if not settings:
- return
- channels = settings.value
- # Telegram
- if channels.get("telegram", {}).get("enabled"):
- try:
- await self._send_telegram(alert, channels["telegram"])
- alert.sent_telegram = True
- except Exception as e:
- print(f"[AlertService] Failed to send Telegram: {e}")
- # Email
- if channels.get("email", {}).get("enabled"):
- try:
- await self._send_email(alert, channels["email"])
- alert.sent_email = True
- except Exception as e:
- print(f"[AlertService] Failed to send Email: {e}")
- await session.commit()
- async def _send_telegram(self, alert: Alert, config: dict):
- """Send alert via Telegram bot."""
- # TODO: Implement Telegram bot integration
- bot_token = config.get("bot_token")
- chat_ids = config.get("chat_ids", [])
- if not bot_token or not chat_ids:
- return
- # Format message
- severity_emoji = {
- "info": "ℹ️",
- "warning": "⚠️",
- "error": "❌",
- "critical": "🚨",
- }
- emoji = severity_emoji.get(alert.severity, "📢")
- text = f"{emoji} **{alert.title}**\n\n{alert.message}"
- print(f"[AlertService] Would send Telegram to {len(chat_ids)} chats: {text[:50]}...")
- # Import httpx and send message to Telegram API
- # await httpx.post(f"https://api.telegram.org/bot{bot_token}/sendMessage", ...)
- async def _send_email(self, alert: Alert, config: dict):
- """Send alert via Email."""
- # TODO: Implement Email SMTP integration
- smtp_server = config.get("smtp_server")
- recipients = config.get("recipients", [])
- if not smtp_server or not recipients:
- return
- print(f"[AlertService] Would send Email to {len(recipients)} recipients: {alert.title}")
- # Import smtplib and send email
- # ...
- async def get_active_alerts(self, session: AsyncSession) -> list[Alert]:
- """Get all active (non-dismissed) alerts."""
- result = await session.execute(
- select(Alert)
- .where(Alert.dismissed == False)
- .order_by(Alert.timestamp.desc())
- )
- return list(result.scalars().all())
- async def acknowledge_alert(
- self, session: AsyncSession, alert_id: int, user_id: int
- ):
- """Mark alert as acknowledged."""
- result = await session.execute(select(Alert).where(Alert.id == alert_id))
- alert = result.scalar_one_or_none()
- if alert:
- alert.acknowledged = True
- alert.acknowledged_at = datetime.now(timezone.utc)
- alert.acknowledged_by = user_id
- await session.commit()
- async def dismiss_alert(self, session: AsyncSession, alert_id: int):
- """Mark alert as dismissed."""
- result = await session.execute(select(Alert).where(Alert.id == alert_id))
- alert = result.scalar_one_or_none()
- if alert:
- alert.dismissed = True
- alert.dismissed_at = datetime.now(timezone.utc)
- await session.commit()
- # Global instance
- alert_service = AlertService()
|