Assistants API Streaming ends stream on tool output submission

Hey all,

I’m working on an assistant I’m expecting to call my functions. I was previously using this before streaming, and now trying to integrate it.

My current source code looks like this, with all of the possible event outputs from the AssistantStream printed for the sake of testing

            async function runAssistant(threadId, assistantId, req, res) {
                
                let textOut = "";
                let toolChangedLast = false;
                let functionName = "";
                let toolCallArgs = "";
                let toolCallId = "";

                const run = client.beta.threads.runs.createAndStream(threadId, {
                    assistant_id: assistantId,
                    instructions: "...",
                });

                run.on('textCreated', (text) => console.log(`textCreated: ${JSON.stringify(text)}`))
                    .on('textDelta', (textDelta, snapshot) => {
                        textOut += textDelta.value;
                        console.log(textOut);
                        toolChangedLast = false;
                    })
                    .on('toolCallCreated', (toolCall) => {
                        console.log("TOOL CALL CREATED", JSON.stringify(toolCall));
                        // reset tool call
                        functionName = "";
                        toolCallArgs = "";
                        toolCallId = "";
                        toolCallId = toolCall.id;
                        functionName = toolCall.function.name;
                        toolChangedLast = true;
                    })
                    .on('toolCallDelta', async (toolCallDelta, snapshot) => {
                        console.log("TOOL CALL DELTA", JSON.stringify(toolCallDelta));
                        toolChangedLast = true;

                        toolCallArgs += toolCallDelta.function.arguments;
                        const completeArgs = checkToolCallArgsComplete(toolCallArgs);
                        if (completeArgs !== null) {

                            const output = [my source code tool call];
                            let runs = await client.beta.threads.runs.list(threadId);

                            const curr_run = runs.data.find(run => run.status === "requires_action");
                            if (curr_run) {
                                let submittedToolOutputs = await client.beta.threads.runs.submitToolOutputs(
                                    threadId,
                                    curr_run.id,
                                    {
                                        tool_outputs: [
                                            {
                                                tool_call_id: toolCallId,
                                                output: JSON.stringify(output.data)
                                            }
                                        ]
                                    }
                                );
                            } else {
                                console.log("NO RUN FOUND");
                                console.log(`runs are ${JSON.stringify(runs)}`);
                            }
                        }
                    })
                    .on('messageCreated', (message) => {
                        console.log("messageCreated:", JSON.stringify(message));
                    })
                    .on('messageDelta', (messageDelta, snapshot) => {
                        console.log("messageDelta:", JSON.stringify(messageDelta));
                    })
                    .on('messageDone', (message) => {
                        console.log("messageDone:", JSON.stringify(message));
                    })
                    .on('runStepCreated', (runStep) => {
                        console.log("runStepCreated:", JSON.stringify(runStep));
                    })
                    .on('runStepDelta', (delta, snapshot) => {
                        console.log("runStepDelta:", JSON.stringify(delta));
                    })
                    .on('runStepDone', (runStep, snapshot) => {
                        console.log("runStepDone:", JSON.stringify(runStep));
                    })
                    .on('toolCallDone', (toolCall) => {
                        console.log("toolCallDone:", JSON.stringify(toolCall));
                        toolChangedLast = true;
                    })
                    .on('textCreated', (content) => {
                        console.log("textCreated:", JSON.stringify(content));
                    })
                    .on('textDelta', (delta, snapshot) => {
                        console.log("textDelta:", JSON.stringify(delta));
                        toolChangedLast = false;
                    })
                    .on('textDone', (content, snapshot) => {
                        console.log("textDone:", JSON.stringify(content));
                        toolChangedLast = false;
                    })
                    .on('imageFileDone', (content, snapshot) => {
                        console.log("imageFileDone:", JSON.stringify(content));
                    })
                    .on('end', async () => {
                        console.log("end event");
                        res.end(); // Close the request
                    });
            }

When I ran this, I noticed that submitting tool output would succeed, but the streaming thread would then send an end event. If I triggered another run after the stream was ended, the text generation steps would proceed as I would’ve expected them to after tool generation was received.

...sequence of tool call delta steps, ultimately resulting in tool submission....
2024-03-14 23:28:00 toolCallDone: {"index":0,"id":"call_873ulzpT4aR3GBZFxdBuaYI2","type":"function","function":{"name":"myTool","arguments":"","output":null}}
2024-03-14 23:28:00 end event
2024-03-14 23:28:00 Connection closed

Perhaps I’m submitting the tool input the wrong way for the streaming paradigm, but the docs don’t suggest an alternative. Any insight here would be appreciated!

4 Likes

