tunnels.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. """
  2. Superadmin tunnel management API endpoints.
  3. """
  4. from typing import Annotated, Optional
  5. from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect, Request
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from pydantic import BaseModel
  8. from sqlalchemy import select
  9. from starlette.responses import StreamingResponse, Response
  10. import httpx
  11. import asyncio
  12. from app.api.deps import get_current_superadmin
  13. from app.core.database import get_db
  14. from app.models.device import Device
  15. from app.models.user import User
  16. from app.services.tunnel_service import tunnel_service
  17. import socket
  18. router = APIRouter(prefix="/tunnels", tags=["superadmin-tunnels"])
  19. async def _allocate_tunnel_port(
  20. db: AsyncSession,
  21. device: Device,
  22. tunnel_type: str,
  23. port_range: tuple[int, int]
  24. ) -> int:
  25. """
  26. Allocate a free port for tunnel.
  27. Strategy: Use device's simple_id to calculate deterministic port offset.
  28. """
  29. # Calculate port based on device simple_id for deterministic allocation
  30. start_port, end_port = port_range
  31. port_offset = device.simple_id % (end_port - start_port + 1)
  32. allocated_port = start_port + port_offset
  33. # Verify port is free on the system
  34. def is_port_free(port: int) -> bool:
  35. try:
  36. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  37. s.bind(('', port))
  38. return True
  39. except OSError:
  40. return False
  41. # If calculated port is taken, find next free port
  42. attempts = 0
  43. while not is_port_free(allocated_port) and attempts < 1000:
  44. allocated_port += 1
  45. if allocated_port > end_port:
  46. allocated_port = start_port
  47. attempts += 1
  48. if attempts >= 1000:
  49. raise HTTPException(
  50. status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  51. detail="No free ports available for tunnel"
  52. )
  53. return allocated_port
  54. class TunnelEnableResponse(BaseModel):
  55. """Response when enabling tunnel"""
  56. session_uuid: str
  57. device_id: str
  58. tunnel_type: str
  59. status: str
  60. class TunnelStatusResponse(BaseModel):
  61. """Tunnel session status response"""
  62. session_uuid: str
  63. status: str # "waiting" | "ready" | "failed"
  64. device_tunnel_port: Optional[int] = None
  65. ttyd_port: Optional[int] = None
  66. tunnel_url: Optional[str] = None
  67. @router.post("/devices/{device_id}/{tunnel_type}")
  68. async def enable_tunnel(
  69. device_id: int,
  70. tunnel_type: str,
  71. db: Annotated[AsyncSession, Depends(get_db)],
  72. current_user: Annotated[User, Depends(get_current_superadmin)]
  73. ) -> TunnelEnableResponse:
  74. """
  75. Enable SSH or Dashboard tunnel for device.
  76. Creates session and triggers device to connect tunnel.
  77. Args:
  78. device_id: Device numeric ID
  79. tunnel_type: "ssh" or "dashboard"
  80. Returns:
  81. session_uuid: UUID for polling status
  82. """
  83. if tunnel_type not in ["ssh", "dashboard"]:
  84. raise HTTPException(
  85. status_code=status.HTTP_400_BAD_REQUEST,
  86. detail="tunnel_type must be 'ssh' or 'dashboard'"
  87. )
  88. # Get device
  89. result = await db.execute(
  90. select(Device).where(Device.id == device_id)
  91. )
  92. device = result.scalar_one_or_none()
  93. if not device:
  94. raise HTTPException(
  95. status_code=status.HTTP_404_NOT_FOUND,
  96. detail="Device not found"
  97. )
  98. # Allocate port for tunnel
  99. if tunnel_type == "ssh":
  100. # SSH tunnel ports: 50000-59999
  101. port_range = (50000, 59999)
  102. else:
  103. # Dashboard tunnel ports: 60000-65535
  104. port_range = (60000, 65535)
  105. # Find free port (simple sequential allocation)
  106. allocated_port = await _allocate_tunnel_port(db, device, tunnel_type, port_range)
  107. # Create tunnel session
  108. session = tunnel_service.create_session(
  109. device_id=device.mac_address,
  110. admin_user=current_user.email,
  111. tunnel_type=tunnel_type
  112. )
  113. # Update device config to enable tunnel with allocated port
  114. if device.config is None:
  115. device.config = {}
  116. tunnel_key = f"{tunnel_type}_tunnel"
  117. if tunnel_key not in device.config:
  118. device.config[tunnel_key] = {}
  119. device.config[tunnel_key]["enabled"] = True
  120. device.config[tunnel_key]["remote_port"] = allocated_port
  121. # Copy other tunnel settings from default
  122. if "server" not in device.config[tunnel_key]:
  123. device.config[tunnel_key]["server"] = "192.168.5.4"
  124. device.config[tunnel_key]["port"] = 22
  125. device.config[tunnel_key]["user"] = "tunnel"
  126. device.config[tunnel_key]["keepalive_interval"] = 30
  127. # Mark as modified (SQLAlchemy JSON field)
  128. from sqlalchemy.orm import attributes
  129. attributes.flag_modified(device, "config")
  130. await db.commit()
  131. print(f"[tunnel] Enabled {tunnel_type} tunnel for {device.mac_address} on port {allocated_port}")
  132. return TunnelEnableResponse(
  133. session_uuid=session.uuid,
  134. device_id=device.mac_address,
  135. tunnel_type=tunnel_type,
  136. status="waiting"
  137. )
  138. @router.get("/sessions/{session_uuid}/status")
  139. async def get_tunnel_status(
  140. session_uuid: str,
  141. current_user: Annotated[User, Depends(get_current_superadmin)]
  142. ) -> TunnelStatusResponse:
  143. """
  144. Poll tunnel session status.
  145. Returns:
  146. - waiting: Device not yet connected
  147. - ready: Tunnel connected, ttyd spawned
  148. - failed: Session expired or failed
  149. """
  150. session = tunnel_service.get_session(session_uuid)
  151. if not session:
  152. raise HTTPException(
  153. status_code=status.HTTP_404_NOT_FOUND,
  154. detail="Session not found or expired"
  155. )
  156. # Build response
  157. response = TunnelStatusResponse(
  158. session_uuid=session.uuid,
  159. status=session.status,
  160. device_tunnel_port=session.device_tunnel_port,
  161. ttyd_port=session.ttyd_port
  162. )
  163. # If ready, return appropriate URL based on tunnel type
  164. if session.status == "ready":
  165. if session.tunnel_type == "ssh" and session.ttyd_port:
  166. response.tunnel_url = f"/admin/ssh/{session.uuid}"
  167. elif session.tunnel_type == "dashboard" and session.device_tunnel_port:
  168. response.tunnel_url = f"/admin/dashboard/{session.uuid}"
  169. return response
  170. @router.post("/sessions/{session_uuid}/heartbeat")
  171. async def session_heartbeat(
  172. session_uuid: str,
  173. current_user: Annotated[User, Depends(get_current_superadmin)]
  174. ):
  175. """
  176. Browser sends heartbeat every 30 seconds to keep session alive.
  177. """
  178. success = tunnel_service.update_heartbeat(session_uuid)
  179. if not success:
  180. raise HTTPException(
  181. status_code=status.HTTP_404_NOT_FOUND,
  182. detail="Session not found"
  183. )
  184. return {"success": True}
  185. @router.websocket("/sessions/{session_uuid}/terminal/ws")
  186. async def tunnel_terminal_websocket(
  187. websocket: WebSocket,
  188. session_uuid: str
  189. ):
  190. """
  191. WebSocket proxy to ttyd on localhost.
  192. """
  193. session = tunnel_service.get_session(session_uuid)
  194. if not session or not session.ttyd_port:
  195. print(f"[tunnel] WS: Session not found or no ttyd port: {session_uuid}")
  196. await websocket.close(code=1008, reason="Session not found")
  197. return
  198. print(f"[tunnel] WS: Accepting connection for session {session_uuid}, ttyd port {session.ttyd_port}")
  199. # ttyd uses "tty" WebSocket subprotocol
  200. await websocket.accept(subprotocol="tty")
  201. # Connect to ttyd WebSocket
  202. import websockets
  203. try:
  204. ttyd_url = f'ws://localhost:{session.ttyd_port}/ws'
  205. print(f"[tunnel] WS: Connecting to ttyd at {ttyd_url}")
  206. # Connect to ttyd with "tty" subprotocol
  207. async with websockets.connect(ttyd_url, subprotocols=["tty"]) as ttyd_ws:
  208. print(f"[tunnel] WS: Connected to ttyd successfully")
  209. # Proxy messages both ways
  210. async def forward_to_ttyd():
  211. try:
  212. while True:
  213. message = await websocket.receive()
  214. print(f"[tunnel] WS: Received from browser: {message.keys()}")
  215. if 'bytes' in message:
  216. data = message['bytes']
  217. print(f"[tunnel] WS: Browser -> ttyd ({len(data)} bytes)")
  218. await ttyd_ws.send(data)
  219. elif 'text' in message:
  220. data = message['text']
  221. print(f"[tunnel] WS: Browser -> ttyd ({len(data)} chars text)")
  222. await ttyd_ws.send(data)
  223. elif 'websocket.disconnect' in message:
  224. print(f"[tunnel] WS: Browser sent disconnect")
  225. break
  226. except WebSocketDisconnect:
  227. print(f"[tunnel] WS: Browser disconnected (exception)")
  228. except Exception as e:
  229. print(f"[tunnel] WS: Error forwarding to ttyd: {e}")
  230. import traceback
  231. traceback.print_exc()
  232. async def forward_from_ttyd():
  233. try:
  234. async for message in ttyd_ws:
  235. if isinstance(message, bytes):
  236. print(f"[tunnel] WS: ttyd -> Browser ({len(message)} bytes)")
  237. await websocket.send_bytes(message)
  238. else:
  239. print(f"[tunnel] WS: ttyd -> Browser ({len(message)} chars text)")
  240. await websocket.send_text(message)
  241. except Exception as e:
  242. print(f"[tunnel] WS: Error forwarding from ttyd: {e}")
  243. import traceback
  244. traceback.print_exc()
  245. await asyncio.gather(forward_to_ttyd(), forward_from_ttyd(), return_exceptions=True)
  246. print(f"[tunnel] WS: Proxy loop ended")
  247. except Exception as e:
  248. print(f"[tunnel] WS: Proxy error: {e}")
  249. import traceback
  250. traceback.print_exc()
  251. try:
  252. await websocket.close(code=1011, reason="Proxy error")
  253. except:
  254. pass
  255. @router.websocket("/sessions/{session_uuid}/dashboard/api/ws")
  256. async def tunnel_dashboard_websocket(
  257. websocket: WebSocket,
  258. session_uuid: str
  259. ):
  260. """
  261. WebSocket proxy to device dashboard WebSocket.
  262. """
  263. session = tunnel_service.get_session(session_uuid)
  264. if not session or not session.device_tunnel_port:
  265. print(f"[tunnel] Dashboard WS: Session not found: {session_uuid}")
  266. await websocket.close(code=1008, reason="Session not found")
  267. return
  268. print(f"[tunnel] Dashboard WS: Accepting connection for session {session_uuid}")
  269. await websocket.accept()
  270. # Connect to device dashboard WebSocket
  271. import websockets
  272. try:
  273. device_ws_url = f'ws://localhost:{session.device_tunnel_port}/api/ws'
  274. print(f"[tunnel] Dashboard WS: Connecting to {device_ws_url}")
  275. async with websockets.connect(device_ws_url) as device_ws:
  276. print(f"[tunnel] Dashboard WS: Connected successfully")
  277. # Proxy messages both ways
  278. async def forward_to_device():
  279. try:
  280. while True:
  281. message = await websocket.receive()
  282. if 'bytes' in message:
  283. await device_ws.send(message['bytes'])
  284. elif 'text' in message:
  285. await device_ws.send(message['text'])
  286. elif 'websocket.disconnect' in message:
  287. break
  288. except WebSocketDisconnect:
  289. print(f"[tunnel] Dashboard WS: Browser disconnected")
  290. except Exception as e:
  291. print(f"[tunnel] Dashboard WS: Error forwarding to device: {e}")
  292. async def forward_from_device():
  293. try:
  294. async for message in device_ws:
  295. if isinstance(message, bytes):
  296. await websocket.send_bytes(message)
  297. else:
  298. await websocket.send_text(message)
  299. except Exception as e:
  300. print(f"[tunnel] Dashboard WS: Error forwarding from device: {e}")
  301. await asyncio.gather(forward_to_device(), forward_from_device(), return_exceptions=True)
  302. print(f"[tunnel] Dashboard WS: Proxy ended")
  303. except Exception as e:
  304. print(f"[tunnel] Dashboard WS: Proxy error: {e}")
  305. try:
  306. await websocket.close(code=1011, reason="Proxy error")
  307. except:
  308. pass
  309. @router.api_route("/sessions/{session_uuid}/terminal/{path:path}", methods=["GET", "POST"])
  310. async def tunnel_terminal_proxy(
  311. session_uuid: str,
  312. path: str,
  313. request: Request
  314. ):
  315. """
  316. Full HTTP proxy to ttyd (including token, ws, etc).
  317. Security: Session was created by superadmin, UUID is secret.
  318. Proxies /terminal/{path} to http://localhost:ttyd_port/{path}
  319. """
  320. session = tunnel_service.get_session(session_uuid)
  321. if not session or not session.ttyd_port:
  322. raise HTTPException(
  323. status_code=status.HTTP_404_NOT_FOUND,
  324. detail="Terminal session not found"
  325. )
  326. # Proxy request to ttyd
  327. url = f'http://localhost:{session.ttyd_port}/{path}'
  328. print(f"[tunnel] HTTP proxy: {request.url.path} -> {url}")
  329. async with httpx.AsyncClient() as client:
  330. resp = await client.request(
  331. method=request.method,
  332. url=url,
  333. headers=dict(request.headers),
  334. params=request.query_params,
  335. )
  336. return Response(
  337. content=resp.content,
  338. status_code=resp.status_code,
  339. headers=dict(resp.headers)
  340. )
  341. @router.api_route("/sessions/{session_uuid}/dashboard/{path:path}", methods=["GET", "POST"])
  342. async def tunnel_dashboard_proxy(
  343. session_uuid: str,
  344. path: str,
  345. request: Request
  346. ):
  347. """
  348. Proxy Dashboard HTTP/WebSocket to device via tunnel.
  349. Security: Session was created by superadmin, UUID is secret.
  350. """
  351. session = tunnel_service.get_session(session_uuid)
  352. if not session or not session.device_tunnel_port:
  353. raise HTTPException(
  354. status_code=status.HTTP_404_NOT_FOUND,
  355. detail="Dashboard session not found"
  356. )
  357. # Proxy to device dashboard through tunnel
  358. # Normalize path - avoid double slashes
  359. clean_path = path if path and path != '/' else ''
  360. url = f'http://localhost:{session.device_tunnel_port}/{clean_path}'
  361. print(f"[tunnel] Dashboard proxy: {request.url.path} -> {url}")
  362. async with httpx.AsyncClient() as client:
  363. resp = await client.request(
  364. method=request.method,
  365. url=url,
  366. headers={k: v for k, v in request.headers.items() if k.lower() not in ('host',)},
  367. params=request.query_params,
  368. content=await request.body()
  369. )
  370. # Replace paths in HTML/JS/CSS files
  371. content_type = resp.headers.get('content-type', '')
  372. should_replace = (
  373. (not path or path == '/') and 'text/html' in content_type
  374. ) or (
  375. 'javascript' in content_type or 'css' in content_type
  376. )
  377. if should_replace:
  378. try:
  379. content = resp.text
  380. base_url = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/dashboard/'
  381. # Replace absolute paths with proxied paths
  382. import re
  383. # First: Replace API paths (in JS/HTML)
  384. # Replace /api/* paths but avoid double-replacing
  385. # Use a unique marker to avoid replacing already-replaced paths
  386. marker = f'__DASHBOARD__{session_uuid}__'
  387. # Replace all /api/ with marker first
  388. content = re.sub(r'(?<!/v1/superadmin/tunnels)/api/', rf'{marker}/api/', content)
  389. # Now replace marker with full path
  390. content = content.replace(marker, base_url.rstrip('/'))
  391. # Second: Replace href/src="/..." -> href/src="/api/v1/.../dashboard/..." (HTML only)
  392. if 'text/html' in content_type:
  393. content = re.sub(r'(href|src)="/', rf'\1="{base_url}', content)
  394. headers = {
  395. k: v for k, v in resp.headers.items()
  396. if k.lower() not in ('content-length', 'content-encoding')
  397. }
  398. if 'text/html' in content_type:
  399. from starlette.responses import HTMLResponse
  400. return HTMLResponse(content=content, headers=headers)
  401. else:
  402. return Response(content=content.encode('utf-8'), headers=headers)
  403. except Exception as e:
  404. print(f"[tunnel] Dashboard path replacement error: {e}")
  405. # Fall through to return original response
  406. return Response(
  407. content=resp.content,
  408. status_code=resp.status_code,
  409. headers={k: v for k, v in resp.headers.items() if k.lower() not in ('content-length', 'content-encoding')}
  410. )
  411. @router.get("/sessions/{session_uuid}/terminal")
  412. async def tunnel_terminal_html(
  413. session_uuid: str
  414. ):
  415. """
  416. Return ttyd HTML with modified URLs.
  417. Security: Session was created by superadmin, UUID is secret.
  418. """
  419. session = tunnel_service.get_session(session_uuid)
  420. if not session or not session.ttyd_port:
  421. raise HTTPException(
  422. status_code=status.HTTP_404_NOT_FOUND,
  423. detail="Terminal session not found"
  424. )
  425. # Fetch ttyd HTML and modify URLs to work through our proxy
  426. async with httpx.AsyncClient() as client:
  427. try:
  428. resp = await client.get(f'http://localhost:{session.ttyd_port}/')
  429. print(f"[tunnel] HTML: ttyd returned status {resp.status_code}")
  430. if resp.status_code != 200:
  431. print(f"[tunnel] HTML: ttyd error: {resp.text[:200]}")
  432. raise HTTPException(
  433. status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
  434. detail="Terminal not ready"
  435. )
  436. html = resp.text
  437. # Prefix for all ttyd resources
  438. prefix = f'/api/v1/superadmin/tunnels/sessions/{session_uuid}/terminal'
  439. # Replace relative URLs with proxied URLs
  440. html = html.replace('src="/', f'src="{prefix}/')
  441. html = html.replace('href="/', f'href="{prefix}/')
  442. # Replace WebSocket URL
  443. # ttyd uses: new WebSocket(url) where url is like "ws://host/ws"
  444. # We need to replace it to go through our proxy
  445. import re
  446. # Find WebSocket URL construction in JavaScript
  447. ws_before = html.count('new WebSocket')
  448. html = re.sub(
  449. r'(new WebSocket\([\'"])(ws[s]?://[^/]+)(/ws[\'"])',
  450. rf'\1ws://192.168.5.4:8000{prefix}\3',
  451. html
  452. )
  453. ws_after = html.count('ws://192.168.5.4:8000')
  454. print(f"[tunnel] HTML: replaced {ws_after} WebSocket URLs (found {ws_before} total)")
  455. from starlette.responses import HTMLResponse
  456. # Don't copy Content-Length or Content-Encoding since we modified the HTML
  457. # (ttyd returns compressed, we return uncompressed modified HTML)
  458. headers = {
  459. k: v for k, v in resp.headers.items()
  460. if k.lower() not in ('content-length', 'content-encoding')
  461. }
  462. return HTMLResponse(content=html, headers=headers)
  463. except Exception as e:
  464. print(f"[tunnel] HTML: exception: {e}")
  465. raise HTTPException(
  466. status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
  467. detail=f"Failed to fetch terminal: {str(e)}"
  468. )