Introduction
I’m working with the OpenAI API’s tool calling feature and facing challenges when handling tool calls in streaming mode with parallel_tool_calls=True
. My goal is to efficiently capture the entire tool call (function name and arguments) in one piece. Currently, I can collect function names and arguments successfully in streaming mode. However, enabling parallel_tool_calls=True
introduces complexities that make it nearly impossible to gather everything consistently due to overlapping streams.
I’m considering whether it might be beneficial to handle tool calls differently by not streaming responses when a tool_call
is detected, but rather returning them as complete data objects instead. This might offer a more reliable way to capture calls accurately without piecing together information from multiple stream chunks.
Technical Challenges
Here are the specific challenges I’m encountering:
- Chunk-based Transmission: With the function name and arguments arriving in separate chunks, collecting all parts of a tool call becomes challenging. Though I can capture function names and arguments successfully,
parallel_tool_calls=True
adds complexity by potentially interweaving data, making consistent gathering difficult. - Consideration for Non-Streamed Tool Call Results: Given the complexity, I’m exploring whether receiving tool calls as complete data objects instead of streams would be a feasible solution, especially when
parallel_tool_calls=True
is set. This could reduce the need to reassemble parts from multiple chunks and improve reliability.
def ask_chat_gpt(
self, user_input: str, conversation_history: List[Dict[str, str]]
) -> Any:
"""
Sends user input to the OpenAI ChatGPT model and processes the streaming response.
Args:
user_input (str): The user's input message.
conversation_history (List[Dict[str, str]]): The conversation history to
maintain context.
Returns:
Any: A streaming response from ChatGPT, which can either be normal text or
a function call result.
"""
self.logger.info(f"Sending user input to GPT: {user_input}")
conversation_history.append({"role": "user", "content": user_input})
# Stream GPT response
stream = self.openai_connector.client.chat.completions.create(
model="gpt-4o-mini",
messages=conversation_history,
stream=True,
parallel_tool_calls=False,
tools=[
executor.get_executor_definition() for executor in self.executors
],
)
# Split the stream for inspection
splitter = StreamSplitter(stream)
splitter.start()
# Initialize variables for function call handling
function_call_name = None
function_call_arguments = ""
first_chunk = next(splitter.get())
choice = first_chunk.choices[0].delta
# Check if it's a function call
if hasattr(choice, "tool_calls") and choice.tool_calls is not None:
self.logger.info(f"Function call detected: {choice.tool_calls[0].function.name}")
for chunk in splitter.get():
choice = chunk.choices[0].delta
# Get the function call name from the first chunk
if (
hasattr(choice, "tool_calls")
and choice.tool_calls is not None
and choice.tool_calls[0].function is not None
):
if function_call_name is None:
function_call_name = (
choice.tool_calls[0].function.name
) # Store the function name
if choice.tool_calls[0].function.arguments:
# Collect arguments
function_call_arguments += choice.tool_calls[0].function.arguments
# Process the function call if detected
if function_call_name:
self.logger.info(
f"Executing function: {function_call_name} with "
f"arguments: {function_call_arguments}"
)
arguments = json.loads(function_call_arguments)
result = self.handle_function_call(function_call_name, arguments)
# Fetch the appropriate executor
executor = next(
(
e
for e in self.executors
if e.get_executor_definition()["function"]["name"] == function_call_name
),
None,
)
if not executor:
self.logger.error(
f"No executor found for function: {function_call_name}"
)
raise Exception(
f"No Executor found for function: {function_call_name}"
)
# Create the interpretation request for GPT
conversation_history.append({"role": "system", "content": result})
# Maybe too much....
conversation_history.append(
{
"role": "system",
"content": executor.get_result_interpreter_instructions(
user_language=self.user_language
),
}
)
interpretation_request = {
"model": "gpt-4o-mini",
"messages": conversation_history,
}
# Return the interpreted executor result stream
interpreted_stream = (
self.openai_connector.client.chat.completions.create(
model="gpt-4o-mini",
messages=interpretation_request["messages"],
stream=True,
)
)
return interpreted_stream
else:
# Normal content stream
self.logger.info("Returning normal content stream.")
return splitter.get()
def handle_function_call(
self, function_name: str, arguments: Dict[str, Any]
) -> str:
"""
Executes the corresponding function based on the function name provided by GPT.
Args:
function_name (str): The name of the function to be executed.
arguments (Dict[str, Any]): The arguments provided by GPT for the function execution.
Returns:
str: The result of the function execution or an error message if no executor is found.
"""
print(
Fore.MAGENTA + Style.BRIGHT + f"Function call: {function_name} with "
f"arguments: {arguments}" + Style.RESET_ALL
)
self.logger.info(
f"Handling function call: {function_name} with arguments: {arguments}"
)
for executor in self.executors:
if executor.get_executor_definition()["function"]["name"] == function_name:
return executor.exec(arguments)
self.logger.error(f"Function {function_name} not found.")
return f"Function {function_name} not found."
Questions for the Community
- Handling Chunked Data with Parallel Streams: Has anyone found an effective way to reliably gather the function name and arguments when using
parallel_tool_calls=True
? With data potentially interwoven from different calls, achieving a consistent collection process is challenging. - Stream Handling Alternatives: Is there a better approach that avoids streaming tool call results entirely? For instance, does it make sense to return tool calls as complete data instead of streaming them when
parallel_tool_calls=True
is enabled? - General Tips for Tool Calls: Are there any recommendations for ensuring that tool calls are collected and processed accurately during streaming?
Any advice or experiences you can share would be incredibly helpful!