I’m building a chatbot using the assistant’s API. The chat is supposed to search the files of my pension institute and answer the user based on these files. The following is the code I built. In most cases, it answers the user’s request flawlessly, but sometimes it gives some misinformation or answers that don’t fit the format I like. How can I modify the code so it gives better responses?
P.S.: I just started learning both coding in Python and using the OpenAI API.
# Store thread IDs for reuse
thread_id_store = {}
# Define the EventHandler class to process streaming events
class MyEventHandler(AssistantEventHandler):
def __init__(self):
super().__init__()
self.message_content = "" # Initialize an empty string to store the message content
@override
def on_text_delta(self, delta: TextDelta, snapshot):
# Append the delta value to the message content as it is received
self.message_content += delta.value
@override
def on_message_done(self, message) -> None:
# Finalize the message formatting when the processing is complete
self.message_content = self.format_message(self.message_content)
def format_message(self, message_content: str) -> str:
# Clean up annotations and references
message_content = re.sub(r'\[\d+\]', '', message_content)
message_content = re.sub(r'【\d+:\d+†source】', '', message_content)
# Formatting subtitles, bold, underline, and lists
message_content = re.sub(r'(\n)([^\n]+)(\n===+)', r'\1<h2>\2</h2>\1', message_content)
message_content = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', message_content)
message_content = re.sub(r'__(.*?)__', r'<u>\1</u>', message_content)
message_content = message_content.replace('\n', '<br>')
return message_content
def get_message(self):
# Return the final formatted message content
return self.message_content
def get_or_create_thread(assistant_id):
# Check if a thread already exists for the given assistant_id
if assistant_id not in thread_id_store:
# Create a new thread if it doesn't exist
thread = client.beta.threads.create()
thread_id_store[assistant_id] = thread.id # Store the thread ID
return thread_id_store[assistant_id]
def create_message_and_get_response(user_message, assistant_id, retries=3, delay=5):
# Get or create a thread ID for the assistant
thread_id = get_or_create_thread(assistant_id)
# Send the user message to the assistant
message = client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=user_message
)
# Attempt to retrieve the response with a retry mechanism
attempt = 1
while attempt <= retries:
try:
# Initialize the event handler to process the response stream
event_handler = MyEventHandler()
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=event_handler,
) as stream:
stream.until_done() # Wait until the stream is done
# Return the final formatted response message
return event_handler.get_message()
except Exception as e:
print(f"Attempt {attempt} failed with error: {e}")
attempt += 1
time.sleep(delay)
# Raise an error if all attempts fail
raise RuntimeError("Failed to get response after 3 attempts")
@app.route('/')
def index():
# Serve the index.html file as the homepage
return send_from_directory('', 'index.html')
@app.route('/chat')
def chat():
# Determine which chat interface to load based on the assistant selected
assistant = request.args.get('assistant')
if assistant == 'vida':
return send_from_directory('', 'chat_vida.html')
elif assistant == 'invest':
return send_from_directory('', 'chat_invest.html')
else:
return redirect(url_for('index')) # Redirect to the homepage if no valid assistant is selected
@app.route('/send_message', methods=['POST'])
def send_message():
# Handle the user's message and return the assistant's response
data = request.json
assistant_id = data['assistant_id']
user_message = data['message']
response = create_message_and_get_response(user_message, assistant_id)
return jsonify({'response': response})
def open_browser():
# Automatically open the web browser when the server starts
webbrowser.open_new(f'http://0.0.0.0:{os.environ.get("PORT", 5000)}/')
if __name__ == '__main__':
# Check if the main script is running, and open the browser if it is
if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
threading.Timer(1, open_browser).start()
# Run the Flask app on the specified host and port
app.run(host='0.0.0.0', port=int(os.environ.get("PORT", 5000)))