Streaming with recursive function / tools calling

Hello,

Is function call incompatible with streaming? I have an example without streaming and it works well. When I set stream:true , then I am not getting the functions calling works. I am trying to build a drive thru app and do recursive calls based on order single item or multiple items. Can someone help me how to do recursive calling with streaming?

import OpenAI from "openai";
import { OpenAIStream, StreamingTextResponse } from "ai";
import { functions } from './functions';
import type { ChatCompletionCreateParams } from 'openai/resources/chat';



// Optional, but recommended: run on the edge runtime.
// See https://vercel.com/docs/concepts/functions/edge-functions
export const runtime = "edge";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY!,
});



// Example tool definitions
const tools = {
  'order-single-item': (args) => {
    // Logic to handle a single item order
    console.log('Ordering Single Item:', args);
    // Implement the ordering logic here
  },
  'order-menu-item': (args) => {
    // Logic to handle a menu item order
    console.log('Ordering Menu Item:', args);
    // Implement the ordering logic here
  },
  'order-drink-item': (args) => {
    // Logic to handle a drink item order
    console.log('Ordering Drink Item:', args);
    // Implement the ordering logic here
  },
};

export async function POST(req: Request) {
  // Extract the `messages` from the body of the request
  const { messages } = await req.json();
  const start = Date.now();
  
  // Request the OpenAI API for the response based on the prompt
  try {
    const response = await openai.chat.completions.create({
      model: "gpt-4",
      stream: true,
      messages: messages,
      //tools: tools,
      //tool_choice:"auto",
    });
    //console.log("messages ====>", messages);
    //console.log("response ===>", response);

    const stream = OpenAIStream(response);
    let toolInvocation = {
      name: null,
      arguments: "",
    };

    

    // Example pseudo-logic for tool invocation based on accumulated data
    if (toolInvocation.name && typeof tools[toolInvocation.name] === "function") {
      try {
        // Assuming toolInvocation.arguments is a stringified JSON, parse it
        // If it's not in JSON format, adjust this part accordingly
        const args = JSON.parse(toolInvocation.arguments);
        tools[toolInvocation.name](args); // Invoke the tool with parsed arguments
      } catch (error) {
        console.error("Error invoking tool or parsing arguments:", error);
        // Handle errors appropriately
      }
    }
    
    const responsestream  = new StreamingTextResponse(stream, {
      headers: {
        "X-LLM-Start": `${start}`,
        "X-LLM-Response": `${Date.now()}`,
      },
    });
    //console.log("resstream ===>",responsestream);

    return responsestream;
   
  } catch (error) {
    console.error("Error: ", error);
    return new Response(JSON.stringify({ error: error.message }), {
      status: 500,
      headers: { 'Content-Type': 'application/json' },
    });
  }
}



1 Like

I believe Function Call is not supported by Streaming.

Function calling with chat completion and streaming is supported. Here is an example code how to do it in OpenAI SDK for Python, unfortunately I do not know if there are examples in JS.
azureai-assistant-tool/sdk/azure-ai-assistant/azure/ai/assistant/management/chat_assistant_client.py at main · Azure-Samples/azureai-assistant-tool (github.com)

@jhakulin I think vercel has managed to do something - ai/examples/next-openai/app/api/chat-with-tools/route.ts at main · vercel/ai (github.com)

2 Likes

This is working , I have one issue though. In the response Instead of getting friendly text i am getting the streamed response. pls find the attached images from the console vs UI. My code is here

 const response = await openai.chat.completions.create({
      model: "gpt-3.5-turbo-0613",
      stream: true,
      messages,
      tools,
      tool_choice:"auto",
    });
    const data = new experimental_StreamData();
    const stream = OpenAIStream(response, {
      experimental_onToolCall: async (
        call: ToolCallPayload,
        appendToolCallMessage,
      ) => {
        for (const toolCall of call.tools) {
          console.log(" tool call ===>" , toolCall);
          // Note: this is a very simple example of a tool call handler
          // that only supports a single tool call function.
          if (toolCall.func.name === 'order-single-item') {
            const functionArguments: string =toolCall.func.arguments;
            const parsedArguments = JSON.parse(functionArguments);
            let functionResult;
            functionResult = `Added to the order: ${JSON.stringify(parsedArguments)}`;
            // Call a weather API here
            
  
            const newMessages = appendToolCallMessage({
              tool_call_id: toolCall.id,
              function_name: 'order-single-item',
              tool_call_result: functionResult,
            });
  
            return openai.chat.completions.create({
              messages: [...messages, ...newMessages],
              model,
              stream: true,
              tools,
              tool_choice: 'auto',
            });
          }
        }
      },
      onCompletion(completion) {

        console.log('completion', completion);
        
      },
      onFinal(completion) {
        data.close();
      },
      experimental_streamData: true,
    });
  
    data.append({
      text: 'Hello, how are you?',
    });
    
   
    const responsestream  = new StreamingTextResponse(stream, {
      headers: {
        "X-LLM-Start": `${start}`,
        "X-LLM-Response": `${Date.now()}`,
      }
    });
    //console.log("resstream ===>",responsestream);

    return responsestream;
   


