Help with Streaming Function Calls

Hey guys, I got a uni project I am working on and I cant get the streaming to work properly with the tools/functions provided. I have no idea why at this point, went through the docs over and over and simply dont understand how to actually work with assistant streaming. Maybe its just me but the documentation provided isnt really helpful and gpt4 also returns pure nonesene.

Basically what I am trying to do is to create a quart app, that can receive a user message from a front end and reply accordingly, using the sample weather function e.g.

This is what I got so far. I suspect the error lies somewhere in the submit tools output function, or the run_stream one.

import openai
from openai.lib.streaming import AsyncAssistantEventHandler
from typing_extensions import override
from quart import Quart, websocket, request
import asyncio
import json
import logging

# Konfigurieren des Loggings
logging.basicConfig(level=logging.DEBUG)

app = Quart(__name__)

# Simulierte Wetter-API-Funktion
async def get_weather(location, unit='c'):
    await asyncio.sleep(1)  # Simulierte Verzögerung
    logging.debug(f"Getting weather for {location} in {unit} units")
    weather_data = {
        "location": location,
        "temperature": "22°C" if unit == 'c' else "71.6°F",
        "condition": "Sunny"
    }
    return weather_data

# Initialisieren Sie den asynchronen OpenAI-Client

client = openai.AsyncOpenAI()

# Definieren Sie den asynchronen EventHandler für das Streaming
class AsyncWeatherEventHandler(AsyncAssistantEventHandler):
    def __init__(self, websocket, thread_id, assistant_id):
        super().__init__()
        self.websocket = websocket
        self.thread_id = thread_id
        self.assistant_id = assistant_id

    @override
    async def on_text_created(self, text) -> None:
        logging.debug(f"Text created: {text.value}")
        await self.websocket.send_json({'type': 'text_created', 'data': text.value})

    @override
    async def on_text_delta(self, delta, snapshot):
        logging.debug(f"Text delta: {delta.value}")
        await self.websocket.send_json({'type': 'text_delta', 'data': delta.value})

    @override
    async def on_event(self, event):
        logging.debug(f"Received event: {event.event}")
        if event.event == 'thread.run.requires_action':
            run_id = event.data.id  # Retrieve the run ID from the event data
            await self.handle_requires_action(event.data, run_id)

    async def handle_requires_action(self, data, run_id):
        tool_outputs = []
        logging.debug(f"Handling requires action for run {run_id}")

        for tool in data.required_action.submit_tool_outputs.tool_calls:
            logging.debug(f"Tool call: {tool.function.name} with parameters: {tool.parameters}")
            if tool.function.name == "get_weather":
                temperature = await get_weather(tool.parameters['location'], tool.parameters.get('unit', 'c'))
                logging.debug(f"Temperature result: {temperature}")
                tool_outputs.append({"tool_call_id": tool.id, "output": temperature['temperature']})

        await self.submit_tool_outputs(tool_outputs, run_id)

    async def submit_tool_outputs(self, tool_outputs, run_id):
        logging.debug(f"Submitting tool outputs: {tool_outputs}")
        async with client.beta.threads.runs.submit_tool_outputs_stream(
                thread_id=self.thread_id,
                run_id=run_id,
                tool_outputs=tool_outputs,
                event_handler=AsyncWeatherEventHandler(self.websocket, self.thread_id, self.assistant_id)
        ) as stream:
            async for text in stream.text_deltas:
                await self.websocket.send_json({'type': 'text_delta', 'data': text.value})

@app.route('/start_stream', methods=['POST'])
async def start_stream():
    user_message = (await request.get_json()).get('message', 'What is the weather in New York, NY?')
    logging.debug(f"Received user message: {user_message}")

    # Erstellen Sie einen neuen Thread für das Gespräch
    thread = await client.beta.threads.create()
    logging.debug(f"Created new thread: {thread.id}")

    # Fügen Sie dem Thread eine Nachricht hinzu
    await client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user",
        content=user_message
    )
    logging.debug(f"Added message to thread {thread.id}")

    return {'status': 'stream ready', 'thread_id': thread.id, 'assistant_id': 'asst_YN2wdPk8BQd1Z8SU0VE5jFX1'}

@app.websocket('/ws')
async def ws():
    logging.debug("WebSocket connected")
    await websocket.send_json({'message': 'WebSocket connected'})
    while True:
        message = await websocket.receive()
        data = json.loads(message)
        logging.debug(f"Received WebSocket message: {data}")
        if 'thread_id' in data and 'assistant_id' in data:
            thread_id = data['thread_id']
            assistant_id = data['assistant_id']
            async def run_stream():
                logging.debug(f"Starting stream for thread {thread_id} with assistant {assistant_id}")
                async with client.beta.threads.runs.s(
                        thread_id=thread_id,
                        assistant_id=assistant_id,
                        event_handler=AsyncWeatherEventHandler(websocket, thread_id, assistant_id)
                ) as stream:
                    await stream.until_done()
            await run_stream()

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8282)

And it crashes with that error
…/chat_backend/venv/lib/python3.12/site-packages/openai/_base_client.py", line 1584, in _request
raise self._make_status_error_from_response(err.response) from None
openai.BadRequestError: Error code: 400 - {‘error’: {‘message’: ‘Thread thread_XYZXYZXYZ already has an active run run_XYZXYZXYZ.’, ‘type’: ‘invalid_request_error’, ‘param’: None, ‘code’: None}}

It looks like you’ve dug too deep into the the openai library, and are using methods for openai’s own helpers, as a start.

Preparing the client for use is as simple as:

import openai
import asyncio
aclient = openai.AsyncOpenAI()

Without digging into comprehending your code fully, I think you need to decide clearly between openai or forming your own API requests for websockets. But we’ll go with it’s working up to a point.

It would seem from the error received, you are not in fact sending to submit_tool_outputs path when required, as sending the run ID there doesn’t get you an “already active” error.

The basic assistants flow is:

  • send input
  • while (init_run or tool_outputs in response) and not crazy_assistant():
    – capture stream
    – display text deltas, assemble and run functions
    – submit tool outputs

I’ve gone into urlllib3 before and made it log everything that gets sent, so you can see exactly the API interface, if the logging isn’t telling you what’s going on. You could do the same at the network exit point before https SSL.

1 Like