Migration Guide for Assistants API to Responses API is now available

Here is a sample code example that I have written that uses conversation and response API -

  • import time
    import traceback
    import asyncio
    import json
    import signal
    import sys
    
    from openai import AsyncOpenAI
    from openai.types.conversations import Conversation
    from openai.resources.conversations import AsyncConversations
    from custom_tools.OpenAIWebSearchTool import OpenAIWebSearchTool
    from custom_tools.generate_openai_function_schema import generate_openai_function_spec
    from typing import Optional, List, Dict, Any
    from AsyncTimedIterable import AsyncTimedIterable
    from openai.types.responses.response_created_event import ResponseCreatedEvent
    from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall
    
    
    class WebSearchAssistant:
        def __init__(self):
            self.__openai_api_key = ""
    
            self.__openai_client = AsyncOpenAI(api_key=self.__openai_api_key)
            self.__custom_tools = self.__load_tools()
            self.__system_prompt = "You are a helpful AI agent"
            self.__async_conversation = AsyncConversations(self.__openai_client)
    
        def __load_tools(self):
            default_tools = [
                {
                    "type": "code_interpreter",
                    "container": {"type": "auto"}
    
                }
            ]
            custom_tools = [OpenAIWebSearchTool]
    
            tools_spec = [generate_openai_function_spec(tool_cls) for tool_cls in custom_tools]
            default_tools.extend(tools_spec)
            self.__tool_instances = {tool_cls.get_name(): tool_cls() for tool_cls in custom_tools}
            print(f"loaded tools: {default_tools}")
            print(f"tool_instances: {self.__tool_instances}")
            return default_tools
    
        async def initiate_streaming_conversation(self) -> Conversation:
            conversation = await self.__async_conversation.create(
                items=[
                    {"type": "message", "role": "system", "content": self.__system_prompt}
                ],
                metadata={"topic": "demo"},
                timeout=10
            )
    
            print("created conversation: ", conversation)
            return conversation
    
        async def delete_conversation(self, conversation: Conversation):
            deleted = await self.__async_conversation.delete(conversation.id)
            print("deleted conversation: ", deleted)
    
        async def __handle_function_call(self, tool_call: dict, **kwargs):
            function_name = tool_call["name"]
            function_args = tool_call["arguments"]
    
            print(f"calling function {function_name} with args: {function_args}")
            function_result = await self.__tool_instances[function_name].arun(**function_args, **kwargs)
            tool_call["function_result"] = function_result
            print(f"got result from {function_name}")
            return tool_call
    
        async def __handle_function_calls(self, tool_calls: Dict[int, ResponseFunctionToolCall], **kwargs) -> List[dict]:
            tool_calls_list = []
            for index, tool_call in tool_calls.items():
                function_call_id = tool_call.call_id
                function_id = tool_call.id
                function_name = tool_call.name
                function_args = json.loads(tool_call.arguments)
                tool_calls_list.append(
                    {
                        "type": "function_call",
                        "id": function_id,
                        "call_id": function_call_id,
                        "name": function_name,
                        "arguments": function_args
                    }
                )
    
            input_messages = []
            function_call_results = await asyncio.gather(
                *(self.__handle_function_call(tool_call, **kwargs) for tool_call in tool_calls_list),
                return_exceptions=True
            )
    
            for result in function_call_results:
                if isinstance(result, Exception) is False:
                    input_messages.append(
                        {
                            "type": "function_call_output",
                            "call_id": result["call_id"],
                            "output": result["function_result"]
                        }
                    )
    
            return input_messages
    
        async def __submit_tool_outputs(self, tool_outputs: List[dict], conversation_id: str, token_timeout=10, **kwargs):
            """
            Submit function call outputs back to the OpenAI API to continue the conversation.
            Returns the stream for the caller to process.
            
            Args:
                tool_outputs: List of tool output dictionaries with call_id and output
                conversation_id: The conversation ID to continue
                token_timeout: Timeout for the stream
                
            Returns:
                AsyncTimedIterable stream or None if no tool outputs
            """
            try:
                if not tool_outputs:
                    print("No tool outputs to submit")
                    return None
                
                print(f"๐Ÿ”„ Submitting {len(tool_outputs)} tool outputs back to OpenAI...")
                
                # Create a new response with the tool outputs as input
                stream = await self.__openai_client.responses.create(
                    model="gpt-4o",
                    input=tool_outputs,  # Submit the tool outputs as input
                    temperature=0.1,
                    conversation=conversation_id,
                    stream=True,
                    tools=self.__custom_tools,
                )
                
                # Return the timed stream for the caller to process
                return AsyncTimedIterable(stream, timeout=token_timeout)
                            
            except Exception as e:
                print(f"Error submitting tool outputs: {e}")
                print(f"Tool outputs were: {tool_outputs}")
                traceback.print_exc()
                return None
    
        async def __process_streaming_events(self, stream, conversation_id: str, token_timeout=10):
            """
            Process streaming events recursively and yield text tokens as an async generator.
            Handles tool calls internally with recursion.
            """
            timed_stream = AsyncTimedIterable(stream, timeout=token_timeout)
            final_tool_calls = {}
    
            async for event in timed_stream:
                # print("event: ")
                # print(event)
                # print()
                response_id = None
    
                if isinstance(event, ResponseCreatedEvent):
                    response_id = event.response.id
    
                if hasattr(event, 'type'):
                    if event.type == "response.output_text.delta":
                        text_content = getattr(event, 'text', None) or getattr(event, 'delta', None)
                        if text_content:
                            yield text_content
    
                    if event.type == "response.code_interpreter_call.in_progress":
                        print("\n๐Ÿ”ง Code interpreter started and in progress")
    
                    if event.type == "response.code_interpreter_call_code.done":
                        code = event.code
                        print("\n๐Ÿ”ง Code interpreter is done with generating code")
    
                    if event.type == "response.code_interpreter_call.interpreting":
                        print("\n๐Ÿ”ง Code interpreter is running the generated code")
    
                    if event.type == "response.code_interpreter_call.completed":
                        print("\n๐Ÿ”ง Code interpreter has executed the generated code")
    
                    if event.type == 'response.output_item.added':
                        final_tool_calls[event.output_index] = event.item
                        print("final tool calls: ", final_tool_calls)
    
                    if event.type == 'response.function_call_arguments.delta':
                        index = event.output_index
    
                        if final_tool_calls[index]:
                            final_tool_calls[index].arguments += event.delta
    
                    if event.type == "response.function_call_arguments.done":
                        print("\n๐Ÿ”ง Function call arguments are done")
                        tool_outputs = await self.__handle_function_calls(tool_calls=final_tool_calls)
                        print("tool outputs: ", tool_outputs)
                        
                        # Submit tool outputs and recursively process the new stream
                        if tool_outputs:
                            tool_output_stream = await self.__submit_tool_outputs(tool_outputs, conversation_id, token_timeout)
                            if tool_output_stream:
                                print("\n๐Ÿ”„ Processing tool output stream...")
                                # Recursively yield tokens from the tool output stream
                                async for token in self.__process_streaming_events(tool_output_stream, conversation_id, token_timeout):
                                    yield token
    
                    elif event.type == "response.completed":
                        print("\nโœ… Streaming completed")
                        print()
    
        async def get_conversation(self, conversation_id: str) -> Conversation:
            conversation = await self.__async_conversation.retrieve(conversation_id=conversation_id)
            if conversation.id:
                return conversation
            return None
    
        async def do_streaming_conversation(self, conversation: Conversation, user_query: str, token_timeout=10):
            input_messages = [{"role": "user", "content": user_query}]
    
            stream = await self.__openai_client.responses.create(
                model="gpt-4o",
                input=input_messages,
                temperature=0.1,
                conversation=conversation.id,
                stream=True,
                tools=self.__custom_tools,
            )
    
            conversation_id = conversation.id
            
            # Process the stream and print all tokens centrally here
            async for text_content in self.__process_streaming_events(stream, conversation_id, token_timeout):
                if text_content:
                    print(text_content, end="", flush=True)
    
        async def chat(self, token_timeout=30):
            """
            Interactive chat method that accepts user queries from keyboard.
            Handles graceful termination and signal handling.
            """
            conversation = None
            
            # Setup signal handlers for graceful shutdown
            def signal_handler(signum, frame):
                print(f"\n\n๐Ÿ›‘ Received signal {signum}. Shutting down gracefully...")
                if conversation:
                    # Run cleanup in the current event loop
                    loop = asyncio.get_event_loop()
                    if loop.is_running():
                        # Schedule cleanup as a task
                        asyncio.create_task(self.delete_conversation(conversation))
                    else:
                        # If loop is not running, use run_until_complete
                        loop.run_until_complete(self.delete_conversation(conversation))
                sys.exit(0)
            
            # Register signal handlers
            signal.signal(signal.SIGINT, signal_handler)   # Ctrl+C
            signal.signal(signal.SIGTERM, signal_handler)  # Termination signal
            signal.signal(signal.SIGQUIT, signal_handler)  # Quit signal
            
            try:
                # Initialize conversation
                print("๐Ÿš€ Initializing conversation...")
                conversation = await self.initiate_streaming_conversation()
                print(f"โœ… Conversation created: {conversation.id}")
                print()
                
                # Print welcome message and instructions
                print("=" * 60)
                print("๐Ÿค– Welcome to OpenAI Assistant Chat!")
                print("=" * 60)
                print("Commands:")
                print("  โ€ข Type your message and press Enter")
                print("  โ€ข Type 'exit', 'quit', or 'bye' to end the chat")
                print("  โ€ข Press Ctrl+C to force quit")
                print("=" * 60)
                print()
                
                while True:
                    try:
                        # Get user input
                        user_input = input("You: ").strip()
                        
                        # Check for exit commands
                        if user_input.lower() in ['exit', 'quit', 'bye', 'q']:
                            print("\n๐Ÿ‘‹ Goodbye! Ending conversation...")
                            break
                        
                        # Skip empty inputs
                        if not user_input:
                            continue
                        
                        # Process the user query
                        print("\nAssistant: ", end="", flush=True)
                        await self.do_streaming_conversation(
                            conversation=conversation,
                            user_query=user_input,
                            token_timeout=token_timeout
                        )
                        print("*" * 100)
                        print("\n")  # Add newline after response
                        
                    except KeyboardInterrupt:
                        print("\n\n๐Ÿ›‘ Ctrl+C pressed. Shutting down gracefully...")
                        break
                        
                    except EOFError:
                        print("\n\n๐Ÿ›‘ EOF received. Shutting down gracefully...")
                        break
                        
                    except Exception as e:
                        print(f"\nโŒ Error during conversation: {e}")
                        print("Traceback:", traceback.format_exc())
                        print("Continuing chat...\n")
                        
            except Exception as e:
                print(f"โŒ Error initializing conversation: {e}")
                print("Traceback:", traceback.format_exc())
                
            finally:
                # Cleanup conversation
                if conversation:
                    try:
                        print("๐Ÿงน Cleaning up conversation...")
                        await self.delete_conversation(conversation)
                        print("โœ… Conversation deleted successfully")
                    except Exception as e:
                        print(f"โŒ Error deleting conversation: {e}")
                
                print("๐Ÿ‘‹ Chat session ended.")
    
    
    if __name__ == '__main__':
    
        mode = "chat"
        token_timeout = 30
        loop = asyncio.get_event_loop()
        assistant = WebSearchAssistant()
        
        if mode == 'chat':
            # Interactive chat mode
            print("๐Ÿš€ Starting interactive chat mode...")
            try:
                loop.run_until_complete(assistant.chat(token_timeout=token_timeout))
            except Exception:
                print("โŒ Error in chat mode:", traceback.format_exc())
        
        else:
            # Test mode with predefined query
            print("๐Ÿงช Running in test mode...")
            conversation = loop.run_until_complete(assistant.initiate_streaming_conversation())
            # user_query = "search the web for what is periodontitis and what is necrosis. use two separate parallel web search calls. don't search them together"
            user_query = "write a code to reverse the string periodontitis. use the python/code interpreter tool to write and execute the code"
    
            try:
                conversation = loop.run_until_complete(assistant.get_conversation(conversation_id=conversation.id))
                loop.run_until_complete(assistant.do_streaming_conversation(conversation=conversation,
                                                                            user_query=user_query,
                                                                            token_timeout=token_timeout))
            except Exception:
                print("error: ", traceback.format_exc())
                print()
    
            finally:
                loop.run_until_complete(assistant.delete_conversation(conversation=conversation))
    
    
    
    
