How to properly handle the function call in assistant streaming (node.js)>

Streaming was recently released for the Assistant, and I now have a hard time to understand how it works with function calling. The documentation doesn’t say almost anything about it and I have to search through the openai sources to get some information. Particularly for me it is unclear which event I must handle on a stream in order to get the function with arguments I want to call. I logged all the events coming in the stream: there a lots of thread.run.step.delta like this:

  "event": "thread.run.step.delta",
  "data": {
    "id": "step_G7HFPc9DnwOAzfwbei7KbaAh",
    "object": "thread.run.step.delta",
    "delta": {
      "step_details": {
        "type": "tool_calls",
        "tool_calls": [
          {
            "index": 0,
            "type": "function",
            "function": {
              "arguments": "ус"
            }
          }
        ]
      }
    }
  }
}

The only difference between all the chunks (deltas) is the “arguments” - with every chunk openAi gives next portion of arguments to fill. Well, OK, I don’t need that but where is the function name in the event?
And after the last delta I just get the

"event": "thread.run.requires_action",

which contains my function name and full arguments. Is this the event I need to listen to call the function? What is its name in the node.js SDK then? I could find only these events available in the source code:

  messageCreated: (message: Message) => void;
  messageDelta: (message: MessageDelta, snapshot: Message) => void;
  messageDone: (message: Message) => void;

  runStepCreated: (runStep: RunStep) => void;
  runStepDelta: (delta: RunStepDelta, snapshot: Runs.RunStep) => void;
  runStepDone: (runStep: Runs.RunStep, snapshot: Runs.RunStep) => void;

  toolCallCreated: (toolCall: ToolCall) => void;
  toolCallDelta: (delta: ToolCallDelta, snapshot: ToolCall) => void;
  toolCallDone: (toolCall: ToolCall) => void;

  textCreated: (content: Text) => void;
  textDelta: (delta: TextDelta, snapshot: Text) => void;
  textDone: (content: Text, snapshot: Message) => void;

I’m also working with nodejs as well as trying to get tool calls and streaming working. However, your approach seems to be very different from mine. With openai.beta.threads.runs.createAndStream(), I can’t seem to understand how to understand how to access the requires_action status, which I used for non-streaming. (see second code example).

As for getting the function arguments, you have to accumulate them since they stream in via toolCallDelta.

let accumulatedArguments = '';

run = await openai.beta.threads.runs.createAndStream(threadId, {
    assistant_id: assistantId
})
.on('toolCallDelta', (toolCallDelta, snapshot) => {
    console.log('toolCallDelta:', toolCallDelta);
    if (toolCallDelta.type === 'function') {
        if (toolCallDelta.type === 'function' && toolCallDelta.function.arguments) {
            accumulatedArguments += toolCallDelta.function.arguments;
        }
    }
})

Where I’m stuck is figuring out when to trigger my other code to continue with the function calls and submitting the output. Again, unclear how to access the “requires_action” status in the run event. For non streaming, my function looks like below.

async function retrieveRun(threadId, runId) {
    let run;
    do {
        run = await openai.beta.threads.runs.retrieve(threadId, runId);

        if (run.status === 'requires_action') {
            const requiredAction = run.required_action;

            if (requiredAction.type === 'submit_tool_outputs') {
                const toolCalls = requiredAction.submit_tool_outputs.tool_calls;
                const toolOutputs = [];

                for (const toolCall of toolCalls) {
                    // Extract the function name from the tool call
                    const functionName = toolCall.function.name;

                    // Parse the function arguments from the tool call
                    const functionArgs = JSON.parse(toolCall.function.arguments);

                    // Define the available functions
                    const availableFunctions = {
                        get_google_directions: getGoogleDirections,
                        get_weather: getOpenWeatherData,
                        get_distance_and_time: getDistanceAndTime,
                    };
                    console.log("Function Name:", functionName);

                    // Look up the actual function to call based on the function name
                    const functionToCall = availableFunctions[functionName];

                    const functionResponse = await functionToCall(functionArgs);
                    console.log("Function Response:", functionResponse);

                    const outputString = JSON.stringify(functionResponse);

                    toolOutputs.push({
                        tool_call_id: toolCall.id,
                        output: outputString,
                    });
                }
                console.log("toolOutputs: ", toolOutputs);

                await openai.beta.threads.runs.submitToolOutputs(
                    threadId,
                    runId,
                    { tool_outputs: toolOutputs }
                );
            }
        }
        await new Promise(resolve => setTimeout(resolve, 500));
    } while (run.status !== 'completed');

    return {
        status: run.status,
        tools: run.tools,
        file_ids: run.file_ids,
        usage: run.usage
    };
}

Found that they added streaming info which shows all the events to the docs:
https://platform.openai.com/docs/api-reference/runs/createRun

1 Like

I was able to get function calls to work and I’m able to submit tool outputs with openai.beta.threads.runs.submitToolOutputs(). However after that, I’m not sure how to get the streaming response that includes the outputs. I can confirm that the response is happening because if I list the historical thread messages, it does include the new response. I just don’t know how to stream it in after submitToolOutputs() is triggered. Any insight would be appreciated.

you can get the text with:

                        with client.beta.threads.runs.submit_tool_outputs_stream(
                            thread_id=self.thread_id,
                            run_id=self.run_id,
                            tool_outputs=tool_outputs,
                        ) as stream:
                            for text in stream.text_deltas:

i’m still having issues, but i was able to get this far by checking the new updates they made to the SDK documentation

Yesterday I used the event…event and it was really helpful to see what it called to which I link to an exposed event (i.e. messageDelta).

run.on('event', (event) => {
  console.log('event', event);
})

This tells me after X thing - Y happens, so I can update the chain to use whatever is needed:

run.on('messageCreated', (message) => {
  console.log('messageCreated', message);
})

Why I keep getting this on Streamlit ‘Stream’ object has no attribute ‘status’?

Doesn’t streaming still work with run statuses, which means you can’t use streaming with the function calling tool?

1 Like