Streaming using Structured Outputs

Is it possible to stream using structured outputs?

Say I define, using Pydantic or an equivalent, a structure that looks like this:

Person = {
    "name": <string>,
    "age": <number>,
    "profession": <string>
}

And I give GPT a prompt like “The user input will be a story. Read the story and identify all of the characters identified in the story.”

And I want GPT to return an array of Person objects.

Rather than return them all at once as an array, is it possible to have GPT send me each person as it identifies them, one by one? This would be very helpful as it would allow me to reduce the time to get the first piece of information back.

it is possible. but you need to have a code to parse the chunks. i have not used the helper libraries so i do not know if they are equipped to get you values even when the result is not completed yet. but basically that is what you need to do on your end.

2 Likes

Thanks @supershaneski

Please @expertise.ai.chat check it out the @katiagg tutorial at Introduction to Structured Outputs | OpenAI Cookbook

strict: true rocks

1 Like

Hey @allyssonallan , I’ve just gone through the tutorial you linked. There were some good examples in there, but nothing about streaming. Did I miss something in there?

1 Like

Hi @expertise.ai.chat , I managed to find a workaround by creating a wrapper for the Pydantic base class and process the json schema the same way the streaming beta api is doing.

from pydantic.json_schema import (
    DEFAULT_REF_TEMPLATE,
    GenerateJsonSchema,
    JsonSchemaMode,
    model_json_schema
)
from typing import Any
from pydantic import BaseModel, Field
from openai.lib._pydantic import _ensure_strict_json_schema


class BaseModelOpenAI(BaseModel):
    @classmethod
    def model_json_schema(
        cls,
        by_alias: bool = True,
        ref_template: str = DEFAULT_REF_TEMPLATE,
        schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema,
        mode: JsonSchemaMode = 'serialization'
    ) -> dict[str, Any]:
        json_schema = model_json_schema(
            cls,
            by_alias=by_alias,
            ref_template=ref_template,
            schema_generator=schema_generator,
            mode=mode
        )
        return _ensure_strict_json_schema(json_schema, path=(), root=json_schema)

Your classes should inherit from BaseModelOpenAI and then you need to pass the response format as follows:

{
        "type": "json_schema",
        "json_schema": {
        "name": response_class.__name__,
        "schema": response_class.model_json_schema(),
        "strict": True
}

Then you can to use the standard client.chat.competions.create to send your request and get a streaming response.

2 Likes

Wow @andreasantoro.pvt! This is really cool. It is streaming the json response.

Do you happen to know, though, is there a way to get it to stream one key and value at a time rather than word by word?

So for example, if I should get back

{
    key1: val1,
    key2: val2,
    key3: val3
}

I’ll get back

{key1: val1}
{key2: val2}
{key3: val3}

or something similar instead of

{
key1
:
val1
,
key2
:
val2
,
key3
:
val3
}

Thanks

1 Like

@expertise.ai.chat
I don’t think so since the generation happens token by token. If it’s just a matter of displaying the result to your users, you could accumulate the chunks contents until a new “:” sign (or “}”) has been reached

1 Like

Maybe I’m missing something, but I had no issues streaming output from the structured outputs API in a simple way. The client.beta.chat.completions object has a .stream method seemingly tailor-made for this.

e.g. this function I created works perfectly fine, and extracts the cumulative streamed response, as well as the final token usage

from openai import OpenAI

# Generator
def openai_structured_outputs_stream(**kwargs):
    client = OpenAI()

    with client.beta.chat.completions.stream(**kwargs, stream_options={"include_usage": True}) as stream:
        for chunk in stream:
            if chunk.type == 'chunk':
                latest_snapshot = chunk.to_dict()['snapshot']
                # The first chunk doesn't have the 'parsed' key, so using .get to prevent raising an exception
                latest_parsed = latest_snapshot['choices'][0]['message'].get('parsed', {})
                # Note that usage is not available until the final chunk
                latest_usage  = latest_snapshot.get('usage', {})
                latest_json   = latest_snapshot['choices'][0]['message']['content']

                yield latest_parsed, latest_usage, latest_json

Usage:
So you can stream the output e.g. as a pandas dataframe as below (though it looks ugly, since this example refreshes the entire dataframe every chunk - purely done for illustrative purposes):

from IPython.display import display, clear_output

