[Python] OpenAI streaming ends after submitting 2 or more tool outputs

Hello OpenAi Community!

We are developing our internal chatbot and use assistants API to reach our purposes.

Currently we use an ordinary approach to send the OpenAI messages to our client. Simply creating a run and submit it, then through simple “for” loop iterating through the OpenAI ThreadMessageDeltas steps and generating the complete message and sending it back to client.

We found out that this approach is not very good in performance and may take ~20 sec to complete. Our goal is to completely switch to assistant stream API.

So, within this requirement I implemented a new approach which allow us to stream OpenAI response using AsyncGenerator in Python. While testing the code, I found out the following problem:

When OpenAI requests us to submit tool outputs (in my case it requested to submit 2 functions) and when I submit them, stream immediately ends… In this case, no TextMessageDeltas produced and stream fires on_end() event. Important Note: Such issue does not occur when OpenAI requests only 1 tool (function in my case). In this case I do submit my function and stream continues and start to generate TextMessageDeltas as expected. So, the issue only when submitting more than 1 tool output.

Please, any hints/suggestions what can be wrong in this case? Possibly if you can provide some code snippets how to resolve this will be much appreciated.

Below is my code snippet for Service, Controller and StreamHandler:

assistant_streaming_controller.py

async def streamChat():
     ...
     return AssistantStreamingService.get_instance().handle_user_prompt(thread_id, user_message, auth_header, user_id), {"Content-Type": response_format}
assistant_streaming_service.py

def handle_user_prompt(self, thread_id, message, auth_header: str = None, user_id: str = None):
    response = self.run_assistant(thread_id, augmented_query or message, auth_header, user_id)
    return response

async def run_assistant(self, thread_id, message, auth_header: str = None, user_id: str = None):
    ...
            with self.get_openai().beta.threads.runs.stream(
            thread_id=thread_id,
            assistant_id=self.assistant.id,
            model=config.get('openai.model_id', 'gpt-4o-mini'),
            temperature=config.get("openai.preset.temperature", 0.2),
            top_p=config.get("openai.preset.top_p", 0.8),
            event_handler=StreamHandler(message)
        ) as stream:
            for chunk in stream:
                if isinstance(chunk, ThreadMessageDelta):
                    contents = chunk.data.delta.content
                    for content in contents:
                        if content.type == 'text':
                            yield  content.text.value
                if isinstance(chunk, ThreadRunRequiresAction):
                    run = chunk.data
                    logging.info(f"Run {run.id} requires action. Processing required functions...")
                    tool_outputs = await self.retrieve_requested_functions(run, auth_header, user_id)
                    stream_manager = self.__submit_tool_output_and_stream(tool_outputs, thread_id, run)
                    if stream_manager:
                        with stream_manager as new_stream:
                            for new_chunk in new_stream:
                                if isinstance(new_chunk, ThreadMessageDelta):
                                    contents = new_chunk.data.delta.content
                                    for content in contents:
                                        if content.type == 'text':
                                            yield content.text.value

def __submit_tool_output_and_stream(self, tool_outputs: list, thread_id: str, run: Run):
        stream = None
        if tool_outputs:
            try:
                stream = self.get_openai().beta.threads.runs.submit_tool_outputs_stream(
                    thread_id=thread_id,
                    run_id=run.id,
                    tool_outputs=tool_outputs,
                    event_handler=StreamHandler()
                )
                logging.debug("Tool outputs submitted successfully. Returning new stream.")
            except Exception as e:
                self.get_openai().beta.threads.runs.cancel(run_id=run.id, thread_id=thread_id)
                logging.error(f"Error submitting tool outputs: {e}")

        return stream
stream_handler.py

class StreamHandler(AssistantEventHandler):

    @override
    def on_event(self, event: AssistantStreamEvent) -> None:
        match event:
            case ThreadRunCreated():
                if self.user_prompt:
                    run = event.data
                    // Our internal logic
            case ThreadRunQueued() | ThreadRunInProgress():
                run = event.data
                logging.debug(f"StreamHandler: Run {run.id} has status {run.status}")
            case ThreadRunFailed():
                run = event.data
                logging.warning(f"StreamHandler: Run {run.id} failed. Error: {run.last_error}")
            case ThreadRunExpired():
                run = event.data
                logging.warning(f"StreamHandler: Run {run.id} has been expired.")
            case ThreadRunIncomplete():
                run = event.data
                stop_reason = run.incomplete_details.reason if run.incomplete_details else 'unknown'
                logging.warning(f"StreamHandler: Run {run.id} is incomplete. Reason: {stop_reason}")
            case ThreadRunCancelled():
                run = event.data
                logging.warning(f"StreamHandler: Run {run.id} has been cancelled.")
            case ThreadRunCompleted():
                run = event.data
                logging.debug(f"StreamHandler: Run {run.id} completed successfully with status: {run.status}")

    @override
    def on_end(self) -> None:
        final_run = self.current_run
        if final_run.required_action:
            logging.debug(f"We processed REQUIRES_ACTION for the current run {final_run.id}. No actions are needed at this moment")
        elif final_run.last_error:
            logging.debug(f"Run {final_run.id} completed with errors. Last error message is: {final_run.last_error.message}")
        else:
            logging.info(f"Run {final_run.id} completed successfully.")
            final_assistant_message = self.current_message_snapshot
            // Our internal logic