Maybe I’m missing something, but I had no issues streaming output from the structured outputs API in a simple way. The client.beta.chat.completions
object has a .stream
method seemingly tailor-made for this.
e.g. this function I created works perfectly fine, and extracts the cumulative streamed response, as well as the final token usage
from openai import OpenAI
# Generator
def openai_structured_outputs_stream(**kwargs):
client = OpenAI()
with client.beta.chat.completions.stream(**kwargs, stream_options={"include_usage": True}) as stream:
for chunk in stream:
if chunk.type == 'chunk':
latest_snapshot = chunk.to_dict()['snapshot']
# The first chunk doesn't have the 'parsed' key, so using .get to prevent raising an exception
latest_parsed = latest_snapshot['choices'][0]['message'].get('parsed', {})
# Note that usage is not available until the final chunk
latest_usage = latest_snapshot.get('usage', {})
latest_json = latest_snapshot['choices'][0]['message']['content']
yield latest_parsed, latest_usage, latest_json
Usage:
So you can stream the output e.g. as a pandas dataframe as below (though it looks ugly, since this example refreshes the entire dataframe every chunk - purely done for illustrative purposes):
from IPython.display import display, clear_output
for parsed_completion, completion_usage, completion_json in openai_structured_outputs_stream(
model=model_name,
temperature=temperature,
messages=messages,
response_format=YourPydanticModel
):
clear_output()
display(pd.DataFrame(parsed_completion))
Notes:
There are three chunk types, one with chunk.type == 'chunk'
, chunk.type == 'content.delta'
, and chunk.type == 'content.done'
- hence the need for the if
statement to only use one of them (they share lots of data). I believe the content.delta
type contains the changes between consecutive chunks.