Here is a sample code example that I have written that uses conversation and response API -
-
import time import traceback import asyncio import json import signal import sys from openai import AsyncOpenAI from openai.types.conversations import Conversation from openai.resources.conversations import AsyncConversations from custom_tools.OpenAIWebSearchTool import OpenAIWebSearchTool from custom_tools.generate_openai_function_schema import generate_openai_function_spec from typing import Optional, List, Dict, Any from AsyncTimedIterable import AsyncTimedIterable from openai.types.responses.response_created_event import ResponseCreatedEvent from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall class WebSearchAssistant: def __init__(self): self.__openai_api_key = "" self.__openai_client = AsyncOpenAI(api_key=self.__openai_api_key) self.__custom_tools = self.__load_tools() self.__system_prompt = "You are a helpful AI agent" self.__async_conversation = AsyncConversations(self.__openai_client) def __load_tools(self): default_tools = [ { "type": "code_interpreter", "container": {"type": "auto"} } ] custom_tools = [OpenAIWebSearchTool] tools_spec = [generate_openai_function_spec(tool_cls) for tool_cls in custom_tools] default_tools.extend(tools_spec) self.__tool_instances = {tool_cls.get_name(): tool_cls() for tool_cls in custom_tools} print(f"loaded tools: {default_tools}") print(f"tool_instances: {self.__tool_instances}") return default_tools async def initiate_streaming_conversation(self) -> Conversation: conversation = await self.__async_conversation.create( items=[ {"type": "message", "role": "system", "content": self.__system_prompt} ], metadata={"topic": "demo"}, timeout=10 ) print("created conversation: ", conversation) return conversation async def delete_conversation(self, conversation: Conversation): deleted = await self.__async_conversation.delete(conversation.id) print("deleted conversation: ", deleted) async def __handle_function_call(self, tool_call: dict, **kwargs): function_name = tool_call["name"] function_args = tool_call["arguments"] print(f"calling function {function_name} with args: {function_args}") function_result = await self.__tool_instances[function_name].arun(**function_args, **kwargs) tool_call["function_result"] = function_result print(f"got result from {function_name}") return tool_call async def __handle_function_calls(self, tool_calls: Dict[int, ResponseFunctionToolCall], **kwargs) -> List[dict]: tool_calls_list = [] for index, tool_call in tool_calls.items(): function_call_id = tool_call.call_id function_id = tool_call.id function_name = tool_call.name function_args = json.loads(tool_call.arguments) tool_calls_list.append( { "type": "function_call", "id": function_id, "call_id": function_call_id, "name": function_name, "arguments": function_args } ) input_messages = [] function_call_results = await asyncio.gather( *(self.__handle_function_call(tool_call, **kwargs) for tool_call in tool_calls_list), return_exceptions=True ) for result in function_call_results: if isinstance(result, Exception) is False: input_messages.append( { "type": "function_call_output", "call_id": result["call_id"], "output": result["function_result"] } ) return input_messages async def __submit_tool_outputs(self, tool_outputs: List[dict], conversation_id: str, token_timeout=10, **kwargs): """ Submit function call outputs back to the OpenAI API to continue the conversation. Returns the stream for the caller to process. Args: tool_outputs: List of tool output dictionaries with call_id and output conversation_id: The conversation ID to continue token_timeout: Timeout for the stream Returns: AsyncTimedIterable stream or None if no tool outputs """ try: if not tool_outputs: print("No tool outputs to submit") return None print(f"๐ Submitting {len(tool_outputs)} tool outputs back to OpenAI...") # Create a new response with the tool outputs as input stream = await self.__openai_client.responses.create( model="gpt-4o", input=tool_outputs, # Submit the tool outputs as input temperature=0.1, conversation=conversation_id, stream=True, tools=self.__custom_tools, ) # Return the timed stream for the caller to process return AsyncTimedIterable(stream, timeout=token_timeout) except Exception as e: print(f"Error submitting tool outputs: {e}") print(f"Tool outputs were: {tool_outputs}") traceback.print_exc() return None async def __process_streaming_events(self, stream, conversation_id: str, token_timeout=10): """ Process streaming events recursively and yield text tokens as an async generator. Handles tool calls internally with recursion. """ timed_stream = AsyncTimedIterable(stream, timeout=token_timeout) final_tool_calls = {} async for event in timed_stream: # print("event: ") # print(event) # print() response_id = None if isinstance(event, ResponseCreatedEvent): response_id = event.response.id if hasattr(event, 'type'): if event.type == "response.output_text.delta": text_content = getattr(event, 'text', None) or getattr(event, 'delta', None) if text_content: yield text_content if event.type == "response.code_interpreter_call.in_progress": print("\n๐ง Code interpreter started and in progress") if event.type == "response.code_interpreter_call_code.done": code = event.code print("\n๐ง Code interpreter is done with generating code") if event.type == "response.code_interpreter_call.interpreting": print("\n๐ง Code interpreter is running the generated code") if event.type == "response.code_interpreter_call.completed": print("\n๐ง Code interpreter has executed the generated code") if event.type == 'response.output_item.added': final_tool_calls[event.output_index] = event.item print("final tool calls: ", final_tool_calls) if event.type == 'response.function_call_arguments.delta': index = event.output_index if final_tool_calls[index]: final_tool_calls[index].arguments += event.delta if event.type == "response.function_call_arguments.done": print("\n๐ง Function call arguments are done") tool_outputs = await self.__handle_function_calls(tool_calls=final_tool_calls) print("tool outputs: ", tool_outputs) # Submit tool outputs and recursively process the new stream if tool_outputs: tool_output_stream = await self.__submit_tool_outputs(tool_outputs, conversation_id, token_timeout) if tool_output_stream: print("\n๐ Processing tool output stream...") # Recursively yield tokens from the tool output stream async for token in self.__process_streaming_events(tool_output_stream, conversation_id, token_timeout): yield token elif event.type == "response.completed": print("\nโ Streaming completed") print() async def get_conversation(self, conversation_id: str) -> Conversation: conversation = await self.__async_conversation.retrieve(conversation_id=conversation_id) if conversation.id: return conversation return None async def do_streaming_conversation(self, conversation: Conversation, user_query: str, token_timeout=10): input_messages = [{"role": "user", "content": user_query}] stream = await self.__openai_client.responses.create( model="gpt-4o", input=input_messages, temperature=0.1, conversation=conversation.id, stream=True, tools=self.__custom_tools, ) conversation_id = conversation.id # Process the stream and print all tokens centrally here async for text_content in self.__process_streaming_events(stream, conversation_id, token_timeout): if text_content: print(text_content, end="", flush=True) async def chat(self, token_timeout=30): """ Interactive chat method that accepts user queries from keyboard. Handles graceful termination and signal handling. """ conversation = None # Setup signal handlers for graceful shutdown def signal_handler(signum, frame): print(f"\n\n๐ Received signal {signum}. Shutting down gracefully...") if conversation: # Run cleanup in the current event loop loop = asyncio.get_event_loop() if loop.is_running(): # Schedule cleanup as a task asyncio.create_task(self.delete_conversation(conversation)) else: # If loop is not running, use run_until_complete loop.run_until_complete(self.delete_conversation(conversation)) sys.exit(0) # Register signal handlers signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # Termination signal signal.signal(signal.SIGQUIT, signal_handler) # Quit signal try: # Initialize conversation print("๐ Initializing conversation...") conversation = await self.initiate_streaming_conversation() print(f"โ Conversation created: {conversation.id}") print() # Print welcome message and instructions print("=" * 60) print("๐ค Welcome to OpenAI Assistant Chat!") print("=" * 60) print("Commands:") print(" โข Type your message and press Enter") print(" โข Type 'exit', 'quit', or 'bye' to end the chat") print(" โข Press Ctrl+C to force quit") print("=" * 60) print() while True: try: # Get user input user_input = input("You: ").strip() # Check for exit commands if user_input.lower() in ['exit', 'quit', 'bye', 'q']: print("\n๐ Goodbye! Ending conversation...") break # Skip empty inputs if not user_input: continue # Process the user query print("\nAssistant: ", end="", flush=True) await self.do_streaming_conversation( conversation=conversation, user_query=user_input, token_timeout=token_timeout ) print("*" * 100) print("\n") # Add newline after response except KeyboardInterrupt: print("\n\n๐ Ctrl+C pressed. Shutting down gracefully...") break except EOFError: print("\n\n๐ EOF received. Shutting down gracefully...") break except Exception as e: print(f"\nโ Error during conversation: {e}") print("Traceback:", traceback.format_exc()) print("Continuing chat...\n") except Exception as e: print(f"โ Error initializing conversation: {e}") print("Traceback:", traceback.format_exc()) finally: # Cleanup conversation if conversation: try: print("๐งน Cleaning up conversation...") await self.delete_conversation(conversation) print("โ Conversation deleted successfully") except Exception as e: print(f"โ Error deleting conversation: {e}") print("๐ Chat session ended.") if __name__ == '__main__': mode = "chat" token_timeout = 30 loop = asyncio.get_event_loop() assistant = WebSearchAssistant() if mode == 'chat': # Interactive chat mode print("๐ Starting interactive chat mode...") try: loop.run_until_complete(assistant.chat(token_timeout=token_timeout)) except Exception: print("โ Error in chat mode:", traceback.format_exc()) else: # Test mode with predefined query print("๐งช Running in test mode...") conversation = loop.run_until_complete(assistant.initiate_streaming_conversation()) # user_query = "search the web for what is periodontitis and what is necrosis. use two separate parallel web search calls. don't search them together" user_query = "write a code to reverse the string periodontitis. use the python/code interpreter tool to write and execute the code" try: conversation = loop.run_until_complete(assistant.get_conversation(conversation_id=conversation.id)) loop.run_until_complete(assistant.do_streaming_conversation(conversation=conversation, user_query=user_query, token_timeout=token_timeout)) except Exception: print("error: ", traceback.format_exc()) print() finally: loop.run_until_complete(assistant.delete_conversation(conversation=conversation))
import asyncio
class AsyncTimedIterable:
def __init__(self, iterable, timeout=0, cancel_token=None):
class AsyncTimedIterator:
def __init__(self):
self.__cancel_token = cancel_token
self._iterator = iterable.__aiter__()
async def __anext__(self):
try:
if self.__cancel_token and hasattr(self.__cancel_token, "is_cancelled"):
if self.__cancel_token.is_cancelled:
raise StopAsyncIteration
result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
# print(f"โ
AsyncTimedIterable: Got event: {type(result)}")
# if you want to stop the iteration just raise StopAsyncIteration using some conditions
# (when the last chunk arrives, for example)
if not result:
raise StopAsyncIteration
return result
except asyncio.TimeoutError as e:
print(f"โฐ AsyncTimedIterable: Timeout after {int(timeout)}s waiting for next event")
raise e
except StopAsyncIteration:
print(f"๐ AsyncTimedIterable: Stream ended naturally")
raise
self._factory = AsyncTimedIterator
def __aiter__(self):
return self._factory()
from types import NoneType
from .BaseTool import BaseTool
from typing import Type, get_origin, get_args, Union, Any
def infer_field_type(field_type: Any) -> str:
"""
Infers the JSON field type from a Python type.
:param field_type: The Python type to infer from.
:return: A string representing the inferred JSON field type.
"""
if get_origin(field_type) is Union:
# Handling Optional fields, extract the actual type without NoneType
non_none_types = [t for t in get_args(field_type) if t is not type(None)]
if non_none_types:
return infer_field_type(non_none_types[0])
else:
return 'null'
type_mappings = {
str: 'string',
int: 'integer',
float: 'float',
bool: 'boolean'
# Add more mappings as necessary
}
return type_mappings.get(field_type, 'string') # Default to 'string'
def generate_openai_function_spec(tool_class: Type[BaseTool]) -> dict:
"""
Generates a dictionary in the OpenAI API function calling format for a given tool class.
:param tool_class: The class for which to generate the function specification.
:return: A dictionary representing the function specification in the OpenAI API format.
"""
# tool_instance = tool_class()
# Extracting class attributes
function_name = tool_class.get_name()
description = tool_class.get_description()
args_schema = tool_class.get_args_schema()
# Constructing parameter specification from Pydantic model
properties = {}
required_fields = []
for field_name, field_model in args_schema.__annotations__.items():
# print("field_name: ", field_name)
# print("field_model: ", field_model)
field_info = args_schema.model_fields.get(field_name)
field_description = field_info.description or ''
field_type = infer_field_type(field_model)
# field_type = 'string' # Defaulting to string, can be enhanced to support more types
properties[field_name] = {"type": field_type, "description": field_description}
# Handling Enums specifically
if hasattr(field_model, '__members__'):
properties[field_name]['enum'] = [e.value for e in field_model]
# # Determining if the field is required
# if str(field_model).find("Optional") <= 0 and not str(field_model).startswith("Optional") and not str(
# field_model).startswith("typing.Optional"):
# required_fields.append(field_name)
# Check if the field is optional
# print("get_args(field_model): ", get_args(field_model))
# print()
if get_origin(field_model) is Union and NoneType in get_args(field_model):
# Field is optional, do not add to required fields
pass
else:
required_fields.append(field_name)
# function_spec = {
# "type": "function",
# "function": {
# "name": function_name,
# "description": description,
# "strict": True, # Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
# "parameters": {
# "type": "object",
# "properties": properties,
# "required": required_fields,
# "additionalProperties": False
# }
# }
# }
function_spec = {
"type": "function",
"name": function_name,
"description": description,
"strict": True,
# Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
"parameters": {
"type": "object",
"properties": properties,
"required": required_fields,
"additionalProperties": False
}
}
return function_spec
from pydantic import BaseModel
from abc import ABC, abstractmethod
class BaseTool(ABC, BaseModel):
name: str
description: str
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not hasattr(cls, 'name') or not hasattr(cls, 'description'):
raise TypeError("Subclasses must define 'name' and 'description'")
class Config:
"""Configuration for this pydantic object."""
extra = "allow"
arbitrary_types_allowed = True
@staticmethod
@abstractmethod
def get_name():
raise NotImplementedError
@staticmethod
@abstractmethod
def get_description():
raise NotImplementedError
@staticmethod
@abstractmethod
def get_args_schema():
raise NotImplementedError
@abstractmethod
def run(self, *args, **kwargs) -> str:
raise NotImplementedError
async def arun(self, *args, **kwargs) -> str:
raise NotImplementedError
import asyncio
from openai import AsyncOpenAI
from typing import Type
from pydantic import BaseModel, Field
from .BaseTool import BaseTool
class WebSearchInput(BaseModel):
search_query: str = Field(..., description="The search query to browse the internet")
class OpenAIWebSearchTool(BaseTool):
name: str = "openai_web_search"
description: str = """
Use this tool to search the internet for real-time information and current events.
Best used for:
- Recent news and events
- Current facts and information
- Up-to-date data and statistics
- Latest developments on any topic
Do NOT use this tool for:
- Personal or private information
Input should be a clear, specific search query.
"""
args_schema: Type[BaseModel] = WebSearchInput
def __init__(self, logger=None):
super().__init__()
self.__logger = logger
self.__api_key = ""
self.__client = AsyncOpenAI(api_key=self.__api_key)
@staticmethod
def get_name():
return "openai_web_search"
@staticmethod
def get_description():
return """
Use this tool to search the internet for real-time information and current events.
Best used for:
- Recent news and events
- Current facts and information
- Up-to-date data and statistics
- Latest developments on any topic
Do NOT use this tool for:
- Personal or private information
Input should be a clear, specific search query.
"""
@staticmethod
def get_args_schema():
return WebSearchInput
async def arun(self, *, search_query: str, **kwargs) -> str:
"""Use the tool asynchronously."""
search_context_size = kwargs.get("search_context_size", "medium")
response = await self.__client.responses.create(
input=search_query,
model="gpt-4o",
temperature=0,
tools=[{"type": "web_search_preview", "search_context_size": search_context_size, }],
tool_choice={"type": "web_search"}
)
return response.output_text
def run(self, search_query: str) -> str:
raise NotImplementedError
if __name__ == '__main__':
async def main():
tool = OpenAIWebSearchTool()
result = await tool.arun("CVE-2024-56180 affects linux-image-6.11.0-19-generic i want to know abt this cve")
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())