I’m developing a FastAPI application where I’m using WebSockets to send text deltas from an Assistant’s API event handler in real-time. However, the tasks responsible for sending these deltas remain in the PENDING state and never run. As a result, the client doesn’t receive the expected data.
Symptoms
• WebSocket connection opens and messages are received.
• Tasks created to handle WebSocket communication (sending text deltas) are stuck in the PENDING state.
• No deltas are sent over the WebSocket to the client.
• Debug logging confirms that tasks are not running and remain pending indefinitely.
import asyncio
from typing_extensions import override
from openai import AssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Text, TextDelta
from starlette.websockets import WebSocket, WebSocketDisconnect
from logging_config import logger
class EventHandler(AssistantEventHandler):
def __init__(self, websocket: WebSocket):
super().__init__()
self.websocket = websocket
logger.info("EventHandler initialized")
@override
def on_event(self, event: AssistantStreamEvent) -> None:
logger.info(f"Event received: {event.event}")
self.introspect_tasks()
@override
async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
logger.info(f"Received text delta.value: {delta.value}")
self.introspect_tasks()
asyncio.ensure_future(self.send_text_delta(delta.value))
await asyncio.sleep(0) # Yield control to ensure the task runs
async def send_text_delta(self, value: str) -> None:
try:
logger.info(f"Attempting to send delta: {value}")
await self.websocket.send_json({"type": "text_delta", "content": value})
logger.info(f"Sent delta: {value}")
except WebSocketDisconnect:
logger.warning("WebSocket disconnected while sending delta")
except Exception as e:
logger.error(f"Error sending delta: {str(e)}")
@override
async def on_message_done(self, text: Text) -> None:
logger.info("The message is now done....")
self.introspect_tasks()
asyncio.ensure_future(self.send_message_done())
await asyncio.sleep(0) # Yield control to ensure the task runs
async def send_message_done(self) -> None:
try:
logger.info("Attempting to send message_done")
await self.websocket.send_json({"type": "message_done"})
logger.info("Sent message_done")
except WebSocketDisconnect:
logger.warning("WebSocket disconnected while sending message_done")
except Exception as e:
logger.error(f"Error sending message_done: {str(e)}")
finally:
await self.close_websocket()
async def close_websocket(self) -> None:
try:
logger.info("Attempting to close WebSocket")
await self.websocket.close()
logger.info("Closed WebSocket")
except Exception as e:
logger.error(f"Error closing WebSocket: {str(e)}")
def introspect_tasks(self):
current_tasks = asyncio.all_tasks()
for task in current_tasks:
if not task.done():
logger.info(f"Running task: {task.get_name()}, state: {task._state}")
Example logs:
024-07-24 16:37:12,708 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,749 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,841 - logging_config - INFO - Event received: thread.message.completed
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-9, state: PENDING
/********/envs/llms/lib/python3.10/site-packages/openai/lib/streaming/_assistants.py:337: RuntimeWarning: coroutine ‘EventHandler.on_message_done’ was never awaited
self.on_message_done(event.data)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
2024-07-24 16:37:12,939 - logging_config - INFO - Event received: thread.run.step.completed
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Event received: thread.run.completed
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,061 - logging_config - INFO - Finally close the web socket…
2024-07-24 16:37:13,062 - logging_config - INFO - Attempting to close W
What’s Been Tried
- Task Introspection: Added detailed introspection logging to confirm that tasks remain in the PENDING state.
- Ensure Future: Used asyncio.ensure_future to schedule coroutines.
- Yielding Control: Added await asyncio.sleep(0) to yield control and ensure tasks are run.
- Event Loop Issues: Ensured that there are no nested event loops causing issues.
- Task Scheduling: Verified that tasks are scheduled within an active event loop context.
My question is: how can I ensure that the tasks responsible for sending text deltas over the WebSocket are executed correctly and do not remain in the PENDING state? Is there some weird interaction to the OpenAI Assistant’s API via the event handler that I don’t know about?
I’ve been at this for longer than I’d care to admit at this point . Any insights or suggestions on what might be causing these tasks to remain pending and how to resolve it would be greatly appreciated!