Migration Guide for Assistants API to Responses API is now available

So i am posting to let you guys know that on the assistants api documentation page they have now uploaded the migration guide. So if anyone of you was waiting for it please follow to the documentation page https://platform.openai.com/docs/assistants/migration.

Also let us know, how did the migration goes and is the new system good or not.
Also, i was looking for migration guide myself and stumbled upon so decided to post so it can help, i don’t know why there is no announcement for this or i didn’t come across one.

4 Likes

Thanks for sharing.

We could really use some change logs in the docs, but unfortunately there is none ATM.

On the other hand, there have been many new discreet updates and new features, which I do see as a good sign that things are improving.

1 Like

Hmmm. I have switch to Responses for a chatbot that I have awhile ago but this converstation id thing is news to me. I have been using previous_response_id to track the converstation. Do I need to switch to conversation id? Is there a benefit to doing so? It’s not clear what the difference is.

The conversation id thing is supposed to persist as the threads did as long as not deleted ourselves. The conversation id is replacement of thread id. The previous_response_id thing persisted for i guess 30 days and then it gets deleted so you have to look for ways to store the chats on your end also…conversation id reduces this effort and helps to maintain in chat history automatically instead of passing the previous_response_id ourselves each time,

Adding the link to the official community announcement:

Here is a sample code example that I have written that uses conversation and response API -

  • import time
    import traceback
    import asyncio
    import json
    import signal
    import sys
    
    from openai import AsyncOpenAI
    from openai.types.conversations import Conversation
    from openai.resources.conversations import AsyncConversations
    from custom_tools.OpenAIWebSearchTool import OpenAIWebSearchTool
    from custom_tools.generate_openai_function_schema import generate_openai_function_spec
    from typing import Optional, List, Dict, Any
    from AsyncTimedIterable import AsyncTimedIterable
    from openai.types.responses.response_created_event import ResponseCreatedEvent
    from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall
    
    
    class WebSearchAssistant:
        def __init__(self):
            self.__openai_api_key = ""
    
            self.__openai_client = AsyncOpenAI(api_key=self.__openai_api_key)
            self.__custom_tools = self.__load_tools()
            self.__system_prompt = "You are a helpful AI agent"
            self.__async_conversation = AsyncConversations(self.__openai_client)
    
        def __load_tools(self):
            default_tools = [
                {
                    "type": "code_interpreter",
                    "container": {"type": "auto"}
    
                }
            ]
            custom_tools = [OpenAIWebSearchTool]
    
            tools_spec = [generate_openai_function_spec(tool_cls) for tool_cls in custom_tools]
            default_tools.extend(tools_spec)
            self.__tool_instances = {tool_cls.get_name(): tool_cls() for tool_cls in custom_tools}
            print(f"loaded tools: {default_tools}")
            print(f"tool_instances: {self.__tool_instances}")
            return default_tools
    
        async def initiate_streaming_conversation(self) -> Conversation:
            conversation = await self.__async_conversation.create(
                items=[
                    {"type": "message", "role": "system", "content": self.__system_prompt}
                ],
                metadata={"topic": "demo"},
                timeout=10
            )
    
            print("created conversation: ", conversation)
            return conversation
    
        async def delete_conversation(self, conversation: Conversation):
            deleted = await self.__async_conversation.delete(conversation.id)
            print("deleted conversation: ", deleted)
    
        async def __handle_function_call(self, tool_call: dict, **kwargs):
            function_name = tool_call["name"]
            function_args = tool_call["arguments"]
    
            print(f"calling function {function_name} with args: {function_args}")
            function_result = await self.__tool_instances[function_name].arun(**function_args, **kwargs)
            tool_call["function_result"] = function_result
            print(f"got result from {function_name}")
            return tool_call
    
        async def __handle_function_calls(self, tool_calls: Dict[int, ResponseFunctionToolCall], **kwargs) -> List[dict]:
            tool_calls_list = []
            for index, tool_call in tool_calls.items():
                function_call_id = tool_call.call_id
                function_id = tool_call.id
                function_name = tool_call.name
                function_args = json.loads(tool_call.arguments)
                tool_calls_list.append(
                    {
                        "type": "function_call",
                        "id": function_id,
                        "call_id": function_call_id,
                        "name": function_name,
                        "arguments": function_args
                    }
                )
    
            input_messages = []
            function_call_results = await asyncio.gather(
                *(self.__handle_function_call(tool_call, **kwargs) for tool_call in tool_calls_list),
                return_exceptions=True
            )
    
            for result in function_call_results:
                if isinstance(result, Exception) is False:
                    input_messages.append(
                        {
                            "type": "function_call_output",
                            "call_id": result["call_id"],
                            "output": result["function_result"]
                        }
                    )
    
            return input_messages
    
        async def __submit_tool_outputs(self, tool_outputs: List[dict], conversation_id: str, token_timeout=10, **kwargs):
            """
            Submit function call outputs back to the OpenAI API to continue the conversation.
            Returns the stream for the caller to process.
            
            Args:
                tool_outputs: List of tool output dictionaries with call_id and output
                conversation_id: The conversation ID to continue
                token_timeout: Timeout for the stream
                
            Returns:
                AsyncTimedIterable stream or None if no tool outputs
            """
            try:
                if not tool_outputs:
                    print("No tool outputs to submit")
                    return None
                
                print(f"🔄 Submitting {len(tool_outputs)} tool outputs back to OpenAI...")
                
                # Create a new response with the tool outputs as input
                stream = await self.__openai_client.responses.create(
                    model="gpt-4o",
                    input=tool_outputs,  # Submit the tool outputs as input
                    temperature=0.1,
                    conversation=conversation_id,
                    stream=True,
                    tools=self.__custom_tools,
                )
                
                # Return the timed stream for the caller to process
                return AsyncTimedIterable(stream, timeout=token_timeout)
                            
            except Exception as e:
                print(f"Error submitting tool outputs: {e}")
                print(f"Tool outputs were: {tool_outputs}")
                traceback.print_exc()
                return None
    
        async def __process_streaming_events(self, stream, conversation_id: str, token_timeout=10):
            """
            Process streaming events recursively and yield text tokens as an async generator.
            Handles tool calls internally with recursion.
            """
            timed_stream = AsyncTimedIterable(stream, timeout=token_timeout)
            final_tool_calls = {}
    
            async for event in timed_stream:
                # print("event: ")
                # print(event)
                # print()
                response_id = None
    
                if isinstance(event, ResponseCreatedEvent):
                    response_id = event.response.id
    
                if hasattr(event, 'type'):
                    if event.type == "response.output_text.delta":
                        text_content = getattr(event, 'text', None) or getattr(event, 'delta', None)
                        if text_content:
                            yield text_content
    
                    if event.type == "response.code_interpreter_call.in_progress":
                        print("\n🔧 Code interpreter started and in progress")
    
                    if event.type == "response.code_interpreter_call_code.done":
                        code = event.code
                        print("\n🔧 Code interpreter is done with generating code")
    
                    if event.type == "response.code_interpreter_call.interpreting":
                        print("\n🔧 Code interpreter is running the generated code")
    
                    if event.type == "response.code_interpreter_call.completed":
                        print("\n🔧 Code interpreter has executed the generated code")
    
                    if event.type == 'response.output_item.added':
                        final_tool_calls[event.output_index] = event.item
                        print("final tool calls: ", final_tool_calls)
    
                    if event.type == 'response.function_call_arguments.delta':
                        index = event.output_index
    
                        if final_tool_calls[index]:
                            final_tool_calls[index].arguments += event.delta
    
                    if event.type == "response.function_call_arguments.done":
                        print("\n🔧 Function call arguments are done")
                        tool_outputs = await self.__handle_function_calls(tool_calls=final_tool_calls)
                        print("tool outputs: ", tool_outputs)
                        
                        # Submit tool outputs and recursively process the new stream
                        if tool_outputs:
                            tool_output_stream = await self.__submit_tool_outputs(tool_outputs, conversation_id, token_timeout)
                            if tool_output_stream:
                                print("\n🔄 Processing tool output stream...")
                                # Recursively yield tokens from the tool output stream
                                async for token in self.__process_streaming_events(tool_output_stream, conversation_id, token_timeout):
                                    yield token
    
                    elif event.type == "response.completed":
                        print("\n✅ Streaming completed")
                        print()
    
        async def get_conversation(self, conversation_id: str) -> Conversation:
            conversation = await self.__async_conversation.retrieve(conversation_id=conversation_id)
            if conversation.id:
                return conversation
            return None
    
        async def do_streaming_conversation(self, conversation: Conversation, user_query: str, token_timeout=10):
            input_messages = [{"role": "user", "content": user_query}]
    
            stream = await self.__openai_client.responses.create(
                model="gpt-4o",
                input=input_messages,
                temperature=0.1,
                conversation=conversation.id,
                stream=True,
                tools=self.__custom_tools,
            )
    
            conversation_id = conversation.id
            
            # Process the stream and print all tokens centrally here
            async for text_content in self.__process_streaming_events(stream, conversation_id, token_timeout):
                if text_content:
                    print(text_content, end="", flush=True)
    
        async def chat(self, token_timeout=30):
            """
            Interactive chat method that accepts user queries from keyboard.
            Handles graceful termination and signal handling.
            """
            conversation = None
            
            # Setup signal handlers for graceful shutdown
            def signal_handler(signum, frame):
                print(f"\n\n🛑 Received signal {signum}. Shutting down gracefully...")
                if conversation:
                    # Run cleanup in the current event loop
                    loop = asyncio.get_event_loop()
                    if loop.is_running():
                        # Schedule cleanup as a task
                        asyncio.create_task(self.delete_conversation(conversation))
                    else:
                        # If loop is not running, use run_until_complete
                        loop.run_until_complete(self.delete_conversation(conversation))
                sys.exit(0)
            
            # Register signal handlers
            signal.signal(signal.SIGINT, signal_handler)   # Ctrl+C
            signal.signal(signal.SIGTERM, signal_handler)  # Termination signal
            signal.signal(signal.SIGQUIT, signal_handler)  # Quit signal
            
            try:
                # Initialize conversation
                print("🚀 Initializing conversation...")
                conversation = await self.initiate_streaming_conversation()
                print(f"✅ Conversation created: {conversation.id}")
                print()
                
                # Print welcome message and instructions
                print("=" * 60)
                print("🤖 Welcome to OpenAI Assistant Chat!")
                print("=" * 60)
                print("Commands:")
                print("  • Type your message and press Enter")
                print("  • Type 'exit', 'quit', or 'bye' to end the chat")
                print("  • Press Ctrl+C to force quit")
                print("=" * 60)
                print()
                
                while True:
                    try:
                        # Get user input
                        user_input = input("You: ").strip()
                        
                        # Check for exit commands
                        if user_input.lower() in ['exit', 'quit', 'bye', 'q']:
                            print("\n👋 Goodbye! Ending conversation...")
                            break
                        
                        # Skip empty inputs
                        if not user_input:
                            continue
                        
                        # Process the user query
                        print("\nAssistant: ", end="", flush=True)
                        await self.do_streaming_conversation(
                            conversation=conversation,
                            user_query=user_input,
                            token_timeout=token_timeout
                        )
                        print("*" * 100)
                        print("\n")  # Add newline after response
                        
                    except KeyboardInterrupt:
                        print("\n\n🛑 Ctrl+C pressed. Shutting down gracefully...")
                        break
                        
                    except EOFError:
                        print("\n\n🛑 EOF received. Shutting down gracefully...")
                        break
                        
                    except Exception as e:
                        print(f"\n❌ Error during conversation: {e}")
                        print("Traceback:", traceback.format_exc())
                        print("Continuing chat...\n")
                        
            except Exception as e:
                print(f"❌ Error initializing conversation: {e}")
                print("Traceback:", traceback.format_exc())
                
            finally:
                # Cleanup conversation
                if conversation:
                    try:
                        print("🧹 Cleaning up conversation...")
                        await self.delete_conversation(conversation)
                        print("✅ Conversation deleted successfully")
                    except Exception as e:
                        print(f"❌ Error deleting conversation: {e}")
                
                print("👋 Chat session ended.")
    
    
    if __name__ == '__main__':
    
        mode = "chat"
        token_timeout = 30
        loop = asyncio.get_event_loop()
        assistant = WebSearchAssistant()
        
        if mode == 'chat':
            # Interactive chat mode
            print("🚀 Starting interactive chat mode...")
            try:
                loop.run_until_complete(assistant.chat(token_timeout=token_timeout))
            except Exception:
                print("❌ Error in chat mode:", traceback.format_exc())
        
        else:
            # Test mode with predefined query
            print("🧪 Running in test mode...")
            conversation = loop.run_until_complete(assistant.initiate_streaming_conversation())
            # user_query = "search the web for what is periodontitis and what is necrosis. use two separate parallel web search calls. don't search them together"
            user_query = "write a code to reverse the string periodontitis. use the python/code interpreter tool to write and execute the code"
    
            try:
                conversation = loop.run_until_complete(assistant.get_conversation(conversation_id=conversation.id))
                loop.run_until_complete(assistant.do_streaming_conversation(conversation=conversation,
                                                                            user_query=user_query,
                                                                            token_timeout=token_timeout))
            except Exception:
                print("error: ", traceback.format_exc())
                print()
    
            finally:
                loop.run_until_complete(assistant.delete_conversation(conversation=conversation))
    
    
    
    
