There are not many examples out there but curious if anyone has any luck with using the Assistants API (beta) in a async manner to push the stream to a Front End.
My applications is in python and using FastAPI as the BE Server.
Responses are taking a bit to send in full back to the user and my hope is with streaming the user will atleast start getting the response much quicker.
I have NOT used async AssistantApi. Primarily because i am using Flask and while i can create a thread on which to run Aysncio, that thread does not share context with the main Flask (& too lazy to figure out how).
That said, in context of Flask, i have had to do eventlet.sleep(0) in the ā for event in streamāā¦ymmv
For FastAPI, I ever implement streaming with chat completion successfully.
I think assistant should work as well, but one thing need to check: Do you enable gzip middleware or anything similiar?
Just remove gzip middleware make my streaming to work. FYI.
@brandonareid2
Got it to work with my Telegram messagehandler. Telegram user is able to get questions asked about the files attached to the assistant. I tried to incorporate functions. However I could not figure out how to get it to work. I got as far as to add on_event. While I used api to send the output via assistant api and examine the stream response, I could see the message created by ChatGPT but not stream it out to Telegram. Will you be working to enhance your solution support functions?
Currently, I am calling the submit_tool_outputs API with stream=True via a requests.post and parse the string to find āthread.message.completedā for the associated data and issuing self.queue.put_nowait(data[ācontentā][0][ātextā][āvalueā])
Some medium articles cover the topic but i found them to be too complex for what i need, you can search Meduim and take a look to check if the solutions there better fit your use case.
After looking around a bit though, i found a simpler way to achieve streaming using SSE with FastAPI with the following example snippet:
async def stream_assistant_response(thread_id, assistant_id):
client = AsyncOpenAI()
stream = client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
)
async with stream as stream:
async for text in stream.text_deltas:
yield f"data: {text}\n\n"
Make sure you also set the media_type correctly in the API response logic:
this code works fine for me when i test the endpoint on postman, i am able to see every event with itās corresponding text_delta.
not sure if there are underlying issues with the logic that i canāt catch atm.
you can find the references that were helpful for me in the helpers.md file in OpenAIās python API library repo, and in the issue #1473 on the repo as well.
Hope that helps!