Using Streaming Assistants API With Websockets

I’m developing a FastAPI application where I’m using WebSockets to send text deltas from an Assistant’s API event handler in real-time. However, the tasks responsible for sending these deltas remain in the PENDING state and never run. As a result, the client doesn’t receive the expected data.

Symptoms

• WebSocket connection opens and messages are received.
• Tasks created to handle WebSocket communication (sending text deltas) are stuck in the PENDING state.
• No deltas are sent over the WebSocket to the client.
• Debug logging confirms that tasks are not running and remain pending indefinitely.

import asyncio
from typing_extensions import override
from openai import AssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Text, TextDelta
from starlette.websockets import WebSocket, WebSocketDisconnect
from logging_config import logger

class EventHandler(AssistantEventHandler):
    def __init__(self, websocket: WebSocket):
        super().__init__()
        self.websocket = websocket
        logger.info("EventHandler initialized")

    @override
    def on_event(self, event: AssistantStreamEvent) -> None:
        logger.info(f"Event received: {event.event}")
        self.introspect_tasks()

    @override
    async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
        logger.info(f"Received text delta.value: {delta.value}")
        self.introspect_tasks()
        asyncio.ensure_future(self.send_text_delta(delta.value))
        await asyncio.sleep(0)  # Yield control to ensure the task runs

    async def send_text_delta(self, value: str) -> None:
        try:
            logger.info(f"Attempting to send delta: {value}")
            await self.websocket.send_json({"type": "text_delta", "content": value})
            logger.info(f"Sent delta: {value}")
        except WebSocketDisconnect:
            logger.warning("WebSocket disconnected while sending delta")
        except Exception as e:
            logger.error(f"Error sending delta: {str(e)}")

    @override
    async def on_message_done(self, text: Text) -> None:
        logger.info("The message is now done....")
        self.introspect_tasks()
        asyncio.ensure_future(self.send_message_done())
        await asyncio.sleep(0)  # Yield control to ensure the task runs

    async def send_message_done(self) -> None:
        try:
            logger.info("Attempting to send message_done")
            await self.websocket.send_json({"type": "message_done"})
            logger.info("Sent message_done")
        except WebSocketDisconnect:
            logger.warning("WebSocket disconnected while sending message_done")
        except Exception as e:
            logger.error(f"Error sending message_done: {str(e)}")
        finally:
            await self.close_websocket()

    async def close_websocket(self) -> None:
        try:
            logger.info("Attempting to close WebSocket")
            await self.websocket.close()
            logger.info("Closed WebSocket")
        except Exception as e:
            logger.error(f"Error closing WebSocket: {str(e)}")

    def introspect_tasks(self):
        current_tasks = asyncio.all_tasks()
        for task in current_tasks:
            if not task.done():
                logger.info(f"Running task: {task.get_name()}, state: {task._state}")

Example logs:

024-07-24 16:37:12,708 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,749 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,841 - logging_config - INFO - Event received: thread.message.completed
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-9, state: PENDING
/********/envs/llms/lib/python3.10/site-packages/openai/lib/streaming/_assistants.py:337: RuntimeWarning: coroutine ‘EventHandler.on_message_done’ was never awaited
self.on_message_done(event.data)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
2024-07-24 16:37:12,939 - logging_config - INFO - Event received: thread.run.step.completed
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Event received: thread.run.completed
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,061 - logging_config - INFO - Finally close the web socket…
2024-07-24 16:37:13,062 - logging_config - INFO - Attempting to close W

What’s Been Tried

  1. Task Introspection: Added detailed introspection logging to confirm that tasks remain in the PENDING state.
  2. Ensure Future: Used asyncio.ensure_future to schedule coroutines.
  3. Yielding Control: Added await asyncio.sleep(0) to yield control and ensure tasks are run.
  4. Event Loop Issues: Ensured that there are no nested event loops causing issues.
  5. Task Scheduling: Verified that tasks are scheduled within an active event loop context.

My question is: how can I ensure that the tasks responsible for sending text deltas over the WebSocket are executed correctly and do not remain in the PENDING state? Is there some weird interaction to the OpenAI Assistant’s API via the event handler that I don’t know about?

I’ve been at this for longer than I’d care to admit at this point :smile: . Any insights or suggestions on what might be causing these tasks to remain pending and how to resolve it would be greatly appreciated!

do you have some kind of flush() function in the stack your are using? like if you need to send some text chunks to the client, you call the flush function.

I don’t think there’s a need to .flush() anything. Typically in a WebSocket implementation, you would ensure the event loop is running correctly and that you’re awaiting the WebSocket send operations directly to ensure they execute.

For instance, using await websocket.send_json(data) directly within my async functions should suffice.

Perhaps a better question here is, is there a better way for me to get the tokens to the front end other than websockets? My main constraint is that I don’t want to poll the thread directly in the client.

Why are you overriding the events in EventHandler?

As I understand it, the point of the EventHandler is to be used to adapt to custom application logic. Please feel free to correct me if I’m mistaken.

Rather than to merely print the text deltas to the terminal or log, I want to send them to my client. Currently, i’ve attempted to implement that using websockets. Therefore, the intent behind overriding those methods is to be able to send my text deltas through the websocket to my client.

Perhaps its something to do with the type of client I’m using?

The folks at chainlit have created this:

So I’m inclined to believe it’s possible.

How might I leverage the async client AsyncOpenAI with what I’m trying to accomplish here?

I’ve confirmed that the issue is due to not using the async client.

I changed my event_handler to use AsyncAssistantEventHandler:

from openai import AsyncAssistantEventHandler

In my main.py file I added an async client for the async streaming calls AsyncOpenAI.

from openai import AsyncOpenAI

Now the event handler sends the stream through my websocket like it’s supposed to :+1:

This all makes perfect sense, but I wish it was documented better. The only way I found the AsyncAssistantEventHandler was because I did a perplexity pro search which pulled up the chainlit demo.

It need to be way easier to find stuff like this, and ideally through official channels like the OpenAI docs.

1 Like

I’ll add that my question about overrides was misguided. The @override decorator threw me off. Apparently it’s new in python 3.12, and will warn you if you slightly misspell something you are trying to override.

I’m going to try your async solution real quick before I move on. I had ended up solving my streaming problem differently, forgoing the EventHandler entirely:

def generate():
   with client.beta.threads.runs.stream(
         thread_id = thread_id,
         assistant_id = assistant_id
      ) as stream:
         for event in stream: 
            # use event.data.object instead of event.type, their docs are bugged
            # https://github.com/openai/openai-python/issues/1334
            if event.data.object == 'thread.message.delta' and event.data.delta.content:
               text = event.data.delta.content[0].text.value
               yield text

But I am concerned that this approach may break in the future, as the sample code provided by OpenAI is incorrect: GitHub - openai/openai-python: The official Python library for the OpenAI API

1 Like