import asyncio


class AsyncTimedIterable:
    def __init__(self, iterable, timeout=0, cancel_token=None):
        class AsyncTimedIterator:
            def __init__(self):
                self.__cancel_token = cancel_token
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    if self.__cancel_token and hasattr(self.__cancel_token, "is_cancelled"):
                        if self.__cancel_token.is_cancelled:
                            raise StopAsyncIteration
                    result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
                    # print(f"✅ AsyncTimedIterable: Got event: {type(result)}")
                    # if you want to stop the iteration just raise StopAsyncIteration using some conditions
                    # (when the last chunk arrives, for example)
                    if not result:
                        raise StopAsyncIteration
                    return result
                except asyncio.TimeoutError as e:
                    print(f"⏰ AsyncTimedIterable: Timeout after {int(timeout)}s waiting for next event")
                    raise e
                except StopAsyncIteration:
                    print(f"🏁 AsyncTimedIterable: Stream ended naturally")
                    raise

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()
from types import NoneType
from .BaseTool import BaseTool
from typing import Type, get_origin, get_args, Union, Any


def infer_field_type(field_type: Any) -> str:
    """
    Infers the JSON field type from a Python type.

    :param field_type: The Python type to infer from.
    :return: A string representing the inferred JSON field type.
    """
    if get_origin(field_type) is Union:
        # Handling Optional fields, extract the actual type without NoneType
        non_none_types = [t for t in get_args(field_type) if t is not type(None)]
        if non_none_types:
            return infer_field_type(non_none_types[0])
        else:
            return 'null'

    type_mappings = {
        str: 'string',
        int: 'integer',
        float: 'float',
        bool: 'boolean'
        # Add more mappings as necessary
    }

    return type_mappings.get(field_type, 'string')  # Default to 'string'


