Streaming using Structured Outputs

Maybe I’m missing something, but I had no issues streaming output from the structured outputs API in a simple way. The client.beta.chat.completions object has a .stream method seemingly tailor-made for this.

e.g. this function I created works perfectly fine, and extracts the cumulative streamed response, as well as the final token usage

from openai import OpenAI

# Generator
def openai_structured_outputs_stream(**kwargs):
    client = OpenAI()

    with client.beta.chat.completions.stream(**kwargs, stream_options={"include_usage": True}) as stream:
        for chunk in stream:
            if chunk.type == 'chunk':
                latest_snapshot = chunk.to_dict()['snapshot']
                # The first chunk doesn't have the 'parsed' key, so using .get to prevent raising an exception
                latest_parsed = latest_snapshot['choices'][0]['message'].get('parsed', {})
                # Note that usage is not available until the final chunk
                latest_usage  = latest_snapshot.get('usage', {})
                latest_json   = latest_snapshot['choices'][0]['message']['content']

                yield latest_parsed, latest_usage, latest_json

Usage:
So you can stream the output e.g. as a pandas dataframe as below (though it looks ugly, since this example refreshes the entire dataframe every chunk - purely done for illustrative purposes):

from IPython.display import display, clear_output

for parsed_completion, completion_usage, completion_json in openai_structured_outputs_stream(
    model=model_name,
    temperature=temperature,
    messages=messages,
    response_format=YourPydanticModel
):
    clear_output()
    display(pd.DataFrame(parsed_completion))

Notes:
There are three chunk types, one with chunk.type == 'chunk', chunk.type == 'content.delta', and chunk.type == 'content.done' - hence the need for the if statement to only use one of them (they share lots of data). I believe the content.delta type contains the changes between consecutive chunks.

4 Likes