""" FastAPI dependencies for authentication and authorization. """ from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.core.security import verify_token from app.models.device import Device from app.models.user import User # HTTP Bearer security scheme security = HTTPBearer() async def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], db: Annotated[AsyncSession, Depends(get_db)], ) -> User: """ Get current authenticated user from JWT token. Args: credentials: HTTP Bearer token from Authorization header db: Database session Returns: User model instance Raises: HTTPException: If token is invalid or user not found """ # Verify token token = credentials.credentials payload = verify_token(token, expected_type="access") if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) # Extract user_id from token (sub is stored as string in JWT) user_id_str: str = payload.get("sub") if user_id_str is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload", ) try: user_id = int(user_id_str) except (ValueError, TypeError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid user ID in token", ) # Get user from database result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", ) if user.status != "active": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"User account is {user.status}", ) return user async def get_current_superadmin( current_user: Annotated[User, Depends(get_current_user)], ) -> User: """ Verify that current user is a superadmin. Args: current_user: Current authenticated user Returns: User model instance Raises: HTTPException: If user is not a superadmin """ if not current_user.is_superadmin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Superadmin access required", ) return current_user async def get_current_owner( current_user: Annotated[User, Depends(get_current_user)], ) -> User: """ Verify that current user is an organization owner. Args: current_user: Current authenticated user Returns: User model instance Raises: HTTPException: If user is not an owner """ if not current_user.is_owner and not current_user.is_superadmin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Owner access required", ) return current_user async def get_current_active_user( current_user: Annotated[User, Depends(get_current_user)], ) -> User: """ Verify that current user is active and email verified. Args: current_user: Current authenticated user Returns: User model instance Raises: HTTPException: If user is not active or email not verified """ if not current_user.email_verified: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Email not verified", ) return current_user async def get_current_device( credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], db: Annotated[AsyncSession, Depends(get_db)], ) -> Device: """ Get current authenticated device from JWT token. Args: credentials: HTTP Bearer token from Authorization header db: Database session Returns: Device model instance Raises: HTTPException: If token is invalid or device not found """ # Verify token (don't check type yet, will verify it's "device" below) token = credentials.credentials payload = verify_token(token, expected_type=None) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) # Verify it's a device token token_type = payload.get("type") if token_type != "device": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type, device token required", ) # Extract device_id from token (sub is stored as string in JWT) device_id_str: str = payload.get("sub") if device_id_str is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload", ) try: device_id = int(device_id_str) except (ValueError, TypeError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid device ID in token", ) # Get device from database result = await db.execute(select(Device).where(Device.id == device_id)) device = result.scalar_one_or_none() if device is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Device not found", ) if device.status == "inactive": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Device is inactive", ) return device