This is my code to retrieve stream response from OpenAI’s model which is event based. (I have shown only core part)
client = OpenAI(api_key=OPEN_AI_API_KEY)
class EventHandler(AssistantEventHandler):
def on_text_delta(self, delta: TextDelta, snapshot: Text):
print(delta.value)
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=EventHandler()
) as stream:
stream.until_done()
on_text_delta event triggers as tokens arrives from API. I want to forward this response using FastAPI instead of printing on output screen.
@app.get("/stream")
async def stream():
return ...something...
I have tried responding result as part of HTTP body:
from fastapi.responses import StreamingResponse
...
@app.post("/stream")
async def stream():
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=EventHandler()
) as stream:
stream.until_done()
return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")
I have created generator_function
inside EventHandler
class but problem is until stream is not over the execution doesn’t reach return statement.
I have also tried websockets, but still problem is how should my program execution should flow. The stream doesn’t let execution go further until API response is completed.
2 Likes
I found the solution!
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI, AsyncOpenAI
OPEN_AI_API_KEY = 'you_api_key'
async_client = AsyncOpenAI(api_key=OPEN_AI_API_KEY)
client = OpenAI(api_key=OPEN_AI_API_KEY)
app = FastAPI()
async def stream_assistant_response(assistant_id, thread_id):
stream = async_client.beta.threads.runs.stream(
assistant_id=assistant_id,
thread_id=thread_id
)
async with stream as stream:
async for text in stream.text_deltas:
yield f"data: {text}\n\n"
@app.get("/message")
async def add_message(assistant_id, thread_id, message):
# make sure thread exist
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message
)
return StreamingResponse(stream_assistant_response(assistant_id, thread_id), media_type="text/event-stream")
4 Likes
Can you let me know if this stream honored white space and new line as it gets from the stream? For me all stream is displayed into the same line.
@vkoprivica.education
Not sure if I completely understood your problem but white space won’t be a problem in forwarding as it is similar to doing this:
yield f"data: \n\n" # two spaces, two \n
But yes, fowarding \n
is problematic due to Server Sent Events (SSE) format which would result like this:
yield f"data: \n\n\n" # one space, three \n
To handle such situation you can do following:
async def stream_assistant_response(assistant_id, thread_id):
stream = async_client.beta.threads.runs.stream(
assistant_id=assistant_id,
thread_id=thread_id
)
async with stream as stream:
async for text in stream.text_deltas:
formatted_text = text.replace('\n', '\\n')
yield f"data: {formatted_text}\n\n"
Thank you for responding. In my case this actually worked:
FastAPI Implementation
yield f"data: {text.replace('\n', '<br>')}\n\n"
```java script
const outputDiv = document.getElementById('streamOutput');
eventSource.onmessage = (event) => {
outputDiv.innerHTML += event.data;
```html
<pre id="streamOutput"></pre>
1 Like