Oh @parunkumar82, I managed to fix it!

You have to remove the data thing completely. Turn off the experimental_streamData: true and remove all references to data. We actually don’t need it if you realise.

Also btw, do you know if this enables looped function calling? I think probably not (based on my testing so far), but any idea how to implement a loop?

Hope this helps!

1 Like

Thank you @raivat1 , this works. If I understand looped function , I guess in my case you are talking about order multiple items right , if Yes thats my next testing and will keep you posted on the results. Pls correct me if my understanding is wrong.

1 Like

I’m glad it helped! Yes, your understanding is correct! I want to check if this current code supports looped function calling (Ie calling the same function multiple times, incase one of the tries don’t work or there’s need to call it multiple times). Thanks for trying it out :slight_smile:

1 Like

@raivat1 I used recursive function to call multiple times , hope this helps. However when there is multiple items in the order I am successful only with calling order-single-item multiple times but I couldnt call order-menu function .

export async function POST(req: Request) {
  // Extract the `messages` from the body of the request
  const { messages } = await req.json();
  const start = Date.now();
  const model = 'gpt-3.5-turbo-0613';

  async function handleToolCalls(messages, model) {
    try {
      const response = await openai.chat.completions.create({
        model,
        stream: true,
        messages,
        tools,
        tool_choice: "auto",
      });

      let newMessages = [];

      // Function to process each tool call and append the result to `newMessages`
      const processToolCall = async (toolCall) => {
        const functionName = toolCall.func.name;
        if (functionName === 'order-single-item' || 
            functionName === 'order-menu' || 
            functionName === 'order-drink-item') {
          const functionArguments = JSON.parse(toolCall.func.arguments);
          const functionResult = `Added to the order: ${JSON.stringify(functionArguments)}`;

          newMessages.push({
            role: 'system',
            content: functionResult,
          });

          return {
            tool_call_id: toolCall.id,
            function_name: functionName,
            tool_call_result: functionResult,
          };
        }
      };

      const stream = OpenAIStream(response, {
        experimental_onToolCall: async (call, appendToolCallMessage) => {
          for (const toolCall of call.tools) {
            await processToolCall(toolCall);
          }
        },
        onCompletion(completion) {
          console.log('completion', completion);
        },
        onFinal(completion) {
          // Handle final completion if needed
        },
      });

      if (newMessages.length > 0) {
        // If there were tool calls, recursively handle additional tool calls with the new messages
        return handleToolCalls([...messages, ...newMessages], model);
      } else {
        // If no tool calls, return the response as is
        return stream;
      }
    } catch (error) {
      console.error("Error: ", error);
      throw new Error(error.message);
    }
  }

  // Initial call to handle tool calls with the initial set of messages
  try {
    const responseStream = await handleToolCalls(messages, model);

    return new StreamingTextResponse(responseStream, {
      headers: {
        "X-LLM-Start": `${start}`,
        "X-LLM-Response": `${Date.now()}`,
      }
    });
  } catch (error) {
    return new Response(JSON.stringify({ error: error.message }), {
      status: 500,
      headers: { 'Content-Type': 'application/json' },
    });
  }
}

Hello,

I am also seeing un-friendly text, here is an example:

0:" was"
0:" the"
0:" grandfather"
0:" of"

What can be done to fix this? I have removed all references to data and DataStream.

It seems you are almost there!

When streaming a response. You just need to display the content of those chunks as they are received into the user interface, appending content to what was previously received in the stream.

When added interactively to the chat display, then the user sees the formation of the response before their eyes.

You’ll also need a handler that can re-assemble the contents of tool calls and function calls, which are also received in chunks that are placed within repeating containers.

I actually used the following function to process the chunks, and its working nicely, although in the past using StreamingTextResponse would return each token nicely:

function transformText(originalText: string): string {
    // Regular expression to match the pattern 0:"<content>"
    const linePattern = /0:"([^"]*)"/g
    let match
    const tokens = []

    // Using regex to match and capture each line token
    while ((match = linePattern.exec(originalText)) !== null) {
      // Replace escaped newline characters with actual newlines
      tokens.push(match[1].replace(/\\n/g, "\n"))
    }

    // Join all tokens to form the final text
    const finalText = tokens.join("")
    return finalText
  }
1 Like

I wrote a python script, hope it helps.

"""
This example shows how to use the streaming feature with multiple function calls.
"""

import json
import os
import sys
import time
from collections.abc import Generator
from typing import Any, Dict, List
from openai import OpenAI


client = OpenAI(api_key="YOUR_API_KEY")


