Multiple Tools in Assistant Streaming API

It seems that calling multiple tools is not supported in the new streaming feature for the assistant API. I am using the code below, but the “toolCallDone” event is only triggered for the first tool and never for any subsequent tools (“toolCallDelta” is still triggered for other tools although after the first tool, the snapshot value is “undefined”). Can someone look into this?

I am using the same code as @paul.grimshaw from his thread (Issue with submitting tool outputs in assistants stream - #3 by paul.grimshaw)

let runId
        return new Promise(resolve => {
            client.beta.threads.runs
                .createAndStream(threadId, {
                    assistant_id: assistantId
                })
                .on("runStepCreated", run => {
                    runId = run.run_id
                })
                .on("runStepDone", () => {
                    console.log("runStepDone")
                    resolve({ threadId })
                })
                .on("textCreated", text => console.log(`\nassistant > ${text.value}\n\n`))
                .on("textDelta", textDelta => {
                    resultTextStream.push(textDelta.value)
                    console.log(resultTextStream.join(""))
                })

                .on("toolCallCreated", toolCall => console.log(`\nassistant > ${toolCall.type}\n\n`))
                .on("toolCallDone", async (toolCall: FunctionToolCall) => {
                    const argP = JSON.parse(args[toolCall.id].join(""))
                    const { result } = await callFn(toolCall.function.name, argP, fnRegistry)

                    const stream = await client.beta.threads.runs.submitToolOutputs(threadId, runId, {
                        stream: true,
                        tool_outputs: [
                            {
                                tool_call_id: toolCall.id,
                                output: JSON.stringify(result)
                            }
                        ]
                    })

                    for await (const event of stream) {
                        console.log(event)
                    }
                })

                .on("toolCallDelta", (toolCallDelta, snapshot) => {
                    if (toolCallDelta.type === "function") {
                        args[snapshot.id] = [...(args[snapshot.id] ?? []), toolCallDelta.function.arguments]
                    }
                })
        })

Thank you!

2 Likes

i found a solution that worked for me.

  1. i found a post on medium from HawkFlow.ai and that started me on the right track by using the event handler.
  2. refactored his code to suite my setup with how i have my functions
  3. created a list variable to store the function calls from on_tool_call_created
  4. looped through the tool calls and submitted them to my functions and added each response to a tool_outputs list
  5. submit that list in submit_tool_outputs.stream

sounds convoluted when writing it out but here’s my code:

Blockquote
class EventHandler(AssistantEventHandler):
def init(self, request, thread_id, assistant_id):
super().init()
self.output = None
self.request = request
self.tool_id = None
self.function_arguments = None
self.thread_id = thread_id
self.assistant_id = assistant_id
self.run_id = None
self.run_step = None
self.function_name = “”
self.arguments = “”
self.tool_calls =

@override
def on_text_created(self, text) -> None:
    print(f"\nassistant on_text_created > ", end="", flush=True)

@override
def on_text_delta(self, delta, snapshot):
    print(f"{delta.value}")

@override
def on_end(self, ):
    print(f"\n end assistant > ",self.current_run_step_snapshot, end="", flush=True)

@override
def on_exception(self, exception: Exception) -> None:
    print(f"\nassistant > {exception}\n", end="", flush=True)

@override
def on_message_created(self, message: Message) -> None:
    print(f"\nassistant on_message_created > {message}\n", end="", flush=True)

@override
def on_message_done(self, message: Message) -> None:
    print(f"\nassistant on_message_done > {message}\n", end="", flush=True)

@override
def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:
    pass

def on_tool_call_created(self, tool_call):
    print(f"\nassistant on_tool_call_created > {tool_call}")
    self.function_name = tool_call.function.name
    self.function_arguments = tool_call.function.arguments  # Capture the arguments
    self.tool_id = tool_call.id
    print(f"\on_tool_call_created > run_step.status > {self.run_step.status}")
    print(f"\nassistant > {tool_call.type} {self.function_name}\n", flush=True)

    keep_retrieving_run = client.beta.threads.runs.retrieve(
        thread_id=self.thread_id,
        run_id=self.run_id
    )

    while keep_retrieving_run.status in ["queued", "in_progress"]: 
        keep_retrieving_run = client.beta.threads.runs.retrieve(
            thread_id=self.thread_id,
            run_id=self.run_id
        )
        print(f"\nSTATUS: {keep_retrieving_run.status}")

        for action in keep_retrieving_run.actions:
            if action['type'] == 'tool_call':
                tool_call = action['tool_call']

                self.tool_calls.append(tool_call)

@override
def on_tool_call_done(self, tool_call: ToolCall) -> None:       
    keep_retrieving_run = client.beta.threads.runs.retrieve(
        thread_id=self.thread_id,
        run_id=self.run_id
    )

    print(f"\nDONE STATUS: {keep_retrieving_run.status}")

    if keep_retrieving_run.status == "completed":
        all_messages = client.beta.threads.messages.list(
            thread_id=self.thread_id
        )

        print(all_messages.data[0].content[0].text.value, "", "")
        return

    elif keep_retrieving_run.status == "requires_action":
        print("here you would call your function")

        run = client.beta.threads.runs.retrieve(
            thread_id=self.thread_id,
            run_id=self.run_id
        )
        tool_outputs = []
        for tool_call in self.tool_calls:
            # process tool_call
            tool_call_result = ai_call(tool_call, self.request)
            tool_call_id = tool_call_result['tool_call_id']
            output = tool_call_result['output']
            output_json = json.dumps(output)
            formatted_output = {
                "tool_call_id": tool_call_id,
                "output": output_json
            }
            tool_outputs.append(formatted_output)

        with client.beta.threads.runs.submit_tool_outputs_stream(
            thread_id=self.thread_id,
            run_id=self.run_id,
            tool_outputs=[{
                "tool_call_id": self.tool_id,
                "output": json.dumps(tool_outputs),
            }],
            event_handler=EventHandler(self.request, self.thread_id, self.assistant_id)
        ) as stream:
            stream.until_done()                       
    else:
        print(f"\nassistant on_tool_call_done > {tool_call}\n", end="", flush=True)

@override
def on_run_step_created(self, run_step: RunStep) -> None:
    print(f"on_run_step_created")
    self.run_id = run_step.run_id
    self.run_step = run_step
    print("The type ofrun_step run step is ", type(run_step), flush=True)
    print(f"\n run step created assistant > {run_step}\n", flush=True)

@override
def on_run_step_done(self, run_step: RunStep) -> None:
    print(f"\n run step done assistant > {run_step}\n", flush=True)

def on_tool_call_delta(self, delta, snapshot): 
    if delta.type == 'function':
        print(delta.function.arguments, end="", flush=True)
        self.arguments += delta.function.arguments
    elif delta.type == 'code_interpreter':
        print(f"on_tool_call_delta > code_interpreter")
        if delta.code_interpreter.input:
            print(delta.code_interpreter.input, end="", flush=True)
        if delta.code_interpreter.outputs:
            print(f"\n\noutput >", flush=True)
            for output in delta.code_interpreter.outputs:
                if output.type == "logs":
                    print(f"\n{output.logs}", flush=True)
    else:
        print("ELSE")
        print(delta, end="", flush=True)

@override
def on_event(self, event: AssistantStreamEvent) -> None:
    if event.event == "thread.run.requires_action":
        print("\nthread.run.requires_action > submit tool call")
        print(f"ARGS: {self.arguments}")
1 Like

I know its been a while since onbekend commented on this, but I felt it worth posting what I think is root cause and a cleaner solution.

To learn chat, I started with the chat completion APIs both blocking and streaming. Then I tried the Assistant API. Clearly, the Assistant abstraction is built on the underlying API or (concepts of the API) present in Chat Completions.

Namely – The ChatCompletionMessage returned in the completions API populates the tool_calls List to indicate that one or more tool calls need to be made. As a constraint, the next message(s) following a chat completion requiring tool_calls must be ChatCompletionToolMessageParam containing the in order execution of the functions with tool_call_id set to the tool_calls requested in the ChatCompletionMessage.

Or summarized: The AI requests tool_calls each with an ID and the message returns as complete. Our code invokes the requested functions and posts results along with the supplied IDs as a new message and invokes chat completions again.

Now, enter the Assistant API and the streaming event abstraction of AssistantEventHandler. I really like its power and the cleanliness of the overall thread, assistant, run, run state and so on. Overall, a huge step up from Chat Completions in terms of conceptual clarity and doing quite a bit of state tracking for us all.

For streaming runs with multiple tool_calls, I think the AssistantEventHandler is not doing the right thing OR is really hard for all of us to infer the proper usage pattern.

Clearly, your solution works where you are constructing your own state of tools to be called using the on_tool_call_XXX events. I think it telling the open API quick start for functions uses a different pattern. Instead of using the on_tool_call_XXX events, it hooks the lower level general on_event.

The event of interest is not even propagated as a possible hook in the more specific event handlers of AssistantEventHandler. So for me, the critical red-herring, is providing an event titled, on_tool_call_done that isn’t actually what we really want, which is on_toll_calls_done. The key event in the quick start is:

thread.run.requires_action

This exactly matches the event you check for before invoking tool calls in the polling version of the Assistant API (also included in the quickstart) and is analogous to the state of when a chat completion finished with tool_calls populated.

The other key piece of info from the quick start is the contents of the

event.data.required_action.submit_tool_outputs.tool_calls

It correctly contains the multiple tool calls that need to be made and is the right thing to iterate over before finally posting the tool call results in another run of the AI.

So, you could hook on event like the quick start and follow that pattern and I think things would get a lot cleaner in your event handler.

This is what a simplified view of the multi-tool calling function ended up looking like:

Separately, I think within your on_tool_done implementation, something like this would leverage the underlying events better too:

        elif self.current_run.status == "requires_action":
            for tool_call in self.current_event.data.required_action.submit_tool_outputs.tool_calls:
                # Call your tools here...
                pass

And then you would post results the same way as you do via submit_tool_outputs_stream.i

3 Likes