Help for function calls with streaming

I am unbelievably lost, I’m using a combination of so many different posts I’ve seen for this and cannot for the life of me figure out how to get function calling to work with streaming, so far this is what I’ve accomplished:

Streaming with no function calls

Streaming completions that always have function calls

Back to back function calls without streaming

However, I am still lost for how to stream a completion, with and without a function call, and have the entire thing in a loop until there is no more tool calls.

Basically I am trying to get the effect in ChatGPT where it streams the result, and can act upon calling functions as much as it wants with no limit.

Does anybody have an example that I can use that doesn’t have hardcoded function parameters in the tool call section?

This is the code I have currently, it will always return that there is no tool call no matter what, im way past my expertise and ChatGPT can only help so far, here’s the code:

from openai import OpenAI
import openai
import json
import math

from apikey import api_key

openai.api_key = api_key
client = OpenAI(api_key=api_key)

def perform_math(operations, operands_sets):
    print("math function is running")

    if not isinstance(operations, list) or not isinstance(operands_sets, list):
        return json.dumps({"content": "Error: Both operations and operands_sets should be lists."})

    if len(operations) != len(operands_sets):
        return json.dumps({"content": "Error: Mismatch between number of operations and number of operand sets."})

    responses = []

    for operation, operands in zip(operations, operands_sets):
        if not operands or not all(isinstance(op, (int, float)) for op in operands):
            responses.append("Error: Invalid operands provided.")
            continue

        try:
            if operation == "add":
                result = sum(operands)
            elif operation == "subtract":
                result = operands[0] - sum(operands[1:])
            elif operation == "multiply":
                result = math.prod(operands)
            elif operation == "divide":
                result = operands[0]
                for op in operands[1:]:
                    result /= op
            elif operation == "power":
                result = math.pow(operands[0], operands[1])
            elif operation == "square_root":
                if operands[0] < 0:
                    raise ValueError("Cannot take the square root of a negative number.")
                result = math.sqrt(operands[0])
            else:
                raise ValueError("Invalid operation specified.")
        except (ArithmeticError, ValueError) as e:
            responses.append(f"Error in {operation}: {str(e)}")
            continue

        responses.append(f"{operation.capitalize()} result is {result}.")

    final_response = " ".join(responses)
    return json.dumps({"content": final_response})

def run_conversation():
    messages = [{"role": "user", "content": "What is 90^1.2 and who invented the lightbulb?"}]
    tools = [
    {
        "type": "function",
        "function": {
            "name": "perform_math",
            "description": "Perform multiple math operations. Specify the operations and the sets of numbers to perform them on.",
            "parameters": {
                "type": "object",
                "properties": {
                    "operations": {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "enum": ["add", "subtract", "multiply", "divide", "power", "square_root"]
                        },
                        "description": "The list of math operations to perform"
                    },
                    "operands_sets": {
                        "type": "array",
                        "items": {
                            "type": "array",
                            "items": {
                                "type": "number"
                            }
                        },
                        "description": "The list of number sets to perform the operations on. Use decimals and whole numbers only."
                    }
                },
                "required": ["operations", "operands_sets"]
            }
        }
    }
    ]
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo-0125",
        messages=messages,
        tools=tools,
        tool_choice="auto",  # auto is default, but we'll be explicit
        stream=True,
) 

    available_functions = {
    "perform_math": perform_math,
}  # only one function in this example, but you can have multiple

    response_text = ""
    tool_calls = []

    for chunk in stream:
        delta = chunk.choices[0].delta

    if delta and delta.content:
        # content chunk -- send to browser and record for later saving
        print(delta.content)
        response_text += delta.content

    elif delta and delta.tool_calls:
        tcchunklist = delta.tool_calls
        for tcchunk in tcchunklist:
            if len(tool_calls) <= tcchunk.index:
                tool_calls.append({"id": "", "type": "function", "function": { "name": "", "arguments": "" }})
            tc = tool_calls[tcchunk.index]

            if tcchunk.id:
                tc["id"] += tcchunk.id
            if tcchunk.function.name:
                tc["function"]["name"] += tcchunk.function.name
            if tcchunk.function.arguments:
                tc["function"]["arguments"] += tcchunk.function.arguments