def generate_openai_function_spec(tool_class: Type[BaseTool]) -> dict:
    """
    Generates a dictionary in the OpenAI API function calling format for a given tool class.

    :param tool_class: The class for which to generate the function specification.
    :return: A dictionary representing the function specification in the OpenAI API format.
    """

    # tool_instance = tool_class()

    # Extracting class attributes
    function_name = tool_class.get_name()
    description = tool_class.get_description()
    args_schema = tool_class.get_args_schema()

    # Constructing parameter specification from Pydantic model
    properties = {}
    required_fields = []

    for field_name, field_model in args_schema.__annotations__.items():
        # print("field_name: ", field_name)
        # print("field_model: ", field_model)

        field_info = args_schema.model_fields.get(field_name)
        field_description = field_info.description or ''
        field_type = infer_field_type(field_model)
        # field_type = 'string'  # Defaulting to string, can be enhanced to support more types

        properties[field_name] = {"type": field_type, "description": field_description}

        # Handling Enums specifically
        if hasattr(field_model, '__members__'):
            properties[field_name]['enum'] = [e.value for e in field_model]

        # # Determining if the field is required
        # if str(field_model).find("Optional") <= 0 and not str(field_model).startswith("Optional") and not str(
        #         field_model).startswith("typing.Optional"):
        #     required_fields.append(field_name)

        # Check if the field is optional
        # print("get_args(field_model): ", get_args(field_model))
        # print()
        if get_origin(field_model) is Union and NoneType in get_args(field_model):
            # Field is optional, do not add to required fields
            pass
        else:
            required_fields.append(field_name)

    # function_spec = {
    #     "type": "function",
    #     "function": {
    #         "name": function_name,
    #         "description": description,
    #         "strict": True,  # Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
    #         "parameters": {
    #             "type": "object",
    #             "properties": properties,
    #             "required": required_fields,
    #             "additionalProperties": False
    #         }
    #     }
    # }

    function_spec = {
        "type": "function",
        "name": function_name,
        "description": description,
        "strict": True,
        # Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
        "parameters": {
            "type": "object",
            "properties": properties,
            "required": required_fields,
            "additionalProperties": False

        }
    }

    return function_spec

