Here is a sample code example that I have written that uses conversation and response API -
-
import time
import traceback
import asyncio
import json
import signal
import sys
from openai import AsyncOpenAI
from openai.types.conversations import Conversation
from openai.resources.conversations import AsyncConversations
from custom_tools.OpenAIWebSearchTool import OpenAIWebSearchTool
from custom_tools.generate_openai_function_schema import generate_openai_function_spec
from typing import Optional, List, Dict, Any
from AsyncTimedIterable import AsyncTimedIterable
from openai.types.responses.response_created_event import ResponseCreatedEvent
from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall
class WebSearchAssistant:
def __init__(self):
self.__openai_api_key = ""
self.__openai_client = AsyncOpenAI(api_key=self.__openai_api_key)
self.__custom_tools = self.__load_tools()
self.__system_prompt = "You are a helpful AI agent"
self.__async_conversation = AsyncConversations(self.__openai_client)
def __load_tools(self):
default_tools = [
{
"type": "code_interpreter",
"container": {"type": "auto"}
}
]
custom_tools = [OpenAIWebSearchTool]
tools_spec = [generate_openai_function_spec(tool_cls) for tool_cls in custom_tools]
default_tools.extend(tools_spec)
self.__tool_instances = {tool_cls.get_name(): tool_cls() for tool_cls in custom_tools}
print(f"loaded tools: {default_tools}")
print(f"tool_instances: {self.__tool_instances}")
return default_tools
async def initiate_streaming_conversation(self) -> Conversation:
conversation = await self.__async_conversation.create(
items=[
{"type": "message", "role": "system", "content": self.__system_prompt}
],
metadata={"topic": "demo"},
timeout=10
)
print("created conversation: ", conversation)
return conversation
async def delete_conversation(self, conversation: Conversation):
deleted = await self.__async_conversation.delete(conversation.id)
print("deleted conversation: ", deleted)
async def __handle_function_call(self, tool_call: dict, **kwargs):
function_name = tool_call["name"]
function_args = tool_call["arguments"]
print(f"calling function {function_name} with args: {function_args}")
function_result = await self.__tool_instances[function_name].arun(**function_args, **kwargs)
tool_call["function_result"] = function_result
print(f"got result from {function_name}")
return tool_call
async def __handle_function_calls(self, tool_calls: Dict[int, ResponseFunctionToolCall], **kwargs) -> List[dict]:
tool_calls_list = []
for index, tool_call in tool_calls.items():
function_call_id = tool_call.call_id
function_id = tool_call.id
function_name = tool_call.name
function_args = json.loads(tool_call.arguments)
tool_calls_list.append(
{
"type": "function_call",
"id": function_id,
"call_id": function_call_id,
"name": function_name,
"arguments": function_args
}
)
input_messages = []
function_call_results = await asyncio.gather(
*(self.__handle_function_call(tool_call, **kwargs) for tool_call in tool_calls_list),
return_exceptions=True
)
for result in function_call_results:
if isinstance(result, Exception) is False:
input_messages.append(
{
"type": "function_call_output",
"call_id": result["call_id"],
"output": result["function_result"]
}
)
return input_messages
async def __submit_tool_outputs(self, tool_outputs: List[dict], conversation_id: str, token_timeout=10, **kwargs):
"""
Submit function call outputs back to the OpenAI API to continue the conversation.
Returns the stream for the caller to process.
Args:
tool_outputs: List of tool output dictionaries with call_id and output
conversation_id: The conversation ID to continue
token_timeout: Timeout for the stream
Returns:
AsyncTimedIterable stream or None if no tool outputs
"""
try:
if not tool_outputs:
print("No tool outputs to submit")
return None
print(f"🔄 Submitting {len(tool_outputs)} tool outputs back to OpenAI...")
# Create a new response with the tool outputs as input
stream = await self.__openai_client.responses.create(
model="gpt-4o",
input=tool_outputs, # Submit the tool outputs as input
temperature=0.1,
conversation=conversation_id,
stream=True,
tools=self.__custom_tools,
)
# Return the timed stream for the caller to process
return AsyncTimedIterable(stream, timeout=token_timeout)
except Exception as e:
print(f"Error submitting tool outputs: {e}")
print(f"Tool outputs were: {tool_outputs}")
traceback.print_exc()
return None
async def __process_streaming_events(self, stream, conversation_id: str, token_timeout=10):
"""
Process streaming events recursively and yield text tokens as an async generator.
Handles tool calls internally with recursion.
"""
timed_stream = AsyncTimedIterable(stream, timeout=token_timeout)
final_tool_calls = {}
async for event in timed_stream:
# print("event: ")
# print(event)
# print()
response_id = None
if isinstance(event, ResponseCreatedEvent):
response_id = event.response.id
if hasattr(event, 'type'):
if event.type == "response.output_text.delta":
text_content = getattr(event, 'text', None) or getattr(event, 'delta', None)
if text_content:
yield text_content
if event.type == "response.code_interpreter_call.in_progress":
print("\n🔧 Code interpreter started and in progress")
if event.type == "response.code_interpreter_call_code.done":
code = event.code
print("\n🔧 Code interpreter is done with generating code")
if event.type == "response.code_interpreter_call.interpreting":
print("\n🔧 Code interpreter is running the generated code")
if event.type == "response.code_interpreter_call.completed":
print("\n🔧 Code interpreter has executed the generated code")
if event.type == 'response.output_item.added':
final_tool_calls[event.output_index] = event.item
print("final tool calls: ", final_tool_calls)
if event.type == 'response.function_call_arguments.delta':
index = event.output_index
if final_tool_calls[index]:
final_tool_calls[index].arguments += event.delta
if event.type == "response.function_call_arguments.done":
print("\n🔧 Function call arguments are done")
tool_outputs = await self.__handle_function_calls(tool_calls=final_tool_calls)
print("tool outputs: ", tool_outputs)
# Submit tool outputs and recursively process the new stream
if tool_outputs:
tool_output_stream = await self.__submit_tool_outputs(tool_outputs, conversation_id, token_timeout)
if tool_output_stream:
print("\n🔄 Processing tool output stream...")
# Recursively yield tokens from the tool output stream
async for token in self.__process_streaming_events(tool_output_stream, conversation_id, token_timeout):
yield token
elif event.type == "response.completed":
print("\n✅ Streaming completed")
print()
async def get_conversation(self, conversation_id: str) -> Conversation:
conversation = await self.__async_conversation.retrieve(conversation_id=conversation_id)
if conversation.id:
return conversation
return None
async def do_streaming_conversation(self, conversation: Conversation, user_query: str, token_timeout=10):
input_messages = [{"role": "user", "content": user_query}]
stream = await self.__openai_client.responses.create(
model="gpt-4o",
input=input_messages,
temperature=0.1,
conversation=conversation.id,
stream=True,
tools=self.__custom_tools,
)
conversation_id = conversation.id
# Process the stream and print all tokens centrally here
async for text_content in self.__process_streaming_events(stream, conversation_id, token_timeout):
if text_content:
print(text_content, end="", flush=True)
async def chat(self, token_timeout=30):
"""
Interactive chat method that accepts user queries from keyboard.
Handles graceful termination and signal handling.
"""
conversation = None
# Setup signal handlers for graceful shutdown
def signal_handler(signum, frame):
print(f"\n\n🛑 Received signal {signum}. Shutting down gracefully...")
if conversation:
# Run cleanup in the current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# Schedule cleanup as a task
asyncio.create_task(self.delete_conversation(conversation))
else:
# If loop is not running, use run_until_complete
loop.run_until_complete(self.delete_conversation(conversation))
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # Termination signal
signal.signal(signal.SIGQUIT, signal_handler) # Quit signal
try:
# Initialize conversation
print("🚀 Initializing conversation...")
conversation = await self.initiate_streaming_conversation()
print(f"✅ Conversation created: {conversation.id}")
print()
# Print welcome message and instructions
print("=" * 60)
print("🤖 Welcome to OpenAI Assistant Chat!")
print("=" * 60)
print("Commands:")
print(" • Type your message and press Enter")
print(" • Type 'exit', 'quit', or 'bye' to end the chat")
print(" • Press Ctrl+C to force quit")
print("=" * 60)
print()
while True:
try:
# Get user input
user_input = input("You: ").strip()
# Check for exit commands
if user_input.lower() in ['exit', 'quit', 'bye', 'q']:
print("\n👋 Goodbye! Ending conversation...")
break
# Skip empty inputs
if not user_input:
continue
# Process the user query
print("\nAssistant: ", end="", flush=True)
await self.do_streaming_conversation(
conversation=conversation,
user_query=user_input,
token_timeout=token_timeout
)
print("*" * 100)
print("\n") # Add newline after response
except KeyboardInterrupt:
print("\n\n🛑 Ctrl+C pressed. Shutting down gracefully...")
break
except EOFError:
print("\n\n🛑 EOF received. Shutting down gracefully...")
break
except Exception as e:
print(f"\n❌ Error during conversation: {e}")
print("Traceback:", traceback.format_exc())
print("Continuing chat...\n")
except Exception as e:
print(f"❌ Error initializing conversation: {e}")
print("Traceback:", traceback.format_exc())
finally:
# Cleanup conversation
if conversation:
try:
print("🧹 Cleaning up conversation...")
await self.delete_conversation(conversation)
print("✅ Conversation deleted successfully")
except Exception as e:
print(f"❌ Error deleting conversation: {e}")
print("👋 Chat session ended.")
if __name__ == '__main__':
mode = "chat"
token_timeout = 30
loop = asyncio.get_event_loop()
assistant = WebSearchAssistant()
if mode == 'chat':
# Interactive chat mode
print("🚀 Starting interactive chat mode...")
try:
loop.run_until_complete(assistant.chat(token_timeout=token_timeout))
except Exception:
print("❌ Error in chat mode:", traceback.format_exc())
else:
# Test mode with predefined query
print("🧪 Running in test mode...")
conversation = loop.run_until_complete(assistant.initiate_streaming_conversation())
# user_query = "search the web for what is periodontitis and what is necrosis. use two separate parallel web search calls. don't search them together"
user_query = "write a code to reverse the string periodontitis. use the python/code interpreter tool to write and execute the code"
try:
conversation = loop.run_until_complete(assistant.get_conversation(conversation_id=conversation.id))
loop.run_until_complete(assistant.do_streaming_conversation(conversation=conversation,
user_query=user_query,
token_timeout=token_timeout))
except Exception:
print("error: ", traceback.format_exc())
print()
finally:
loop.run_until_complete(assistant.delete_conversation(conversation=conversation))
import asyncio
class AsyncTimedIterable:
def __init__(self, iterable, timeout=0, cancel_token=None):
class AsyncTimedIterator:
def __init__(self):
self.__cancel_token = cancel_token
self._iterator = iterable.__aiter__()
async def __anext__(self):
try:
if self.__cancel_token and hasattr(self.__cancel_token, "is_cancelled"):
if self.__cancel_token.is_cancelled:
raise StopAsyncIteration
result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
# print(f"✅ AsyncTimedIterable: Got event: {type(result)}")
# if you want to stop the iteration just raise StopAsyncIteration using some conditions
# (when the last chunk arrives, for example)
if not result:
raise StopAsyncIteration
return result
except asyncio.TimeoutError as e:
print(f"⏰ AsyncTimedIterable: Timeout after {int(timeout)}s waiting for next event")
raise e
except StopAsyncIteration:
print(f"🏁 AsyncTimedIterable: Stream ended naturally")
raise
self._factory = AsyncTimedIterator
def __aiter__(self):
return self._factory()
from types import NoneType
from .BaseTool import BaseTool
from typing import Type, get_origin, get_args, Union, Any
def infer_field_type(field_type: Any) -> str:
"""
Infers the JSON field type from a Python type.
:param field_type: The Python type to infer from.
:return: A string representing the inferred JSON field type.
"""
if get_origin(field_type) is Union:
# Handling Optional fields, extract the actual type without NoneType
non_none_types = [t for t in get_args(field_type) if t is not type(None)]
if non_none_types:
return infer_field_type(non_none_types[0])
else:
return 'null'
type_mappings = {
str: 'string',
int: 'integer',
float: 'float',
bool: 'boolean'
# Add more mappings as necessary
}
return type_mappings.get(field_type, 'string') # Default to 'string'
def generate_openai_function_spec(tool_class: Type[BaseTool]) -> dict:
"""
Generates a dictionary in the OpenAI API function calling format for a given tool class.
:param tool_class: The class for which to generate the function specification.
:return: A dictionary representing the function specification in the OpenAI API format.
"""
# tool_instance = tool_class()
# Extracting class attributes
function_name = tool_class.get_name()
description = tool_class.get_description()
args_schema = tool_class.get_args_schema()
# Constructing parameter specification from Pydantic model
properties = {}
required_fields = []
for field_name, field_model in args_schema.__annotations__.items():
# print("field_name: ", field_name)
# print("field_model: ", field_model)
field_info = args_schema.model_fields.get(field_name)
field_description = field_info.description or ''
field_type = infer_field_type(field_model)
# field_type = 'string' # Defaulting to string, can be enhanced to support more types
properties[field_name] = {"type": field_type, "description": field_description}
# Handling Enums specifically
if hasattr(field_model, '__members__'):
properties[field_name]['enum'] = [e.value for e in field_model]
# # Determining if the field is required
# if str(field_model).find("Optional") <= 0 and not str(field_model).startswith("Optional") and not str(
# field_model).startswith("typing.Optional"):
# required_fields.append(field_name)
# Check if the field is optional
# print("get_args(field_model): ", get_args(field_model))
# print()
if get_origin(field_model) is Union and NoneType in get_args(field_model):
# Field is optional, do not add to required fields
pass
else:
required_fields.append(field_name)
# function_spec = {
# "type": "function",
# "function": {
# "name": function_name,
# "description": description,
# "strict": True, # Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
# "parameters": {
# "type": "object",
# "properties": properties,
# "required": required_fields,
# "additionalProperties": False
# }
# }
# }
function_spec = {
"type": "function",
"name": function_name,
"description": description,
"strict": True,
# Setting strict to true will ensure function calls reliably adhere to the function schema, instead of being best effort. We recommend always enabling strict mode.
"parameters": {
"type": "object",
"properties": properties,
"required": required_fields,
"additionalProperties": False
}
}
return function_spec
from pydantic import BaseModel
from abc import ABC, abstractmethod
class BaseTool(ABC, BaseModel):
name: str
description: str
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not hasattr(cls, 'name') or not hasattr(cls, 'description'):
raise TypeError("Subclasses must define 'name' and 'description'")
class Config:
"""Configuration for this pydantic object."""
extra = "allow"
arbitrary_types_allowed = True
@staticmethod
@abstractmethod
def get_name():
raise NotImplementedError
@staticmethod
@abstractmethod
def get_description():
raise NotImplementedError
@staticmethod
@abstractmethod
def get_args_schema():
raise NotImplementedError
@abstractmethod
def run(self, *args, **kwargs) -> str:
raise NotImplementedError
async def arun(self, *args, **kwargs) -> str:
raise NotImplementedError
import asyncio
from openai import AsyncOpenAI
from typing import Type
from pydantic import BaseModel, Field
from .BaseTool import BaseTool
class WebSearchInput(BaseModel):
search_query: str = Field(..., description="The search query to browse the internet")
class OpenAIWebSearchTool(BaseTool):
name: str = "openai_web_search"
description: str = """
Use this tool to search the internet for real-time information and current events.
Best used for:
- Recent news and events
- Current facts and information
- Up-to-date data and statistics
- Latest developments on any topic
Do NOT use this tool for:
- Personal or private information
Input should be a clear, specific search query.
"""
args_schema: Type[BaseModel] = WebSearchInput
def __init__(self, logger=None):
super().__init__()
self.__logger = logger
self.__api_key = ""
self.__client = AsyncOpenAI(api_key=self.__api_key)
@staticmethod
def get_name():
return "openai_web_search"
@staticmethod
def get_description():
return """
Use this tool to search the internet for real-time information and current events.
Best used for:
- Recent news and events
- Current facts and information
- Up-to-date data and statistics
- Latest developments on any topic
Do NOT use this tool for:
- Personal or private information
Input should be a clear, specific search query.
"""
@staticmethod
def get_args_schema():
return WebSearchInput
async def arun(self, *, search_query: str, **kwargs) -> str:
"""Use the tool asynchronously."""
search_context_size = kwargs.get("search_context_size", "medium")
response = await self.__client.responses.create(
input=search_query,
model="gpt-4o",
temperature=0,
tools=[{"type": "web_search_preview", "search_context_size": search_context_size, }],
tool_choice={"type": "web_search"}
)
return response.output_text
def run(self, search_query: str) -> str:
raise NotImplementedError
if __name__ == '__main__':
async def main():
tool = OpenAIWebSearchTool()
result = await tool.arun("CVE-2024-56180 affects linux-image-6.11.0-19-generic i want to know abt this cve")
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())