Facing the same in Python library. I don’t even need to submit tool output - the run just ends after model makes the function call. I hope this is a bug rather than a feature!

1 Like

I checked the logs in the playground, and could see that after doing the post to the tools, the stream for the output is in the response of that post.

It looks like there is a handoff… I did some searching in the SDK, and found this in python, which works for me:

with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=thread_id,
run_id=run_id,
tool_outputs=tool_output_array,
event_handler=EventHandler()
) as stream:
stream.until_done()

I just tried something similar, and while the first couple of run steps seem to register the updated tool output, the run simply went back to requires the exact same tool output and terminating again weirdly.

Just wanted to help state that this is currently quite confusing and some more docs would help. I use the submitToolOutputsStream in Node. The stream ends immediately after calling it. Also a question is multiple tool calls. Does it still need to pass in all of them at once? If so, what is best practice to identify all the tool calls that needs to happen before it can proceed?

1 Like

Okay, so I played some more with this and figured it out. There is a GitHub issue, which I can not find right now, which states that streams will terminate on requires_action. So the way to think about this is that you create a method to observeStream(stream: AssistantStream). This method has the listeners for updates to your thread stream, something like:

// The "AssistantStream" class is not exposed on the OpenAI type namespace for some reason
private observeStream(stream: AssistantStream) {
  this.currentStream = stream
    .on('textDelta', () => {})
     // We use the end event to figure out if actions are required and what functions to call
    .on("end", () => {
      const currentRun = stream.currentRun();

      if (
        currentRun.status === "requires_action" &&
        currentRun.required_action.type === "submit_tool_outputs"
      ) {
        const toolCalls =
          currentRun.required_action.submit_tool_outputs.tool_calls;

        this.onRequiresActionEmitter.fire({
          runId: currentRun.id,
          toolCalls,
        });
      }
    })

  // Return waiting for the final messages (or finalRun)
  return stream.finalMessages()
}

Now in your addMessage method you do a run.createAndStream and return it something like:

async addMessage(instructions: string, text: string) {
   await this.openai.beta.threads.messages.create(this.thread.id, {
      role: "user",
      content,
    });

    const stream = this.openai.beta.threads.runs.createAndStream(
      this.thread.id,
      {
        instructions,
        assistant_id: this.assistantId,
      }
    );

  return this.observeStream(stream) 
}

And in your method for handling the tool outputs you do the same:

  async submitToolOutputs(
    runId: string,
    toolOutputs: OpenAI.Beta.Threads.RunSubmitToolOutputsParams.ToolOutput[]
  ) {
    const stream = this.openai.beta.threads.runs.submitToolOutputsStream(
      this.thread.id,
      runId,
      {
        tool_outputs: toolOutputs,
      }
    );

    return this.observeStream(stream);
  }

I am not using the tool call deltas here, as it does not really fit with the UX… but could be cool to show the AI writing out the params of the function call before it is called, if that is your jive :slight_smile:

7 Likes

Ah! This got me to the solution I was looking for. I was confused by the stream ending after requires_action. I used createAndStream() and iterated over (const event of stream) again. Thanks!

I managed to get an example working, posted it on medium here https://medium.com/@hawkflow.ai/openai-streaming-assistants-example-77e53ca18fb4

1 Like

I ended up with a similar solution to you - but made it recursive:

export const observeStream = (
    stream: AssistantStream,
    messageHandler: MessageHandler,
    opts: { client: OpenAI; threadId: string; functionRegistry: FunctionRegistry; logger: Loggable }
) => {
    const { client, threadId, functionRegistry, logger } = opts

    const checkAndRespondToToolCalls = async () => {
        const currentRun = stream.currentRun()
        if (currentRun.status !== "requires_action") return

        const fnCallResults = await Promise.all(
            currentRun.required_action?.submit_tool_outputs?.tool_calls?.map(tc =>
                callFunction(tc.function.name, JSON.parse(tc.function.arguments), functionRegistry, tc.id, logger)
            )
        )

        const newStream = client.beta.threads.runs.submitToolOutputsStream(threadId, currentRun.id, {
            tool_outputs: fnCallResults.map(({ callId, result }) => {
                return {
                    tool_call_id: callId,
                    output: result
                }
            })
        })
        observeStream(newStream, messageHandler, opts)
    }

    stream
        .on("textDelta", textDelta =>
            messageHandler.onMessageTextChunk ? messageHandler.onMessageTextChunk(textDelta.value) : null
        )
        .on("textDone", text =>
            messageHandler.onMessageComplete ? messageHandler.onMessageComplete(text.value) : null
        )
        .on("end", checkAndRespondToToolCalls)
        .on("error", err => {
            logger("error", err)
        })
}
4 Likes