34 lines
1.0 KiB
Python
34 lines
1.0 KiB
Python
import unittest
|
|
import asyncio
|
|
import websockets
|
|
import requests
|
|
from time import sleep
|
|
|
|
def send_post_request():
|
|
url = "http://localhost:8000/example"
|
|
data = {"content": "Hello, WebSocket!"}
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
response = requests.post(url, json=data, headers=headers)
|
|
print(f"Status Code: {response.status_code}")
|
|
print(f"Response: {response.json()}")
|
|
class MyTestCase(unittest.TestCase):
|
|
def test_something(self):
|
|
async def test_client():
|
|
uri = "ws://localhost:8000/event"
|
|
async with websockets.connect(uri) as websocket:
|
|
while True:
|
|
try:
|
|
message = await websocket.recv()
|
|
self.assertGreater(len(message), 0)
|
|
except websockets.exceptions.ConnectionClosed:
|
|
raise Exception("Connection closed")
|
|
|
|
asyncio.run(test_client())
|
|
sleep(1)
|
|
send_post_request()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|