How to stream with the Assistant API and a Flask endpoint?

I’m trying to build a flask endpoint for use with a web app:

# these work, not including their code for brevity
from app.assistant import assistant
from app.assistant.event_handler import EventHandler
client = assistant.create_client() # could this cause a scaling issue?

bp = Blueprint('api', __name__, url_prefix='/api')

@bp.route('/chat_stream', methods=['POST'])
def chat_stream():
   content_type = request.headers.get('Content-Type')
   if (content_type != 'application/json'):
      return "Content type is not supported."
      
   data : dict = request.get_json()
   thread_id = data.get('thread_id')
   user_input = data.get('message')

   handler = EventHandler()
   # handler.add_mock_data()
   
   # should this be here or inside generate?
   message = client.beta.threads.messages.create(
      thread_id = thread_id,
      role = 'user',
      content = user_input
   )

   def generate():
      try:
         with client.beta.threads.runs.stream(
            thread_id = thread_id,
            assistant_id = assistant_id,
            event_handler = handler
         ) as stream:
            print("Stream started")
            stream.until_done()
            print("Stream ended")

         # nothing ever shows up here
         for events in handler:
            print(f'yielding data: {events}')
            yield events
            
      except Exception as e:
         print(f"Error during streaming: {e}")
         yield f"Error: {e}"

   return Response(generate(), content_type='text/plain')

For my EventHandler, I have the following code:

class EventHandler(AssistantEventHandler):
   def __init__(self):
      super().__init__()
      self.events = []
   
   def on_text_created(self, text) -> None:
      print(f"\nassistant > ", end="", flush=True)
      self.events.append(f"\nassistant > {text}")
   
   def on_text_delta(self, delta, snapshot):
      print(delta.value, end="", flush=True)
      self.events.append(delta.value)
   
   def on_tool_call_created(self, tool_call):
      self.events.append(f"\nassistant > {tool_call.type}\n")
   
   def on_tool_call_delta(self, delta, snapshot):
      if delta.type == 'code_interpreter':
         if delta.code_interpreter.input:
             elf.events.append(delta.code_interpreter.input)
         if delta.code_interpreter.outputs:
            self.events.append("\n\noutput >")
            for output in delta.code_interpreter.outputs:
               if output.type == 'logs':
                  self.events.append(f"\n{output.logs}")
   
   def on_text_done(self, text):
      print(f'\nComplete > {text}')
      self.events.append(f"\nComplete > {text}")
   
   def __iter__(self):
      print(f"__iter__ called with events: {self.events}")
      return iter(self.events)

   def add_mock_data(self):
      # Add mock data for testing purposes
      self.events.extend([
         "Mock data 1",
         "Mock data 2",
         "Mock data 3"
      ])

The mock data works, but none of the other events seem to fire. I get nothing in console or over the network.

However, if I use this simpler handler I can at least get console output, but there’s nothing being saved to send over the network:

class EventHandlerSimple(AssistantEventHandler):
   def on_text_created(self, text) -> None:
      print(f"\nassistant > ", end="", flush=True)

   def on_text_delta(self, delta, snapshot):
      print(delta.value, end="", flush=True)

   def on_tool_call_created(self, tool_call):
      print(f"\nassistant > {tool_call.type}\n", flush=True)

   def on_tool_call_delta(self, delta, snapshot):
      if delta.type == 'code_interpreter':
         if delta.code_interpreter.input:
            print(delta.code_interpreter.input, end="", flush=True)
         if delta.code_interpreter.outputs:
            print(f"\n\noutput >", flush=True)
            for output in delta.code_interpreter.outputs:
               if output.type == 'logs':
                  print(f"\n{output.logs}", flush=True)

   def on_text_done(self, text):
      print(f"\nComplete > {text}", end="", flush=True)

I must be breaking something with the __iter__ or __init__ definitions. I’ve also seen some other people’s code in which they use the @override decorator on some events, but I have no idea why.

Am I missing something minor with the EventHandler class, or is my whole pattern bad? I’m new to web programming, and am open to alternative patterns for providing streaming answers to a web app.

1 Like

class EventHandler(AssistantEventHandler):
@override
def on_text_created(self, text) → None:
print(f"\nassistant > “, end=”", flush=True)

@override
def on_text_delta(self, delta, snapshot):
    print(delta.value, end="", flush=True)
    
def on_tool_call_created(self, tool_call):
    print(f"\nassistant > {tool_call.type}\n", flush=True)

def on_tool_call_delta(self, delta, snapshot):
    if delta.type == 'code_interpreter':
        if delta.code_interpreter.input:
            print(delta.code_interpreter.input, end="", flush=True)
        if delta.code_interpreter.outputs:
            print(f"\n\noutput >", flush=True)
            for output in delta.code_interpreter.outputs:
                if output.type == "logs":
                    print(f"\n{output.logs}", flush=True)

This is all you should need for your Flask application. Your other methods may be extraneous.

    run = None

    with self.client.beta.threads.runs.stream(
        thread_id=thread_id,
        assistant_id=assistant_id,
        event_handler=EventHandler(),
    ) as stream:
        stream.until_done()
        run = stream.get_final_run()

The @override decorator was added in Python 3.12, which basically warns you if the name of the function you’re trying to overload changes. I’m on 3.11 so I can’t make use of it, but it doesn’t matter since I have the name right for this version of the API at least.

This is fine for printing to console (so was my original code), but it isn’t going to send anything to the socket.

I don’t know what you are suggesting… I have a fully functioning Flask app that leverages these functions and receives responses from the APIs as Messages… I’m trying to help you but I do not know what you are needing I guess? What does a socket have anything to do with the issue? I run the exact same code in both Python 3.11 and 3.12 and both work, so I don’t agree with your statement.

What is this supposed to be showing?

Just trying to demonstrate for you that Python 3.11 generates message content and responses with the provided code using a flask app endpoint to trigger the API response, which can be extracted and parsed with HTML and JS on the front end. What can I do to help you further? I’m trying to be of assistance, no pun intended…