| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588 |
- """
- Superadmin tunnel management API endpoints.
- """
- from typing import Annotated, Optional
- from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect, Request
- from sqlalchemy.ext.asyncio import AsyncSession
- from pydantic import BaseModel
- from sqlalchemy import select
- from starlette.responses import StreamingResponse, Response
- import httpx
- import asyncio
- from app.api.deps import get_current_superadmin
- from app.core.database import get_db
- from app.models.device import Device
- from app.models.user import User
- from app.services.tunnel_service import tunnel_service
- import socket
- router = APIRouter(prefix="/tunnels", tags=["superadmin-tunnels"])
- async def _allocate_tunnel_port(
- db: AsyncSession,
- device: Device,
- tunnel_type: str,
- port_range: tuple[int, int]
- ) -> int:
- """
- Allocate a free port for tunnel.
- Strategy: Use device's simple_id to calculate deterministic port offset.
- """
- # Calculate port based on device simple_id for deterministic allocation
- start_port, end_port = port_range
- port_offset = device.simple_id % (end_port - start_port + 1)
- allocated_port = start_port + port_offset
- # Verify port is free on the system
- def is_port_free(port: int) -> bool:
- try:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', port))
- return True
- except OSError:
- return False
- # If calculated port is taken, find next free port
- attempts = 0
- while not is_port_free(allocated_port) and attempts < 1000:
- allocated_port += 1
- if allocated_port > end_port:
- allocated_port = start_port
- attempts += 1
- if attempts >= 1000:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="No free ports available for tunnel"
- )
- return allocated_port
- class TunnelEnableResponse(BaseModel):
- """Response when enabling tunnel"""
- session_uuid: str
- device_id: str
- tunnel_type: str
- status: str
- class TunnelStatusResponse(BaseModel):
- """Tunnel session status response"""
- session_uuid: str
- status: str # "waiting" | "ready" | "failed"
- device_tunnel_port: Optional[int] = None
- ttyd_port: Optional[int] = None
- tunnel_url: Optional[str] = None
- @router.get("/sessions")
- async def list_all_sessions(
- current_user: Annotated[User, Depends(get_current_superadmin)]
- ):
- """
- Debug endpoint: List all active tunnel sessions in memory.
- """
- sessions = []
- for uuid, session in tunnel_service.sessions.items():
- sessions.append({
- "uuid": uuid,
- "device_id": session.device_id,
- "tunnel_type": session.tunnel_type,
- "status": session.status,
- "created_at": session.created_at.isoformat(),
- "expires_at": session.expires_at.isoformat(),
- "last_heartbeat": session.last_heartbeat.isoformat() if session.last_heartbeat else None,
- "ttyd_port": session.ttyd_port,
- "ttyd_pid": session.ttyd_pid,
- "device_tunnel_port": session.device_tunnel_port
- })
- return {
- "total": len(sessions),
- "sessions": sessions
- }
- @router.post("/devices/{device_id}/{tunnel_type}")
- async def enable_tunnel(
- device_id: int,
- tunnel_type: str,
- db: Annotated[AsyncSession, Depends(get_db)],
- current_user: Annotated[User, Depends(get_current_superadmin)]
- ) -> TunnelEnableResponse:
- """
- Enable SSH or Dashboard tunnel for device.
- Creates session and triggers device to connect tunnel.
- Args:
- device_id: Device numeric ID
- tunnel_type: "ssh" or "dashboard"
- Returns:
- session_uuid: UUID for polling status
- """
- if tunnel_type not in ["ssh", "dashboard"]:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="tunnel_type must be 'ssh' or 'dashboard'"
- )
- # Get device
- result = await db.execute(
- select(Device).where(Device.id == device_id)
- )
- device = result.scalar_one_or_none()
- if not device:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="Device not found"
- )
- # Allocate port for tunnel
- if tunnel_type == "ssh":
- # SSH tunnel ports: 50000-59999
- port_range = (50000, 59999)
- else:
- # Dashboard tunnel ports: 60000-65535
- port_range = (60000, 65535)
- # Find free port (simple sequential allocation)
- allocated_port = await _allocate_tunnel_port(db, device, tunnel_type, port_range)
- # Create tunnel session
- session = tunnel_service.create_session(
- device_id=device.mac_address,
- admin_user=current_user.email,
- tunnel_type=tunnel_type
- )
- # Update device config to enable tunnel with allocated port
- if device.config is None:
- device.config = {}
- tunnel_key = f"{tunnel_type}_tunnel"
- if tunnel_key not in device.config:
- device.config[tunnel_key] = {}
- device.config[tunnel_key]["enabled"] = True
- device.config[tunnel_key]["remote_port"] = allocated_port
- device.config[tunnel_key]["timeout_minutes"] = 120 # Tunnel will auto-stop after 120 minutes
- # Copy other tunnel settings from default
- if "server" not in device.config[tunnel_key]:
- device.config[tunnel_key]["server"] = "ms.e-bash.ru"
- device.config[tunnel_key]["port"] = 2223
- device.config[tunnel_key]["user"] = "tunnel"
- device.config[tunnel_key]["keepalive_interval"] = 30
- # Mark as modified (SQLAlchemy JSON field)
- from sqlalchemy.orm import attributes
- attributes.flag_modified(device, "config")
- await db.commit()
- print(f"[tunnel] Enabled {tunnel_type} tunnel for {device.mac_address} on port {allocated_port}")
- return TunnelEnableResponse(
- session_uuid=session.uuid,
- device_id=device.mac_address,
- tunnel_type=tunnel_type,
- status="waiting"
- )
- @router.get("/sessions/{session_uuid}/status")
- async def get_tunnel_status(
- session_uuid: str,
- current_user: Annotated[User, Depends(get_current_superadmin)]
- ) -> TunnelStatusResponse:
- """
- Poll tunnel session status.
- Returns:
- - waiting: Device not yet connected
- - ready: Tunnel connected, ttyd spawned
- - failed: Session expired or failed
- """
- session = tunnel_service.get_session(session_uuid)
- if not session:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="Session not found or expired"
- )
- # Build response
- response = TunnelStatusResponse(
- session_uuid=session.uuid,
- status=session.status,
- device_tunnel_port=session.device_tunnel_port,
- ttyd_port=session.ttyd_port
- )
- # If ready, return appropriate URL based on tunnel type
- if session.status == "ready":
- if session.tunnel_type == "ssh" and session.ttyd_port:
- response.tunnel_url = f"/admin/ssh/{session.uuid}"
- elif session.tunnel_type == "dashboard" and session.device_tunnel_port:
- response.tunnel_url = f"/admin/dashboard/{session.uuid}"
- return response
- @router.post("/sessions/{session_uuid}/heartbeat")
- async def session_heartbeat(
- session_uuid: str,
- current_user: Annotated[User, Depends(get_current_superadmin)]
- ):
- """
- Browser sends heartbeat every 30 seconds to keep session alive.
- """
- success = tunnel_service.update_heartbeat(session_uuid)
- if not success:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="Session not found"
- )
- return {"success": True}
- @router.websocket("/sessions/{session_uuid}/terminal/ws")
- async def tunnel_terminal_websocket(
- websocket: WebSocket,
- session_uuid: str
- ):
- """
- WebSocket proxy to ttyd on localhost.
- """
- session = tunnel_service.get_session(session_uuid)
- if not session or not session.ttyd_port:
- print(f"[tunnel] WS: Session not found or no ttyd port: {session_uuid}")
- await websocket.close(code=1008, reason="Session not found")
- return
- print(f"[tunnel] WS: Accepting connection for session {session_uuid}, ttyd port {session.ttyd_port}")
- # ttyd uses "tty" WebSocket subprotocol
- await websocket.accept(subprotocol="tty")
- # Connect to ttyd WebSocket
- import websockets
- try:
- ttyd_url = f'ws://localhost:{session.ttyd_port}/ws'
- print(f"[tunnel] WS: Connecting to ttyd at {ttyd_url}")
- # Connect to ttyd with "tty" subprotocol
- async with websockets.connect(ttyd_url, subprotocols=["tty"]) as ttyd_ws:
- print(f"[tunnel] WS: Connected to ttyd successfully")
- # Proxy messages both ways
- async def forward_to_ttyd():
- try:
- while True:
- message = await websocket.receive()
- print(f"[tunnel] WS: Received from browser: {message.keys()}")
- if 'bytes' in message:
- data = message['bytes']
- print(f"[tunnel] WS: Browser -> ttyd ({len(data)} bytes)")
- await ttyd_ws.send(data)
- elif 'text' in message:
- data = message['text']
- print(f"[tunnel] WS: Browser -> ttyd ({len(data)} chars text)")
- await ttyd_ws.send(data)
- elif 'websocket.disconnect' in message:
- print(f"[tunnel] WS: Browser sent disconnect")
- break
- except WebSocketDisconnect:
- print(f"[tunnel] WS: Browser disconnected (exception)")
- except Exception as e:
- print(f"[tunnel] WS: Error forwarding to ttyd: {e}")
- import traceback
- traceback.print_exc()
- async def forward_from_ttyd():
- try:
- async for message in ttyd_ws:
- if isinstance(message, bytes):
- print(f"[tunnel] WS: ttyd -> Browser ({len(message)} bytes)")
- await websocket.send_bytes(message)
- else:
- print(f"[tunnel] WS: ttyd -> Browser ({len(message)} chars text)")
- await websocket.send_text(message)
- except Exception as e:
- print(f"[tunnel] WS: Error forwarding from ttyd: {e}")
- import traceback
- traceback.print_exc()
- await asyncio.gather(forward_to_ttyd(), forward_from_ttyd(), return_exceptions=True)
- print(f"[tunnel] WS: Proxy loop ended")
- except Exception as e:
- print(f"[tunnel] WS: Proxy error: {e}")
- import traceback
- traceback.print_exc()
- try:
- await websocket.close(code=1011, reason="Proxy error")
- except:
- pass
- @router.websocket("/sessions/{session_uuid}/dashboard/api/ws")
- async def tunnel_dashboard_websocket(
- websocket: WebSocket,
- session_uuid: str
- ):
- """
- WebSocket proxy to device dashboard WebSocket.
- """
- session = tunnel_service.get_session(session_uuid)
- if not session or not session.device_tunnel_port:
- print(f"[tunnel] Dashboard WS: Session not found: {session_uuid}")
- await websocket.close(code=1008, reason="Session not found")
- return
- print(f"[tunnel] Dashboard WS: Accepting connection for session {session_uuid}")
- await websocket.accept()
- # Connect to device dashboard WebSocket
- import websockets
- try:
- device_ws_url = f'ws://localhost:{session.device_tunnel_port}/api/ws'
- print(f"[tunnel] Dashboard WS: Connecting to {device_ws_url}")
- async with websockets.connect(device_ws_url) as device_ws:
- print(f"[tunnel] Dashboard WS: Connected successfully")
- # Proxy messages both ways
- async def forward_to_device():
- try:
- while True:
- message = await websocket.receive()
- if 'bytes' in message:
- await device_ws.send(message['bytes'])
- elif 'text' in message:
- await device_ws.send(message['text'])
- elif 'websocket.disconnect' in message:
- break
- except WebSocketDisconnect:
- print(f"[tunnel] Dashboard WS: Browser disconnected")
- except Exception as e:
- print(f"[tunnel] Dashboard WS: Error forwarding to device: {e}")
- async def forward_from_device():
- try:
- async for message in device_ws:
- if isinstance(message, bytes):
- await websocket.send_bytes(message)
- else:
- await websocket.send_text(message)
- except Exception as e:
- print(f"[tunnel] Dashboard WS: Error forwarding from device: {e}")
- await asyncio.gather(forward_to_device(), forward_from_device(), return_exceptions=True)
- print(f"[tunnel] Dashboard WS: Proxy ended")
- except Exception as e:
- print(f"[tunnel] Dashboard WS: Proxy error: {e}")
- try:
- await websocket.close(code=1011, reason="Proxy error")
- except:
- pass
- @router.api_route("/sessions/{session_uuid}/terminal/{path:path}", methods=["GET", "POST"])
- async def tunnel_terminal_proxy(
- session_uuid: str,
- path: str,
- request: Request
- ):
- """
- Full HTTP proxy to ttyd (including token, ws, etc).
- Security: Session was created by superadmin, UUID is secret.
- Proxies /terminal/{path} to http://localhost:ttyd_port/{path}
- """
- session = tunnel_service.get_session(session_uuid)
- if not session or not session.ttyd_port:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="Terminal session not found"
- )
- # Proxy request to ttyd
- url = f'http://localhost:{session.ttyd_port}/{path}'
- print(f"[tunnel] HTTP proxy: {request.url.path} -> {url}")
- async with httpx.AsyncClient() as client:
- resp = await client.request(
- method=request.method,
- url=url,
- headers=dict(request.headers),
- params=request.query_params,
- )
- return Response(
- content=resp.content,
- status_code=resp.status_code,
- headers=dict(resp.headers)
- )
- @router.api_route("/sessions/{session_uuid}/dashboard/{path:path}", methods=["GET", "POST"])
- async def tunnel_dashboard_proxy(
- session_uuid: str,
- path: str,
- request: Request
- ):
- """
- Proxy Dashboard HTTP/WebSocket to device via tunnel.
- Security: Session was created by superadmin, UUID is secret.
- """
- session = tunnel_service.get_session(session_uuid)
- if not session or not session.device_tunnel_port:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="Dashboard session not found"
- )
- # Proxy to device dashboard through tunnel
- # Normalize path - avoid double slashes
- clean_path = path if path and path != '/' else ''
- url = f'http://localhost:{session.device_tunnel_port}/{clean_path}'
- print(f"[tunnel] Dashboard proxy: {request.url.path} -> {url}")
- async with httpx.AsyncClient() as client:
- resp = await client.request(
- method=request.method,
- url=url,
- headers={k: v for k, v in request.headers.items() if k.lower() not in ('host',)},
- params=request.query_params,
- content=await request.body()
- )
- # Replace paths in HTML/JS/CSS files
- content_type = resp.headers.get('content-type', '')
- should_replace = (
- (not path or path == '/') and 'text/html' in content_type
- ) or (
- 'javascript' in content_type or 'css' in content_type
- )
- if should_replace:
- try:
- content = resp.text
- base_url = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/dashboard/'
- # Replace absolute paths with proxied paths
- import re
- # First: Replace API paths (in JS/HTML)
- # Replace /api/* paths but avoid double-replacing
- # Use a unique marker to avoid replacing already-replaced paths
- marker = f'__DASHBOARD__{session_uuid}__'
- # Replace all /api/ with marker first
- content = re.sub(r'(?<!/v1/superadmin/tunnels)/api/', rf'{marker}/api/', content)
- # Now replace marker with full path
- content = content.replace(marker, base_url.rstrip('/'))
- # Second: Replace href/src="/..." -> href/src="/api/v1/.../dashboard/..." (HTML only)
- if 'text/html' in content_type:
- content = re.sub(r'(href|src)="/', rf'\1="{base_url}', content)
- headers = {
- k: v for k, v in resp.headers.items()
- if k.lower() not in ('content-length', 'content-encoding')
- }
- if 'text/html' in content_type:
- from starlette.responses import HTMLResponse
- return HTMLResponse(content=content, headers=headers)
- else:
- return Response(content=content.encode('utf-8'), headers=headers)
- except Exception as e:
- print(f"[tunnel] Dashboard path replacement error: {e}")
- # Fall through to return original response
- return Response(
- content=resp.content,
- status_code=resp.status_code,
- headers={k: v for k, v in resp.headers.items() if k.lower() not in ('content-length', 'content-encoding')}
- )
- @router.get("/sessions/{session_uuid}/terminal")
- async def tunnel_terminal_html(
- session_uuid: str
- ):
- """
- Return ttyd HTML with modified URLs.
- Security: Session was created by superadmin, UUID is secret.
- """
- session = tunnel_service.get_session(session_uuid)
- if not session or not session.ttyd_port:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="Terminal session not found"
- )
- # Fetch ttyd HTML and modify URLs to work through our proxy
- async with httpx.AsyncClient() as client:
- try:
- resp = await client.get(f'http://localhost:{session.ttyd_port}/')
- print(f"[tunnel] HTML: ttyd returned status {resp.status_code}")
- if resp.status_code != 200:
- print(f"[tunnel] HTML: ttyd error: {resp.text[:200]}")
- raise HTTPException(
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
- detail="Terminal not ready"
- )
- html = resp.text
- # Prefix for all ttyd resources
- prefix = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/terminal'
- # Replace relative URLs with proxied URLs
- html = html.replace('src="/', f'src="{prefix}/')
- html = html.replace('href="/', f'href="{prefix}/')
- # Replace WebSocket URL
- # ttyd uses: new WebSocket(url) where url is like "ws://host/ws"
- # We need to replace it to go through our proxy
- import re
- # Find WebSocket URL construction in JavaScript
- ws_before = html.count('new WebSocket')
- # Use wss:// for HTTPS, ws:// for HTTP - construct from request would be ideal
- # For now, replace with relative WebSocket that browser will resolve
- html = re.sub(
- r'(new WebSocket\([\'"])(ws[s]?://[^/]+)(/ws[\'"])',
- rf"\1' + (location.protocol === 'https:' ? 'wss://' : 'ws://') + location.host + '{prefix}\3",
- html
- )
- ws_after = html.count("location.host")
- print(f"[tunnel] HTML: replaced {ws_after} WebSocket URLs (found {ws_before} total)")
- from starlette.responses import HTMLResponse
- # Don't copy Content-Length or Content-Encoding since we modified the HTML
- # (ttyd returns compressed, we return uncompressed modified HTML)
- headers = {
- k: v for k, v in resp.headers.items()
- if k.lower() not in ('content-length', 'content-encoding')
- }
- return HTMLResponse(content=html, headers=headers)
- except Exception as e:
- print(f"[tunnel] HTML: exception: {e}")
- raise HTTPException(
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
- detail=f"Failed to fetch terminal: {str(e)}"
- )
|