I spent some time creating a sample of how to use async version of the steaming API. The general idea is the same as the sync API, however, the exact imports can be a bit tricky.
There are two versions:
- Streaming iterator version
import os
from openai import AsyncOpenAI
# OpenAI API settings
OPENAI_API_KEY = os.environ['OPEN_AI_KEY']
ASSISTANT_ID = os.environ['ASSISTANT_ID']
# Predefined message
PREDEFINED_MESSAGE = "Explain the second law of thermodynamics"
# Initialize the async OpenAI client
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
async def main():
# Create a new OpenAI thread
thread = await openai_client.beta.threads.create()
oa_thread_id = thread.id
# Prepare and send the message to OpenAI
await openai_client.beta.threads.messages.create(
thread_id=oa_thread_id,
role="user",
content=PREDEFINED_MESSAGE
)
print(f"Message sent to OpenAI: {PREDEFINED_MESSAGE}")
# Stream the response
async with openai_client.beta.threads.runs.stream(
thread_id=oa_thread_id,
assistant_id=ASSISTANT_ID) as stream:
async for response in stream:
print(response)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
- Event Handler version
import os
from openai import AsyncOpenAI, OpenAIError
from openai.lib.streaming import AsyncAssistantEventHandler
from openai.types.beta.threads.message import Message
# OpenAI API settings
OPENAI_API_KEY = os.environ['OPEN_AI_KEY']
ASSISTANT_ID = os.environ['ASSISTANT_ID']
# Predefined message
PREDEFINED_MESSAGE = "Explain the second law of thermodynamics"
class CustomEventHandler(AsyncAssistantEventHandler):
async def on_text_created(self, text) -> None:
print(f"\nassistant > ", end="", flush=True)
async def on_text_delta(self, delta, snapshot):
print(delta.value, end="", flush=True)
async def on_message_done(self, message: Message) -> None:
"""Callback that is fired when a message is completed"""
# We keep this empty as in the original version,
# but with logging replaced by print, it would be:
# print(f"Message done: {message}")
# Initialize the async OpenAI client
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
async def main():
try:
# Create a new OpenAI thread
thread = await openai_client.beta.threads.create()
oa_thread_id = thread.id
# Prepare and send the message to OpenAI
await openai_client.beta.threads.messages.create(
thread_id=oa_thread_id,
role="user",
content=PREDEFINED_MESSAGE
)
print(f"Message sent to OpenAI: {PREDEFINED_MESSAGE}")
# Initialize event handler
event_handler = CustomEventHandler()
# Stream the response using the event handler
async with openai_client.beta.threads.runs.stream(
thread_id=oa_thread_id,
assistant_id=ASSISTANT_ID,
event_handler=event_handler) as stream:
await stream.until_done()
except OpenAIError as e:
print(f"OpenAI API error: {e}")
except Exception as e:
print(f"Unexpected error in OpenAI interaction: {e}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())