tools = [
    {
        "type": "function",
        "function": {
            "name": "count_string",
            "description": "Counts the number of characters in a string.",
            "parameters": {
                "type": "object",
                "properties": {
                    "string_to_count": {
                        "type": "string",
                        "description": "The string whose characters you want to count.",
                    }
                },
                "required": ["string_to_count"],
                "additionalProperties": False,
            },
            "strict": True,
        },
    },
    {
        "type": "function",
        "function": {
            "name": "count_character_in_string",
            "description": "Counts the number of a specific character in a string.",
            "parameters": {
                "type": "object",
                "properties": {
                    "string_to_count": {
                        "type": "string",
                        "description": "The string to count the character in.",
                    },
                    "character_to_count": {
                        "type": "string",
                        "description": "The character you want to count.",
                    },
                },
                "required": ["string_to_count", "character_to_count"],
                "additionalProperties": False,
            },
            "strict": True,
        },
    }
]

FUNCTIONS = {}
for tool in tools:
    FUNCTIONS[tool["function"]["name"]] = tool["function"]


def count_string(string_to_count: str) -> str:
    """Counts the number of characters in a string."""
    print("count string called, string_to_count:", string_to_count)
    return str(len(string_to_count))


def count_character_in_string(string_to_count: str, character_to_count: str) -> str:
    """Counts the number of a specific character in a string."""
    print("count character in string called, string_to_count:", string_to_count, "character_to_count:", character_to_count)
    return str(string_to_count.count(character_to_count))


def call_function(function_name: str, function_arguments: str) -> str:
    """Calls a function and returns the result."""

    # Ensure the function is defined
    if function_name not in FUNCTIONS:
        return "Function not defined."

    # Convert the function arguments from a string to a dict
    function_arguments_dict = json.loads(function_arguments)

    # Ensure the function arguments are valid
    function_parameters = FUNCTIONS[function_name]["parameters"]["properties"]
    for argument in function_arguments_dict:
        if argument not in function_parameters:
            return f"{argument} not defined."

    # Call the function and return the result
    return globals()[function_name](**function_arguments_dict)


def get_response(messages: List[Dict[str, Any]]) -> Generator[str, None, None]:
    """Gets the response from OpenAI, updates the messages array, yields
    content, and calls functions as needed."""
    
    print(messages)
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=tools,
            tool_choice="auto",
            stream=True,
        )
    except Exception as e:
        yield f"Sorry, there was an error: {e}"
        return

    # Define variables to hold the streaming content and function call
    streaming_content = ""
    tool_calls = []
    for chunk in response:
        if not chunk.choices:
            messages.append(
                {
                    "role": "assistant",
                    "content": "Sorry, there was an error. Please try again.",
                }
            )
            yield "Sorry, there was an error. Please try again."
            break

        if chunk.choices[0].delta.tool_calls:
            for tcchunk in chunk.choices[0].delta.tool_calls:
                index = tcchunk.index
                if len(tool_calls) <= index:
                    tool_calls.append(
                        {
                            "id": "",
                            "type": "function",
                            "function": {"name": "", "arguments": ""},
                        }
                    )
                tc = tool_calls[index]
                if tcchunk.id:
                    tc["id"] += tcchunk.id
                if tcchunk.function.name:
                    tc["function"]["name"] += tcchunk.function.name
                if tcchunk.function.arguments:
                    tc["function"]["arguments"] += tcchunk.function.arguments
        elif chunk.choices[0].delta.content:
            streaming_content += chunk.choices[0].delta.content
            yield chunk.choices[0].delta.content

        if chunk.choices[0].finish_reason == "stop":
            messages.append(
                {
                    "role": "assistant",
                    "content": streaming_content,
                }
            )
        elif chunk.choices[0].finish_reason == "tool_calls":
            # print(tool_calls)
            messages.append(
                {
                    "tool_calls": tool_calls,
                    "role": "assistant",
                }
            )
            for tc in tool_calls:
                name = tc["function"]["name"]
                arguments = tc["function"]["arguments"]
                function_output = call_function(name, arguments)
                messages.append(
                    {
                        "role": "tool",
                        "tool_call_id": tc["id"],
                        "content": function_output,
                    }
                )
            yield from get_response(messages)


if __name__ == "__main__":
    while True:
        messages = [
            {
                "role": "system",
                "content": "You are demonstrating streaming with function calls.",
            }
        ]
        # what is the length of "strawberry"
        # how many 'r' in "strawberry"
        # how many a's in "banana" and what is the length of it?
        user_content = input("You: ")
        messages.append({"role": "user", "content": user_content})
        sys.stdout.write("Assistant: ")
        for content in get_response(messages):
            sys.stdout.write(content)
            sys.stdout.flush()
            time.sleep(0.1)
        sys.stdout.write("\n")

Nice little demo.

I saw the first issue, and stimulated it:

Which is that there are indeed chunks that don’t have “choices”.

        stream=True,
        stream_options={"include_usage": True}, 

And now the AI would read all the news about your error instead.