Retrive last message streamed by GPT Assistant

Dear community,

I first disclose that I am a beginner in coding.

I am facing some difficulties while trying to store into a variable, in Python, the last message streamed by a GPT Assistant in a thread.

Starting from the script below (provided in the OpenAI docs Api Reference) I am able to get a strea response, but how to I assing it to a variable in order to recall it later?


from typing_extensions import override
from openai import AssistantEventHandler
 
# First, we create a EventHandler class to define
# how we want to handle the events in the response stream.
 
class EventHandler(AssistantEventHandler):    
  @override
  def on_text_created(self, text) -> None:
    print(f"\nassistant > ", end="", flush=True)
      
  @override
  def on_text_delta(self, delta, snapshot):
    print(delta.value, end="", flush=True)
      
  def on_tool_call_created(self, tool_call):
    print(f"\nassistant > {tool_call.type}\n", flush=True)
  
  def on_tool_call_delta(self, delta, snapshot):
    if delta.type == 'code_interpreter':
      if delta.code_interpreter.input:
        print(delta.code_interpreter.input, end="", flush=True)
      if delta.code_interpreter.outputs:
        print(f"\n\noutput >", flush=True)
        for output in delta.code_interpreter.outputs:
          if output.type == "logs":
            print(f"\n{output.logs}", flush=True)
 
# Then, we use the `stream` SDK helper 
# with the `EventHandler` class to create the Run 
# and stream the response.

with client.beta.threads.runs.stream(
  thread_id='thread_vO00Obb1UByWlA79y6XrNnWF',
  assistant_id='asst_hGOwD3YbnBrHHpSxB2wGWUUM', 
  #assistant_id=assistant.id,
  #instructions="store the answer in a variable called answer"",
  event_handler=EventHandler(),
) as stream:
  stream.until_done()

For instance the value that I would like to assign to a variable here is the JSON: {“Action”:“f”}

Thank you in advance for your kind support

Here is more documentation about the helpers that are provided with the OpenAI Python library. While a helpful starting point, I’ve often found that writing your own code is better than the time spent learning another’s proprietary convolutions that have no applicability elsewhere.

Therefore - I make an AI learn it. And fix it. And write about it.

Enhancing Your OpenAI Assistant Streaming Script: Before and After Guide

Here’s an easy-to-understand guide that highlights the changes needed to your original script. I show the “Before” and “After” versions of small portions of code (just the modified code snippets from what you had before), along with clear reasoning for each change. This will help you understand how the enhancements allow the script to now collect and retain the full streaming text response from the OpenAI Assistant API.


1. Initialize a Variable to Store the Accumulated Text

Before

In your original EventHandler class, there was no mechanism to store the accumulated text from the streaming response.

class EventHandler(AssistantEventHandler):    
    @override
    def on_text_created(self, text) -> None:
        print(f"\nassistant > ", end="", flush=True)
      
    @override
    def on_text_delta(self, delta, snapshot):
        print(delta.value, end="", flush=True)
      
    # ... other methods ...

After

Introduce an __init__ method to initialize an answer variable that will store the accumulated text.

class EventHandler(AssistantEventHandler):
    def __init__(self):
        super().__init__()
        self.answer = ""
    
    @override
    def on_text_created(self, text: Text) -> None:
        print("\nassistant > ", end="", flush=True)

    @override
    def on_text_delta(self, delta: TextDelta, snapshot: Text):
        self.answer += delta.value
        print(delta.value, end="", flush=True)
    
    # ... other methods ...

Reasoning

  • Purpose: To collect and retain the full text response from the streaming API for later use.
  • Change: Added an __init__ method to initialize self.answer as an empty string.
  • Benefit: Allows the script to accumulate all text fragments received during streaming into a single variable (self.answer), making it accessible after the streaming is complete.

2. Accumulate Text in the on_text_delta Method

Before

The original on_text_delta method simply prints each text fragment without storing it.

