Streaming was recently released for the Assistant, and I now have a hard time to understand how it works with function calling. The documentation doesn’t say almost anything about it and I have to search through the openai sources to get some information. Particularly for me it is unclear which event I must handle on a stream in order to get the function with arguments I want to call. I logged all the events coming in the stream: there a lots of thread.run.step.delta like this:
The only difference between all the chunks (deltas) is the “arguments” - with every chunk openAi gives next portion of arguments to fill. Well, OK, I don’t need that but where is the function name in the event?
And after the last delta I just get the
"event": "thread.run.requires_action",
which contains my function name and full arguments. Is this the event I need to listen to call the function? What is its name in the node.js SDK then? I could find only these events available in the source code:
I’m also working with nodejs as well as trying to get tool calls and streaming working. However, your approach seems to be very different from mine. With openai.beta.threads.runs.createAndStream(), I can’t seem to understand how to understand how to access the requires_action status, which I used for non-streaming. (see second code example).
As for getting the function arguments, you have to accumulate them since they stream in via toolCallDelta.
let accumulatedArguments = '';
run = await openai.beta.threads.runs.createAndStream(threadId, {
assistant_id: assistantId
})
.on('toolCallDelta', (toolCallDelta, snapshot) => {
console.log('toolCallDelta:', toolCallDelta);
if (toolCallDelta.type === 'function') {
if (toolCallDelta.type === 'function' && toolCallDelta.function.arguments) {
accumulatedArguments += toolCallDelta.function.arguments;
}
}
})
Where I’m stuck is figuring out when to trigger my other code to continue with the function calls and submitting the output. Again, unclear how to access the “requires_action” status in the run event. For non streaming, my function looks like below.
async function retrieveRun(threadId, runId) {
let run;
do {
run = await openai.beta.threads.runs.retrieve(threadId, runId);
if (run.status === 'requires_action') {
const requiredAction = run.required_action;
if (requiredAction.type === 'submit_tool_outputs') {
const toolCalls = requiredAction.submit_tool_outputs.tool_calls;
const toolOutputs = [];
for (const toolCall of toolCalls) {
// Extract the function name from the tool call
const functionName = toolCall.function.name;
// Parse the function arguments from the tool call
const functionArgs = JSON.parse(toolCall.function.arguments);
// Define the available functions
const availableFunctions = {
get_google_directions: getGoogleDirections,
get_weather: getOpenWeatherData,
get_distance_and_time: getDistanceAndTime,
};
console.log("Function Name:", functionName);
// Look up the actual function to call based on the function name
const functionToCall = availableFunctions[functionName];
const functionResponse = await functionToCall(functionArgs);
console.log("Function Response:", functionResponse);
const outputString = JSON.stringify(functionResponse);
toolOutputs.push({
tool_call_id: toolCall.id,
output: outputString,
});
}
console.log("toolOutputs: ", toolOutputs);
await openai.beta.threads.runs.submitToolOutputs(
threadId,
runId,
{ tool_outputs: toolOutputs }
);
}
}
await new Promise(resolve => setTimeout(resolve, 500));
} while (run.status !== 'completed');
return {
status: run.status,
tools: run.tools,
file_ids: run.file_ids,
usage: run.usage
};
}
I was able to get function calls to work and I’m able to submit tool outputs with openai.beta.threads.runs.submitToolOutputs(). However after that, I’m not sure how to get the streaming response that includes the outputs. I can confirm that the response is happening because if I list the historical thread messages, it does include the new response. I just don’t know how to stream it in after submitToolOutputs() is triggered. Any insight would be appreciated.
with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=self.thread_id,
run_id=self.run_id,
tool_outputs=tool_outputs,
) as stream:
for text in stream.text_deltas:
i’m still having issues, but i was able to get this far by checking the new updates they made to the SDK documentation