security.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. """
  2. Security utilities: JWT tokens, password hashing.
  3. """
  4. from datetime import datetime, timedelta, timezone
  5. from typing import Any
  6. import bcrypt
  7. from jose import JWTError, jwt
  8. from app.config import settings
  9. def create_access_token(data: dict[str, Any]) -> str:
  10. """
  11. Create JWT access token.
  12. Args:
  13. data: Payload to encode (typically {"sub": user_id})
  14. Returns:
  15. Encoded JWT token string
  16. """
  17. to_encode = data.copy()
  18. # Convert sub to string if it's an int (JWT standard requires string)
  19. if "sub" in to_encode and isinstance(to_encode["sub"], int):
  20. to_encode["sub"] = str(to_encode["sub"])
  21. expire = datetime.now(timezone.utc) + timedelta(
  22. minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
  23. )
  24. # Don't override type if already provided (e.g., "device" token)
  25. if "type" not in to_encode:
  26. to_encode["type"] = "access"
  27. to_encode["exp"] = expire
  28. return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  29. def create_refresh_token(data: dict[str, Any]) -> str:
  30. """
  31. Create JWT refresh token.
  32. Args:
  33. data: Payload to encode (typically {"sub": user_id})
  34. Returns:
  35. Encoded JWT token string
  36. """
  37. to_encode = data.copy()
  38. # Convert sub to string if it's an int (JWT standard requires string)
  39. if "sub" in to_encode and isinstance(to_encode["sub"], int):
  40. to_encode["sub"] = str(to_encode["sub"])
  41. expire = datetime.now(timezone.utc) + timedelta(
  42. days=settings.REFRESH_TOKEN_EXPIRE_DAYS
  43. )
  44. to_encode.update({"exp": expire, "type": "refresh"})
  45. return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  46. def verify_token(token: str, expected_type: str | None = "access") -> dict[str, Any] | None:
  47. """
  48. Verify and decode JWT token.
  49. Args:
  50. token: JWT token string
  51. expected_type: Expected token type ("access", "refresh", "device", etc.) or None to skip type check
  52. Returns:
  53. Decoded payload if valid, None otherwise
  54. """
  55. try:
  56. payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  57. # Only check type if expected_type is specified
  58. if expected_type is not None:
  59. token_type: str = payload.get("type")
  60. if token_type != expected_type:
  61. return None
  62. return payload
  63. except JWTError:
  64. return None
  65. def verify_password(plain_password: str, hashed_password: str) -> bool:
  66. """
  67. Verify a plain password against a hashed password.
  68. Args:
  69. plain_password: Plain text password
  70. hashed_password: Hashed password from database
  71. Returns:
  72. True if password matches, False otherwise
  73. """
  74. return bcrypt.checkpw(
  75. plain_password.encode("utf-8"), hashed_password.encode("utf-8")
  76. )
  77. def hash_password(password: str) -> str:
  78. """
  79. Hash a password using bcrypt.
  80. Args:
  81. password: Plain text password
  82. Returns:
  83. Hashed password
  84. """
  85. salt = bcrypt.gensalt()
  86. hashed = bcrypt.hashpw(password.encode("utf-8"), salt)
  87. return hashed.decode("utf-8")