@override
def on_text_delta(self, delta, snapshot):
    print(delta.value, end="", flush=True)

After

Modify the on_text_delta method to append each text fragment to self.answer.

@override
def on_text_delta(self, delta: TextDelta, snapshot: Text):
    self.answer += delta.value
    print(delta.value, end="", flush=True)

Reasoning

  • Purpose: To ensure every piece of text received is stored.
  • Change: Added self.answer += delta.value to accumulate the text.
  • Benefit: Enables the script to have the complete response stored in self.answer, which can be used later in your program for further processing or logging.

3. Access the Accumulated Text After Streaming

Before

The original script streams the response and doesn’t retain the full text for later use.

with client.beta.threads.runs.stream(
    thread_id='thread_vO00Obb1UByWlA79y6XrNnWF',
    assistant_id='asst_hGOwD3YbnBrHHpSxB2wGWUUM', 
    event_handler=EventHandler(),
) as stream:
    stream.until_done()

After

After streaming is complete, access the accumulated text from the handler.answer attribute.

# Create an instance of the EventHandler
handler = EventHandler()

# Stream the response from the Assistant
with client.beta.threads.runs.stream(
    thread_id='thread_vO00Obb1UByWlA79y6XrNnWF',
    assistant_id='asst_hGOwD3YbnBrHHpSxB2wGWUUM', 
    event_handler=handler,
) as stream:
    # Start streaming and wait until the stream is done
    stream.until_done()

# After streaming is complete, access the accumulated answer
answer = handler.answer
print("\n\nFull Answer Collected:")
print(answer)

Reasoning

  • Purpose: To utilize the full response after the streaming process has finished.
  • Change: Created an instance of EventHandler named handler and accessed handler.answer after stream.until_done().
  • Benefit: Allows you to use the complete response (answer) elsewhere in your code, such as storing it in a database, displaying it in a UI, or performing further analysis.

4. Complete Enhanced Script for Reference

For clarity, here’s how the complete enhanced script looks with all the modifications integrated:

from typing_extensions import override
from openai import AssistantEventHandler, OpenAI
from openai.types.beta.threads import Text, TextDelta
from openai.types.beta.threads.runs import ToolCall, ToolCallDelta

# Initialize the OpenAI client
client = OpenAI()

# Define the EventHandler class to handle streaming events
class EventHandler(AssistantEventHandler):
    def __init__(self):
        super().__init__()
        self.answer = ""

    @override
    def on_text_created(self, text: Text) -> None:
        """
        Called when a new text message is created.
        """
        print("\nassistant > ", end="", flush=True)

    @override
    def on_text_delta(self, delta: TextDelta, snapshot: Text):
        """
        Called for each text delta (fragment) received in the stream.
        Appends the fragment to the accumulated answer.
        """
        self.answer += delta.value
        print(delta.value, end="", flush=True)

    @override
    def on_tool_call_created(self, tool_call: ToolCall):
        """
        Called when a tool call is created.
        """
        print(f"\nassistant > {tool_call.type}\n", flush=True)

    @override
    def on_tool_call_delta(self, delta: ToolCallDelta, snapshot: ToolCall):
        """
        Called for each tool call delta received in the stream.
        Handles code interpreter inputs and outputs.
        """
        if delta.type == "code_interpreter" and delta.code_interpreter:
            if delta.code_interpreter.input:
                print(delta.code_interpreter.input, end="", flush=True)
            if delta.code_interpreter.outputs:
                print(f"\n\noutput >", flush=True)
                for output in delta.code_interpreter.outputs:
                    if output.type == "logs":
                        print(f"\n{output.logs}", flush=True)

# Create an instance of the EventHandler
handler = EventHandler()

# Stream the response from the Assistant
with client.beta.threads.runs.stream(
    thread_id='thread_vO00Obb1UByWlA79y6XrNnWF',
    assistant_id='asst_hGOwD3YbnBrHHpSxB2wGWUUM', 
    event_handler=handler,
) as stream:
    # Start streaming and wait until the stream is done
    stream.until_done()

