77 lines
2.8 KiB
Python
77 lines
2.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# @Time : 2024/11/19 下午8:05
|
|
# @Author : 河瞬
|
|
# @FileName: manage_user.py
|
|
# @Software: PyCharm
|
|
from fastapi import HTTPException, APIRouter, Depends, Request
|
|
from sqlmodel import select
|
|
|
|
from dependencies import SessionDep, get_current_user
|
|
from models import User
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# 枚举成员
|
|
@router.get("/api/s1/user")
|
|
async def list_users(request: Request, session: SessionDep, current_user: User = Depends(get_current_user)):
|
|
if current_user.role != 1:
|
|
raise HTTPException(status_code=403, detail="Only admin users can list users")
|
|
|
|
users = session.exec(select(User).where(User.tenant_id == current_user.tenant_id)).all()
|
|
user_list = [{"username": user.username, "role": user.role} for user in users]
|
|
return user_list
|
|
|
|
|
|
# 新增和修改成员
|
|
@router.post("/api/s1/user")
|
|
async def add_or_update_user(data: dict, session: SessionDep, current_user: User = Depends(get_current_user)):
|
|
if current_user.role != 1:
|
|
raise HTTPException(status_code=403, detail="Only admin users can add or update users")
|
|
|
|
username = data.get("username")
|
|
password = data.get("password")
|
|
role = data.get("role")
|
|
if role not in ["auditor", "estimator"]:
|
|
raise HTTPException(status_code=400, detail="Invalid role")
|
|
role = 2 if role == "estimator" else 3
|
|
|
|
if not username or not role:
|
|
raise HTTPException(status_code=400, detail="Username and role are required")
|
|
|
|
user = session.exec(select(User).where(User.username == username, User.tenant_id == current_user.tenant_id)).first()
|
|
|
|
if user:
|
|
if password and password != "":
|
|
user.password = password
|
|
user.role = role
|
|
session.add(user)
|
|
session.commit()
|
|
return {"detail": "User updated successfully"}
|
|
else:
|
|
if password == "":
|
|
raise HTTPException(status_code=400, detail="Password is required for new user")
|
|
new_user = User(username=username, password=password, role=role, tenant_id=current_user.tenant_id)
|
|
session.add(new_user)
|
|
session.commit()
|
|
return {"detail": "User added successfully"}
|
|
|
|
|
|
# 删除成员
|
|
@router.delete("/api/s1/user")
|
|
async def delete_user(username: str, session: SessionDep, current_user: User = Depends(get_current_user)):
|
|
if current_user.role != 1:
|
|
raise HTTPException(status_code=403, detail="Only admin users can delete users")
|
|
|
|
# username = data.get("username")
|
|
if not username:
|
|
raise HTTPException(status_code=422, detail="Username is required")
|
|
|
|
user = session.exec(select(User).where(User.username == username, User.tenant_id == current_user.tenant_id)).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
session.delete(user)
|
|
session.commit()
|
|
return {"detail": "User deleted successfully"}
|