Efficiently Collecting Tool Calls with parallel_tool_calls=True During Streaming

Introduction

I’m working with the OpenAI API’s tool calling feature and facing challenges when handling tool calls in streaming mode with parallel_tool_calls=True. My goal is to efficiently capture the entire tool call (function name and arguments) in one piece. Currently, I can collect function names and arguments successfully in streaming mode. However, enabling parallel_tool_calls=True introduces complexities that make it nearly impossible to gather everything consistently due to overlapping streams.

I’m considering whether it might be beneficial to handle tool calls differently by not streaming responses when a tool_call is detected, but rather returning them as complete data objects instead. This might offer a more reliable way to capture calls accurately without piecing together information from multiple stream chunks.

Technical Challenges

Here are the specific challenges I’m encountering:

  1. Chunk-based Transmission: With the function name and arguments arriving in separate chunks, collecting all parts of a tool call becomes challenging. Though I can capture function names and arguments successfully, parallel_tool_calls=True adds complexity by potentially interweaving data, making consistent gathering difficult.
  2. Consideration for Non-Streamed Tool Call Results: Given the complexity, I’m exploring whether receiving tool calls as complete data objects instead of streams would be a feasible solution, especially when parallel_tool_calls=True is set. This could reduce the need to reassemble parts from multiple chunks and improve reliability.
    def ask_chat_gpt(
        self, user_input: str, conversation_history: List[Dict[str, str]]
    ) -> Any:
        """
        Sends user input to the OpenAI ChatGPT model and processes the streaming response.

        Args:
            user_input (str): The user's input message.
            conversation_history (List[Dict[str, str]]): The conversation history to
                                                            maintain context.

        Returns:
            Any: A streaming response from ChatGPT, which can either be normal text or
                    a function call result.
        """
        self.logger.info(f"Sending user input to GPT: {user_input}")
        conversation_history.append({"role": "user", "content": user_input})

        # Stream GPT response
        stream = self.openai_connector.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=conversation_history,
            stream=True,
            parallel_tool_calls=False,
            tools=[
                executor.get_executor_definition() for executor in self.executors
            ],
        )

        # Split the stream for inspection
        splitter = StreamSplitter(stream)
        splitter.start()

        # Initialize variables for function call handling
        function_call_name = None
        function_call_arguments = ""

        first_chunk = next(splitter.get())
        choice = first_chunk.choices[0].delta

        # Check if it's a function call
        if hasattr(choice, "tool_calls") and choice.tool_calls is not None:
            self.logger.info(f"Function call detected: {choice.tool_calls[0].function.name}")

            for chunk in splitter.get():
                choice = chunk.choices[0].delta

                # Get the function call name from the first chunk
                if (
                    hasattr(choice, "tool_calls")
                    and choice.tool_calls is not None
                    and choice.tool_calls[0].function is not None
                ):
                    if function_call_name is None:
                        function_call_name = (
                            choice.tool_calls[0].function.name
                        )  # Store the function name
                    if choice.tool_calls[0].function.arguments:
                        # Collect arguments
                        function_call_arguments += choice.tool_calls[0].function.arguments

            # Process the function call if detected
            if function_call_name:
                self.logger.info(
                    f"Executing function: {function_call_name} with "
                    f"arguments: {function_call_arguments}"
                )
                arguments = json.loads(function_call_arguments)
                result = self.handle_function_call(function_call_name, arguments)

                # Fetch the appropriate executor
                executor = next(
                    (
                        e
                        for e in self.executors
                        if e.get_executor_definition()["function"]["name"] == function_call_name
                    ),
                    None,
                )
                if not executor:
                    self.logger.error(
                        f"No executor found for function: {function_call_name}"
                    )
                    raise Exception(
                        f"No Executor found for function: {function_call_name}"
                    )

                # Create the interpretation request for GPT
                conversation_history.append({"role": "system", "content": result})

                # Maybe too much....
                conversation_history.append(
                    {
                        "role": "system",
                        "content": executor.get_result_interpreter_instructions(
                            user_language=self.user_language
                        ),
                    }
                )

                interpretation_request = {
                    "model": "gpt-4o-mini",
                    "messages": conversation_history,
                }

                # Return the interpreted executor result stream
                interpreted_stream = (
                    self.openai_connector.client.chat.completions.create(
                        model="gpt-4o-mini",
                        messages=interpretation_request["messages"],
                        stream=True,
                    )
                )
                return interpreted_stream

        else:
            # Normal content stream
            self.logger.info("Returning normal content stream.")
            return splitter.get()

    def handle_function_call(
        self, function_name: str, arguments: Dict[str, Any]
    ) -> str:
        """
        Executes the corresponding function based on the function name provided by GPT.

        Args:
            function_name (str): The name of the function to be executed.
            arguments (Dict[str, Any]): The arguments provided by GPT for the function execution.

        Returns:
            str: The result of the function execution or an error message if no executor is found.
        """
        print(
            Fore.MAGENTA + Style.BRIGHT + f"Function call: {function_name} with "
            f"arguments: {arguments}" + Style.RESET_ALL
        )

        self.logger.info(
            f"Handling function call: {function_name} with arguments: {arguments}"
        )

        for executor in self.executors:
            if executor.get_executor_definition()["function"]["name"] == function_name:
                return executor.exec(arguments)

        self.logger.error(f"Function {function_name} not found.")
        return f"Function {function_name} not found."

Questions for the Community

  1. Handling Chunked Data with Parallel Streams: Has anyone found an effective way to reliably gather the function name and arguments when using parallel_tool_calls=True? With data potentially interwoven from different calls, achieving a consistent collection process is challenging.
  2. Stream Handling Alternatives: Is there a better approach that avoids streaming tool call results entirely? For instance, does it make sense to return tool calls as complete data instead of streaming them when parallel_tool_calls=True is enabled?
  3. General Tips for Tool Calls: Are there any recommendations for ensuring that tool calls are collected and processed accurately during streaming?

Any advice or experiences you can share would be incredibly helpful!

1 Like

There really is no concern with “data potentially interwoven”.

Each tool call has its own ID, and can be executed independently and in parallel.

The same for returning. You must return the tool results with respective ID so they can be matched back up. You do not “stream” to return output; you make another API call. However you again can set stream:true in that API call with the function results, so the AI can produce a low latency output to the user.

The AI should not be employing any dependent tool calls in one multi tool use output, and they seem to be all of the same function when they are produced. Imagine the AI requesting two cities’ weather from an API function that otherwise accepts only one.

You can gather the tool call object from the stream, and what you receive is exactly the same as would be obtained from a non-stream generation when reassembled. No action needs to be taken until the stream is finished and the final JSON collected from either tool_calls or function_call can be parsed.

You only need a bit deeper inspection of the tool_call object from chunk (deeper than simply joining language together of “content”), as redundant container is transmitted in each delta. Thus, yes, there are those that have made effective use.


There is a parameter to disable parallel function call and its special tool wrapper presented for the AI’s use. Enabling can reduce cost by removing iterations of sending functions, but the tokens of tool description itself costs you, even if never used. Frankly, it was written a year ago with higher quality AI in mind than OpenAI presents currently as their flagship cheapest GPT-4 ever, resulting in many forum topics opened about complications from AI misuse and misunderstanding of this internal tool.