# -*- coding: utf-8 -*- # @Time : 2024/11/19 下午8:05 # @Author : 河瞬 # @FileName: manage_user.py # @Software: PyCharm from fastapi import HTTPException, APIRouter, Depends, Request from sqlmodel import select from dependencies import SessionDep, get_current_user from models import User router = APIRouter() # 枚举成员 @router.get("/api/s1/user") async def list_users(request: Request, session: SessionDep, current_user: User = Depends(get_current_user)): if current_user.role != 1: raise HTTPException(status_code=403, detail="Only admin users can list users") users = session.exec(select(User).where(User.tenant_id == current_user.tenant_id)).all() user_list = [{"username": user.username, "role": user.role} for user in users] return user_list # 新增和修改成员 @router.post("/api/s1/user") async def add_or_update_user(data: dict, session: SessionDep, current_user: User = Depends(get_current_user)): if current_user.role != 1: raise HTTPException(status_code=403, detail="Only admin users can add or update users") username = data.get("username") password = data.get("password") role = data.get("role") if role not in ["auditor", "estimator"]: raise HTTPException(status_code=400, detail="Invalid role") role = 2 if role == "estimator" else 3 if not username or not role: raise HTTPException(status_code=400, detail="Username and role are required") user = session.exec(select(User).where(User.username == username, User.tenant_id == current_user.tenant_id)).first() if user: if password and password != "": user.password = password user.role = role session.add(user) session.commit() return {"detail": "User updated successfully"} else: if password == "": raise HTTPException(status_code=400, detail="Password is required for new user") new_user = User(username=username, password=password, role=role, tenant_id=current_user.tenant_id) session.add(new_user) session.commit() return {"detail": "User added successfully"} # 删除成员 @router.delete("/api/s1/user") async def delete_user(username: str, session: SessionDep, current_user: User = Depends(get_current_user)): if current_user.role != 1: raise HTTPException(status_code=403, detail="Only admin users can delete users") # username = data.get("username") if not username: raise HTTPException(status_code=422, detail="Username is required") user = session.exec(select(User).where(User.username == username, User.tenant_id == current_user.tenant_id)).first() if not user: raise HTTPException(status_code=404, detail="User not found") session.delete(user) session.commit() return {"detail": "User deleted successfully"}