""" SSH keys management utilities. """ import subprocess from sqlalchemy import select from app.core.database import async_session_maker from app.models.device import Device async def sync_authorized_keys(): """ Sync all device SSH keys to /home/tunnel/.ssh/authorized_keys This should be called: - After device registration (to add new key) - After device deletion (to remove key) """ try: async with async_session_maker() as session: result = await session.execute(select(Device)) devices = result.scalars().all() keys = [] # Security restrictions for tunnel keys: # - no-pty: no terminal allocation (prevents shell) # - no-X11-forwarding: no X11 # - no-agent-forwarding: no SSH agent # - permitopen: only allow forwarding to localhost ports 22 and 80 restrictions = ( 'no-pty,' 'no-X11-forwarding,' 'no-agent-forwarding,' 'permitopen="localhost:22",' 'permitopen="localhost:80"' ) for device in devices: if device.config and 'ssh_public_key' in device.config: ssh_key = device.config['ssh_public_key'].strip() if ssh_key: # Format: restrictions key comment keys.append(f"{restrictions} {ssh_key} # device:{device.mac_address}") authorized_keys_content = "\n".join(keys) + "\n" if keys else "" # Write using sudo subprocess.run( ["sudo", "tee", "/home/tunnel/.ssh/authorized_keys"], input=authorized_keys_content.encode(), stdout=subprocess.DEVNULL, check=True ) subprocess.run( ["sudo", "chmod", "600", "/home/tunnel/.ssh/authorized_keys"], check=True ) subprocess.run( ["sudo", "chown", "tunnel:tunnel", "/home/tunnel/.ssh/authorized_keys"], check=True ) print(f"[SSH] Synced {len(keys)} keys to authorized_keys") except Exception as e: print(f"[SSH] Failed to sync authorized_keys: {e}")