# Process tool calls if any
    if tool_calls:
        for tool_call in tool_calls:
        # Extract function name for each tool call
            function_name = tool_call['function']['name']
        
        # Check if the function exists in the available functions; skip if not found
        if function_name in available_functions:
            function_to_call = available_functions[function_name]
            function_args = json.loads(tool_call['function']['arguments'])
            
            # Attempt to call the function with the provided arguments
            try:
                function_response = function_to_call(**function_args)
                # Construct the response for the tool call
                tool_response = {
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": function_response
                }
                # Append the response to messages
                messages.append(tool_response)
            except TypeError as e:
                print(f"Error calling function {function_name} with args {function_args}: {e}")
    else:
        print("No tool calls to process.")
    
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo-0125",
        messages=messages,
        stream=True,
    ) 

    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)
      

run_conversation()
1 Like

I would look at how the chunks are actually coming in a tool call response, and immediately branch to a path that simply assembles the rest of the AI response into a list of tool strings when you get a tool_call. We don’t have to actually “branch”, but just gather something different, silently, for those chunks.

display raw chunks

c = client.chat.completions.with_raw_response.create(**params)

Leaving this in pydantic model format:

next(c.parse())
ChatCompletionChunk(id=‘chatcmpl-8xxx’, choices=[Choice(delta=ChoiceDelta(content=None, function_call=None, role=‘assistant’, tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1707962400, model=‘gpt-4-0125-preview’, object=‘chat.completion.chunk’, system_fingerprint=‘fp_f084bcfc79’)

A typical “no use” or “we blocked you seeing what the AI actually produced” chunk as the first is what we see above.

Then, next, we get to – tool_calls with a name. That is the point where we can now exit any playing of content chunks and start the silent collection of the remainder of the response as a tool_call object.

next(c.parse())
ChatCompletionChunk(id=‘chatcmpl-8xxx’, choices=[Choice(delta=ChoiceDelta(content=None, function_call=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=‘call_TqhIpvAGcZOLG81yIhu59Kj0’, function=ChoiceDeltaToolCallFunction(arguments=‘’, name=‘get_random_float’), type=‘function’)]), finish_reason=None, index=0, logprobs=None)], created=1707962400, model=‘gpt-4-0125-preview’, object=‘chat.completion.chunk’, system_fingerprint=‘fp_f084bcfc79’)

Then arguments contents of that particular function to assemble (the start of “random…”).

next(c.parse())
ChatCompletionChunk(id=‘chatcmpl-8xxx’, choices=[Choice(delta=ChoiceDelta(content=None, function_call=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments=‘{"ra’, name=None), type=None)]), finish_reason=None, index=0, logprobs=None)], created=1707962400, model=‘gpt-4-0125-preview’, object=‘chat.completion.chunk’, system_fingerprint=‘fp_f084bcfc79’)

I haven’t seen them yet in “tools” for what I’ve written to the API, but you should write code expecting any “content” chunks until finish reason need to be printed.

Implement

Let’s write some neat code:

c = client.chat.completions.with_raw_response.create(**params)
reply=""
tools=[]
for chunk in c.parse():
    print(chunk.choices[0].delta)
    if chunk.choices[0].delta.content:
        reply += chunk.choices[0].delta.content        # gather for chat history
        print(chunk.choices[0].delta.content, end="")  # your output method
    if chunk.choices[0].delta.tool_calls:
        tools += chunk.choices[0].delta.tool_calls     # gather ChoiceDeltaToolCall list chunks
tools_obj = tool_list_to_tool_obj(tools)
print(reply)
print(tools_obj)

It’s only neat because I gather messy tool deltas and turn them back into a typical non-stream object with a function:

from collections import defaultdict

def tool_list_to_tool_obj(tools):
    # Initialize a dictionary with default values
    tool_calls_dict = defaultdict(lambda: {"id": None, "function": {"arguments": "", "name": None}, "type": None})

    # Iterate over the tool calls
    for tool_call in tools:
        # If the id is not None, set it
        if tool_call.id is not None:
            tool_calls_dict[tool_call.index]["id"] = tool_call.id

        # If the function name is not None, set it
        if tool_call.function.name is not None:
            tool_calls_dict[tool_call.index]["function"]["name"] = tool_call.function.name

        # Append the arguments
        tool_calls_dict[tool_call.index]["function"]["arguments"] += tool_call.function.arguments

        # If the type is not None, set it
        if tool_call.type is not None:
            tool_calls_dict[tool_call.index]["type"] = tool_call.type

    # Convert the dictionary to a list
    tool_calls_list = list(tool_calls_dict.values())

    # Return the result
    return {"tool_calls": tool_calls_list}

Output of running the code with some tools and messages as parameters:

{‘tool_calls’: [{‘id’: ‘call_44LLGv0lFEZeFAnRahnQx4H8’, ‘function’: {‘arguments’: ‘{“range_start”: 0, “range_end”: 66}’, ‘name’: ‘get_random_float’}, ‘type’: ‘function’}, {‘id’: ‘call_vjhf3oHMYbK0FyZYUpyNWimR’, ‘function’: {‘arguments’: ‘{“range_start”: 1, “range_end”: 33}’, ‘name’: ‘get_random_int’}, ‘type’: ‘function’}]}

2 Likes

Okay thanks for the help, but I do not understand how any of this works, in all the tests and examples I’ve tried, we do something like this (without streaming, and a function from my working app, try to ignore everything app specific.):

def ask(question):
    print("User:", question)
    print(" ")
    global conversation_history
    print("[Processing request...]")
    if not question:
        return "Sorry, I heard you but I couldn't make out any words, either talk louder or move to a quieter space."

    if conversation_history and conversation_history[0]['role'] == 'system':
        conversation_history[0]['content'] = system_prompt
    elif not conversation_history:
        conversation_history.append({"role": "system", "content": system_prompt})

    messages = conversation_history
    messages.append({"role": "user", "content": question})
    print("Messages before API call:")
    print(json.dumps(messages, indent=4))
    
    timeout_timer = threading.Timer(7.0, display_timeout_message)
    timeout_timer.start()
        
    tools = [
    {
        "type": "function",
        "function": {
            "name": "search_and_play_song",
            "description": "Search for a song on Spotify using a given name and play it. The song name can vary from the exact user input.",
            "parameters": {
                "type": "object",
                "properties": {
                    "song_name": {
                        "type": "string",
                        "description": "The name of the song to search for"
                    }
                },
                "required": ["song_name"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_current_datetime",
            "description": "Retrieve the current date and/or time. Options: date, time, or both.",
            "parameters": {
                "type": "object",
                "properties": {
                    "mode": {
                        "type": "string",
                        "enum": ["date", "time", "date & time"],
                        "description": "Choose whether to get date, time, or both"
                    }
                },
                "required": ["mode"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "perform_math",
            "description": "Perform multiple math operations. Specify the operations and the sets of numbers to perform them on.",
            "parameters": {
                "type": "object",
                "properties": {
                    "operations": {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "enum": ["add", "subtract", "multiply", "divide", "power", "square_root"]
                        },
                        "description": "The list of math operations to perform"
                    },
                    "operands_sets": {
                        "type": "array",
                        "items": {
                            "type": "array",
                            "items": {
                                "type": "number"
                            }
                        },
                        "description": "The list of number sets to perform the operations on. Use decimals and whole numbers only."
                    }
                },
                "required": ["operations", "operands_sets"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "memory_manager",
            "description": "Store, retrieve, or clear data in a file. Be specific when storing data.",
            "parameters": {
                "type": "object",
                "properties": {
                    "operation": {
                        "type": "string",
                        "enum": ["store", "retrieve", "clear"],
                        "description": "Operation to perform"
                    },
                    "data": {
                        "type": "string",
                        "description": "The data to store (required for 'store' operation)"
                    }
                },
                "required": ["operation"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "Retrieve current weather and condition data for any location, defaulting to Clearwater, FL.",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "The city and state, e.g., Clearwater, FL"
                    },
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"]
                    }
                },
                "required": []
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "show_weather_message",
            "description": "Display a popup with the current weather on the user's screen.",
            "parameters": {
                "type": "object",
                "properties": {}
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "toggle_spotify_playback",
            "description": "Control Spotify playback: pause, unpause, or toggle between pause and unpause.",
            "parameters": {
                "type": "object",
                "properties": {
                    "action": {
                        "type": "string",
                        "enum": ["pause", "unpause", "toggle"],
                        "description": "Action for Spotify playback: choose 'pause', 'unpause', or 'toggle'."
                    }
                },
                "required": ["action"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "switch_ai_model",
            "description": "Switch between OpenAI API models: 'gpt-4-0125-preview' or 'gpt-3.5-turbo-0125'. GPT-4-Turbo is more advanced and costly, while GPT-3.5-Turbo is less effective but 20 times cheaper.",
            "parameters": {
                "type": "object",
                "properties": {
                    "model_name": {
                        "type": "string",
                        "description": "Name of the OpenAI AI model to switch to"
                    }
                },
                "required": ["model_name"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "set_spotify_volume",
            "description": "Set Spotify playback volume. Specify volume as a percentage (0-100).",
            "parameters": {
                "type": "object",
                "properties": {
                    "volume_percent": {
                        "type": "number",
                        "description": "Volume level 0-100"
                    }
                },
                "required": ["volume_percent"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "set_system_volume",
            "description": "Set system volume, also your speaking volume. Default to this volume unless recently asked to play a song. Volume level range: 0-100.",
            "parameters": {
                "type": "object",
                "properties": {
                    "volume_level": {
                        "type": "number",
                        "description": "Volume level"
                    }
                },
                "required": ["volume_level"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "change_system_prompt",
            "description": "Change the system prompt to 'default', 'short_cheap', or 'custom'. For 'custom', provide a first-person prompt, like 'I am a southern cowboy'.",
            "parameters": {
                "type": "object",
                "properties": {
                    "prompt_type": {
                        "type": "string",
                        "enum": ["default", "short_cheap", "custom"],
                        "description": "Type of prompt to set. Options are 'default', 'short_cheap', 'custom'."
                    },
                    "custom_prompt": {
                        "type": "string",
                        "description": "The custom prompt to use. It must be in the first person and be written like the example. Never name yourself or include a section that gives you a name."
                    }
                },
                "required": ["prompt_type"]
            }
        }
    }
]


    response = openai.chat.completions.create(
        model=current_model,
        messages=messages,
        tools=tools,
        tool_choice="auto",
    )
    
    timeout_timer.cancel()
    timeout_timer_second = threading.Timer(12.0, display_timeout_message)
    timeout_timer_second.start()

    response_message = response.choices[0].message
    tool_calls = response_message.tool_calls
    if tool_calls:
        available_functions = {
            "search_and_play_song": search_and_play_song,
            "get_current_weather": get_current_weather,
            "get_current_datetime": get_current_datetime,
            "perform_math": perform_math,
            "memory_manager": memory_manager,
            "show_weather_message": show_weather_message,
            "toggle_spotify_playback": toggle_spotify_playback,
            "switch_ai_model": switch_ai_model,
            "set_spotify_volume": set_spotify_volume,
            "set_system_volume": set_system_volume,
            "change_system_prompt": change_system_prompt
        }

        for tool_call in tool_calls:
            function_name = tool_call.function.name
            function_args = json.loads(tool_call.function.arguments)
            function_to_call = available_functions.get(function_name)

            if function_to_call:
                function_response = function_to_call(**function_args)

                messages.append({
                    "tool_call_id": tool_call.id,
                    "role": "function",
                    "name": function_name,
                    "content": function_response,
                })

    final_response = openai.chat.completions.create(
        model=current_model,
        messages=messages,
        tools=tools,
        tool_choice="none"
    )
    
    timeout_timer_second.cancel()

    print(f"{response.json()}")
    final_response_message = final_response.choices[0].message.content
    conversation_history.append({"role": "assistant", "content": final_response_message})
    return final_response_message

def reply(question):
    response_content = ask(question)
    
    print("Miles:", response_content)
    print(" ")
    speak(response_content)
    print("Listening for 'Miles'...")
    
    return response_content

Sorry if these are dumb questions, but I am extremely new to this, but where does the tools array fit into your code, where do I go about doing really anything easily?

I really need at least a rough draft that works in a similar format to my code so I can learn how it functions and implement it. But in the format you provided, I am just not sure how any of it works, or how to even run it with a tool array and a function to test it as it’s so different from my testing and examples.

I’m also looking to loop the entire thing IF that is even possible, its possible without streaming enabled, which allows the model to infinitely choose functions back to back before returning the entire full response, but I just can’t grasp the concept with streaming enabled.

Thank you lots for the help in advance, I’m new to coding in general, only been doing it for a year.

Using your code as reference

...
response = openai.chat.completions.create(
        model=current_model,
        messages=messages,
        tools=tools,
        tool_choice="auto",
        stream=true, # streaming
    )
    
    timeout_timer.cancel()
    timeout_timer_second = threading.Timer(12.0, display_timeout_message)
    timeout_timer_second.start()

    #################
    # Insert code here to read the chunks from streaming
    # If you receive values in "content", you can already send it to the client
    # If you receive tool calls, assemble the chunks until you get everything
    # When you completed the assembly, you can proceed to your code processing the tool calls just like before
    #################

    response_message = response.choices[0].message
    tool_calls = response_message.tool_calls
    if tool_calls:
        available_functions = {
            "search_and_play_song": search_and_play_song,
            "get_current_weather": get_current_weather,
            "get_current_datetime": get_current_datetime,
...

For the next API call, it should still be streaming and you do the same thing again. And this is where your loop comes in. As long as the API is spewing tool calls, you do the loop.

What I posted is full code for an API request to the openai python library to get an AI response from a model. params that are accepted by the chat.completions function you would write in python dictionary format (which looks like json key/value)

Here a linear example in another topic of tool-enabled code which builds the params input to send to the AI, which is like the chat history a chatbot loop would be building to give the AI a memory of past user input and tool calls and returns to the AI:


Here’s an explanation of the code I gave (where the function definition must come first in the py file.


The code does the following:

c = client.chat.completions.with_raw_response.create(**params)

This line makes a request to the OpenAI API. **params is used to unfurl the dictionary of parameters to the API request. They are the normal parameters like “model”=“gpt-3.5-turbo”, but the input is a dictionary, so the params dictionary looks more like a raw JSON API request, with “model”: “gpt-3.5-turbo” (note the colon).

The request is made to OpenAI’s Chat Model API endpoint at client.chat.completions. The with_raw_response.create method indicates that the response is from the httpx library within, and includes additional information like headers, and should not be parsed and should be left in its raw, JSON-like format when using certain httpx methods on that return. c.

reply=""
tools=[]
for chunk in c.parse():

This initiates a loop through the response from the API call, parsing the raw response into a more usable Python object using the parse() function which returns an iterable (generator that emits the network chunks as they are received). The response from the API comes in “chunks” to allow the processing of data in a streaming manner.

print(chunk.choices[0].delta)

Just for diagnosis so you can see more of what is being received over the network, this line prints out the first choice (where “choice” is because you can request the AI answer the same input multiple times for choices of responses using n: 2 or more, rarely used) in each chunk that the streaming API sends. Each choice has an associated ‘delta’ object which considers the changes addeds between the previous chunks and the current chunk in the stream.

if chunk.choices[0].delta.content:
    reply += chunk.choices[0].delta.content      
    print(chunk.choices[0].delta.content, end="")  

If the delta has a content field (which includes the assistant’s reply almost token-by-token), it’s added to the reply string, and is also printed out.

if chunk.choices[0].delta.tool_calls:
    tools += chunk.choices[0].delta.tool_calls    

If there are any tool_calls in the delta, (like calls to Open AI’s system-level tools), they’re added to the tools list for later processing. Each chunk has a complex collection of the parts of a function, where only the first chunk of a function has its ID, where continued chunks still have a full object and not just the additional text of a tool.

from collections import defaultdict

def tool_list_to_tool_obj(tools):
    ...

This function, where its def would appear earlier in the code, converts the list of streams objects extracted from chunks for tools calls into a single object representation. If a tool call sends an argument in one chunk and then sends more in a subsequent chunk, they are all gathered and associated under the same tool id using a defaultdict. Once all chunks have been processed, it produces a dict of tool details (not unlike what would be returned from the non-streaming OpenAI API).

Finally,

tools_obj = tool_list_to_tool_obj(tools)
print(reply)
print(tools_obj)

We use the function to make the non-streaming version of the tool call object.

The reply string and the objectified tool calls dict are both printed as a demonstration of what information has been gathered. Remember: the AI “content” to the user was already printed token-by-token to the user (and you substitute your “printing” method there within the loop to receive chunks).

The variables that were set for reply and tools_obj are now available for use in your code as before: You must use your existing parsing, now performing the functions (which can be multiple and parallel) to send each one back to the AI (see the earlier linked topic for doing this too)

In summary, the script I provided is designed to communicate with the OpenAI API, receive responses in a streaming manner, and handle chunks of data that are parts of either dialogue (in the content) or system-level tool invocations (tool_calls). The chunks are pieced together appropriately to form complete dialogue or tool invocations.

Writing a chatbot (link with even more I wrote) can be a simple loop of input. send with history, parse response, and send back the fulfilled tool return instead of asking the user a new question.

If you are still at a “I need programming lessons” stage after this, ChatGPT plus is $20/mo and can answer when you have the expertise to know what to ask.

Using code from ChatGPT, I made an answer, the other replies in this thread are kinda confusing and hard to follow as they are in a non standard format, but this is the full code that works for streaming with function calls, multi function calls, and without function calls at all:

from openai import OpenAI
import openai
import json
from apikey import api_key  # Ensure this imports your API key correctly, or if you wanna do it my way, make a file named apikey.py and put api_key="REAL_API_KEY_HERE" in it, and put that in the same folder as this file.

openai.api_key = api_key
client = OpenAI(api_key=api_key)

# Hard-coded 'fake' weather API, in a real situation, this would be a real weather api, or anything else.
def get_current_weather(location, unit="fahrenheit"):
    """Get the current weather in a given location"""
    print(f"PRINT STATEMENT: Getting weather for {location} with unit {unit}") #Clairfication that the function actually is running and the model isn't making stuff up.
    print() # empty print statement to add space and seperate from API response
    if "tokyo" in location.lower():
        return json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit})
    elif "san francisco" in location.lower():
        return json.dumps({"location": "San Francisco", "temperature": "32", "unit": unit})
    elif "paris" in location.lower():
        return json.dumps({"location": "Paris", "temperature": "22", "unit": unit})
    else:
        return json.dumps({"location": location, "temperature": "unknown"})

    
def run_conversation():
    messages = [{
        "role": "user",
        "content": "first, define unicorn in 30 words. Then find the weather in Paris"
    }]

    # We define the tool array here instead of within the API call because it's just eaiser to look at and manage, just like with messages up there ^.
    tools = [
        {
            "type": "function",
            "function": {
                "name": "get_current_weather",
                "description": "Get the current weather in a given location",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": {"type": "string", "description": "The city and state, e.g., San Francisco, CA"},
                        "unit": {"type": "string", "enum": ["celsius", "fahrenheit"], "description": "The unit of measurement for the temperature", "default": "fahrenheit"}
                    },
                    "required": ["location"]
                },
            }
        }
    ]

    stream = client.chat.completions.create(
        model="gpt-4-0125-preview", # the model you wanna use, if this doesn't work, try using "gpt-3.5-turbo-0125"
        messages=messages, # define the context, ususally this would be a thing that isn't static
        tools=tools, # the array of tools we defined up there ^
        tool_choice="auto", # pretty sure this is default, so you don't need it, but it's here just in case.
        stream=True, # enable streaming for the API
    )

    available_functions = {"get_current_weather": get_current_weather,
                        # "add_another_function_here": add_another_function_here,
                           }

    tool_call_accumulator = ""  # Accumulator for JSON fragments of tool call arguments
    tool_call_id = None  # Current tool call ID

    # This is where we print the chunks directly from the API if no function was called from the model
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True) # the extra stuff at the end makes it so it updates as fast as possible, and doesn't create new lines for each chunk it gets

        if chunk.choices[0].delta.tool_calls:
            for tc in chunk.choices[0].delta.tool_calls:
                if tc.id:  # New tool call detected here
                    tool_call_id = tc.id
                tool_call_accumulator += tc.function.arguments if tc.function.arguments else ""

                # When the accumulated JSON string seems complete then:
                try:
                    func_args = json.loads(tool_call_accumulator)
                    function_name = tc.function.name if tc.function.name else "get_current_weather"
                    # Call the corresponding function that we defined and matches what is in the available functions
                    func_response = json.dumps(available_functions[function_name](**func_args))
                    # Append the function response directly to messages
                    messages.append({
                        "tool_call_id": tool_call_id,
                        "role": "function",
                        "name": function_name,
                        "content": func_response,
                    })
                    tool_call_accumulator = ""  # Reset for the next tool call
                except json.JSONDecodeError:
                    # Incomplete JSON; continue accumulating
                    pass

    # Make a follow-up API call with the updated messages, including function call responses with tool id
    stream = client.chat.completions.create(
        model="gpt-4-0125-preview",
        messages=messages,
        stream=True,
    )

    # Prints each chunk as they come after the function is called and the result is available.
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)

run_conversation()

The only thing that this code cannot do that the ChatGPT website can do, is the feature where the entire process seems to be in a loop, what I mean by that is this example:

User: Say hi, then make an image.

ChatGPT: Hi! -instant
…making image
Here’s the image, oh wait, let me try again.
…making image

In that rough example, ChatGPT can call functions for as many times in a row as it wants, and it also returns the first answer instantly without using a function first, in my code, the model has to decide to call the function before writing anything, then the second API call handles putting it all together. I’m not sure why this is, but that is the only differing feature. This effect can be reached by putting the entire process within a loop somehow, but frankly, I’ve been trying to figure this out for 6 hours straight now, and I’m scared to touch the code, but it works now.

3 Likes

Hello, i saw that you talking about call one function in stream response. I need call some function after one request to completion. It is work with no stream and response has some functions in tool calls, but when i made request to completion with stream i always got in tool calls response only 1 function. Have you any ideas?

I was trying to figure this out myself and could not find a complete resource. It led to the OAI forums and this topic here. I implemented a full asynchronous, streaming, function calling supported chat loop.

Function calling snippet

import os
import json
import asyncio
import openai
from typing import Any, Tuple
from typing import Tuple
from dotenv import load_dotenv

“”"
Initialize the client
- Setup the client to use either Azure, OpenAI or Ollama API
- Uses the Async client to handle asynchronous requests
- Uses the environment variables
“”"
load_dotenv()
API_HOST = os.getenv(“API_HOST”)
if API_HOST == “azure”:
client = openai.AsyncAzureOpenAI(
azure_endpoint=os.getenv(“AZURE_OPENAI_ENDPOINT”),
api_key=os.getenv(“AZURE_OPENAI_API_KEY”),
api_version=os.getenv(“AZURE_OPENAI_API_VERSION”),
)
DEPLOYMENT_NAME = os.getenv(“AZURE_OPENAI_DEPLOYMENT_NAME”)
elif API_HOST == “openai”:
client = openai.AsyncOpenAI(api_key=os.getenv(“OPENAI_KEY”))
DEPLOYMENT_NAME = os.getenv(“OPENAI_MODEL”)
elif API_HOST == “ollama”:
client = openai.AsyncOpenAI(
base_url=“”,
api_key=“nokeyneeded”,
)
DEPLOYMENT_NAME = os.getenv(“OLLAMA_MODEL”)

“”"
Get the current weather
- This function is hard coded weather values
- In production, this could be from your backend data or external API
“”"
def get_current_weather(location, unit=“fahrenheit”):
“”“Get the current weather in a given location”“”
if “tokyo” in location.lower():
return json.dumps({“location”: “Tokyo”, “temperature”: “10”, “unit”: unit})
elif “san francisco” in location.lower():
return json.dumps(
{“location”: “San Francisco”, “temperature”: “72”, “unit”: unit}
)
elif “paris” in location.lower():
return json.dumps({“location”: “Paris”, “temperature”: “22”, “unit”: unit})
else:
return json.dumps({“location”: location, “temperature”: “unknown”})

“”"
Initialize messages
- Returns the initial messages to start the conversation
- In this case, it’s a single message to introduce the assistant
“”"
def init_messages():
return [
{
“role”: “system”,
“content”: “”"
You are a helpful assistant.
You have access to a function that can get the current weather in a given location.
Determine a reasonable Unit of Measurement (Celsius or Fahrenheit) for the temperature based on the location.
“”"
}
]

“”"
Get tools
- Returns the tools available to the model.
- In this case, it’s a single function to get the current weather
“”"
def get_tools():
return [
{
“type”: “function”,
“function”: {
“name”: “get_current_weather”,
“description”: “”"
Get the current weather in a given location.
Note: any US cities have temperatures in Fahrenheit
“”",
“parameters”: {
“type”: “object”,
“properties”: {
“location”: {
“type”: “string”,
“description”: “The city and state, e.g. San Francisco, CA”,
},
“unit”: {
“type”: “string”,
“description”: “Unit of Measurement (Celsius or Fahrenheit) for the temperature based on the location”,
“enum”: [“celsius”, “fahrenheit”]
},
},
“required”: [“location”],
},
},
}
]

“”"
Get available functions
- This function returns a dictionary of available functions
“”"
def get_available_functions():
return { “get_current_weather”: get_current_weather }

“”"
Get user input
- Handle ‘exit’ command and exceptions
“”"
def get_user_input() → str:
try:
user_input = input(“User:> “)
except KeyboardInterrupt:
print(”\n\nExiting chat…”)
return “”
except EOFError:
print(“\n\nExiting chat…”)
return “”

# Handle exit command
if user_input == "exit":
    print("\n\nExiting chat...")
    return ""

return user_input

“”"
Send the chat request to the model
- Handle asynchronous responses
- Handle streaming responses
- Handle tool calls
“”"
async def send_chat_request(messages):

# Step 1: send the conversation and available functions to the model
stream_response1 = await client.chat.completions.create(
    model=DEPLOYMENT_NAME,
    messages=messages,
    tools=get_tools(),
    tool_choice="auto",
    temperature=0.1,
    top_p=0.95,
    max_tokens=4096,
    stream=True
)

# Convert the stream response to a list
stream_response1_list = [item async for item in stream_response1]

tool_calls = [] # Accumulator for tool calls to process later; 
full_delta_content = "" # Accumulator for delta content to process later

# Process the stream response for tool calls and delta content
for chunk in stream_response1_list:
    delta = chunk.choices[0].delta if chunk.choices and chunk.choices[0].delta is not None else None

    if delta and delta.content:
        full_delta_content += delta.content
        
    elif delta and delta.tool_calls:
        tc_chunk_list = delta.tool_calls
        for tc_chunk in tc_chunk_list:
            if len(tool_calls) <= tc_chunk.index:
                tool_calls.append({"id": "", "type": "function", "function": {"name": "", "arguments": ""}})
            tc = tool_calls[tc_chunk.index]

            if tc_chunk.id:
                tc["id"] += tc_chunk.id
            if tc_chunk.function.name:
                tc["function"]["name"] += tc_chunk.function.name
            if tc_chunk.function.arguments:
                tc["function"]["arguments"] += tc_chunk.function.arguments

# Step 2: check if the model wanted to call a function
if not tool_calls and full_delta_content:
    messages.append({ "role": "assistant", "content": full_delta_content })

    # Convert the list to a stream to return as a response
    async def list_to_stream():
        for item in stream_response1_list:
            yield item

    return list_to_stream()
elif tool_calls:
    # Extend conversation by appending the tool calls to the messages
    messages.append({ "role": "assistant", "tool_calls": tool_calls })
    
    # Map of function names to the actual functions
    available_functions = get_available_functions() 

    for tool_call in tool_calls:

        # Note: the JSON response may not always be valid; be sure to handle errors
        function_name = tool_call['function']['name']
        if function_name not in available_functions:
            return "Function " + function_name + " does not exist"
    
        # Step 3: call the function with arguments if any
        function_to_call = available_functions[function_name]
        function_args = json.loads(tool_call['function']['arguments'])
        function_response = function_to_call(**function_args)

        # Step 4: send the info for each function call and function response to the model
        messages.append(
            {
                "tool_call_id": tool_call['id'],
                "role": "tool",
                "name": function_name,
                "content": function_response,
            }
        )  # extend conversation with function response

    stream_response2 = await client.chat.completions.create(
        model=DEPLOYMENT_NAME,
        messages=messages,
        temperature=0,  # Adjust the variance by changing the temperature value (default is 0.8)
        top_p=0.95,
        max_tokens=4096,
        stream=True,
    )
    return stream_response2

“”"
Format the response for the stream
- Use case: Fit an expected response payload format to send to a web client chat UI
“”"
def format_stream_response(chatCompletionChunk):
response_obj = {
“id”: chatCompletionChunk.id,
“model”: chatCompletionChunk.model,
“created”: chatCompletionChunk.created,
“object”: chatCompletionChunk.object,
“choices”: [{
“messages”:
}]
}

if len(chatCompletionChunk.choices) > 0:
    delta = chatCompletionChunk.choices[0].delta
    if delta:
        if hasattr(delta, "context"):
            messageObj = {
                "role": "tool",
                "content": json.dumps(delta.context)
            }
            response_obj["choices"][0]["messages"].append(messageObj)
            return response_obj
        if delta.role == "assistant" and hasattr(delta, "context"):
            messageObj = {
                "role": "assistant",
                "context": delta.context,
            }
            response_obj["choices"][0]["messages"].append(messageObj)
            return response_obj
        else:
            if delta.content:
                messageObj = {
                    "role": "assistant",
                    "content": delta.content,
                }
                response_obj["choices"][0]["messages"].append(messageObj)
                return response_obj
return {}

“”"
Stream the chat request
- Sends the chat request to the model and waits for the response
- Returns an async generator to stream the response
“”"
async def stream_chat_request(messages):
response = await send_chat_request(messages)

async def generate():
    async for completionChunk in response:
        await asyncio.sleep(0.1) # smooth out the stream
        yield format_stream_response(completionChunk)

return generate()

“”"
Process the chat response
- If in a Client/Server environment, this function would be on the client and receive the response from the server
- It would then need to parse the payload and display the response in the chat UI
- In this example, we simply print the response to the console instead as this is a standalone script
“”"
async def process_chat_response(async_generator):
async for result in async_generator:
content = result.get(‘choices’, [{}])[0].get(‘messages’, [{}])[0].get(‘content’)
if content:
print(content, end=“”)
print()

“”"
Chat
- The main chat loop
- Handles the user input, sends the chat request, and processes the chat response
“”"
async def chat(messages) → Tuple[Any, bool]:

# User's input
user_input = get_user_input()
if not user_input:
    return False
messages.append({"role": "user", "content": user_input})

# Send the chat request
async_generator = await stream_chat_request(messages)

# Assistant's response
print("Assistant:> ", end="")
await process_chat_response(async_generator) # Process the chat response

return True

Initialize the messages

messages = init_messages()

async def main() → None:

chatting = True
while chatting:
    chatting = await chat(messages)

if name == “main”:
asyncio.run(main())

For more, check out my github repo with function calling examples (not allowed to post links here):

github: john-carroll-sw/chat-completions-function-calling-examples

I think community openai could certainly use better code snippet formatting :slight_smile:

1 Like

can stop_reason help? when the stop_reason is tools call, keep generate response

This is perfect, thank you for this