from pydantic import BaseModel
from abc import ABC, abstractmethod


class BaseTool(ABC, BaseModel):
    name: str
    description: str

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        if not hasattr(cls, 'name') or not hasattr(cls, 'description'):
            raise TypeError("Subclasses must define 'name' and 'description'")

    class Config:
        """Configuration for this pydantic object."""

        extra = "allow"
        arbitrary_types_allowed = True

    @staticmethod
    @abstractmethod
    def get_name():
        raise NotImplementedError

    @staticmethod
    @abstractmethod
    def get_description():
        raise NotImplementedError

    @staticmethod
    @abstractmethod
    def get_args_schema():
        raise NotImplementedError

    @abstractmethod
    def run(self, *args, **kwargs) -> str:
        raise NotImplementedError

    async def arun(self, *args, **kwargs) -> str:
        raise NotImplementedError

import asyncio


from openai import AsyncOpenAI
from typing import Type
from pydantic import BaseModel, Field
from .BaseTool import BaseTool


class WebSearchInput(BaseModel):
    search_query: str = Field(..., description="The search query to browse the internet")


class OpenAIWebSearchTool(BaseTool):
    name: str = "openai_web_search"
    description: str = """
    Use this tool to search the internet for real-time information and current events. 
    Best used for:
    - Recent news and events
    - Current facts and information
    - Up-to-date data and statistics
    - Latest developments on any topic
    
    Do NOT use this tool for:
    - Personal or private information
    
    Input should be a clear, specific search query.
    """

    args_schema: Type[BaseModel] = WebSearchInput

    def __init__(self, logger=None):
        super().__init__()
        self.__logger = logger
        self.__api_key = ""
        self.__client = AsyncOpenAI(api_key=self.__api_key)

    @staticmethod
    def get_name():
        return "openai_web_search"

    @staticmethod
    def get_description():
        return """
    Use this tool to search the internet for real-time information and current events. 
    Best used for:
    - Recent news and events
    - Current facts and information
    - Up-to-date data and statistics
    - Latest developments on any topic
    
    Do NOT use this tool for:
    - Personal or private information
    
    Input should be a clear, specific search query.
    """

    @staticmethod
    def get_args_schema():
        return WebSearchInput

    async def arun(self, *, search_query: str, **kwargs) -> str:
        """Use the tool asynchronously."""

        search_context_size = kwargs.get("search_context_size", "medium")
        response = await self.__client.responses.create(
            input=search_query,
            model="gpt-4o",
            temperature=0,
            tools=[{"type": "web_search_preview", "search_context_size": search_context_size, }],
            tool_choice={"type": "web_search"}
        )
        return response.output_text

    def run(self, search_query: str) -> str:
        raise NotImplementedError


if __name__ == '__main__':
    async def main():
        tool = OpenAIWebSearchTool()
        result = await tool.arun("CVE-2024-56180 affects linux-image-6.11.0-19-generic i want to know abt this cve")
        print(result)


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

OpenAI has released an official migration guide from Assistants to Responses API.