Conversation_locked error in Responses API

Hello,

I am using the responses API with streaming and function calling in a chatbot application. Here’s a minimal version of how I’m handling conversations:

import asyncio
import json
from typing import Any, Dict, List, Optional

from openai import AsyncOpenAI, AsyncStream
from openai.types.responses import (
    ResponseStreamEvent,
    ResponseTextDeltaEvent,
    ResponseOutputItemAddedEvent,
    ResponseOutputItemDoneEvent,
    ResponseOutputMessage,
    ResponseOutputText,
    ResponseFunctionToolCall,
    ResponseCompletedEvent,
)


class MinimalResponder:
    """
    Minimal reproduction of:
    - starting a Responses API stream
    - handling deltas and function calls
    - sending function_call_output back to the model
    - recursively processing the follow-up stream
    """

    def __init__(self, *, instructions: Optional[str] = None):
        self.client = AsyncOpenAI()
        self.instructions = (
            instructions
            or "You are a helpful assistant. You may call functions if useful."
        )

        # Define a couple of tools to make the model call them
        self.tools = [
            {
                "type": "function",
                "name": "add",
                "description": "Add two numbers.",
                "parameters": {
                    "type": "object",
                    "properties": {"a": {"type": "number"}, "b": {"type": "number"}},
                    "required": ["a", "b"],
                },
            },
        ]

    async def process(self, message: str, role: str = "user"):
        """Entry point: create the initial stream and process it."""
        stream = await self.perform_responses_api_call(
            inputs=[{"role": role, "content": message}]
        )
        await self._process_stream(stream)

    async def perform_responses_api_call(self, inputs) -> AsyncStream[ResponseStreamEvent]:
        return await self.client.responses.create(
            instructions=self.instructions,
            input=inputs,
            tools=self.tools,
            stream=True
        )

    async def _process_stream(self, stream: AsyncStream[ResponseStreamEvent]):
        """
        Core event loop:
        - collect text deltas
        - on completion, inspect response.output for function calls
        - execute tools and feed results back via function_call_output
        - recurse on the follow-up stream
        """

        async with stream as events:
            async for event in events:
                if isinstance(event, ResponseTextDeltaEvent):
                    # Stream text deltas to client (here we just print)
                    print(event.delta, end="", flush=True)

                elif isinstance(event, ResponseOutputItemAddedEvent):
                    # Start of an output message
                    if isinstance(event.item, ResponseOutputMessage):
                        print("\n[message start]", flush=True)

                elif isinstance(event, ResponseOutputItemDoneEvent):
                    # End of an output message; gather its full text
                    if isinstance(event.item, ResponseOutputMessage):
                        full_text = "\n".join(
                            c.text
                            for c in event.item.content
                            if isinstance(c, ResponseOutputText)
                        )
                        print("\n[message complete]")
                        print(full_text)

                elif isinstance(event, ResponseCompletedEvent):
                    # Response is complete: check for tool calls
                    response = event.response
                    function_calls = [
                        {
                            "name": i.name,
                            "call_id": i.call_id,
                            "arguments": i.arguments or "{}",
                        }
                        for i in (response.output or [])
                        if isinstance(i, ResponseFunctionToolCall)
                    ]

                    if function_calls:
                        follow_up_stream = await self._handle_function_calls(function_calls)
                        # Recurse: process the follow-up stream
                        await self._process_stream(follow_up_stream)

    async def _handle_function_calls(self, function_calls: List[Dict[str, Any]]):
        """
        Execute server-side tools and return a follow-up stream
        created by sending `function_call_output` messages.
        """
        outputs_messages = []
        for call in function_calls:
            name = call.get("name")
            call_id = call.get("call_id")
            args_str = call.get("arguments") or "{}"
            try:
                args = json.loads(args_str) if isinstance(args_str, str) else args_str
            except Exception:
                args = {}

            # Minimal server-side tool implementations
            result: Any
            try:
                fns = {
                    "add": lambda a, b: {"sum": a + b},
                }
                result = await fns[name](**args)
            except Exception as e:
                result = {"error": "tool_error"}

            outputs_messages.append(
                {
                    "type": "function_call_output",
                    "call_id": call_id,
                    "output": json.dumps(result),
                }
            )

        # Feed all tool outputs back to the model in a single follow-up call
        if outputs_messages:
            return await self.perform_responses_api_call(inputs=outputs_messages)
        return None

So essentially, at the end of a response, I look for tool call messages, process them, then send a request to the responses API with the outputs and process the new stream.

Sometimes, I get this error on the second call to the responses API:

Error: 400 “'Another process is currently operating on this conversation. Please retry in a few seconds” code: “conversation_locked”

It doesn’t seem to be deterministic as some calls do go through without problems.

I couldn’t find anything in the docs about this error.

Adding a retry on the second call with a small delay seems to help but - How do I fix this issue for good? Is there anything wrong with the way I’m handling function calls?

Thank you for your time.

For anyone wondering, moving this bit outside of the with statement fixed the issue.

1 Like