import asyncio


class AsyncTimedIterable:
    def __init__(self, iterable, timeout=0, cancel_token=None):
        class AsyncTimedIterator:
            def __init__(self):
                self.__cancel_token = cancel_token
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    if self.__cancel_token and hasattr(self.__cancel_token, "is_cancelled"):
                        if self.__cancel_token.is_cancelled:
                            raise StopAsyncIteration
                    result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
                    # print(f"โœ… AsyncTimedIterable: Got event: {type(result)}")
                    # if you want to stop the iteration just raise StopAsyncIteration using some conditions
                    # (when the last chunk arrives, for example)
                    if not result:
                        raise StopAsyncIteration
                    return result
                except asyncio.TimeoutError as e:
                    print(f"โฐ AsyncTimedIterable: Timeout after {int(timeout)}s waiting for next event")
                    raise e
                except StopAsyncIteration:
                    print(f"๐Ÿ AsyncTimedIterable: Stream ended naturally")
                    raise

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()
from types import NoneType
from .BaseTool import BaseTool
from typing import Type, get_origin, get_args, Union, Any


def infer_field_type(field_type: Any) -> str:
    """
    Infers the JSON field type from a Python type.

    :param field_type: The Python type to infer from.
    :return: A string representing the inferred JSON field type.
    """
    if get_origin(field_type) is Union:
        # Handling Optional fields, extract the actual type without NoneType
        non_none_types = [t for t in get_args(field_type) if t is not type(None)]
        if non_none_types:
            return infer_field_type(non_none_types[0])
        else:
            return 'null'

    type_mappings = {
        str: 'string',
        int: 'integer',
        float: 'float',
        bool: 'boolean'
        # Add more mappings as necessary
    }

    return type_mappings.get(field_type, 'string')  # Default to 'string'


