Hello,
I am using the responses API with streaming and function calling in a chatbot application. Here’s a minimal version of how I’m handling conversations:
import asyncio
import json
from typing import Any, Dict, List, Optional
from openai import AsyncOpenAI, AsyncStream
from openai.types.responses import (
ResponseStreamEvent,
ResponseTextDeltaEvent,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseOutputMessage,
ResponseOutputText,
ResponseFunctionToolCall,
ResponseCompletedEvent,
)
class MinimalResponder:
"""
Minimal reproduction of:
- starting a Responses API stream
- handling deltas and function calls
- sending function_call_output back to the model
- recursively processing the follow-up stream
"""
def __init__(self, *, instructions: Optional[str] = None):
self.client = AsyncOpenAI()
self.instructions = (
instructions
or "You are a helpful assistant. You may call functions if useful."
)
# Define a couple of tools to make the model call them
self.tools = [
{
"type": "function",
"name": "add",
"description": "Add two numbers.",
"parameters": {
"type": "object",
"properties": {"a": {"type": "number"}, "b": {"type": "number"}},
"required": ["a", "b"],
},
},
]
async def process(self, message: str, role: str = "user"):
"""Entry point: create the initial stream and process it."""
stream = await self.perform_responses_api_call(
inputs=[{"role": role, "content": message}]
)
await self._process_stream(stream)
async def perform_responses_api_call(self, inputs) -> AsyncStream[ResponseStreamEvent]:
return await self.client.responses.create(
instructions=self.instructions,
input=inputs,
tools=self.tools,
stream=True
)
async def _process_stream(self, stream: AsyncStream[ResponseStreamEvent]):
"""
Core event loop:
- collect text deltas
- on completion, inspect response.output for function calls
- execute tools and feed results back via function_call_output
- recurse on the follow-up stream
"""
async with stream as events:
async for event in events:
if isinstance(event, ResponseTextDeltaEvent):
# Stream text deltas to client (here we just print)
print(event.delta, end="", flush=True)
elif isinstance(event, ResponseOutputItemAddedEvent):
# Start of an output message
if isinstance(event.item, ResponseOutputMessage):
print("\n[message start]", flush=True)
elif isinstance(event, ResponseOutputItemDoneEvent):
# End of an output message; gather its full text
if isinstance(event.item, ResponseOutputMessage):
full_text = "\n".join(
c.text
for c in event.item.content
if isinstance(c, ResponseOutputText)
)
print("\n[message complete]")
print(full_text)
elif isinstance(event, ResponseCompletedEvent):
# Response is complete: check for tool calls
response = event.response
function_calls = [
{
"name": i.name,
"call_id": i.call_id,
"arguments": i.arguments or "{}",
}
for i in (response.output or [])
if isinstance(i, ResponseFunctionToolCall)
]
if function_calls:
follow_up_stream = await self._handle_function_calls(function_calls)
# Recurse: process the follow-up stream
await self._process_stream(follow_up_stream)
async def _handle_function_calls(self, function_calls: List[Dict[str, Any]]):
"""
Execute server-side tools and return a follow-up stream
created by sending `function_call_output` messages.
"""
outputs_messages = []
for call in function_calls:
name = call.get("name")
call_id = call.get("call_id")
args_str = call.get("arguments") or "{}"
try:
args = json.loads(args_str) if isinstance(args_str, str) else args_str
except Exception:
args = {}
# Minimal server-side tool implementations
result: Any
try:
fns = {
"add": lambda a, b: {"sum": a + b},
}
result = await fns[name](**args)
except Exception as e:
result = {"error": "tool_error"}
outputs_messages.append(
{
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps(result),
}
)
# Feed all tool outputs back to the model in a single follow-up call
if outputs_messages:
return await self.perform_responses_api_call(inputs=outputs_messages)
return None
So essentially, at the end of a response, I look for tool call messages, process them, then send a request to the responses API with the outputs and process the new stream.
Sometimes, I get this error on the second call to the responses API:
Error: 400 “'Another process is currently operating on this conversation. Please retry in a few seconds” code: “conversation_locked”
It doesn’t seem to be deterministic as some calls do go through without problems.
I couldn’t find anything in the docs about this error.
Adding a retry on the second call with a small delay seems to help but - How do I fix this issue for good? Is there anything wrong with the way I’m handling function calls?
Thank you for your time.