""" Superadmin tunnel management API endpoints. """ from typing import Annotated, Optional from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect, Request from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel from sqlalchemy import select from starlette.responses import StreamingResponse, Response import httpx import asyncio from app.api.deps import get_current_superadmin from app.core.database import get_db from app.models.device import Device from app.models.user import User from app.services.tunnel_service import tunnel_service import socket router = APIRouter(prefix="/tunnels", tags=["superadmin-tunnels"]) async def _allocate_tunnel_port( db: AsyncSession, device: Device, tunnel_type: str, port_range: tuple[int, int] ) -> int: """ Allocate a free port for tunnel. Strategy: Use device's simple_id to calculate deterministic port offset. """ # Calculate port based on device simple_id for deterministic allocation start_port, end_port = port_range port_offset = device.simple_id % (end_port - start_port + 1) allocated_port = start_port + port_offset # Verify port is free on the system def is_port_free(port: int) -> bool: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', port)) return True except OSError: return False # If calculated port is taken, find next free port attempts = 0 while not is_port_free(allocated_port) and attempts < 1000: allocated_port += 1 if allocated_port > end_port: allocated_port = start_port attempts += 1 if attempts >= 1000: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="No free ports available for tunnel" ) return allocated_port class TunnelEnableResponse(BaseModel): """Response when enabling tunnel""" session_uuid: str device_id: str tunnel_type: str status: str class TunnelStatusResponse(BaseModel): """Tunnel session status response""" session_uuid: str status: str # "waiting" | "ready" | "failed" device_tunnel_port: Optional[int] = None ttyd_port: Optional[int] = None tunnel_url: Optional[str] = None @router.get("/sessions") async def list_all_sessions( current_user: Annotated[User, Depends(get_current_superadmin)] ): """ Debug endpoint: List all active tunnel sessions in memory. """ sessions = [] for uuid, session in tunnel_service.sessions.items(): sessions.append({ "uuid": uuid, "device_id": session.device_id, "tunnel_type": session.tunnel_type, "status": session.status, "created_at": session.created_at.isoformat(), "expires_at": session.expires_at.isoformat(), "last_heartbeat": session.last_heartbeat.isoformat() if session.last_heartbeat else None, "ttyd_port": session.ttyd_port, "ttyd_pid": session.ttyd_pid, "device_tunnel_port": session.device_tunnel_port }) return { "total": len(sessions), "sessions": sessions } @router.post("/devices/{device_id}/{tunnel_type}") async def enable_tunnel( device_id: int, tunnel_type: str, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_superadmin)] ) -> TunnelEnableResponse: """ Enable SSH or Dashboard tunnel for device. Creates session and triggers device to connect tunnel. Args: device_id: Device numeric ID tunnel_type: "ssh" or "dashboard" Returns: session_uuid: UUID for polling status """ if tunnel_type not in ["ssh", "dashboard"]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="tunnel_type must be 'ssh' or 'dashboard'" ) # Get device result = await db.execute( select(Device).where(Device.id == device_id) ) device = result.scalar_one_or_none() if not device: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Device not found" ) # Allocate port for tunnel if tunnel_type == "ssh": # SSH tunnel ports: 50000-59999 port_range = (50000, 59999) else: # Dashboard tunnel ports: 60000-65535 port_range = (60000, 65535) # Find free port (simple sequential allocation) allocated_port = await _allocate_tunnel_port(db, device, tunnel_type, port_range) # Create tunnel session session = tunnel_service.create_session( device_id=device.mac_address, admin_user=current_user.email, tunnel_type=tunnel_type ) # Update device config to enable tunnel with allocated port if device.config is None: device.config = {} tunnel_key = f"{tunnel_type}_tunnel" if tunnel_key not in device.config: device.config[tunnel_key] = {} device.config[tunnel_key]["enabled"] = True device.config[tunnel_key]["remote_port"] = allocated_port device.config[tunnel_key]["timeout_minutes"] = 120 # Tunnel will auto-stop after 120 minutes # Copy other tunnel settings from default if "server" not in device.config[tunnel_key]: device.config[tunnel_key]["server"] = "ms.e-bash.ru" device.config[tunnel_key]["port"] = 2223 device.config[tunnel_key]["user"] = "tunnel" device.config[tunnel_key]["keepalive_interval"] = 30 # Mark as modified (SQLAlchemy JSON field) from sqlalchemy.orm import attributes attributes.flag_modified(device, "config") await db.commit() print(f"[tunnel] Enabled {tunnel_type} tunnel for {device.mac_address} on port {allocated_port}") return TunnelEnableResponse( session_uuid=session.uuid, device_id=device.mac_address, tunnel_type=tunnel_type, status="waiting" ) @router.get("/sessions/{session_uuid}/status") async def get_tunnel_status( session_uuid: str, current_user: Annotated[User, Depends(get_current_superadmin)] ) -> TunnelStatusResponse: """ Poll tunnel session status. Returns: - waiting: Device not yet connected - ready: Tunnel connected, ttyd spawned - failed: Session expired or failed """ session = tunnel_service.get_session(session_uuid) if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found or expired" ) # Build response response = TunnelStatusResponse( session_uuid=session.uuid, status=session.status, device_tunnel_port=session.device_tunnel_port, ttyd_port=session.ttyd_port ) # If ready, return appropriate URL based on tunnel type if session.status == "ready": if session.tunnel_type == "ssh" and session.ttyd_port: response.tunnel_url = f"/admin/ssh/{session.uuid}" elif session.tunnel_type == "dashboard" and session.device_tunnel_port: response.tunnel_url = f"/admin/dashboard/{session.uuid}" return response @router.post("/sessions/{session_uuid}/heartbeat") async def session_heartbeat( session_uuid: str, current_user: Annotated[User, Depends(get_current_superadmin)] ): """ Browser sends heartbeat every 30 seconds to keep session alive. """ success = tunnel_service.update_heartbeat(session_uuid) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) return {"success": True} @router.websocket("/sessions/{session_uuid}/terminal/ws") async def tunnel_terminal_websocket( websocket: WebSocket, session_uuid: str ): """ WebSocket proxy to ttyd on localhost. """ session = tunnel_service.get_session(session_uuid) if not session or not session.ttyd_port: print(f"[tunnel] WS: Session not found or no ttyd port: {session_uuid}") await websocket.close(code=1008, reason="Session not found") return print(f"[tunnel] WS: Accepting connection for session {session_uuid}, ttyd port {session.ttyd_port}") # ttyd uses "tty" WebSocket subprotocol await websocket.accept(subprotocol="tty") # Connect to ttyd WebSocket import websockets try: ttyd_url = f'ws://localhost:{session.ttyd_port}/ws' print(f"[tunnel] WS: Connecting to ttyd at {ttyd_url}") # Connect to ttyd with "tty" subprotocol async with websockets.connect(ttyd_url, subprotocols=["tty"]) as ttyd_ws: print(f"[tunnel] WS: Connected to ttyd successfully") # Proxy messages both ways async def forward_to_ttyd(): try: while True: message = await websocket.receive() print(f"[tunnel] WS: Received from browser: {message.keys()}") if 'bytes' in message: data = message['bytes'] print(f"[tunnel] WS: Browser -> ttyd ({len(data)} bytes)") await ttyd_ws.send(data) elif 'text' in message: data = message['text'] print(f"[tunnel] WS: Browser -> ttyd ({len(data)} chars text)") await ttyd_ws.send(data) elif 'websocket.disconnect' in message: print(f"[tunnel] WS: Browser sent disconnect") break except WebSocketDisconnect: print(f"[tunnel] WS: Browser disconnected (exception)") except Exception as e: print(f"[tunnel] WS: Error forwarding to ttyd: {e}") import traceback traceback.print_exc() async def forward_from_ttyd(): try: async for message in ttyd_ws: if isinstance(message, bytes): print(f"[tunnel] WS: ttyd -> Browser ({len(message)} bytes)") await websocket.send_bytes(message) else: print(f"[tunnel] WS: ttyd -> Browser ({len(message)} chars text)") await websocket.send_text(message) except Exception as e: print(f"[tunnel] WS: Error forwarding from ttyd: {e}") import traceback traceback.print_exc() await asyncio.gather(forward_to_ttyd(), forward_from_ttyd(), return_exceptions=True) print(f"[tunnel] WS: Proxy loop ended") except Exception as e: print(f"[tunnel] WS: Proxy error: {e}") import traceback traceback.print_exc() try: await websocket.close(code=1011, reason="Proxy error") except: pass @router.websocket("/sessions/{session_uuid}/dashboard/api/ws") async def tunnel_dashboard_websocket( websocket: WebSocket, session_uuid: str ): """ WebSocket proxy to device dashboard WebSocket. """ session = tunnel_service.get_session(session_uuid) if not session or not session.device_tunnel_port: print(f"[tunnel] Dashboard WS: Session not found: {session_uuid}") await websocket.close(code=1008, reason="Session not found") return print(f"[tunnel] Dashboard WS: Accepting connection for session {session_uuid}") await websocket.accept() # Connect to device dashboard WebSocket import websockets try: device_ws_url = f'ws://localhost:{session.device_tunnel_port}/api/ws' print(f"[tunnel] Dashboard WS: Connecting to {device_ws_url}") async with websockets.connect(device_ws_url) as device_ws: print(f"[tunnel] Dashboard WS: Connected successfully") # Proxy messages both ways async def forward_to_device(): try: while True: message = await websocket.receive() if 'bytes' in message: await device_ws.send(message['bytes']) elif 'text' in message: await device_ws.send(message['text']) elif 'websocket.disconnect' in message: break except WebSocketDisconnect: print(f"[tunnel] Dashboard WS: Browser disconnected") except Exception as e: print(f"[tunnel] Dashboard WS: Error forwarding to device: {e}") async def forward_from_device(): try: async for message in device_ws: if isinstance(message, bytes): await websocket.send_bytes(message) else: await websocket.send_text(message) except Exception as e: print(f"[tunnel] Dashboard WS: Error forwarding from device: {e}") await asyncio.gather(forward_to_device(), forward_from_device(), return_exceptions=True) print(f"[tunnel] Dashboard WS: Proxy ended") except Exception as e: print(f"[tunnel] Dashboard WS: Proxy error: {e}") try: await websocket.close(code=1011, reason="Proxy error") except: pass @router.api_route("/sessions/{session_uuid}/terminal/{path:path}", methods=["GET", "POST"]) async def tunnel_terminal_proxy( session_uuid: str, path: str, request: Request ): """ Full HTTP proxy to ttyd (including token, ws, etc). Security: Session was created by superadmin, UUID is secret. Proxies /terminal/{path} to http://localhost:ttyd_port/{path} """ session = tunnel_service.get_session(session_uuid) if not session or not session.ttyd_port: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Terminal session not found" ) # Proxy request to ttyd url = f'http://localhost:{session.ttyd_port}/{path}' print(f"[tunnel] HTTP proxy: {request.url.path} -> {url}") async with httpx.AsyncClient() as client: resp = await client.request( method=request.method, url=url, headers=dict(request.headers), params=request.query_params, ) return Response( content=resp.content, status_code=resp.status_code, headers=dict(resp.headers) ) @router.api_route("/sessions/{session_uuid}/dashboard/{path:path}", methods=["GET", "POST"]) async def tunnel_dashboard_proxy( session_uuid: str, path: str, request: Request ): """ Proxy Dashboard HTTP/WebSocket to device via tunnel. Security: Session was created by superadmin, UUID is secret. """ session = tunnel_service.get_session(session_uuid) if not session or not session.device_tunnel_port: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Dashboard session not found" ) # Proxy to device dashboard through tunnel # Normalize path - avoid double slashes clean_path = path if path and path != '/' else '' url = f'http://localhost:{session.device_tunnel_port}/{clean_path}' print(f"[tunnel] Dashboard proxy: {request.url.path} -> {url}") async with httpx.AsyncClient() as client: resp = await client.request( method=request.method, url=url, headers={k: v for k, v in request.headers.items() if k.lower() not in ('host',)}, params=request.query_params, content=await request.body() ) # Replace paths in HTML/JS/CSS files content_type = resp.headers.get('content-type', '') should_replace = ( (not path or path == '/') and 'text/html' in content_type ) or ( 'javascript' in content_type or 'css' in content_type ) if should_replace: try: content = resp.text base_url = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/dashboard/' # Replace absolute paths with proxied paths import re # First: Replace API paths (in JS/HTML) # Replace /api/* paths but avoid double-replacing # Use a unique marker to avoid replacing already-replaced paths marker = f'__DASHBOARD__{session_uuid}__' # Replace all /api/ with marker first content = re.sub(r'(? href/src="/api/v1/.../dashboard/..." (HTML only) if 'text/html' in content_type: content = re.sub(r'(href|src)="/', rf'\1="{base_url}', content) headers = { k: v for k, v in resp.headers.items() if k.lower() not in ('content-length', 'content-encoding') } if 'text/html' in content_type: from starlette.responses import HTMLResponse return HTMLResponse(content=content, headers=headers) else: return Response(content=content.encode('utf-8'), headers=headers) except Exception as e: print(f"[tunnel] Dashboard path replacement error: {e}") # Fall through to return original response return Response( content=resp.content, status_code=resp.status_code, headers={k: v for k, v in resp.headers.items() if k.lower() not in ('content-length', 'content-encoding')} ) @router.get("/sessions/{session_uuid}/terminal") async def tunnel_terminal_html( session_uuid: str ): """ Return ttyd HTML with modified URLs. Security: Session was created by superadmin, UUID is secret. """ session = tunnel_service.get_session(session_uuid) if not session or not session.ttyd_port: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Terminal session not found" ) # Fetch ttyd HTML and modify URLs to work through our proxy async with httpx.AsyncClient() as client: try: resp = await client.get(f'http://localhost:{session.ttyd_port}/') print(f"[tunnel] HTML: ttyd returned status {resp.status_code}") if resp.status_code != 200: print(f"[tunnel] HTML: ttyd error: {resp.text[:200]}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Terminal not ready" ) html = resp.text # Prefix for all ttyd resources prefix = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/terminal' # Replace relative URLs with proxied URLs html = html.replace('src="/', f'src="{prefix}/') html = html.replace('href="/', f'href="{prefix}/') # Replace WebSocket URL # ttyd uses: new WebSocket(url) where url is like "ws://host/ws" # We need to replace it to go through our proxy import re # Find WebSocket URL construction in JavaScript ws_before = html.count('new WebSocket') # Use wss:// for HTTPS, ws:// for HTTP - construct from request would be ideal # For now, replace with relative WebSocket that browser will resolve html = re.sub( r'(new WebSocket\([\'"])(ws[s]?://[^/]+)(/ws[\'"])', rf"\1' + (location.protocol === 'https:' ? 'wss://' : 'ws://') + location.host + '{prefix}\3", html ) ws_after = html.count("location.host") print(f"[tunnel] HTML: replaced {ws_after} WebSocket URLs (found {ws_before} total)") from starlette.responses import HTMLResponse # Don't copy Content-Length or Content-Encoding since we modified the HTML # (ttyd returns compressed, we return uncompressed modified HTML) headers = { k: v for k, v in resp.headers.items() if k.lower() not in ('content-length', 'content-encoding') } return HTMLResponse(content=html, headers=headers) except Exception as e: print(f"[tunnel] HTML: exception: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Failed to fetch terminal: {str(e)}" )