Hey guys, I got a uni project I am working on and I cant get the streaming to work properly with the tools/functions provided. I have no idea why at this point, went through the docs over and over and simply dont understand how to actually work with assistant streaming. Maybe its just me but the documentation provided isnt really helpful and gpt4 also returns pure nonesene.
Basically what I am trying to do is to create a quart app, that can receive a user message from a front end and reply accordingly, using the sample weather function e.g.
This is what I got so far. I suspect the error lies somewhere in the submit tools output function, or the run_stream one.
import openai
from openai.lib.streaming import AsyncAssistantEventHandler
from typing_extensions import override
from quart import Quart, websocket, request
import asyncio
import json
import logging
# Konfigurieren des Loggings
logging.basicConfig(level=logging.DEBUG)
app = Quart(__name__)
# Simulierte Wetter-API-Funktion
async def get_weather(location, unit='c'):
await asyncio.sleep(1) # Simulierte Verzögerung
logging.debug(f"Getting weather for {location} in {unit} units")
weather_data = {
"location": location,
"temperature": "22°C" if unit == 'c' else "71.6°F",
"condition": "Sunny"
}
return weather_data
# Initialisieren Sie den asynchronen OpenAI-Client
client = openai.AsyncOpenAI()
# Definieren Sie den asynchronen EventHandler fĂŒr das Streaming
class AsyncWeatherEventHandler(AsyncAssistantEventHandler):
def __init__(self, websocket, thread_id, assistant_id):
super().__init__()
self.websocket = websocket
self.thread_id = thread_id
self.assistant_id = assistant_id
@override
async def on_text_created(self, text) -> None:
logging.debug(f"Text created: {text.value}")
await self.websocket.send_json({'type': 'text_created', 'data': text.value})
@override
async def on_text_delta(self, delta, snapshot):
logging.debug(f"Text delta: {delta.value}")
await self.websocket.send_json({'type': 'text_delta', 'data': delta.value})
@override
async def on_event(self, event):
logging.debug(f"Received event: {event.event}")
if event.event == 'thread.run.requires_action':
run_id = event.data.id # Retrieve the run ID from the event data
await self.handle_requires_action(event.data, run_id)
async def handle_requires_action(self, data, run_id):
tool_outputs = []
logging.debug(f"Handling requires action for run {run_id}")
for tool in data.required_action.submit_tool_outputs.tool_calls:
logging.debug(f"Tool call: {tool.function.name} with parameters: {tool.parameters}")
if tool.function.name == "get_weather":
temperature = await get_weather(tool.parameters['location'], tool.parameters.get('unit', 'c'))
logging.debug(f"Temperature result: {temperature}")
tool_outputs.append({"tool_call_id": tool.id, "output": temperature['temperature']})
await self.submit_tool_outputs(tool_outputs, run_id)
async def submit_tool_outputs(self, tool_outputs, run_id):
logging.debug(f"Submitting tool outputs: {tool_outputs}")
async with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=self.thread_id,
run_id=run_id,
tool_outputs=tool_outputs,
event_handler=AsyncWeatherEventHandler(self.websocket, self.thread_id, self.assistant_id)
) as stream:
async for text in stream.text_deltas:
await self.websocket.send_json({'type': 'text_delta', 'data': text.value})
@app.route('/start_stream', methods=['POST'])
async def start_stream():
user_message = (await request.get_json()).get('message', 'What is the weather in New York, NY?')
logging.debug(f"Received user message: {user_message}")
# Erstellen Sie einen neuen Thread fĂŒr das GesprĂ€ch
thread = await client.beta.threads.create()
logging.debug(f"Created new thread: {thread.id}")
# FĂŒgen Sie dem Thread eine Nachricht hinzu
await client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=user_message
)
logging.debug(f"Added message to thread {thread.id}")
return {'status': 'stream ready', 'thread_id': thread.id, 'assistant_id': 'asst_YN2wdPk8BQd1Z8SU0VE5jFX1'}
@app.websocket('/ws')
async def ws():
logging.debug("WebSocket connected")
await websocket.send_json({'message': 'WebSocket connected'})
while True:
message = await websocket.receive()
data = json.loads(message)
logging.debug(f"Received WebSocket message: {data}")
if 'thread_id' in data and 'assistant_id' in data:
thread_id = data['thread_id']
assistant_id = data['assistant_id']
async def run_stream():
logging.debug(f"Starting stream for thread {thread_id} with assistant {assistant_id}")
async with client.beta.threads.runs.s(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=AsyncWeatherEventHandler(websocket, thread_id, assistant_id)
) as stream:
await stream.until_done()
await run_stream()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8282)
And it crashes with that error
âŠ/chat_backend/venv/lib/python3.12/site-packages/openai/_base_client.py", line 1584, in _request
raise self._make_status_error_from_response(err.response) from None
openai.BadRequestError: Error code: 400 - {âerrorâ: {âmessageâ: âThread thread_XYZXYZXYZ already has an active run run_XYZXYZXYZ.â, âtypeâ: âinvalid_request_errorâ, âparamâ: None, âcodeâ: None}}