tunnels.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. """
  2. Superadmin tunnel management API endpoints.
  3. """
  4. from typing import Annotated, Optional
  5. from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect, Request
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from pydantic import BaseModel
  8. from sqlalchemy import select
  9. from starlette.responses import StreamingResponse, Response
  10. import httpx
  11. import asyncio
  12. from app.api.deps import get_current_superadmin
  13. from app.core.database import get_db
  14. from app.models.device import Device
  15. from app.models.user import User
  16. from app.services.tunnel_service import tunnel_service
  17. import socket
  18. router = APIRouter(prefix="/tunnels", tags=["superadmin-tunnels"])
  19. async def _allocate_tunnel_port(
  20. db: AsyncSession,
  21. device: Device,
  22. tunnel_type: str,
  23. port_range: tuple[int, int]
  24. ) -> int:
  25. """
  26. Allocate a free port for tunnel.
  27. Strategy: Use device's simple_id to calculate deterministic port offset.
  28. """
  29. # Calculate port based on device simple_id for deterministic allocation
  30. start_port, end_port = port_range
  31. port_offset = device.simple_id % (end_port - start_port + 1)
  32. allocated_port = start_port + port_offset
  33. # Verify port is free on the system
  34. def is_port_free(port: int) -> bool:
  35. try:
  36. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  37. s.bind(('', port))
  38. return True
  39. except OSError:
  40. return False
  41. # If calculated port is taken, find next free port
  42. attempts = 0
  43. while not is_port_free(allocated_port) and attempts < 1000:
  44. allocated_port += 1
  45. if allocated_port > end_port:
  46. allocated_port = start_port
  47. attempts += 1
  48. if attempts >= 1000:
  49. raise HTTPException(
  50. status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  51. detail="No free ports available for tunnel"
  52. )
  53. return allocated_port
  54. class TunnelEnableResponse(BaseModel):
  55. """Response when enabling tunnel"""
  56. session_uuid: str
  57. device_id: str
  58. tunnel_type: str
  59. status: str
  60. class TunnelStatusResponse(BaseModel):
  61. """Tunnel session status response"""
  62. session_uuid: str
  63. status: str # "waiting" | "ready" | "failed"
  64. device_tunnel_port: Optional[int] = None
  65. ttyd_port: Optional[int] = None
  66. tunnel_url: Optional[str] = None
  67. @router.get("/sessions")
  68. async def list_all_sessions(
  69. current_user: Annotated[User, Depends(get_current_superadmin)]
  70. ):
  71. """
  72. Debug endpoint: List all active tunnel sessions in memory.
  73. """
  74. sessions = []
  75. for uuid, session in tunnel_service.sessions.items():
  76. sessions.append({
  77. "uuid": uuid,
  78. "device_id": session.device_id,
  79. "tunnel_type": session.tunnel_type,
  80. "status": session.status,
  81. "created_at": session.created_at.isoformat(),
  82. "expires_at": session.expires_at.isoformat(),
  83. "last_heartbeat": session.last_heartbeat.isoformat() if session.last_heartbeat else None,
  84. "ttyd_port": session.ttyd_port,
  85. "ttyd_pid": session.ttyd_pid,
  86. "device_tunnel_port": session.device_tunnel_port
  87. })
  88. return {
  89. "total": len(sessions),
  90. "sessions": sessions
  91. }
  92. @router.post("/devices/{device_id}/{tunnel_type}")
  93. async def enable_tunnel(
  94. device_id: int,
  95. tunnel_type: str,
  96. db: Annotated[AsyncSession, Depends(get_db)],
  97. current_user: Annotated[User, Depends(get_current_superadmin)]
  98. ) -> TunnelEnableResponse:
  99. """
  100. Enable SSH or Dashboard tunnel for device.
  101. Creates session and triggers device to connect tunnel.
  102. Args:
  103. device_id: Device numeric ID
  104. tunnel_type: "ssh" or "dashboard"
  105. Returns:
  106. session_uuid: UUID for polling status
  107. """
  108. if tunnel_type not in ["ssh", "dashboard"]:
  109. raise HTTPException(
  110. status_code=status.HTTP_400_BAD_REQUEST,
  111. detail="tunnel_type must be 'ssh' or 'dashboard'"
  112. )
  113. # Get device
  114. result = await db.execute(
  115. select(Device).where(Device.id == device_id)
  116. )
  117. device = result.scalar_one_or_none()
  118. if not device:
  119. raise HTTPException(
  120. status_code=status.HTTP_404_NOT_FOUND,
  121. detail="Device not found"
  122. )
  123. # Allocate port for tunnel
  124. if tunnel_type == "ssh":
  125. # SSH tunnel ports: 50000-59999
  126. port_range = (50000, 59999)
  127. else:
  128. # Dashboard tunnel ports: 60000-65535
  129. port_range = (60000, 65535)
  130. # Find free port (simple sequential allocation)
  131. allocated_port = await _allocate_tunnel_port(db, device, tunnel_type, port_range)
  132. # Create tunnel session
  133. session = tunnel_service.create_session(
  134. device_id=device.mac_address,
  135. admin_user=current_user.email,
  136. tunnel_type=tunnel_type
  137. )
  138. # Update device config to enable tunnel with allocated port
  139. if device.config is None:
  140. device.config = {}
  141. tunnel_key = f"{tunnel_type}_tunnel"
  142. if tunnel_key not in device.config:
  143. device.config[tunnel_key] = {}
  144. device.config[tunnel_key]["enabled"] = True
  145. device.config[tunnel_key]["remote_port"] = allocated_port
  146. device.config[tunnel_key]["timeout_minutes"] = 120 # Tunnel will auto-stop after 120 minutes
  147. # Copy other tunnel settings from default
  148. if "server" not in device.config[tunnel_key]:
  149. device.config[tunnel_key]["server"] = "192.168.5.4"
  150. device.config[tunnel_key]["port"] = 22
  151. device.config[tunnel_key]["user"] = "tunnel"
  152. device.config[tunnel_key]["keepalive_interval"] = 30
  153. # Mark as modified (SQLAlchemy JSON field)
  154. from sqlalchemy.orm import attributes
  155. attributes.flag_modified(device, "config")
  156. await db.commit()
  157. print(f"[tunnel] Enabled {tunnel_type} tunnel for {device.mac_address} on port {allocated_port}")
  158. return TunnelEnableResponse(
  159. session_uuid=session.uuid,
  160. device_id=device.mac_address,
  161. tunnel_type=tunnel_type,
  162. status="waiting"
  163. )
  164. @router.get("/sessions/{session_uuid}/status")
  165. async def get_tunnel_status(
  166. session_uuid: str,
  167. current_user: Annotated[User, Depends(get_current_superadmin)]
  168. ) -> TunnelStatusResponse:
  169. """
  170. Poll tunnel session status.
  171. Returns:
  172. - waiting: Device not yet connected
  173. - ready: Tunnel connected, ttyd spawned
  174. - failed: Session expired or failed
  175. """
  176. session = tunnel_service.get_session(session_uuid)
  177. if not session:
  178. raise HTTPException(
  179. status_code=status.HTTP_404_NOT_FOUND,
  180. detail="Session not found or expired"
  181. )
  182. # Build response
  183. response = TunnelStatusResponse(
  184. session_uuid=session.uuid,
  185. status=session.status,
  186. device_tunnel_port=session.device_tunnel_port,
  187. ttyd_port=session.ttyd_port
  188. )
  189. # If ready, return appropriate URL based on tunnel type
  190. if session.status == "ready":
  191. if session.tunnel_type == "ssh" and session.ttyd_port:
  192. response.tunnel_url = f"/admin/ssh/{session.uuid}"
  193. elif session.tunnel_type == "dashboard" and session.device_tunnel_port:
  194. response.tunnel_url = f"/admin/dashboard/{session.uuid}"
  195. return response
  196. @router.post("/sessions/{session_uuid}/heartbeat")
  197. async def session_heartbeat(
  198. session_uuid: str,
  199. current_user: Annotated[User, Depends(get_current_superadmin)]
  200. ):
  201. """
  202. Browser sends heartbeat every 30 seconds to keep session alive.
  203. """
  204. success = tunnel_service.update_heartbeat(session_uuid)
  205. if not success:
  206. raise HTTPException(
  207. status_code=status.HTTP_404_NOT_FOUND,
  208. detail="Session not found"
  209. )
  210. return {"success": True}
  211. @router.websocket("/sessions/{session_uuid}/terminal/ws")
  212. async def tunnel_terminal_websocket(
  213. websocket: WebSocket,
  214. session_uuid: str
  215. ):
  216. """
  217. WebSocket proxy to ttyd on localhost.
  218. """
  219. session = tunnel_service.get_session(session_uuid)
  220. if not session or not session.ttyd_port:
  221. print(f"[tunnel] WS: Session not found or no ttyd port: {session_uuid}")
  222. await websocket.close(code=1008, reason="Session not found")
  223. return
  224. print(f"[tunnel] WS: Accepting connection for session {session_uuid}, ttyd port {session.ttyd_port}")
  225. # ttyd uses "tty" WebSocket subprotocol
  226. await websocket.accept(subprotocol="tty")
  227. # Connect to ttyd WebSocket
  228. import websockets
  229. try:
  230. ttyd_url = f'ws://localhost:{session.ttyd_port}/ws'
  231. print(f"[tunnel] WS: Connecting to ttyd at {ttyd_url}")
  232. # Connect to ttyd with "tty" subprotocol
  233. async with websockets.connect(ttyd_url, subprotocols=["tty"]) as ttyd_ws:
  234. print(f"[tunnel] WS: Connected to ttyd successfully")
  235. # Proxy messages both ways
  236. async def forward_to_ttyd():
  237. try:
  238. while True:
  239. message = await websocket.receive()
  240. print(f"[tunnel] WS: Received from browser: {message.keys()}")
  241. if 'bytes' in message:
  242. data = message['bytes']
  243. print(f"[tunnel] WS: Browser -> ttyd ({len(data)} bytes)")
  244. await ttyd_ws.send(data)
  245. elif 'text' in message:
  246. data = message['text']
  247. print(f"[tunnel] WS: Browser -> ttyd ({len(data)} chars text)")
  248. await ttyd_ws.send(data)
  249. elif 'websocket.disconnect' in message:
  250. print(f"[tunnel] WS: Browser sent disconnect")
  251. break
  252. except WebSocketDisconnect:
  253. print(f"[tunnel] WS: Browser disconnected (exception)")
  254. except Exception as e:
  255. print(f"[tunnel] WS: Error forwarding to ttyd: {e}")
  256. import traceback
  257. traceback.print_exc()
  258. async def forward_from_ttyd():
  259. try:
  260. async for message in ttyd_ws:
  261. if isinstance(message, bytes):
  262. print(f"[tunnel] WS: ttyd -> Browser ({len(message)} bytes)")
  263. await websocket.send_bytes(message)
  264. else:
  265. print(f"[tunnel] WS: ttyd -> Browser ({len(message)} chars text)")
  266. await websocket.send_text(message)
  267. except Exception as e:
  268. print(f"[tunnel] WS: Error forwarding from ttyd: {e}")
  269. import traceback
  270. traceback.print_exc()
  271. await asyncio.gather(forward_to_ttyd(), forward_from_ttyd(), return_exceptions=True)
  272. print(f"[tunnel] WS: Proxy loop ended")
  273. except Exception as e:
  274. print(f"[tunnel] WS: Proxy error: {e}")
  275. import traceback
  276. traceback.print_exc()
  277. try:
  278. await websocket.close(code=1011, reason="Proxy error")
  279. except:
  280. pass
  281. @router.websocket("/sessions/{session_uuid}/dashboard/api/ws")
  282. async def tunnel_dashboard_websocket(
  283. websocket: WebSocket,
  284. session_uuid: str
  285. ):
  286. """
  287. WebSocket proxy to device dashboard WebSocket.
  288. """
  289. session = tunnel_service.get_session(session_uuid)
  290. if not session or not session.device_tunnel_port:
  291. print(f"[tunnel] Dashboard WS: Session not found: {session_uuid}")
  292. await websocket.close(code=1008, reason="Session not found")
  293. return
  294. print(f"[tunnel] Dashboard WS: Accepting connection for session {session_uuid}")
  295. await websocket.accept()
  296. # Connect to device dashboard WebSocket
  297. import websockets
  298. try:
  299. device_ws_url = f'ws://localhost:{session.device_tunnel_port}/api/ws'
  300. print(f"[tunnel] Dashboard WS: Connecting to {device_ws_url}")
  301. async with websockets.connect(device_ws_url) as device_ws:
  302. print(f"[tunnel] Dashboard WS: Connected successfully")
  303. # Proxy messages both ways
  304. async def forward_to_device():
  305. try:
  306. while True:
  307. message = await websocket.receive()
  308. if 'bytes' in message:
  309. await device_ws.send(message['bytes'])
  310. elif 'text' in message:
  311. await device_ws.send(message['text'])
  312. elif 'websocket.disconnect' in message:
  313. break
  314. except WebSocketDisconnect:
  315. print(f"[tunnel] Dashboard WS: Browser disconnected")
  316. except Exception as e:
  317. print(f"[tunnel] Dashboard WS: Error forwarding to device: {e}")
  318. async def forward_from_device():
  319. try:
  320. async for message in device_ws:
  321. if isinstance(message, bytes):
  322. await websocket.send_bytes(message)
  323. else:
  324. await websocket.send_text(message)
  325. except Exception as e:
  326. print(f"[tunnel] Dashboard WS: Error forwarding from device: {e}")
  327. await asyncio.gather(forward_to_device(), forward_from_device(), return_exceptions=True)
  328. print(f"[tunnel] Dashboard WS: Proxy ended")
  329. except Exception as e:
  330. print(f"[tunnel] Dashboard WS: Proxy error: {e}")
  331. try:
  332. await websocket.close(code=1011, reason="Proxy error")
  333. except:
  334. pass
  335. @router.api_route("/sessions/{session_uuid}/terminal/{path:path}", methods=["GET", "POST"])
  336. async def tunnel_terminal_proxy(
  337. session_uuid: str,
  338. path: str,
  339. request: Request
  340. ):
  341. """
  342. Full HTTP proxy to ttyd (including token, ws, etc).
  343. Security: Session was created by superadmin, UUID is secret.
  344. Proxies /terminal/{path} to http://localhost:ttyd_port/{path}
  345. """
  346. session = tunnel_service.get_session(session_uuid)
  347. if not session or not session.ttyd_port:
  348. raise HTTPException(
  349. status_code=status.HTTP_404_NOT_FOUND,
  350. detail="Terminal session not found"
  351. )
  352. # Proxy request to ttyd
  353. url = f'http://localhost:{session.ttyd_port}/{path}'
  354. print(f"[tunnel] HTTP proxy: {request.url.path} -> {url}")
  355. async with httpx.AsyncClient() as client:
  356. resp = await client.request(
  357. method=request.method,
  358. url=url,
  359. headers=dict(request.headers),
  360. params=request.query_params,
  361. )
  362. return Response(
  363. content=resp.content,
  364. status_code=resp.status_code,
  365. headers=dict(resp.headers)
  366. )
  367. @router.api_route("/sessions/{session_uuid}/dashboard/{path:path}", methods=["GET", "POST"])
  368. async def tunnel_dashboard_proxy(
  369. session_uuid: str,
  370. path: str,
  371. request: Request
  372. ):
  373. """
  374. Proxy Dashboard HTTP/WebSocket to device via tunnel.
  375. Security: Session was created by superadmin, UUID is secret.
  376. """
  377. session = tunnel_service.get_session(session_uuid)
  378. if not session or not session.device_tunnel_port:
  379. raise HTTPException(
  380. status_code=status.HTTP_404_NOT_FOUND,
  381. detail="Dashboard session not found"
  382. )
  383. # Proxy to device dashboard through tunnel
  384. # Normalize path - avoid double slashes
  385. clean_path = path if path and path != '/' else ''
  386. url = f'http://localhost:{session.device_tunnel_port}/{clean_path}'
  387. print(f"[tunnel] Dashboard proxy: {request.url.path} -> {url}")
  388. async with httpx.AsyncClient() as client:
  389. resp = await client.request(
  390. method=request.method,
  391. url=url,
  392. headers={k: v for k, v in request.headers.items() if k.lower() not in ('host',)},
  393. params=request.query_params,
  394. content=await request.body()
  395. )
  396. # Replace paths in HTML/JS/CSS files
  397. content_type = resp.headers.get('content-type', '')
  398. should_replace = (
  399. (not path or path == '/') and 'text/html' in content_type
  400. ) or (
  401. 'javascript' in content_type or 'css' in content_type
  402. )
  403. if should_replace:
  404. try:
  405. content = resp.text
  406. base_url = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/dashboard/'
  407. # Replace absolute paths with proxied paths
  408. import re
  409. # First: Replace API paths (in JS/HTML)
  410. # Replace /api/* paths but avoid double-replacing
  411. # Use a unique marker to avoid replacing already-replaced paths
  412. marker = f'__DASHBOARD__{session_uuid}__'
  413. # Replace all /api/ with marker first
  414. content = re.sub(r'(?<!/v1/superadmin/tunnels)/api/', rf'{marker}/api/', content)
  415. # Now replace marker with full path
  416. content = content.replace(marker, base_url.rstrip('/'))
  417. # Second: Replace href/src="/..." -> href/src="/api/v1/.../dashboard/..." (HTML only)
  418. if 'text/html' in content_type:
  419. content = re.sub(r'(href|src)="/', rf'\1="{base_url}', content)
  420. headers = {
  421. k: v for k, v in resp.headers.items()
  422. if k.lower() not in ('content-length', 'content-encoding')
  423. }
  424. if 'text/html' in content_type:
  425. from starlette.responses import HTMLResponse
  426. return HTMLResponse(content=content, headers=headers)
  427. else:
  428. return Response(content=content.encode('utf-8'), headers=headers)
  429. except Exception as e:
  430. print(f"[tunnel] Dashboard path replacement error: {e}")
  431. # Fall through to return original response
  432. return Response(
  433. content=resp.content,
  434. status_code=resp.status_code,
  435. headers={k: v for k, v in resp.headers.items() if k.lower() not in ('content-length', 'content-encoding')}
  436. )
  437. @router.get("/sessions/{session_uuid}/terminal")
  438. async def tunnel_terminal_html(
  439. session_uuid: str
  440. ):
  441. """
  442. Return ttyd HTML with modified URLs.
  443. Security: Session was created by superadmin, UUID is secret.
  444. """
  445. session = tunnel_service.get_session(session_uuid)
  446. if not session or not session.ttyd_port:
  447. raise HTTPException(
  448. status_code=status.HTTP_404_NOT_FOUND,
  449. detail="Terminal session not found"
  450. )
  451. # Fetch ttyd HTML and modify URLs to work through our proxy
  452. async with httpx.AsyncClient() as client:
  453. try:
  454. resp = await client.get(f'http://localhost:{session.ttyd_port}/')
  455. print(f"[tunnel] HTML: ttyd returned status {resp.status_code}")
  456. if resp.status_code != 200:
  457. print(f"[tunnel] HTML: ttyd error: {resp.text[:200]}")
  458. raise HTTPException(
  459. status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
  460. detail="Terminal not ready"
  461. )
  462. html = resp.text
  463. # Prefix for all ttyd resources
  464. prefix = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/terminal'
  465. # Replace relative URLs with proxied URLs
  466. html = html.replace('src="/', f'src="{prefix}/')
  467. html = html.replace('href="/', f'href="{prefix}/')
  468. # Replace WebSocket URL
  469. # ttyd uses: new WebSocket(url) where url is like "ws://host/ws"
  470. # We need to replace it to go through our proxy
  471. import re
  472. # Find WebSocket URL construction in JavaScript
  473. ws_before = html.count('new WebSocket')
  474. html = re.sub(
  475. r'(new WebSocket\([\'"])(ws[s]?://[^/]+)(/ws[\'"])',
  476. rf'\1ws://192.168.5.4:8000{prefix}\3',
  477. html
  478. )
  479. ws_after = html.count('ws://192.168.5.4:8000')
  480. print(f"[tunnel] HTML: replaced {ws_after} WebSocket URLs (found {ws_before} total)")
  481. from starlette.responses import HTMLResponse
  482. # Don't copy Content-Length or Content-Encoding since we modified the HTML
  483. # (ttyd returns compressed, we return uncompressed modified HTML)
  484. headers = {
  485. k: v for k, v in resp.headers.items()
  486. if k.lower() not in ('content-length', 'content-encoding')
  487. }
  488. return HTMLResponse(content=html, headers=headers)
  489. except Exception as e:
  490. print(f"[tunnel] HTML: exception: {e}")
  491. raise HTTPException(
  492. status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
  493. detail=f"Failed to fetch terminal: {str(e)}"
  494. )