Problem with Responses API + Stream + Function Calling with Openai Python SDK

Hello,
I have a problem ini my python implementation with openai SDK using the responses API with stream and function calling.

It´s all ok, the endpoint works fine streaming text messages and file search.
I’m implementing the function calling with stream, and i receive ok the model request, i excecute my local function, but i don’t know how can i send to the model the function output…

In the Assistants API there is a submit_tool_outputs fucntion, but i don´t find the way to do this with the Responses API.

This is my code

  for event in response:
            event_type = event.type
            print(f"[DEBUG] event_type recibido: {event_type}")

            if event_type == "response.created" and not current_response_id:
                current_response_id = event.response.id
                response_data = {
                    "response_id": current_response_id,
                    "conversation_id": conversation_id,
                    "previous_response_id": previous_response_id if previous_response_id else None,
                    "created_at": datetime.datetime.fromtimestamp(event.response.created_at),
                    "model": event.response.model,
                    "input": input_messages # Guardar los mensajes completos, incluyendo el de sistema
                }
                yield f"event: current_response_id\ndata: {json.dumps({'response_id': current_response_id, 'conversation_id': conversation_id})}\n\n"
            
            if event_type == "response.completed":
                response_data.update({
                    "output": event.response.output,
                    "total_tokens": event.response.usage.total_tokens,
                    "input_tokens": event.response.usage.input_tokens,
                    "output_tokens": event.response.usage.output_tokens,
                    "response_object": json.loads(event.model_dump_json())
                })
                await save_response_to_db(response_data, db)
                
            if event_type == "response.output_item.added":
                # Aquí suele venir el nombre de la función
                if hasattr(event, "output_item") and event.output_item.type == "function_call":
                    function_name = event.output_item.name
                    item_id = event.output_item.id
                    pending_function_calls[item_id] = function_name

            if event_type == "response.function_call_arguments.done":
                function_args = json.loads(event.arguments)
                function_call_id = event.item_id
                function_name = pending_function_calls.get(function_call_id)
                # Necesitas saber el nombre de la función (puede venir en otro evento previo o en el output)
                # Si tienes el nombre, ejecuta la función:
                function_to_call = TOOL_FUNCTIONS.get(function_name)
                if function_to_call:
                    if inspect.iscoroutinefunction(function_to_call):
                        result = await function_to_call(**function_args)
                    else:
                        result = function_to_call(**function_args)
                else:
                    result = {"error": f"Función {function_name} no implementada"}
                print(f"[FUNCALL] Output de la función {function_name}: {result}")
                # Submit tool outputs
                client.responses.submit_tool_outputs(
                    response_id=event.response_id,
                    tool_outputs=[{
                        "tool_call_id": function_call_id,
                        "output": json.dumps(result)
                    }]
                )

            yield f"event: {event_type}\ndata: {event.model_dump_json()}\n\n"
            await asyncio.sleep(0)

Can anybody help me? :slight_smile:

You need to follow up with the function output as a new input.

Here is an example
select_model="gpt-4.1-mini"

tools = [{
    "type": "function",
    "name": "get_weather",
    "description": "Get current temperature for provided coordinates in celsius.",
    "parameters": {
        "type": "object",
        "properties": {
            "latitude": {"type": "number"},
            "longitude": {"type": "number"}
        },
        "required": ["latitude", "longitude"],
        "additionalProperties": False
    },
    "strict": True
}]

input_messages = [{"role": "user", "content": "What's the weather like in Paris today?"}]

stream = client.responses.create(
    model=select_model,
    input=input_messages,
    tools=tools,
    stream=True,
)

for event in stream:
    # uncomment if you want to see all events
    #print(event.type, event)
    if event.type in ['response.output_text.delta','response.function_call_arguments.delta']:
        print(event.delta, end='')
    if event.type=='response.completed':
        response=event.response
        #print(f"usage: {event.response.usage}")
        #print(event.response.output_text)

#your full response object
print("\n",response)

