| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- """
- Security utilities: JWT tokens, password hashing.
- """
- from datetime import datetime, timedelta, timezone
- from typing import Any
- import bcrypt
- from jose import JWTError, jwt
- from app.config import settings
- def create_access_token(data: dict[str, Any]) -> str:
- """
- Create JWT access token.
- Args:
- data: Payload to encode (typically {"sub": user_id})
- Returns:
- Encoded JWT token string
- """
- to_encode = data.copy()
- # Convert sub to string if it's an int (JWT standard requires string)
- if "sub" in to_encode and isinstance(to_encode["sub"], int):
- to_encode["sub"] = str(to_encode["sub"])
- expire = datetime.now(timezone.utc) + timedelta(
- minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
- )
- # Don't override type if already provided (e.g., "device" token)
- if "type" not in to_encode:
- to_encode["type"] = "access"
- to_encode["exp"] = expire
- return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
- def create_refresh_token(data: dict[str, Any]) -> str:
- """
- Create JWT refresh token.
- Args:
- data: Payload to encode (typically {"sub": user_id})
- Returns:
- Encoded JWT token string
- """
- to_encode = data.copy()
- # Convert sub to string if it's an int (JWT standard requires string)
- if "sub" in to_encode and isinstance(to_encode["sub"], int):
- to_encode["sub"] = str(to_encode["sub"])
- expire = datetime.now(timezone.utc) + timedelta(
- days=settings.REFRESH_TOKEN_EXPIRE_DAYS
- )
- to_encode.update({"exp": expire, "type": "refresh"})
- return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
- def verify_token(token: str, expected_type: str | None = "access") -> dict[str, Any] | None:
- """
- Verify and decode JWT token.
- Args:
- token: JWT token string
- expected_type: Expected token type ("access", "refresh", "device", etc.) or None to skip type check
- Returns:
- Decoded payload if valid, None otherwise
- """
- try:
- payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- # Only check type if expected_type is specified
- if expected_type is not None:
- token_type: str = payload.get("type")
- if token_type != expected_type:
- return None
- return payload
- except JWTError:
- return None
- def verify_password(plain_password: str, hashed_password: str) -> bool:
- """
- Verify a plain password against a hashed password.
- Args:
- plain_password: Plain text password
- hashed_password: Hashed password from database
- Returns:
- True if password matches, False otherwise
- """
- return bcrypt.checkpw(
- plain_password.encode("utf-8"), hashed_password.encode("utf-8")
- )
- def hash_password(password: str) -> str:
- """
- Hash a password using bcrypt.
- Args:
- password: Plain text password
- Returns:
- Hashed password
- """
- salt = bcrypt.gensalt()
- hashed = bcrypt.hashpw(password.encode("utf-8"), salt)
- return hashed.decode("utf-8")
|