How to use the async version of the Streaming API

I spent some time creating a sample of how to use async version of the steaming API. The general idea is the same as the sync API, however, the exact imports can be a bit tricky.

There are two versions:

  1. Streaming iterator version
import os
from openai import AsyncOpenAI

# OpenAI API settings
OPENAI_API_KEY = os.environ['OPEN_AI_KEY']
ASSISTANT_ID = os.environ['ASSISTANT_ID']
# Predefined message
PREDEFINED_MESSAGE = "Explain the second law of thermodynamics"

# Initialize the async OpenAI client
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)

async def main():
    # Create a new OpenAI thread
    thread = await openai_client.beta.threads.create()
    oa_thread_id = thread.id
    # Prepare and send the message to OpenAI
    await openai_client.beta.threads.messages.create(
        thread_id=oa_thread_id,
        role="user",
        content=PREDEFINED_MESSAGE
    )
    print(f"Message sent to OpenAI: {PREDEFINED_MESSAGE}")
    # Stream the response
    async with openai_client.beta.threads.runs.stream(
            thread_id=oa_thread_id,
            assistant_id=ASSISTANT_ID) as stream:
        async for response in stream:
            print(response)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

  1. Event Handler version
import os
from openai import AsyncOpenAI, OpenAIError
from openai.lib.streaming import AsyncAssistantEventHandler
from openai.types.beta.threads.message import Message

# OpenAI API settings
OPENAI_API_KEY = os.environ['OPEN_AI_KEY']
ASSISTANT_ID = os.environ['ASSISTANT_ID']

# Predefined message
PREDEFINED_MESSAGE = "Explain the second law of thermodynamics"

class CustomEventHandler(AsyncAssistantEventHandler):

    async def on_text_created(self, text) -> None:
        print(f"\nassistant > ", end="", flush=True)

    async def on_text_delta(self, delta, snapshot):
        print(delta.value, end="", flush=True)

    async def on_message_done(self, message: Message) -> None:
        """Callback that is fired when a message is completed"""
        # We keep this empty as in the original version,
        # but with logging replaced by print, it would be:
        # print(f"Message done: {message}")

# Initialize the async OpenAI client
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)

async def main():
    try:
        # Create a new OpenAI thread
        thread = await openai_client.beta.threads.create()
        oa_thread_id = thread.id

        # Prepare and send the message to OpenAI
        await openai_client.beta.threads.messages.create(
            thread_id=oa_thread_id,
            role="user",
            content=PREDEFINED_MESSAGE
        )
        print(f"Message sent to OpenAI: {PREDEFINED_MESSAGE}")

        # Initialize event handler
        event_handler = CustomEventHandler()

        # Stream the response using the event handler
        async with openai_client.beta.threads.runs.stream(
                thread_id=oa_thread_id,
                assistant_id=ASSISTANT_ID,
                event_handler=event_handler) as stream:
            await stream.until_done()

    except OpenAIError as e:
        print(f"OpenAI API error: {e}")
    except Exception as e:
        print(f"Unexpected error in OpenAI interaction: {e}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
2 Likes