# -*- coding: utf-8 -*- # @Time : 2024/10/26 下午3:35 # @Author : 河瞬 # @FileName: ConnectionManager.py # @Software: PyCharm # @Github : from typing import List from fastapi import WebSocket # WebSocket连接管理器 class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) @staticmethod async def send_personal_message(message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast_json(self, data: dict): for connection in self.active_connections: await connection.send_json(data)