def generate_openai_function_spec(tool_class: Type[BaseTool]) -> dict:
    """
    Generates a dictionary in the OpenAI API function calling format for a given tool class.

    :param tool_class: The class for which to generate the function specification.
    :return: A dictionary representing the function specification in the OpenAI API format.
    """

    # tool_instance = tool_class()

    # Extracting class attributes
    function_name = tool_class.get_name()
    description = tool_class.get_description()
    args_schema = tool_class.get_args_schema()

    # Constructing parameter specification from Pydantic model
    properties = {}
    required_fields = []

    for field_name, field_model in args_schema.__annotations__.items():
        # print("field_name: ", field_name)
        # print("field_model: ", field_model)

        field_info = args_schema.model_fields.get(field_name)
        field_description = field_info.description or ''
        field_type = infer_field_type(field_model)
        # field_type = 'string'  # Defaulting to string, can be enhanced to support more types

        properties[field_name] = {"type": field_type, "description": field_description}

        # Handling Enums specifically
        if hasattr(field_model, '__members__'):
            properties[field_name]['enum'] = [e.value for e in field_model]

        # # Determining if the field is required
        # if str(field_model).find("Optional") <= 0 and not str(field_model).startswith("Optional") and not str(
        #         field_model).startswith("typing.Optional"):
        #     required_fields.append(field_name)

        # Check if the field is optional
        # print("get_args(field_model): ", get_args(field_model))
        # print()
        if get_origin(field_model) is Union and NoneType in get_args(field_model):
            # Field is optional, do not add to required fields
            pass
        else:
            required_fields.append(field_name)

    # function_spec = {
    #     "type": "function",
    #     "function": {
    #         "name": function_name,
    #         "description": description,
    #         "strict": True,  # Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
    #         "parameters": {
    #             "type": "object",
    #             "properties": properties,
    #             "required": required_fields,
    #             "additionalProperties": False
    #         }
    #     }
    # }

    function_spec = {
        "type": "function",
        "name": function_name,
        "description": description,
        "strict": True,
        # Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
        "parameters": {
            "type": "object",
            "properties": properties,
            "required": required_fields,
            "additionalProperties": False

        }
    }

    return function_spec

