""" Alert service for creating and dispatching system notifications. """ from datetime import datetime, timezone from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import async_session_maker from app.models.alert import Alert from app.models.settings import Settings class AlertService: """Manage system alerts and notifications.""" async def create_alert( self, alert_type: str, severity: str, title: str, message: str, alert_metadata: dict | None = None, ) -> Alert: """Create a new alert and dispatch to configured channels.""" async with async_session_maker() as session: # Check if similar alert already exists (prevent spam) existing = await self._find_similar_alert(session, alert_type, title) if existing: print(f"[AlertService] Similar alert already exists, skipping: {title}") return existing # Create alert alert = Alert( timestamp=datetime.now(timezone.utc), alert_type=alert_type, severity=severity, title=title, message=message, alert_metadata=alert_metadata or {}, sent_dashboard=True, # Always show in dashboard ) session.add(alert) await session.commit() await session.refresh(alert) print(f"[AlertService] Created alert: [{severity}] {title}") # Dispatch to configured channels await self._dispatch_alert(session, alert) return alert async def _find_similar_alert( self, session: AsyncSession, alert_type: str, title: str ) -> Alert | None: """Find recent similar alert to prevent spam.""" # Check if alert with same type and title was created in last 5 minutes from datetime import timedelta cutoff = datetime.now(timezone.utc) - timedelta(minutes=5) result = await session.execute( select(Alert) .where(Alert.alert_type == alert_type) .where(Alert.title == title) .where(Alert.timestamp > cutoff) .where(Alert.dismissed == False) ) return result.scalar_one_or_none() async def _dispatch_alert(self, session: AsyncSession, alert: Alert): """Dispatch alert to configured channels (Telegram, Email, etc).""" # Get alert channels configuration result = await session.execute( select(Settings).where(Settings.key == "alert_channels") ) settings = result.scalar_one_or_none() if not settings: return channels = settings.value # Telegram if channels.get("telegram", {}).get("enabled"): try: await self._send_telegram(alert, channels["telegram"]) alert.sent_telegram = True except Exception as e: print(f"[AlertService] Failed to send Telegram: {e}") # Email if channels.get("email", {}).get("enabled"): try: await self._send_email(alert, channels["email"]) alert.sent_email = True except Exception as e: print(f"[AlertService] Failed to send Email: {e}") await session.commit() async def _send_telegram(self, alert: Alert, config: dict): """Send alert via Telegram bot.""" # TODO: Implement Telegram bot integration bot_token = config.get("bot_token") chat_ids = config.get("chat_ids", []) if not bot_token or not chat_ids: return # Format message severity_emoji = { "info": "ℹ️", "warning": "⚠️", "error": "❌", "critical": "🚨", } emoji = severity_emoji.get(alert.severity, "📢") text = f"{emoji} **{alert.title}**\n\n{alert.message}" print(f"[AlertService] Would send Telegram to {len(chat_ids)} chats: {text[:50]}...") # Import httpx and send message to Telegram API # await httpx.post(f"https://api.telegram.org/bot{bot_token}/sendMessage", ...) async def _send_email(self, alert: Alert, config: dict): """Send alert via Email.""" # TODO: Implement Email SMTP integration smtp_server = config.get("smtp_server") recipients = config.get("recipients", []) if not smtp_server or not recipients: return print(f"[AlertService] Would send Email to {len(recipients)} recipients: {alert.title}") # Import smtplib and send email # ... async def get_active_alerts(self, session: AsyncSession) -> list[Alert]: """Get all active (non-dismissed) alerts.""" result = await session.execute( select(Alert) .where(Alert.dismissed == False) .order_by(Alert.timestamp.desc()) ) return list(result.scalars().all()) async def acknowledge_alert( self, session: AsyncSession, alert_id: int, user_id: int ): """Mark alert as acknowledged.""" result = await session.execute(select(Alert).where(Alert.id == alert_id)) alert = result.scalar_one_or_none() if alert: alert.acknowledged = True alert.acknowledged_at = datetime.now(timezone.utc) alert.acknowledged_by = user_id await session.commit() async def dismiss_alert(self, session: AsyncSession, alert_id: int): """Mark alert as dismissed.""" result = await session.execute(select(Alert).where(Alert.id == alert_id)) alert = result.scalar_one_or_none() if alert: alert.dismissed = True alert.dismissed_at = datetime.now(timezone.utc) await session.commit() # Global instance alert_service = AlertService()