Streamlit Assistant Chat Bot with Function calling

Hello all!

Here is my repository to my Aerospace Certification Chatbot app:
rocketpoweryul/AeroChatBot

The assistant is setup manually at openai, so you’ll need to customize your own with your own prompts and docs.

However the point of sharing this is to show how streamlit can work with streaming openai assistant outputs from message deltas or tool output submissions.

Is there a better way to do this? For example, I didn’t use an event handler, and was mostly inspired by this repo:
gabrielchua/oai-assistant-streaming-st-demo/

… and also the free code camp assistant tutorial, which was great!

Any suggestions to improve this OAI-Streamlit interface would be great!

Thanks
rP

Here is the relevant streamlit code from main.py

# Accept user input
if prompt := st.chat_input("Ask anything about aerospace certification!"):
    # Display user message in chat message container
    with st.chat_message("user"):
        st.markdown(prompt)
    # Add user message to app chat history
    ss.chat_history.append({"role": "user", "content": prompt})
    
    # Send message to chatbot
    ss.agent.add_user_prompt("user", prompt)

    # Display assistant response in chat message container
    with st.chat_message("assistant"):
        # Empty container to display the assistant's reply
        assistant_reply_box = st.empty()

        # Initialize the assistant reply as an empty string
        assistant_reply = ""

        # Stream the assistant's response
        assistant_reply = ss.agent.stream_response(assistant_reply_box)

        # Once the stream is over, update chat history
        ss.chat_history.append({"role": "assistant", "content": assistant_reply})

Here is the relevant code from openai_backend.py, namely the stream_response method for my assistant class:

def stream_response(self, assistant_reply_box):
        try:
            with client.beta.threads.runs.create(
                assistant_id=self.assistant.id,
                thread_id=self.thread.id,
                stream=True
            ) as stream:
                assistant_reply = ""
                start_time = time.time()
                max_duration = 120  # Maximum duration in seconds for streaming

                # Iterate through the stream of events
                for event in stream:
                    print("Event received!")  # Debug statement

                    # Check if the maximum duration has been exceeded
                    if time.time() - start_time > max_duration:
                        print("Stream timeout exceeded.")
                        break

                    # Handle different types of events
                    if isinstance(event, ThreadMessageDelta):
                        print("ThreadMessageDelta event data")  # Debug statement
                        if isinstance(event.data.delta.content[0], TextDeltaBlock):
                            # add the new text
                            assistant_reply += event.data.delta.content[0].text.value
                            # display the new text
                            assistant_reply_box.markdown(assistant_reply)

                    elif isinstance(event, ThreadRunRequiresAction):
                        print("ThreadRunRequiresAction event data")  # Debug statement

                        # Get required actions
                        runs_page = self.client.beta.threads.runs.list(thread_id=self.thread.id)
                        runs = list(runs_page.data)
                        if runs:
                            run = runs[0]
                            run_id = run.id if hasattr(run, 'id') else None

                            if run_id:
                                required_actions = run.required_action.submit_tool_outputs.model_dump()
                                tool_outputs = []

                                # Loop through actions
                                for action in required_actions["tool_calls"]:
                                    # Identify function and params
                                    func_name = action["function"]["name"]
                                    arguments = json.loads(action["function"]["arguments"])
                                    print(f"Executing function: {func_name} with arguments: {arguments}")  # Debug statement

                                    # Run the agent function caller
                                    output = execute_required_function(func_name, arguments)
                                    print(f"Function {func_name} complete")  # Debug statement

                                    # Create the tool outputs
                                    tool_outputs.append({"tool_call_id": action["id"], "output": str(output)})

                                # Submit the outputs
                                if tool_outputs:
                                    self.client.beta.threads.runs.submit_tool_outputs(
                                        run_id=run_id,
                                        thread_id=self.thread.id,
                                        tool_outputs=tool_outputs,
                                        stream=True
                                    )
                                    print("Tool outputs submitted")  # Debug statement
                                
                                # Wait for the tool outputs to be processed
                                while True:
                                    run_status = self.client.beta.threads.runs.retrieve(thread_id=self.thread.id,run_id=run_id)
                                    if run_status.status == "completed":
                                        print("Waiting complete")
                                        break
                                    print("Waiting for tool outputs to be processed...")
                                    time.sleep(2)  # Check every 2 seconds
                                
                                messages = self.client.beta.threads.messages.list(thread_id=self.thread.id)
                                # just get the last message of the thread
                                last_message = messages.data[0]
                                assistant_reply += last_message.content[0].text.value
                                assistant_reply_box.markdown(assistant_reply)
                                

                    elif isinstance(event, ThreadMessageInProgress):
                        print("ThreadMessageInProgress event received")  # Debug statement
                        time.sleep(1)

                    elif isinstance(event, ThreadMessageCompleted):
                        print("Message completed.")  # Debug statement

                    elif isinstance(event, ThreadRunCompleted):
                        print("Run completed.")  # Debug statement

                    print("Loop iteration completed.")  # Debug statement to check loop progress

                return assistant_reply

        except Exception as e:
            print("An error occurred during streaming: ", str(e))
            return "An error occurred while processing your request."

1 Like

Actually, I just pushed a fix for the function call output streaming as the solution to issue 3 in the repo.

I just used the same approach as with the case when there is just chatting and now function calling needed.

What I still don’t understand is the difference between the different flavours of submit_tool_outputs (stream, poll, etc), and how stream for example differs from when you put Stream = True in the input params.

# Submit the outputs
                                if tool_outputs:
                                    print("Tool output acquired")
                                    with client.beta.threads.runs.submit_tool_outputs(
                                        thread_id=self.thread.id,
                                        run_id=run_id,
                                        tool_outputs=tool_outputs,
                                        stream = True
                                    ) as stream:
                                        print("Streaming response to tool output...")
                                        # Handle different types of events
                                        for event in stream:
                                            if isinstance(event, ThreadMessageDelta):
                                                print("MSG ThreadMessageDelta event data")  # Debug statement
                                                if isinstance(event.data.delta.content[0], TextDeltaBlock):
                                                    # add the new text
                                                    assistant_reply += event.data.delta.content[0].text.value
                                                    # display the new text
                                                    assistant_reply_box.markdown(assistant_reply)

Maybe not the cleanest solution but I’m not a pro by any means.

rP