for parsed_completion, completion_usage, completion_json in openai_structured_outputs_stream(
    model=model_name,
    temperature=temperature,
    messages=messages,
    response_format=YourPydanticModel
):
    clear_output()
    display(pd.DataFrame(parsed_completion))

Notes:
There are three chunk types, one with chunk.type == 'chunk', chunk.type == 'content.delta', and chunk.type == 'content.done' - hence the need for the if statement to only use one of them (they share lots of data). I believe the content.delta type contains the changes between consecutive chunks.

4 Likes

@sebastian.chejniak
Last time i checked, the parse api did not accept the stream parameter so we weren’t able to use the stream function you provided.

Anyway I’m glad they added it, thanks for pointing out

1 Like

Sorry, but would you mind spelling it our for me?

Say I have the following code.

stream = self.client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {
            "role": "system",
            "content": "You are a helpful math tutor. Guide the user through the solution step by step.",
        },
        {"role": "user", "content": "how can I solve 8x + 7 = -23"},
    ],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")

This code streams the response to the provided math question, but it does so in plain text. Say I still want to have that response be streamed, but now in JSON that conforms to MathReasoning defined below. How would I modify this code to achieve that?

class Step(BaseModel):
    explanation: str
    output: str


class MathReasoning(BaseModel):
    steps: list[Step]
    final_answer: str

Thanks

@expertise.ai.chat you would just need to use the model_validate method of the Pydantic class. In your case you would need to do

MathResponse.model_validate(plain_text_response)

And your result will be parsed as a MathResponse object

Option 1:

# pip install git+https://github.com/nicholishen/tooldantic.git

import tooldantic as td

class Step(td.OpenAiResponseFormatBaseModel):
    explanation: str
    output: str

class MathReasoning(td.OpenAiResponseFormatBaseModel):
    steps: list[Step]
    final_answer: str

Option 2:

class OpenAiResponseFormatGenerator(pydantic.json_schema.GenerateJsonSchema):
    # https://docs.pydantic.dev/latest/concepts/json_schema/#customizing-the-json-schema-generation-process
    def generate(self, schema, mode="validation"):
        json_schema = super().generate(schema, mode=mode)
        json_schema = {
            "type": "json_schema",
            "json_schema": {
                "name": json_schema.pop("title"),
                "schema": json_schema,
            }
        }
        return json_schema


class StrictBaseModel(pydantic.BaseModel):
    model_config = {"extra": "forbid"}

    @classmethod
    def model_json_schema(cls, **kwargs):
        return super().model_json_schema(
            schema_generator=OpenAiResponseFormatGenerator, **kwargs
        )


class Step(StrictBaseModel):
    explanation: str
    output: str


class MathReasoning(StrictBaseModel):
    steps: list[Step]
    final_answer: str

Calling the LLM:

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {
            "role": "system",
            "content": "You are a helpful math tutor. Guide the user through the solution step by step.",
        },
        {"role": "user", "content": "how can I solve 8x + 7 = -23"},
    ],
    stream=True,
    response_format=MathReasoning.model_json_schema(),
)
1 Like

The below code works, using my function openai_structured_outputs_stream as defined above. Like I said, it’s not pretty due to the constant refreshing, but it works. You could write code that instead prints consecutive chunks of text instead of printing the entire object repeatedly, but this is unnecessary for my use-case, due to react’s reconciliation behaviour (i.e. react only re-renders the changes in the text, not the entire text every time).

from pydantic import BaseModel
from pydantic.fields import Field
from IPython.display import display, clear_output
import pandas as pd

class Step(BaseModel):
    explanation: str
    output: str


class MathReasoning(BaseModel):
    steps: list[Step]
    final_answer: str

messages=[
    {
        "role": "system",
        "content": "You are a helpful math tutor. Guide the user through the solution step by step.",
    },
    {"role": "user", "content": "how can I solve 8x + 7 = -23"},
]

for parsed_completion, *_ in openai_structured_outputs_stream(
    model='gpt-4o-mini',
    temperature=0,
    messages=messages,
    response_format=MathReasoning
):
    clear_output()
    display(parsed_completion)
1 Like

Does this second approach then need the mandatory extra keys adding? additionalProperties false and all the elements of an array being required?

Yes. That applies to the new structured output mode. Tool calling is a bit more flexible, and does not require those additional properties, but extra validation is required on your end.

Here’s a post with an obsolete-to-me printer class of generator words or chunks to the console, with word wrapping.