from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Response, Depends, APIRouter from typing import Optional, Annotated from datetime import datetime, timedelta from jose import JWTError, jwt from sqlmodel import Session, select from database import create_db_and_tables, engine from models import Tenant, User, Project # 用于生成和验证JWT的密钥 SECRET_KEY = "your_secret_key" ALGORITHM = "HS256" # @app.on_event("startup") # def on_startup(): # create_db_and_tables() @asynccontextmanager async def lifespan(app: FastAPI): create_db_and_tables() yield def get_session(): with Session(engine) as session: yield session # 生成JWT token def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt app = FastAPI(lifespan=lifespan) SessionDep = Annotated[Session, Depends(get_session)] # 登录路由 @app.post("/api/s1/login") async def login(response: Response, user_data: dict, session: SessionDep): # 查询用户 user = session.exec(select(User).where(User.username == user_data['username'])).first() # 验证用户名和密码 if not user or user.password != user_data['password']: raise HTTPException(status_code=401, detail="Login failed") # 生成JWT token token = create_access_token(data={"id": user.id, "role": user.role, "tanant_id": user.tenant.id}) # 设置cookie response.set_cookie(key="session_token", value=token, httponly=True) # 关闭数据库会话 session.close() return {"message": f"Login successful"}