# After streaming is complete, access the accumulated answer
answer = handler.answer
print("\n\nFull Answer Collected:")
print(answer)

Key Additions:

  • __init__ Method: Initializes self.answer to store the accumulated text.
  • self.answer += delta.value: Appends each text fragment to self.answer in the on_text_delta method.
  • Accessing handler.answer: Retrieves the full accumulated text after streaming is complete.

5. Validation

Running the actual code (of course on another prepared thread and assistant), gives the real-time streaming production, and then a presentation of the final string.

assistant > Hello! I’m your helpful assistant, here to assist you with a wide range of tasks, from answering questions and solving problems to running Python code and analyzing data. How can I help you today?

Full Answer Collected:
Hello! I’m your helpful assistant, here to assist you with a wide range of tasks, from answering questions and solving problems to running Python code and analyzing data. How can I help you today?

6. Conclusion

By implementing these changes, your script now effectively collects and retains the entire streaming text response from the OpenAI Assistant API.

This enhancement allows you to utilize the full response for various purposes, such as displaying it to users, logging, or further processing within your application, such as parsing a JSON text response that you receive back as a final fulfillment with Python’s JSON library.

2 Likes

Thank you _j for your answer.

It is indeed interesting and a good starting point to solve my issue.

In fact, applying the code you mentioned, the vairable ‘answer’ is now retaining the first response from the Assistant.

However, if I do further interaction with the assistant, the variable ‘answer’ does not change, ad always keep the same value.

I might be losing myself in something really easy here but after quite some time and multiple tries I have not achieved what I wanted EG: be able to retrive the last answer, after the response from a stream.

Thank you in advance for any support

The `handler’ object in the code is an instance of the EventHandler() class. This is an object that retains properties, performs tasks, and can be reused in code, if you don’t use it solely within a function that returns a value and then stops existing.

You will see that when a new instance is created, the init section defines an empty self.answer instance attribute as an empty string, self. being how code within the class refers to its own varibles and constants and internal methods.

When outside the class, you can use the instance attribute .answer like a variable, while at the same time, methods inside the class can modify the attribute value when used. But if you don’t know the why, best that you just not reuse the handler.

Here’s example usage that more explicitly shows the demonstration class (given before) then being used in a called function, so the instance of the class doesn’t persist and the answer doesn’t get appended to further by misunderstanding.

# the imports and the class you got before

def send_message_to_thread(thread_id, message):
    # Placeholder for sending a message to the thread
    # Replace with actual API call to send the message
    pass

def get_ai_response(thread_id, assistant_id):
    handler = EventHandler()
    with client.beta.threads.runs.stream(
        thread_id=thread_id,
        assistant_id=assistant_id, 
        event_handler=handler,
    ) as stream:
        stream.until_done()
    return handler.answer

def main():
    thread_id = 'thread_vO00Obb1UByWlA79y6XrNnWF'
    assistant_id = 'asst_hGOwD3YbnBrHHpSxB2wGWUUM'

    # First interaction
    message1 = "Hello, AI!"
    send_message_to_thread(thread_id, message1)
    answer1 = get_ai_response(thread_id, assistant_id)
    print("\n\nFull Answer Collected:")
    print(answer1)

    # Second interaction
    message2 = "Can you tell me a joke?"
    send_message_to_thread(thread_id, message2)
    answer2 = get_ai_response(thread_id, assistant_id)
    print("\n\nFull Answer Collected:")
    print(answer2)

if __name__ == "__main__":
    main()

This should clarify misunderstandings, such as

  • Reusing the same EventHandler instance
  • Not resetting self.answer in the handler object
  • Not actually sending another user message to the thread first, while the run continues to add assistant messages (AI confused why it has to answer again).
  • Asking for programming help (without showing what you did), instead of asking for help with opaque API library methods themselves.
1 Like