Hello everyone,
I’m new to programming and have been learning gradually, working on integrating an OpenAI assistant into my Flask application using Flask-SocketIO. I’ve encountered a problem where, after providing details to the assistant and it attempts to initiate a tool (specifically the submit_hotel_request
function), the application enters an infinite loop and the tool never gets executed.
Here’s a brief overview of what happens:
- I send a message with details to the assistant.
- The assistant processes the message and tries to call the
submit_hotel_request
function. - Instead of executing the function and sending a response, the application seems to get stuck and doesn’t proceed further.
I’ve translated my code comments to English to make it easier for everyone to understand. Despite my efforts, I haven’t been able to identify why the tool isn’t executing and why the application gets stuck. Any guidance or suggestions on what might be causing this issue would be greatly appreciated.
Thank you for your help!
P.S.: I’m just starting out with programming, so please bear with me if my explanation isn’t perfect.
My full code:
def fetch_assistant_config(assistant_id):
config = AssistantConfig.query.filter_by(id=assistant_id).first()
if config:
app.logger.debug(f"fetch_assistant_config: Configuration found for assistant_id={assistant_id}")
else:
app.logger.debug(f"fetch_assistant_config: No configuration found for assistant_id={assistant_id}")
return config
# Function to send emails
def send_email(to_email, subject, body):
app.logger.debug(f"send_email: Preparing to send email to {to_email} with subject '{subject}'")
smtp_server = os.getenv('SMTP_SERVER', 'smtp.gmail.com')
smtp_port = int(os.getenv('SMTP_PORT', 587))
smtp_username = os.getenv('SMTP_USERNAME')
smtp_password = os.getenv('SMTP_PASSWORD')
if not smtp_username or not smtp_password:
app.logger.error("send_email: SMTP credentials not configured.")
return False
msg = MIMEMultipart()
msg['From'] = smtp_username
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
app.logger.debug(f"send_email: Starting SMTP session at {smtp_server}:{smtp_port}")
server.login(smtp_username, smtp_password)
app.logger.debug("send_email: SMTP login successful.")
text = msg.as_string()
server.sendmail(smtp_username, to_email, text)
server.quit()
app.logger.info(f"send_email: Email sent to {to_email} with subject '{subject}'.")
return True
except Exception as e:
app.logger.error(f"send_email: Error sending email to {to_email}: {e}")
return False
# Function to handle hotel requests and send emails
def handle_hotel_request(arguments):
app.logger.debug(f"handle_hotel_request: Processing arguments: {arguments}")
guest_name = arguments.get('guest_name')
room_number = arguments.get('room_number')
request_type = arguments.get('request_type') # e.g., "incident", "report", "reservation"
details = arguments.get('details')
if not all([guest_name, room_number, request_type, details]):
app.logger.warning("handle_hotel_request: Missing data in guest request.")
return {"success": False, "message": "Missing data in the request."}
subject = f"{request_type.capitalize()} Request from {guest_name} - Room {room_number}"
body = f"""
Guest Name: {guest_name}
Room Number: {room_number}
Request Type: {request_type}
Details: {details}
"""
recipient_email = os.getenv('HOTEL_SUPPORT_EMAIL') # Define this variable in the .env file
if not recipient_email:
app.logger.error("handle_hotel_request: Hotel support email not configured.")
return {"success": False, "message": "Support email not configured."}
# Send the email
email_sent = send_email(recipient_email, subject, body)
if email_sent:
app.logger.info(f"handle_hotel_request: Request successfully sent for {guest_name}.")
return {"success": True, "message": "Request successfully sent."}
else:
app.logger.error(f"handle_hotel_request: Error sending request for {guest_name}.")
return {"success": False, "message": "Error sending the request."}
# Define the JSON for the 'submit_hotel_request' function
submit_hotel_request_function_json = {
"name": "submit_hotel_request",
"description": "Allows hotel guests to submit requests such as incidents, reports, or reservations.",
"parameters": {
"type": "object",
"properties": {
"guest_name": {
"type": "string",
"description": "Guest's name."
},
"room_number": {
"type": "string",
"description": "Room number."
},
"request_type": {
"type": "string",
"enum": ["incident", "report", "reservation"],
"description": "Type of request the guest wants to make."
},
"details": {
"type": "string",
"description": "Details of the request."
}
},
"required": ["guest_name", "room_number", "request_type", "details"],
"additionalProperties": False
},
}
# Assistant ID for ChatWindowLuis
CHATWINDOWLUIS_ASSISTANT_ID = "asst_yQW3Amu7CXAcmrGXWtqmXTZh"
# Function to update the assistant with functions
def update_assistant_with_functions(assistant_id, functions):
try:
app.logger.debug(f"update_assistant_with_functions: Updating assistant {assistant_id} with functions: {functions}")
updated_assistant = client.beta.assistants.update(
assistant_id,
tools=functions
)
app.logger.info(f"update_assistant_with_functions: Functions updated for assistant {assistant_id}.")
return updated_assistant
except Exception as e:
app.logger.error(f"update_assistant_with_functions: Error updating functions for assistant {assistant_id}: {e}")
return None
# Custom class to handle assistant events
class MyEventHandler(AssistantEventHandler):
def __init__(self, socketio, thread_id, session_id, assistant_config):
super().__init__()
self.message_buffer = ""
self.previous_text = ""
self.socketio = socketio
self.thread_id = thread_id
self.session_id = session_id # Renamed for clarity
self.assistant_config = assistant_config # Store assistant configuration
app.logger.debug(f"MyEventHandler: Initialized for thread_id={thread_id}, session_id={session_id}, assistant_id={assistant_config.id}")
def on_text_created(self, text) -> None:
markdown_text = text.value if hasattr(text, 'value') else str(text)
app.logger.debug(f"on_text_created: Received text: {markdown_text}")
if markdown_text != self.previous_text:
self.message_buffer += markdown_text
self.previous_text = markdown_text
app.logger.debug(f"on_text_created: Emitting message: {markdown_text}")
self.socketio.emit(
'receive_message',
{"content": markdown_text, "thread_id": self.thread_id},
room=self.session_id # Use session_id for clarity
)
def on_text_delta(self, delta, snapshot):
markdown_text = delta.value
app.logger.debug(f"on_text_delta: Received text delta: {markdown_text}")
if markdown_text != self.previous_text:
self.message_buffer += markdown_text
self.previous_text = markdown_text
app.logger.debug(f"on_text_delta: Emitting delta message: {markdown_text}")
self.socketio.emit(
'receive_message',
{"content": markdown_text, "thread_id": self.thread_id},
room=self.session_id # Use session_id for clarity
)
def on_function_call(self, function_call):
function_name = function_call.name
arguments = function_call.arguments # Expected to be a dict
app.logger.info(f"on_function_call: Function called: {function_name} with arguments: {arguments}")
if self.assistant_config.id != CHATWINDOWLUIS_ASSISTANT_ID:
app.logger.warning(f"on_function_call: Function call not supported for assistant {self.assistant_config.id}")
return
if function_name == "submit_hotel_request":
app.logger.info("on_function_call: Executing handle_hotel_request.")
result = handle_hotel_request(arguments)
if result['success']:
response = f"Request successfully sent: {result['message']}"
app.logger.info(f"on_function_call: {response}")
else:
response = f"Error sending request: {result['message']}"
app.logger.warning(f"on_function_call: {response}")
# Emit the bot's response to the client
self.socketio.emit(
'receive_message',
{"content": response, "thread_id": self.thread_id},
room=self.session_id
)
self.message_buffer += response
self.previous_text = response
# Check if run_id and tool_call_id are present
run_id = getattr(function_call, 'run_id', None)
tool_call_id = getattr(function_call, 'id', None)
if not run_id or not tool_call_id:
app.logger.error("on_function_call: run_id or tool_call_id not present in function_call.")
return
else:
app.logger.debug(f"on_function_call: run_id found: {run_id}")
app.logger.debug(f"on_function_call: tool_call_id found: {tool_call_id}")
# Send the function outputs to OpenAI
output = result['message'] # Must be a string
app.logger.debug(f"on_function_call: Preparing to send tool_outputs: run_id={run_id}, tool_call_id={tool_call_id}, output={output}")
try:
client.beta.threads.runs.submit_tool_outputs(
thread_id=self.thread_id,
run_id=run_id, # Use the correct run_id
tool_outputs=[
{
"tool_call_id": tool_call_id,
"output": output # Must be a string
}
]
)
app.logger.info(f"on_function_call: Tool outputs sent for run_id {run_id}.")
except Exception as e:
app.logger.error(f"on_function_call: Error sending tool outputs for run_id {run_id}: {e}")
# Do not return here to allow the flow to continue
else:
app.logger.warning(f"on_function_call: Unknown function called: {function_name}")
# OpenAI configuration
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
thread_runs = {}
thread_runs_lock = threading.Lock()
# If the assistant is ChatWindowLuis, update it with functions
if assistant_id == CHATWINDOWLUIS_ASSISTANT_ID:
functions = [
{
"type": "function",
"function": submit_hotel_request_function_json
}
]
# Update the assistant with the functions
try:
app.logger.debug(f"create_assistant_config_admin: Updating assistant {assistant_id} with functions.")
client.beta.assistants.update(
assistant_id,
tools=functions
)
app.logger.info(f"create_assistant_config_admin: Functions updated for assistant {assistant_id}.")
except Exception as e:
app.logger.error(f"create_assistant_config_admin: Error updating functions for assistant {assistant_id}: {e}")
return jsonify({'message': 'Assistant configuration created but failed to update functions.'}), 201
return jsonify({'message': 'Assistant config created successfully'}), 201
@app.route('/api/admin/assistant_configs/<assistant_id>', methods=['PUT'])
@admin_required
def update_assistant_config_admin(user_id, assistant_id):
try:
app.logger.info(f"update_assistant_config_admin: Updating assistant configuration: {assistant_id}")
config = AssistantConfig.query.filter_by(id=assistant_id).first()
if not config:
app.logger.warning(f"update_assistant_config_admin: Assistant configuration not found: {assistant_id}")
return jsonify({'message': 'Assistant config not found'}), 404
data = request.get_json()
app.logger.debug(f"update_assistant_config_admin: Received data for update: {data}")
# Do not allow updating the 'id'
if 'id' in data:
app.logger.warning("update_assistant_config_admin: Attempt to modify assistant ID.")
return jsonify({'message': 'Modifying the ID is not allowed.'}), 400
# Validate required fields
name = data.get('name')
if not name:
app.logger.warning("update_assistant_config_admin: Name not provided in update.")
return jsonify({'message': 'Name is required.'}), 400
# Validate theme fields
theme_updates = data.get('theme', {})
for key, value in theme_updates.items():
if key in COLOR_FIELDS:
if not is_valid_hex_color(value):
app.logger.warning(f"update_assistant_config_admin: Invalid color value for {key}: {value}")
return jsonify({'message': f'Invalid color value for {key}.'}), 400
elif key in URL_FIELDS:
if not is_valid_url(value):
app.logger.warning(f"update_assistant_config_admin: Invalid URL value for {key}: {value}")
return jsonify({'message': f'Invalid URL value for {key}.'}), 400
elif key == 'label':
label = value
if not isinstance(label, dict):
app.logger.warning("update_assistant_config_admin: The 'label' field must be an object.")
return jsonify({'message': "The 'label' field must be an object."}), 400
# Validate label subfields
label_text = label.get('text', config.theme.get('label', {}).get('text', ''))
label_bg_color = label.get('bg_color', config.theme.get('label', {}).get('bg_color', '#ffffff'))
label_text_color = label.get('text_color', config.theme.get('label', {}).get('text_color', '#000000'))
label_visible = label.get('visible', config.theme.get('label', {}).get('visible', True))
if not isinstance(label_text, str):
app.logger.warning("update_assistant_config_admin: The 'label.text' field must be a string.")
return jsonify({'message': "The 'label.text' field must be a string."}), 400
if not is_valid_hex_color(label_bg_color):
app.logger.warning(f"update_assistant_config_admin: Invalid color value for 'label.bg_color': {label_bg_color}")
return jsonify({'message': "Invalid color value for 'label.bg_color'."}), 400
if not is_valid_hex_color(label_text_color):
app.logger.warning(f"update_assistant_config_admin: Invalid color value for 'label.text_color': {label_text_color}")
return jsonify({'message': "Invalid color value for 'label.text_color'."}), 400
if not isinstance(label_visible, bool):
app.logger.warning("update_assistant_config_admin: The 'label.visible' field must be a boolean.")
return jsonify({'message': "The 'label.visible' field must be a boolean."}), 400
else:
app.logger.warning(f"update_assistant_config_admin: Unknown field in theme: {key}")
return jsonify({'message': f'Unknown field in theme: {key}.'}), 400
# Update other fields
config.name = name
config.logo_url = data.get('logo_url', config.logo_url)
# Update is_full_screen if provided
if 'is_full_screen' in data:
is_full_screen = data['is_full_screen']
if not isinstance(is_full_screen, bool):
app.logger.warning("update_assistant_config_admin: is_full_screen must be a boolean.")
return jsonify({'message': 'is_full_screen must be a boolean.'}), 400
config.is_full_screen = is_full_screen
app.logger.debug(f"update_assistant_config_admin: is_full_screen updated to {is_full_screen}.")
# Update theme fields by assigning the complete or partial object
if theme_updates:
if not config.theme:
config.theme = {}
config.theme = {**config.theme, **theme_updates}
app.logger.debug(f"update_assistant_config_admin: New 'theme' object after update: {config.theme}")
db.session.commit()
app.logger.info(f"update_assistant_config_admin: Assistant configuration {assistant_id} successfully updated.")
# If the assistant is ChatWindowLuis, update it with functions
if assistant_id == CHATWINDOWLUIS_ASSISTANT_ID:
functions = [
{
"type": "function",
"function": submit_hotel_request_function_json
}
]
# Update the assistant with the functions
try:
app.logger.debug(f"update_assistant_config_admin: Updating assistant {assistant_id} with functions.")
client.beta.assistants.update(
assistant_id,
tools=functions
)
app.logger.info(f"update_assistant_config_admin: Functions updated for assistant {assistant_id}.")
except Exception as e:
app.logger.error(f"update_assistant_config_admin: Error updating functions for assistant {assistant_id}: {e}")
return jsonify({'message': 'Assistant configuration updated but failed to update functions.'}), 200
return jsonify({'message': 'Assistant configuration successfully updated.'}), 200
except Exception as e:
app.logger.error(f"update_assistant_config_admin: Error updating assistant configuration: {e}")
return jsonify({'message': 'Internal server error.'}), 500
# Additional routes (e.g., create_assistant_config, get_session_token, refresh_token) are omitted for brevity
# SocketIO event for connection
@socketio.on('connect')
def handle_connect():
access_token = request.args.get('access_token') # Use 'access_token'
app.logger.info(f"handle_connect: Attempting to connect with access_token: {access_token}")
if not access_token:
app.logger.warning("handle_connect: Access token required.")
emit('error', {'message': 'Access token required.'})
disconnect()
return
# Unpack the tuple returned by decode_session_token
session_id, assistant_id = decode_session_token(access_token)
if not session_id or not assistant_id:
app.logger.warning("handle_connect: Invalid or expired access token.")
emit('error', {'message': 'Invalid or expired access token.'})
disconnect()
return
app.logger.info(f"handle_connect: Valid connection with session_id: {session_id}")
join_room(session_id)
emit('connected', {'msg': f'Connected with session_id {session_id}'}, room=session_id)
# Store the mapping of socket_id to session_id
socket_session_map[request.sid] = session_id
app.logger.debug(f"handle_connect: socket_id {request.sid} mapped to session_id {session_id}")
@socketio.on('disconnect')
def handle_disconnect():
session_id = socket_session_map.get(request.sid)
if session_id:
leave_room(session_id)
app.logger.info(f"handle_disconnect: User with session_id {session_id} disconnected and left rooms.")
# Remove the mapping once the user disconnects
del socket_session_map[request.sid]
else:
app.logger.info(f"handle_disconnect: User with unknown session_id {request.sid} disconnected.")
# SocketIO event for sending messages
@socketio.on('send_message')
@session_required_socketio
def handle_send_message(message, session_id=None, assistant_id=None):
app.logger.info(f"handle_send_message: Received message: {message}, session_id={session_id}, assistant_id={assistant_id}")
username = session_id # Use session_id as identifier
content = message.get("content", None)
create_new_thread = message.get("new_thread", False)
# Get thread_id from Redis using session_id
thread_id = redis_client.get(f"thread_id:{username}")
if thread_id:
thread_id = thread_id.decode('utf-8')
app.logger.debug(f"handle_send_message: thread_id retrieved from Redis: {thread_id}")
if create_new_thread or not thread_id:
thread = client.beta.threads.create()
thread_id = thread.id
redis_client.set(f"thread_id:{username}", thread_id, ex=3600) # Expiration consistent with JWT
app.logger.info(f"handle_send_message: Created new thread_id: {thread_id}")
if create_new_thread:
emit('thread_created', room=username)
app.logger.debug(f"handle_send_message: Emitted 'thread_created' event for room={username}")
if content:
# Get the current date and time
date_info = get_current_date_time()
date_str = f"[Message date and time: {date_info['fecha_hoy']} ({date_info['timezone']})]"
# Combine user content with date and time
combined_content = f"{content}\n{date_str}"
app.logger.debug(f"handle_send_message: combined_content: {combined_content}")
# Send the user's message with date and time information
try:
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=combined_content
)
app.logger.info(f"handle_send_message: Message sent to thread_id={thread_id}")
except Exception as e:
app.logger.error(f"handle_send_message: Error sending message to thread_id={thread_id}: {e}")
emit('error', {'message': 'Error sending the message.'}, room=username)
return
# Check if there is an active execution in this thread
with thread_runs_lock:
if thread_runs.get(thread_id, False):
app.logger.warning(f"handle_send_message: Active execution in thread_id={thread_id}")
emit('error', {'message': 'There is an active execution in this thread. Please wait.'}, room=username)
return
thread_runs[thread_id] = True
app.logger.debug(f"handle_send_message: thread_runs[{thread_id}] set to True")
current_thread_id = thread_id
# Get specific assistant configuration using the helper function
assistant_config = fetch_assistant_config(assistant_id)
if not assistant_config:
app.logger.error(f"handle_send_message: Assistant configuration not found for assistant_id={assistant_id}")
emit('error', {'message': 'Assistant configuration not found.'}, room=username)
with thread_runs_lock:
thread_runs[thread_id] = False
app.logger.debug(f"handle_send_message: thread_runs[{thread_id}] set to False due to nonexistent configuration")
return
def run_assistant_thread():
app.logger.info(f"handle_send_message: Running run_assistant_thread for thread_id={current_thread_id}, session_id={session_id}")
run_assistant(current_thread_id, session_id, assistant_config)
# Use eventlet to run run_assistant cooperatively
try:
eventlet.spawn(run_assistant_thread)
app.logger.debug(f"handle_send_message: run_assistant_thread launched for thread_id={current_thread_id}")
except Exception as e:
app.logger.error(f"handle_send_message: Error launching run_assistant_thread for thread_id={current_thread_id}: {e}")
emit('error', {'message': 'Error processing your request.'}, room=username)
with thread_runs_lock:
thread_runs[thread_id] = False
app.logger.debug(f"handle_send_message: thread_runs[{thread_id}] set to False due to error launching run_assistant_thread")
def run_assistant(thread_id_param, session_id, assistant_config_param):
try:
app.logger.info(f"run_assistant: Starting execution for thread_id={thread_id_param}, session_id={session_id}")
event_handler = MyEventHandler(socketio, thread_id_param, session_id, assistant_config_param)
tools = [{"type": "file_search"}]
if assistant_config_param.id == CHATWINDOWLUIS_ASSISTANT_ID:
tools.append({
"type": "function",
"function": submit_hotel_request_function_json
})
with client.beta.threads.runs.stream(
thread_id=thread_id_param,
assistant_id=assistant_config_param.id, # Use the specific assistant_id
event_handler=event_handler,
tools=tools
) as stream:
app.logger.info(f"run_assistant: Stream started for thread_id={thread_id_param}")
stream.until_done()
app.logger.info(f"run_assistant: Stream ended for thread_id={thread_id_param}")
except Exception as e:
app.logger.error(f"run_assistant: Error in run_assistant for thread_id={thread_id_param}: {e}")
finally:
with thread_runs_lock:
thread_runs[thread_id_param] = False
app.logger.info(f"run_assistant: thread_runs updated for thread_id={thread_id_param}")
if __name__ == "__main__":
app.logger.info("Flask application starting...")
socketio.run(app, debug=True)