| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- """
- Device registration endpoint.
- """
- import secrets
- from base64 import b64encode
- from datetime import datetime, timezone
- from typing import Annotated
- from fastapi import APIRouter, Depends, HTTPException, status
- from pydantic import BaseModel
- from sqlalchemy import select, update
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import get_db
- from app.models.device import Device
- from app.models.settings import Settings
- router = APIRouter()
- class RegistrationRequest(BaseModel):
- """Device registration request."""
- device_id: str # MAC address
- ssh_public_key: str | None = None
- class RegistrationResponse(BaseModel):
- """Device registration response."""
- device_token: str
- device_password: str
- def _generate_token() -> str:
- """Generate 32-byte base64 token."""
- return b64encode(secrets.token_bytes(32)).decode("ascii")
- def _generate_password() -> str:
- """Generate 8-digit password."""
- n = secrets.randbelow(10**8)
- return f"{n:08d}"
- @router.post("/registration", response_model=RegistrationResponse, status_code=201)
- async def register_device(
- data: RegistrationRequest,
- db: Annotated[AsyncSession, Depends(get_db)],
- ):
- """
- Register new device or return existing credentials.
- Requires auto_registration to be enabled in settings.
- """
- mac_address = data.device_id.lower().strip()
- if not mac_address:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Missing device_id",
- )
- # Check if device already exists
- result = await db.execute(select(Device).where(Device.mac_address == mac_address))
- device = result.scalar_one_or_none()
- if device:
- # Return existing credentials
- if not device.device_token or not device.device_password:
- # Re-generate if missing
- device.device_token = _generate_token()
- device.device_password = _generate_password()
- await db.commit()
- return RegistrationResponse(
- device_token=device.device_token,
- device_password=device.device_password,
- )
- # Check auto-registration setting
- settings_result = await db.execute(
- select(Settings).where(Settings.key == "auto_registration")
- )
- auto_reg_setting = settings_result.scalar_one_or_none()
- if not auto_reg_setting or not auto_reg_setting.value.get("enabled", False):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Registration disabled. Contact administrator.",
- )
- # Create new device
- device = Device(
- mac_address=mac_address,
- organization_id=None, # Unassigned
- status="online",
- config={"ssh_public_key": data.ssh_public_key} if data.ssh_public_key else {},
- device_token=_generate_token(),
- device_password=_generate_password(),
- )
- db.add(device)
- await db.flush()
- # Update last_device_at
- auto_reg_setting.value["last_device_at"] = datetime.now(timezone.utc).isoformat()
- await db.commit()
- await db.refresh(device)
- print(f"[REGISTRATION] device={mac_address} simple_id={device.simple_id}")
- return RegistrationResponse(
- device_token=device.device_token,
- device_password=device.device_password,
- )
|