Hello all!
Here is my repository to my Aerospace Certification Chatbot app:
rocketpoweryul/AeroChatBot
The assistant is setup manually at openai, so you’ll need to customize your own with your own prompts and docs.
However the point of sharing this is to show how streamlit can work with streaming openai assistant outputs from message deltas or tool output submissions.
Is there a better way to do this? For example, I didn’t use an event handler, and was mostly inspired by this repo:
gabrielchua/oai-assistant-streaming-st-demo/
… and also the free code camp assistant tutorial, which was great!
Any suggestions to improve this OAI-Streamlit interface would be great!
Thanks
rP
Here is the relevant streamlit code from main.py
# Accept user input
if prompt := st.chat_input("Ask anything about aerospace certification!"):
# Display user message in chat message container
with st.chat_message("user"):
st.markdown(prompt)
# Add user message to app chat history
ss.chat_history.append({"role": "user", "content": prompt})
# Send message to chatbot
ss.agent.add_user_prompt("user", prompt)
# Display assistant response in chat message container
with st.chat_message("assistant"):
# Empty container to display the assistant's reply
assistant_reply_box = st.empty()
# Initialize the assistant reply as an empty string
assistant_reply = ""
# Stream the assistant's response
assistant_reply = ss.agent.stream_response(assistant_reply_box)
# Once the stream is over, update chat history
ss.chat_history.append({"role": "assistant", "content": assistant_reply})
Here is the relevant code from openai_backend.py, namely the stream_response method for my assistant class:
def stream_response(self, assistant_reply_box):
try:
with client.beta.threads.runs.create(
assistant_id=self.assistant.id,
thread_id=self.thread.id,
stream=True
) as stream:
assistant_reply = ""
start_time = time.time()
max_duration = 120 # Maximum duration in seconds for streaming
# Iterate through the stream of events
for event in stream:
print("Event received!") # Debug statement
# Check if the maximum duration has been exceeded
if time.time() - start_time > max_duration:
print("Stream timeout exceeded.")
break
# Handle different types of events
if isinstance(event, ThreadMessageDelta):
print("ThreadMessageDelta event data") # Debug statement
if isinstance(event.data.delta.content[0], TextDeltaBlock):
# add the new text
assistant_reply += event.data.delta.content[0].text.value
# display the new text
assistant_reply_box.markdown(assistant_reply)
elif isinstance(event, ThreadRunRequiresAction):
print("ThreadRunRequiresAction event data") # Debug statement
# Get required actions
runs_page = self.client.beta.threads.runs.list(thread_id=self.thread.id)
runs = list(runs_page.data)
if runs:
run = runs[0]
run_id = run.id if hasattr(run, 'id') else None
if run_id:
required_actions = run.required_action.submit_tool_outputs.model_dump()
tool_outputs = []
# Loop through actions
for action in required_actions["tool_calls"]:
# Identify function and params
func_name = action["function"]["name"]
arguments = json.loads(action["function"]["arguments"])
print(f"Executing function: {func_name} with arguments: {arguments}") # Debug statement
# Run the agent function caller
output = execute_required_function(func_name, arguments)
print(f"Function {func_name} complete") # Debug statement
# Create the tool outputs
tool_outputs.append({"tool_call_id": action["id"], "output": str(output)})
# Submit the outputs
if tool_outputs:
self.client.beta.threads.runs.submit_tool_outputs(
run_id=run_id,
thread_id=self.thread.id,
tool_outputs=tool_outputs,
stream=True
)
print("Tool outputs submitted") # Debug statement
# Wait for the tool outputs to be processed
while True:
run_status = self.client.beta.threads.runs.retrieve(thread_id=self.thread.id,run_id=run_id)
if run_status.status == "completed":
print("Waiting complete")
break
print("Waiting for tool outputs to be processed...")
time.sleep(2) # Check every 2 seconds
messages = self.client.beta.threads.messages.list(thread_id=self.thread.id)
# just get the last message of the thread
last_message = messages.data[0]
assistant_reply += last_message.content[0].text.value
assistant_reply_box.markdown(assistant_reply)
elif isinstance(event, ThreadMessageInProgress):
print("ThreadMessageInProgress event received") # Debug statement
time.sleep(1)
elif isinstance(event, ThreadMessageCompleted):
print("Message completed.") # Debug statement
elif isinstance(event, ThreadRunCompleted):
print("Run completed.") # Debug statement
print("Loop iteration completed.") # Debug statement to check loop progress
return assistant_reply
except Exception as e:
print("An error occurred during streaming: ", str(e))
return "An error occurred while processing your request."