#your function implementation
def get_weather(latitude, longitude):
    response = requests.get(f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&current=temperature_2m,wind_speed_10m&hourly=temperature_2m,relative_humidity_2m,wind_speed_10m")
    data = response.json()
    return data['current']['temperature_2m']

tool_call = response.output[0]
args = json.loads(tool_call.arguments)

result = get_weather(args["latitude"], args["longitude"])
print("get_weather returned: ", result)

input_messages = [{                               # append result message
    "type": "function_call_output",
    "call_id": tool_call.call_id,
    "output": str(result)
}]
stream = client.responses.create(
    model=select_model,
    previous_response_id=response.id,
    input=input_messages,
    tools=tools,
    stream=True,
)

for event in stream:
    # uncomment if you want to see all events
    #print(event.type, event)
    if event.type in ['response.output_text.delta','response.function_call_arguments.delta']:
        print(event.delta, end='')
    if event.type=='response.completed':
        response_2=event.response

print("\n", response_2)

You can take a look at the docs for more details.

2 Likes

Thanks so much for the answer. :blush:

But are you sure?
Yes, I’m familiar with the documentation you linked, but I’m not sure it’s the best solution.

I was thinking about your solution and will try it, but…

What happens if the model requests another function again? :face_with_monocle:

User > Model > Call Function1 > Model > Call Function2 >>>>> ?

Thanks so much for the answer, I’m a little unsure about this. :shaking_face:

Well, I thought you were asking for this.

The responses API workflow will request you again to provide a response, which you implement in your backend and send back to the model in the next interaction.

Were you perhaps looking for a function in the python API to send back the value or to automatically handle it without you providing a follow up?

1 Like

Thanks for your quick response. :blush:

I’m currently migrating from the Assistants API to the Responses API.

When working with the Assistants API, there is a function called submit_tool_outputs_stream that allows you to send the result of a function call back to the model within the same stream, without interrupting the connection.

However, in the Responses API, this function does not exist. From what I see and from your explanation, the solution is to start a new stream and send the function results in that new request.

But in this case, it seems that the frontend (JavaScript) will have to handle starting a new stream and sending the function results back to the backend, right?

So the flow would be something like:

Frontend → Backend → Stream → Model → Backend → Function → Frontend → Backend → Stream → Model…

Is this correct? Or is there a way to keep everything in a single stream as with the Assistants API?

Thanks! :shaking_face:

1 Like

Yes (automatically, the user will only see more streaming). Just a correction here, the frontend will not (or shouldn’t) deal with this, it is the backend that awaits the full response after handling all the function after steps.

In fact it is the same “thread” (conversation), it is just that due to how the API works the response is made in multiple steps (the model needs your backend response to know the function output).

Another thing you might be interested is the agent SDK, that kind of simplifies these interactions.

Here is an example of an agent calling a function

import asyncio

from agents import Agent, Runner, function_tool


@function_tool
def get_weather(city: str) -> str:
    return f"The weather in {city} is sunny."


agent = Agent(
    name="Hello world",
    instructions="You are a helpful agent.",
    tools=[get_weather],
)


async def main():
    result = await Runner.run(agent, input="What's the weather in Tokyo?")
    print(result.final_output)
    # The weather in Tokyo is sunny.


if __name__ == "__main__":
    asyncio.run(main())
3 Likes

Thank you very much for your answer! :star_struck:

I’m going to implement this approach and will reply to this thread with the results. Thanks again for your detailed explanation and solution.

Regarding the Agents SDK, that will probably be my next step.
Would you recommend using it for building an assistant?
Does it support using fine-tuned models? (That was one of the reasons I started migrating to the Responses API.)

Thanks again, your help is much appreciated! :grin:

1 Like

I haven’t explored much FT, but theoretically it should. The project description says it even allows other LLMs, but I haven’t tried those yet:

The OpenAI Agents SDK is a lightweight yet powerful framework for building multi-agent workflows. It is provider-agnostic, supporting the OpenAI Responses and Chat Completions APIs, as well as 100+ other LLMs.

My guess is that it is just a wrapper, so it shouldn’t prevent you from using any model, but it would be best if you try it (in the Agent creation there is a model parameter).

It depends on your use case. Responses API is easily portable for any language, but agent sdk will hold you into python, which might be a problem if your deploying strategy do not allow it.

1 Like

It is helpful to consider:

When managing your own chat, the API is stateless.

Streaming an API call response with the "stream":true parameter gives you essentially the same product generated by the model as when not streaming. It is just more dynamic, allowing you to display an incremental user output as it is being generated, or to collect tool_calls and start them asyncronously as soon as they are received if functions are called in parallel.

What you send back is the same, just using information collected differently out of events (or you can use the final response.completed as if it were the non-streamed return object).

Here for example, is an “output” from response.completed event, if you didn’t want to rely on what can be gathered by function call deltas or function call argument events that appear in the stream:

    "output": [
      {
        "id": "msg_msg_345678",
        "type": "message",
        "status": "completed",
        "content": [
          {
            "type": "output_text",
            "annotations": [],
            "text": "I will check the current weather in both Los Angeles and San Francisco for you."
          }
        ],
        "role": "assistant"
      },
      {
        "id": "fc_98765",
        "type": "function_call",
        "status": "completed",
        "arguments": "{\"location\":\"Los Angeles\"}",
        "call_id": "call_X8Xf9LPRRccFURA0Tw4bSlfK",
        "name": "get_weather"
      },
      {
        "id": "fc_98765",
        "type": "function_call",
        "status": "completed",
        "arguments": "{\"location\":\"San Francisco\"}",
        "call_id": "call_JEjHOGbZbo3FjYUBSQkjdfjF",
        "name": "get_weather"
      }
    ],

This will give you an example of the AI producing both language output and tool call output in the same turn, which must be coded for, and also, what a parallel tool call looks like.

  • What you send back as your API call is the same – whether or not you use streaming or non-streaming.

You are returning the entire chat history as input, plus the newest assistant and tool output (unlike Chat Completions, split to unique input/output items), plus your new “input”. In this case, the addition is the parallel tool calls that have matching IDs.

    {
      "type": "function_call_output",
      "call_id": "call_X8Xf9LPRRccFURA0Tw4bSlfK",
      "output": "Los Angeles, CA - Current conditions: 72F, partly cloudy"
    },
    {
      "type": "function_call_output",
      "call_id": "call_JEjHOGbZbo3FjYUBSQkjdfjF",
      "output": "San Francisco, CA - Error 408: timeout"
    },

You will note the pairing of tool call to tool return, along with the matching IDs.

Then sending back - you still await any type of response and allow the possibility of more tool calls coming back at you, which you continue to handle in the same way until not emitted and there is only a output_text response for the user. (or give the AI a maximum number of consecutive iterations).

In this case, a clever AI might retry the tool call for San Francisco, since the timeout error I sent to it might be recoverable on a second try.


If reusing a response ID, with a server side state, the addition that needs to be sent back as input is only the tool return, still with the matching call_id that the server chat holds.

Lesson:

  • streaming is a different way of getting the same model output, and only the output is different.

(writing code for you not included today :slight_smile: )

Note: Chat Completions supports fine-tuning AI models just as Responses does; go there for inference to also receive better analysis products, such as logprobs, and to be able to tune the generation by logit_bias.

1 Like