""" Tunnel session management service for SSH and Dashboard tunnels. """ import asyncio import os import signal import subprocess import uuid as uuid_module from datetime import datetime, timedelta from typing import Dict, Optional from pydantic import BaseModel class TunnelSession(BaseModel): """Tunnel session model""" uuid: str device_id: str admin_user: str tunnel_type: str # "ssh" | "dashboard" created_at: datetime expires_at: datetime last_heartbeat: Optional[datetime] = None ttyd_port: Optional[int] = None ttyd_pid: Optional[int] = None device_tunnel_port: Optional[int] = None status: str = "waiting" # "waiting" | "ready" | "failed" class TunnelStatus(BaseModel): """Device tunnel status""" device_id: str tunnel_type: str # "ssh" | "dashboard" allocated_port: Optional[int] = None status: str # "connected" | "disconnected" connected_at: Optional[datetime] = None last_heartbeat: Optional[datetime] = None class TunnelService: """ Tunnel management service In-memory storage (можно заменить на Redis для multi-server) """ def __init__(self): self.sessions: Dict[str, TunnelSession] = {} self.tunnel_status: Dict[str, TunnelStatus] = {} self.cleanup_task = None def start_background_cleanup(self): """Start background task for cleanup inactive sessions""" if not self.cleanup_task: self.cleanup_task = asyncio.create_task(self._cleanup_loop()) async def _cleanup_loop(self): """Background cleanup loop""" while True: await asyncio.sleep(300) # Every 5 minutes await self.cleanup_inactive_sessions() async def cleanup_inactive_sessions(self): """ Kill ttyd processes with no heartbeat for 60 minutes Remove expired sessions """ now = datetime.now() inactive_threshold = now - timedelta(minutes=60) grace_period = now - timedelta(seconds=60) for session_uuid, session in list(self.sessions.items()): # Check expiration (hard limit: 1 hour) if now > session.expires_at: print(f"[tunnel] Session expired: {session_uuid}") self._kill_ttyd(session.ttyd_pid) del self.sessions[session_uuid] continue # Check inactivity (60 minutes without heartbeat) if session.last_heartbeat and session.last_heartbeat < inactive_threshold: print(f"[tunnel] Session inactive for 60 min: {session_uuid}") self._kill_ttyd(session.ttyd_pid) del self.sessions[session_uuid] continue # Grace period: if tab closed, wait 60 seconds before killing if session.last_heartbeat and session.last_heartbeat < grace_period: if session.ttyd_pid and not self._is_process_alive(session.ttyd_pid): print(f"[tunnel] ttyd process dead: {session_uuid}") del self.sessions[session_uuid] def create_session( self, device_id: str, admin_user: str, tunnel_type: str ) -> TunnelSession: """Create new tunnel session""" session_uuid = str(uuid_module.uuid4()) now = datetime.now() session = TunnelSession( uuid=session_uuid, device_id=device_id, admin_user=admin_user, tunnel_type=tunnel_type, created_at=now, expires_at=now + timedelta(hours=1), status="waiting" ) self.sessions[session_uuid] = session # Create tunnel status key status_key = f"{device_id}:{tunnel_type}" if status_key not in self.tunnel_status: self.tunnel_status[status_key] = TunnelStatus( device_id=device_id, tunnel_type=tunnel_type, status="disconnected" ) return session def get_session(self, session_uuid: str) -> Optional[TunnelSession]: """Get session by UUID""" return self.sessions.get(session_uuid) def update_heartbeat(self, session_uuid: str) -> bool: """Update session heartbeat""" session = self.sessions.get(session_uuid) if not session: return False session.last_heartbeat = datetime.now() return True def report_device_port( self, device_id: str, tunnel_type: str, port: Optional[int], status: str ): """Device reports tunnel port allocation""" status_key = f"{device_id}:{tunnel_type}" if status == "connected" and port: self.tunnel_status[status_key] = TunnelStatus( device_id=device_id, tunnel_type=tunnel_type, allocated_port=port, status="connected", connected_at=datetime.now(), last_heartbeat=datetime.now() ) # Update all waiting sessions for this device for session in self.sessions.values(): if (session.device_id == device_id and session.tunnel_type == tunnel_type and session.status == "waiting"): session.device_tunnel_port = port session.status = "ready" elif status == "disconnected": if status_key in self.tunnel_status: self.tunnel_status[status_key].status = "disconnected" self.tunnel_status[status_key].allocated_port = None def get_tunnel_status( self, device_id: str, tunnel_type: str ) -> Optional[TunnelStatus]: """Get tunnel status for device""" status_key = f"{device_id}:{tunnel_type}" return self.tunnel_status.get(status_key) def spawn_ttyd( self, session_uuid: str, device_tunnel_port: int, server_host: str = "localhost" ) -> int: """ Spawn ttyd process for terminal access Returns ttyd port """ session = self.sessions.get(session_uuid) if not session: raise ValueError(f"Session not found: {session_uuid}") # Find free port for ttyd (45000-49999) ttyd_port = self._find_free_port(45000, 49999) # Spawn ttyd process # ttyd connects to device via SSH through the tunnel port cmd = [ "ttyd", "--port", str(ttyd_port), "--once", # Single session "--writable", # Allow input "ssh", "-p", str(device_tunnel_port), "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", f"root@{server_host}" ] process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) session.ttyd_port = ttyd_port session.ttyd_pid = process.pid print(f"[tunnel] Spawned ttyd on port {ttyd_port} (pid={process.pid})") return ttyd_port def _find_free_port(self, start: int, end: int) -> int: """Find free port in range""" import socket for port in range(start, end + 1): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', port)) return port except OSError: continue raise RuntimeError(f"No free ports in range {start}-{end}") def _kill_ttyd(self, pid: Optional[int]): """Kill ttyd process gracefully""" if not pid: return try: os.kill(pid, signal.SIGTERM) print(f"[tunnel] Killed ttyd process {pid}") except ProcessLookupError: pass def _is_process_alive(self, pid: int) -> bool: """Check if process is running""" try: os.kill(pid, 0) # Signal 0 = check existence return True except ProcessLookupError: return False # Global tunnel service instance tunnel_service = TunnelService()