Streaming Stopped at `thread.run.requires_action` When Handling OpenAI Assistants Function Calling

I am working with OpenAI’s Assistant API along with function calling and handling streaming events using Node.js. I have set up an EventHandler class that extends EventEmitter to process events from an OpenAI assistant, and I am trying to handle the thread.run.requires_action event, which should trigger the calling of tools, processing their output, and submitting it back through streaming.

The issue arises when the event handling stops at thread.run.requires_action. Despite the tool outputs being processed, the flow does not seem to proceed further as expected, and I am not receiving the event in the controller.

Code Overview:

eventHandler.js

  • onEvent method: Listens for the event thread.run.requires_action and processes the required tool calls.
  • submitToolOutputs method: Submits the processed tool outputs using a stream and emits the event.
  • Issue: Although the streaming messages are printed in the console (process.stdout.write), the event is not emitted properly to continue the flow.
const EventEmitter = require('events');
const assistantController = require('./assistant.controller');

class EventHandler extends EventEmitter {
    constructor(client, project, dbURL, userID, projectID, res) {
        super();
        this.client = client;
        this.project = project;
        this.dbURL = dbURL;
        this.userID = userID;
        this.res = res;
    }

    async onEvent(event) {
        try {
            // Handle 'requires_action' events with tool_calls
            if (event.event === "thread.run.requires_action") {
                await this.handleRequiresAction(
                    event.data,
                    event.data.id,
                    event.data.thread_id,
                );
            }
        } catch (error) {
            console.error("Error handling event:", error);
        }
    }

    async handleRequiresAction(data, runId, threadId) {
        try {
            let toolCalls = data.required_action.submit_tool_outputs.tool_calls;
            const toolOutputs = await Promise.all(toolCalls.map(toolCall => {
                return assistantController.processToolCalls(
                    toolCall,
                    this.project,
                    this.dbURL,
                    this.userID,
                    this.projectID
                );
            }));

            // Submit tool outputs
            await this.submitToolOutputs(toolOutputs, runId, threadId);
        } catch (error) {
            console.error("Error processing required action:", error);
        }
    }

    async submitToolOutputs(toolOutputs, runId, threadId) {
        console.log("Submitting tool outputs");
        let finalMessage = '';
        try {
            const stream = this.client.beta.threads.runs.submitToolOutputsStream(
                threadId,
                runId,
                { tool_outputs: toolOutputs }
            );

            for await (const event of stream) {
                const message = event.data?.delta?.content[0]?.text?.value;
                process.stdout.write(message);  //<-- messages are getting printed in console
                this.emit("event", event);  //<-- But this line supposed to emit the event while accessing from controller file
            }
        } catch (error) {
            console.error("Error submitting tool outputs:", error);
        }
    }
}

module.exports = EventHandler;

controller.js

  • The EventHandler is instantiated and set up to listen for the "event" and handle it through the onEvent method.
  • Issue: The flow stops at thread.run.requires_action, and even though events are emitted inside submitToolOutputs, they don’t seem to trigger further event handling in the controller.
const eventHandler = new EventHandler(openai, project, dbURL, userID, projectID);
eventHandler.on("event", eventHandler.onEvent.bind(eventHandler));

const stream = await openai.beta.threads.runs.createAndStream(
    thread,
    { assistant_id: assistant },
    eventHandler,
);

for await (const event of stream) {
    writeLog(`\n${event.event}  : ${event.data.id}`);
    eventHandler.emit("event", event);
    console.log(event.event);     // <-- Stopped at thread.run.requires_action and not proceeding further
}

Expected Behavior:

  • I expect the flow to continue and the tool outputs to be processed and submitted after handling the thread.run.requires_action event.
  • The event should be emitted and handled properly in the controller file.

Actual Behavior:

  • The event handling stops at thread.run.requires_action.
  • The tool outputs are processed, and messages are logged to the console, but the event is not emitted as expected.

Any insights into why the event isn’t being handled further or how I can ensure that the event gets emitted and handled properly would be greatly appreciated!

2 Likes

have you had any luck with this ? i’m having the same issue ! the open ai doc example is really not helpful…

Hi @Zakisb97 , No But I came up with some thing like below.

