""" User management service. """ from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import hash_password from app.models.user import User from app.schemas.user import UserCreate, UserUpdate async def create_user( db: AsyncSession, data: UserCreate, ) -> User: """ Create a new user. Args: db: Database session data: User creation data Returns: Created user """ # Hash password hashed_password = hash_password(data.password) user = User( email=data.email, hashed_password=hashed_password, full_name=data.full_name, phone=data.phone, role=data.role, organization_id=data.organization_id, status="active", # Superadmin creates active users email_verified=True, # Auto-verify for superadmin-created users ) db.add(user) await db.commit() await db.refresh(user) return user async def get_user(db: AsyncSession, user_id: int) -> User | None: """ Get user by ID. Args: db: Database session user_id: User ID Returns: User or None """ result = await db.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none() async def get_user_by_email(db: AsyncSession, email: str) -> User | None: """ Get user by email. Args: db: Database session email: User email Returns: User or None """ result = await db.execute(select(User).where(User.email == email)) return result.scalar_one_or_none() async def list_users( db: AsyncSession, skip: int = 0, limit: int = 100, organization_id: int | None = None, role: str | None = None, status: str | None = None, ) -> tuple[list[User], int]: """ List users with pagination and filters. Args: db: Database session skip: Number of records to skip limit: Maximum number of records to return organization_id: Filter by organization (optional) role: Filter by role (optional) status: Filter by status (optional) Returns: Tuple of (users list, total count) """ # Build query query = select(User) if organization_id is not None: query = query.where(User.organization_id == organization_id) if role: query = query.where(User.role == role) if status: query = query.where(User.status == status) # Get total count count_query = select(func.count()).select_from(User) if organization_id is not None: count_query = count_query.where(User.organization_id == organization_id) if role: count_query = count_query.where(User.role == role) if status: count_query = count_query.where(User.status == status) total_result = await db.execute(count_query) total = total_result.scalar_one() # Get paginated results query = query.offset(skip).limit(limit).order_by(User.created_at.desc()) result = await db.execute(query) users = list(result.scalars().all()) return users, total async def update_user( db: AsyncSession, user_id: int, data: UserUpdate, ) -> User | None: """ Update user. Args: db: Database session user_id: User ID data: Update data Returns: Updated user or None if not found """ result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: return None # Update fields update_data = data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(user, field, value) await db.commit() await db.refresh(user) return user async def delete_user( db: AsyncSession, user_id: int, ) -> bool: """ Delete user. Args: db: Database session user_id: User ID Returns: True if deleted, False if not found """ result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: return False await db.delete(user) await db.commit() return True async def change_user_password( db: AsyncSession, user_id: int, new_password: str, ) -> User | None: """ Change user password. Args: db: Database session user_id: User ID new_password: New password (plain text) Returns: Updated user or None if not found """ result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: return None user.hashed_password = hash_password(new_password) await db.commit() await db.refresh(user) return user