This didnt work for me as I am using streaming in an assistant context. Still, the stream return the partial JSON which make it kind of painful to extract the actual content out of it. This is probably not bullet proof, but the quickest and smartest way to get streaming work with assistant returning a JSON.
I probably only works with simple JSON structure with just one level of nesting, like
{"description": "...", "key1": "value1", ...}
You need to make sure, in your assistant’s instruction, to set the priority response, like “description” at the beginning.
The idea is to implement a simple JSON-repair logic, like that - you may add more conditions to even check unclosed lists, I’d just kept it simple here. The goal is to get a valid JSON that you can at least parse for the “important” data you need, which in my case is the description.
if json_buffer.endswith(':'):
dict_response = json.loads(json_buffer + '""}')
if json_buffer.count('"') % 2 != 0:
dict_response = json.loads(json_buffer + '"}')
Then I try to load the JSON, if successfull, I got something to return to the frontend, if not, I just return the last succesful state:
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id="asst_XYZ",
stream=True,
temperature=0
)
message_buffer = ""
json_buffer = ""
for event in run:
event_type = getattr(event, "event", None)
if event_type == "thread.message.created":
pass
elif event_type == "thread.message.in_progress":
pass
elif event_type == "thread.message.delta":
delta_content = event.data.delta.content
for block in delta_content:
if block.type == "text":
message_buffer += block.text.value
# try to repair the partial JSON
if message_buffer.endswith(':'):
json_buffer = message_buffer + '""}'
if message_buffer.count('"') % 2 != 0:
json_buffer = message_buffer +'"}'
try:
# if we can parse the json and get the required key, assign it
description = json.loads(json_buffer).get('description', '')
# for testing purposes only
print(description, end='\n', flush=True)
# and also parse the JSON for the stream response to the frontend
response = json.loads(dict_response)
except json.JSONDecodeError:
pass
yield json_buffer
elif event_type == "thread.message.completed":
print("\n[Message Completed]")
yield message_buffer
else:
pass
Eventually this streams the important “description” back to the frontend, as required and then at some point I get the whole response will all the additional information.