from pydantic import BaseModel
from abc import ABC, abstractmethod


class BaseTool(ABC, BaseModel):
    name: str
    description: str

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        if not hasattr(cls, 'name') or not hasattr(cls, 'description'):
            raise TypeError("Subclasses must define 'name' and 'description'")

    class Config:
        """Configuration for this pydantic object."""

        extra = "allow"
        arbitrary_types_allowed = True

    @staticmethod
    @abstractmethod
    def get_name():
        raise NotImplementedError

    @staticmethod
    @abstractmethod
    def get_description():
        raise NotImplementedError

    @staticmethod
    @abstractmethod
    def get_args_schema():
        raise NotImplementedError

    @abstractmethod
    def run(self, *args, **kwargs) -> str:
        raise NotImplementedError

    async def arun(self, *args, **kwargs) -> str:
        raise NotImplementedError

import asyncio


from openai import AsyncOpenAI
from typing import Type
from pydantic import BaseModel, Field
from .BaseTool import BaseTool


class WebSearchInput(BaseModel):
    search_query: str = Field(..., description="The search query to browse the internet")


class OpenAIWebSearchTool(BaseTool):
    name: str = "openai_web_search"
    description: str = """
    Use this tool to search the internet for real-time information and current events. 
    Best used for:
    - Recent news and events
    - Current facts and information
    - Up-to-date data and statistics
    - Latest developments on any topic
    
    Do NOT use this tool for:
    - Personal or private information
    
    Input should be a clear, specific search query.
    """

    args_schema: Type[BaseModel] = WebSearchInput

    def __init__(self, logger=None):
        super().__init__()
        self.__logger = logger
        self.__api_key = ""
        self.__client = AsyncOpenAI(api_key=self.__api_key)

    @staticmethod
    def get_name():
        return "openai_web_search"

    @staticmethod
    def get_description():
        return """
    Use this tool to search the internet for real-time information and current events. 
    Best used for:
    - Recent news and events
    - Current facts and information
    - Up-to-date data and statistics
    - Latest developments on any topic
    
    Do NOT use this tool for:
    - Personal or private information
    
    Input should be a clear, specific search query.
    """

    @staticmethod
    def get_args_schema():
        return WebSearchInput

    async def arun(self, *, search_query: str, **kwargs) -> str:
        """Use the tool asynchronously."""

        search_context_size = kwargs.get("search_context_size", "medium")
        response = await self.__client.responses.create(
            input=search_query,
            model="gpt-4o",
            temperature=0,
            tools=[{"type": "web_search_preview", "search_context_size": search_context_size, }],
            tool_choice={"type": "web_search"}
        )
        return response.output_text

    def run(self, search_query: str) -> str:
        raise NotImplementedError


if __name__ == '__main__':
    async def main():
        tool = OpenAIWebSearchTool()
        result = await tool.arun("CVE-2024-56180 affects linux-image-6.11.0-19-generic i want to know abt this cve")
        print(result)


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())