| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- """
- FastAPI dependencies for authentication and authorization.
- """
- from typing import Annotated
- from fastapi import Depends, HTTPException, status
- from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import get_db
- from app.core.security import verify_token
- from app.models.device import Device
- from app.models.user import User
- # HTTP Bearer security scheme
- security = HTTPBearer()
- async def get_current_user(
- credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
- db: Annotated[AsyncSession, Depends(get_db)],
- ) -> User:
- """
- Get current authenticated user from JWT token.
- Args:
- credentials: HTTP Bearer token from Authorization header
- db: Database session
- Returns:
- User model instance
- Raises:
- HTTPException: If token is invalid or user not found
- """
- # Verify token
- token = credentials.credentials
- payload = verify_token(token, expected_type="access")
- if payload is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid or expired token",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Extract user_id from token (sub is stored as string in JWT)
- user_id_str: str = payload.get("sub")
- if user_id_str is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid token payload",
- )
- try:
- user_id = int(user_id_str)
- except (ValueError, TypeError):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid user ID in token",
- )
- # Get user from database
- result = await db.execute(select(User).where(User.id == user_id))
- user = result.scalar_one_or_none()
- if user is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="User not found",
- )
- if user.status != "active":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"User account is {user.status}",
- )
- return user
- async def get_current_superadmin(
- current_user: Annotated[User, Depends(get_current_user)],
- ) -> User:
- """
- Verify that current user is a superadmin.
- Args:
- current_user: Current authenticated user
- Returns:
- User model instance
- Raises:
- HTTPException: If user is not a superadmin
- """
- if not current_user.is_superadmin:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Superadmin access required",
- )
- return current_user
- async def get_current_owner(
- current_user: Annotated[User, Depends(get_current_user)],
- ) -> User:
- """
- Verify that current user is an organization owner.
- Args:
- current_user: Current authenticated user
- Returns:
- User model instance
- Raises:
- HTTPException: If user is not an owner
- """
- if not current_user.is_owner and not current_user.is_superadmin:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Owner access required",
- )
- return current_user
- async def get_current_active_user(
- current_user: Annotated[User, Depends(get_current_user)],
- ) -> User:
- """
- Verify that current user is active and email verified.
- Args:
- current_user: Current authenticated user
- Returns:
- User model instance
- Raises:
- HTTPException: If user is not active or email not verified
- """
- if not current_user.email_verified:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Email not verified",
- )
- return current_user
- async def get_current_device(
- credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
- db: Annotated[AsyncSession, Depends(get_db)],
- ) -> Device:
- """
- Get current authenticated device from JWT token.
- Args:
- credentials: HTTP Bearer token from Authorization header
- db: Database session
- Returns:
- Device model instance
- Raises:
- HTTPException: If token is invalid or device not found
- """
- # Verify token (don't check type yet, will verify it's "device" below)
- token = credentials.credentials
- payload = verify_token(token, expected_type=None)
- if payload is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid or expired token",
- headers={"WWW-Authenticate": "Bearer"},
- )
- # Verify it's a device token
- token_type = payload.get("type")
- if token_type != "device":
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid token type, device token required",
- )
- # Extract device_id from token (sub is stored as string in JWT)
- device_id_str: str = payload.get("sub")
- if device_id_str is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid token payload",
- )
- try:
- device_id = int(device_id_str)
- except (ValueError, TypeError):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid device ID in token",
- )
- # Get device from database
- result = await db.execute(select(Device).where(Device.id == device_id))
- device = result.scalar_one_or_none()
- if device is None:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Device not found",
- )
- if device.status == "inactive":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Device is inactive",
- )
- return device
|