I am working with OpenAI’s Assistant API along with function calling and handling streaming events using Node.js. I have set up an EventHandler
class that extends EventEmitter
to process events from an OpenAI assistant, and I am trying to handle the thread.run.requires_action
event, which should trigger the calling of tools, processing their output, and submitting it back through streaming.
The issue arises when the event handling stops at thread.run.requires_action
. Despite the tool outputs being processed, the flow does not seem to proceed further as expected, and I am not receiving the event in the controller.
Code Overview:
eventHandler.js
onEvent
method: Listens for the eventthread.run.requires_action
and processes the required tool calls.submitToolOutputs
method: Submits the processed tool outputs using a stream and emits the event.- Issue: Although the streaming messages are printed in the console (
process.stdout.write
), the event is not emitted properly to continue the flow.
const EventEmitter = require('events');
const assistantController = require('./assistant.controller');
class EventHandler extends EventEmitter {
constructor(client, project, dbURL, userID, projectID, res) {
super();
this.client = client;
this.project = project;
this.dbURL = dbURL;
this.userID = userID;
this.res = res;
}
async onEvent(event) {
try {
// Handle 'requires_action' events with tool_calls
if (event.event === "thread.run.requires_action") {
await this.handleRequiresAction(
event.data,
event.data.id,
event.data.thread_id,
);
}
} catch (error) {
console.error("Error handling event:", error);
}
}
async handleRequiresAction(data, runId, threadId) {
try {
let toolCalls = data.required_action.submit_tool_outputs.tool_calls;
const toolOutputs = await Promise.all(toolCalls.map(toolCall => {
return assistantController.processToolCalls(
toolCall,
this.project,
this.dbURL,
this.userID,
this.projectID
);
}));
// Submit tool outputs
await this.submitToolOutputs(toolOutputs, runId, threadId);
} catch (error) {
console.error("Error processing required action:", error);
}
}
async submitToolOutputs(toolOutputs, runId, threadId) {
console.log("Submitting tool outputs");
let finalMessage = '';
try {
const stream = this.client.beta.threads.runs.submitToolOutputsStream(
threadId,
runId,
{ tool_outputs: toolOutputs }
);
for await (const event of stream) {
const message = event.data?.delta?.content[0]?.text?.value;
process.stdout.write(message); //<-- messages are getting printed in console
this.emit("event", event); //<-- But this line supposed to emit the event while accessing from controller file
}
} catch (error) {
console.error("Error submitting tool outputs:", error);
}
}
}
module.exports = EventHandler;
controller.js
- The
EventHandler
is instantiated and set up to listen for the"event"
and handle it through theonEvent
method. - Issue: The flow stops at
thread.run.requires_action
, and even though events are emitted insidesubmitToolOutputs
, they don’t seem to trigger further event handling in the controller.
const eventHandler = new EventHandler(openai, project, dbURL, userID, projectID);
eventHandler.on("event", eventHandler.onEvent.bind(eventHandler));
const stream = await openai.beta.threads.runs.createAndStream(
thread,
{ assistant_id: assistant },
eventHandler,
);
for await (const event of stream) {
writeLog(`\n${event.event} : ${event.data.id}`);
eventHandler.emit("event", event);
console.log(event.event); // <-- Stopped at thread.run.requires_action and not proceeding further
}
Expected Behavior:
- I expect the flow to continue and the tool outputs to be processed and submitted after handling the
thread.run.requires_action
event. - The event should be emitted and handled properly in the controller file.
Actual Behavior:
- The event handling stops at
thread.run.requires_action
. - The tool outputs are processed, and messages are logged to the console, but the event is not emitted as expected.
Any insights into why the event isn’t being handled further or how I can ensure that the event gets emitted and handled properly would be greatly appreciated!