{
            await openai.beta.threads.messages.create(thread, {
                role: "user",
                content: message,
            });

            let run_id;
            let tool_outputs = [];
            let stream = openai.beta.threads.runs.stream(thread, { assistant_id: assistant });

            let masterResponse = '';
            for await (const event of stream) {
                if (event.evnet === "thread.run.created") {
                    process.stdout.write('\nassistant > ');
                    socketEmitController.sendDataToAngularClient([userID], 'assistant', { status: true, message: '', info: 'Message Initiated', thread: thread, state: "created" });
                } else if (event.event === "thread.message.delta") {
                    let assistantResponse = event.data.delta.content[0].text.value;
                    masterResponse += assistantResponse;
                    process.stdout.write(assistantResponse);
                    socketEmitController.sendDataToAngularClient([userID], 'assistant', { status: true, message: assistantResponse, info: 'Message Active', thread: thread, state: "queued" });
                } else if (event.event === "thread.run.requires_action") {
                    if (event.data.required_action && event.data.required_action.type === 'submit_tool_outputs') {
                        run_id = event.data.id;

                        let toolCalls = event.data.required_action.submit_tool_outputs.tool_calls;
                        tool_outputs = await Promise.all(
                            toolCalls.map(async (tool) => {
                                let toolOutput = await this.processToolCalls(tool, project, dbURL, userID, projectID);
                                return toolOutput;
                            }));

                        process.stdout.write('\nProcessing ..\n');
                        socketEmitController.sendDataToAngularClient([userID], 'assistant', { status: true, message: '', info: 'Message Processing', thread: thread, state: "inprogress" });
                        break;
                    }

                } else if (event.event === "thread.run.completed") {
                    conversationDetails._id = new ObjectId();
                    conversationDetails.createdOn = new Date();
                    conversationDetails.lastUpdatedOn = new Date();
                    conversationDetails.timestamp = new Date().getTime();
                    conversationDetails.role = 'assistant';
                    conversationDetails.message = masterResponse;
                    assistantModel.addConversationHistory(project, dbURL, conversationDetails);
                    socketEmitController.sendDataToAngularClient([userID], 'assistant', { status: true, message: masterResponse, info: 'Message Completed', thread: thread, state: "completed" });
                    return res.status(200).json({ status: true, message: masterResponse, info: 'Message Completed', thread: thread, state: "completed" });
                }
            }

            //Submit tool outputs
            stream = openai.beta.threads.runs.submitToolOutputsStream(thread, run_id, {
                tool_outputs,
            });

            let masterToolResponse = '';
            for await (const event of stream) {
                if (event.event === 'thread.message.delta') {
                    let assistantResponse = event.data.delta.content[0].text.value;
                    masterToolResponse += assistantResponse;
                    process.stdout.write(assistantResponse);
                    socketEmitController.sendDataToAngularClient([userID], 'assistant', { status: true, message: assistantResponse, info: 'Message Active', thread: thread, state: "queued" });
                } else if (event.event === "thread.run.completed") {
                    conversationDetails._id = new ObjectId();
                    conversationDetails.createdOn = new Date();
                    conversationDetails.lastUpdatedOn = new Date();
                    conversationDetails.timestamp = new Date().getTime();
                    conversationDetails.role = 'assistant';
                    conversationDetails.message = masterToolResponse;
                    assistantModel.addConversationHistory(project, dbURL, conversationDetails);
                    socketEmitController.sendDataToAngularClient([userID], 'assistant', { status: true, message: masterToolResponse, info: 'Message Completed', thread: thread, state: "completed" });
                    return res.status(200).json({ status: true, message: masterToolResponse, info: 'Message Completed', thread: thread, state: "completed" });
                }
            }
        }

I got same error. I didnt find any solution.

I will listen the step.delta event for understanding tool calls.

I have a similar error that happens very rarely and at a specific moment, when the user asks to generate a graph with a certain subject or data.

Basically, it calls thread.run.requires_action and I return the data to it. After that, it continues with generating the code in Python to create the graph, however, instead of going to the thread.message.delta event and generating a message mentioning that it created the graph/making some explanation, it simply ends there.