Flask Streaming Examples?

I’m trying to build a flask-interface for an Assistant.

I have the Assistant set-up correctly, with streaming printing to console and working perfectly. However, I am really struggling to make a Flask-interface that streams the output.

Does anyone have any examples of flask-streaming chat-bots that might help me out?

Here is what I have so far. It is working, however, the flask app only updates when the buffer size hits a certain mark (several hundred words) instead of every detla. I’m also not sure how to approach integrating the streaming into a chat-interface that shows back and forth replies.

@main.route('/chat_stream', methods=['POST', 'GET'])
def chat_stream():
    logger.info("Chat route accessed.")

    # Use the chat_session assigned to the current app instance.
    chat_session = current_app.config['chat_session']

    handler = EventHandler()


    user_input = 'please write a 100 word poem'


    message = chat_session.client.beta.threads.messages.create(
        thread_id=chat_session.thread.id,
        role='user',
        content=user_input
    )

    # @stream_with_context
    def generate():
        try:
            with chat_session.client.beta.threads.runs.stream(
                    thread_id=chat_session.thread.id,
                    assistant_id=chat_session.assistant_id,
                    instructions='system_prompt',
                    event_handler=EventHandler(),
            ) as stream:
                for chunk in stream:
                    if type(chunk) == openai.types.beta.assistant_stream_event.ThreadMessageDelta:
                        yield chunk.data.delta.content[0].text.value

        except Exception as e:
            print(f"Error during streaming: {e}")
            yield f"Error: {e}"

    return Response(generate(), content_type='text/plain', headers={"Transfer-Encoding": "chunked"})

This is my generate function (written a while ago and not for assistants"

def generate(user_input):
    client = stream_openai_response(user_input)
    for chunk in client:
        choices = getattr(chunk, 'choices', [])
        if choices and len(choices) > 0:
            choice = choices[0]
            delta = choice.delta
            
            if getattr(delta, 'content', None):
                text = delta.content
                yield f"data: {text}\n\n"
            elif getattr(choice, 'finish_reason', None) == 'stop':
                # Handle the end of a message more explicitly if needed
                logging.info("End of message received.")
                yield "data: \n\n"  # You could modify this as needed to signal end of content more clearly
            else:
                logging.info("Content is missing, None, or empty in delta.")
        else:
            logging.info("No choices found in chunk or choices list is empty.")

The stream function is a little specific to my use-case but hopefully it makes sense

def stream_openai_response(prompt):
    if not prompt:
        logging. Error("Received empty prompt")
        yield "data: Error: Received empty prompt\n\n"
        return

    # Proceed with existing code to call OpenAI API
    stream = openai_client.chat.completions.create(
        model="gpt-4-0125-preview",
        temperature=0.5,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    for event in stream:
        # Log the event to console
        logging.info(f"Streaming event: {event}")
        try:
            # Extract the content from the event
            if 'choices' in event and len(event['choices']) > 0:
                text = event['choices'][0].get('message', {}).get('content', '')
                if text:  # Ensure there is text to send
                    formatted_data = f"data: {text}\n\n"
                    logging.info(f"Formatted for SSE: {formatted_data}")
                    yield formatted_data
                else:
                    logging.info("No text to send, skipping.")
            else:
                logging. Warning(f"Unexpected event format: {event}")
        except Exception as e:
            logging. Error(f"Error while processing stream: {e}")
            yield f"data: Error: {str(e)}\n\n"

I had a custom data format I sent the deltas in as the other side delt with them in a specific way, you could just omit the data: part in the response string and just do a yield text

1 Like

And after assistant use one tool output or multiple tool outputs is still streaming or deliver an empty message?