Infinite Loop Issue When Executing Tool with OpenAI Assistant Integration in Flask Application

Hello everyone,

I’m new to programming and have been learning gradually, working on integrating an OpenAI assistant into my Flask application using Flask-SocketIO. I’ve encountered a problem where, after providing details to the assistant and it attempts to initiate a tool (specifically the submit_hotel_request function), the application enters an infinite loop and the tool never gets executed.

Here’s a brief overview of what happens:

  1. I send a message with details to the assistant.
  2. The assistant processes the message and tries to call the submit_hotel_request function.
  3. Instead of executing the function and sending a response, the application seems to get stuck and doesn’t proceed further.

I’ve translated my code comments to English to make it easier for everyone to understand. Despite my efforts, I haven’t been able to identify why the tool isn’t executing and why the application gets stuck. Any guidance or suggestions on what might be causing this issue would be greatly appreciated.

Thank you for your help!

P.S.: I’m just starting out with programming, so please bear with me if my explanation isn’t perfect.

My full code:


def fetch_assistant_config(assistant_id):
    config = AssistantConfig.query.filter_by(id=assistant_id).first()
    if config:
        app.logger.debug(f"fetch_assistant_config: Configuration found for assistant_id={assistant_id}")
    else:
        app.logger.debug(f"fetch_assistant_config: No configuration found for assistant_id={assistant_id}")
    return config


# Function to send emails
def send_email(to_email, subject, body):
    app.logger.debug(f"send_email: Preparing to send email to {to_email} with subject '{subject}'")
    smtp_server = os.getenv('SMTP_SERVER', 'smtp.gmail.com')
    smtp_port = int(os.getenv('SMTP_PORT', 587))
    smtp_username = os.getenv('SMTP_USERNAME')
    smtp_password = os.getenv('SMTP_PASSWORD')

    if not smtp_username or not smtp_password:
        app.logger.error("send_email: SMTP credentials not configured.")
        return False

    msg = MIMEMultipart()
    msg['From'] = smtp_username
    msg['To'] = to_email
    msg['Subject'] = subject

    msg.attach(MIMEText(body, 'plain'))

    try:
        server = smtplib.SMTP(smtp_server, smtp_port)
        server.starttls()
        app.logger.debug(f"send_email: Starting SMTP session at {smtp_server}:{smtp_port}")
        server.login(smtp_username, smtp_password)
        app.logger.debug("send_email: SMTP login successful.")
        text = msg.as_string()
        server.sendmail(smtp_username, to_email, text)
        server.quit()
        app.logger.info(f"send_email: Email sent to {to_email} with subject '{subject}'.")
        return True
    except Exception as e:
        app.logger.error(f"send_email: Error sending email to {to_email}: {e}")
        return False

# Function to handle hotel requests and send emails
def handle_hotel_request(arguments):
    app.logger.debug(f"handle_hotel_request: Processing arguments: {arguments}")
    guest_name = arguments.get('guest_name')
    room_number = arguments.get('room_number')
    request_type = arguments.get('request_type')  # e.g., "incident", "report", "reservation"
    details = arguments.get('details')

    if not all([guest_name, room_number, request_type, details]):
        app.logger.warning("handle_hotel_request: Missing data in guest request.")
        return {"success": False, "message": "Missing data in the request."}

    subject = f"{request_type.capitalize()} Request from {guest_name} - Room {room_number}"
    body = f"""
    Guest Name: {guest_name}
    Room Number: {room_number}
    Request Type: {request_type}
    Details: {details}
    """

    recipient_email = os.getenv('HOTEL_SUPPORT_EMAIL')  # Define this variable in the .env file

    if not recipient_email:
        app.logger.error("handle_hotel_request: Hotel support email not configured.")
        return {"success": False, "message": "Support email not configured."}

    # Send the email
    email_sent = send_email(recipient_email, subject, body)

    if email_sent:
        app.logger.info(f"handle_hotel_request: Request successfully sent for {guest_name}.")
        return {"success": True, "message": "Request successfully sent."}
    else:
        app.logger.error(f"handle_hotel_request: Error sending request for {guest_name}.")
        return {"success": False, "message": "Error sending the request."}

# Define the JSON for the 'submit_hotel_request' function
submit_hotel_request_function_json = {
    "name": "submit_hotel_request",
    "description": "Allows hotel guests to submit requests such as incidents, reports, or reservations.",
    "parameters": {
        "type": "object",
        "properties": {
            "guest_name": {
                "type": "string",
                "description": "Guest's name."
            },
            "room_number": {
                "type": "string",
                "description": "Room number."
            },
            "request_type": {
                "type": "string",
                "enum": ["incident", "report", "reservation"],
                "description": "Type of request the guest wants to make."
            },
            "details": {
                "type": "string",
                "description": "Details of the request."
            }
        },
        "required": ["guest_name", "room_number", "request_type", "details"],
        "additionalProperties": False
    },
}

# Assistant ID for ChatWindowLuis
CHATWINDOWLUIS_ASSISTANT_ID = "asst_yQW3Amu7CXAcmrGXWtqmXTZh"

# Function to update the assistant with functions
def update_assistant_with_functions(assistant_id, functions):
    try:
        app.logger.debug(f"update_assistant_with_functions: Updating assistant {assistant_id} with functions: {functions}")
        updated_assistant = client.beta.assistants.update(
            assistant_id,
            tools=functions
        )
        app.logger.info(f"update_assistant_with_functions: Functions updated for assistant {assistant_id}.")
        return updated_assistant
    except Exception as e:
        app.logger.error(f"update_assistant_with_functions: Error updating functions for assistant {assistant_id}: {e}")
        return None

# Custom class to handle assistant events
class MyEventHandler(AssistantEventHandler):
    def __init__(self, socketio, thread_id, session_id, assistant_config):
        super().__init__()
        self.message_buffer = ""
        self.previous_text = ""
        self.socketio = socketio
        self.thread_id = thread_id
        self.session_id = session_id  # Renamed for clarity
        self.assistant_config = assistant_config  # Store assistant configuration
        app.logger.debug(f"MyEventHandler: Initialized for thread_id={thread_id}, session_id={session_id}, assistant_id={assistant_config.id}")

    def on_text_created(self, text) -> None:
        markdown_text = text.value if hasattr(text, 'value') else str(text)
        app.logger.debug(f"on_text_created: Received text: {markdown_text}")
        if markdown_text != self.previous_text:
            self.message_buffer += markdown_text
            self.previous_text = markdown_text
            app.logger.debug(f"on_text_created: Emitting message: {markdown_text}")
            self.socketio.emit(
                'receive_message',
                {"content": markdown_text, "thread_id": self.thread_id},
                room=self.session_id  # Use session_id for clarity
            )

    def on_text_delta(self, delta, snapshot):
        markdown_text = delta.value
        app.logger.debug(f"on_text_delta: Received text delta: {markdown_text}")
        if markdown_text != self.previous_text:
            self.message_buffer += markdown_text
            self.previous_text = markdown_text
            app.logger.debug(f"on_text_delta: Emitting delta message: {markdown_text}")
            self.socketio.emit(
                'receive_message',
                {"content": markdown_text, "thread_id": self.thread_id},
                room=self.session_id  # Use session_id for clarity
            )

    def on_function_call(self, function_call):
        function_name = function_call.name
        arguments = function_call.arguments  # Expected to be a dict

        app.logger.info(f"on_function_call: Function called: {function_name} with arguments: {arguments}")

        if self.assistant_config.id != CHATWINDOWLUIS_ASSISTANT_ID:
            app.logger.warning(f"on_function_call: Function call not supported for assistant {self.assistant_config.id}")
            return

        if function_name == "submit_hotel_request":
            app.logger.info("on_function_call: Executing handle_hotel_request.")
            result = handle_hotel_request(arguments)
            if result['success']:
                response = f"Request successfully sent: {result['message']}"
                app.logger.info(f"on_function_call: {response}")
            else:
                response = f"Error sending request: {result['message']}"
                app.logger.warning(f"on_function_call: {response}")

            # Emit the bot's response to the client
            self.socketio.emit(
                'receive_message',
                {"content": response, "thread_id": self.thread_id},
                room=self.session_id
            )
            self.message_buffer += response
            self.previous_text = response

            # Check if run_id and tool_call_id are present
            run_id = getattr(function_call, 'run_id', None)
            tool_call_id = getattr(function_call, 'id', None)

            if not run_id or not tool_call_id:
                app.logger.error("on_function_call: run_id or tool_call_id not present in function_call.")
                return
            else:
                app.logger.debug(f"on_function_call: run_id found: {run_id}")
                app.logger.debug(f"on_function_call: tool_call_id found: {tool_call_id}")

            # Send the function outputs to OpenAI
            output = result['message']  # Must be a string
            app.logger.debug(f"on_function_call: Preparing to send tool_outputs: run_id={run_id}, tool_call_id={tool_call_id}, output={output}")

            try:
                client.beta.threads.runs.submit_tool_outputs(
                    thread_id=self.thread_id,
                    run_id=run_id,  # Use the correct run_id
                    tool_outputs=[
                        {
                            "tool_call_id": tool_call_id,
                            "output": output  # Must be a string
                        }
                    ]
                )
                app.logger.info(f"on_function_call: Tool outputs sent for run_id {run_id}.")
            except Exception as e:
                app.logger.error(f"on_function_call: Error sending tool outputs for run_id {run_id}: {e}")
                # Do not return here to allow the flow to continue
        else:
            app.logger.warning(f"on_function_call: Unknown function called: {function_name}")

# OpenAI configuration
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
thread_runs = {}
thread_runs_lock = threading.Lock()

    # If the assistant is ChatWindowLuis, update it with functions
    if assistant_id == CHATWINDOWLUIS_ASSISTANT_ID:
        functions = [
            {
                "type": "function",
                "function": submit_hotel_request_function_json
            }
        ]
        # Update the assistant with the functions
        try:
            app.logger.debug(f"create_assistant_config_admin: Updating assistant {assistant_id} with functions.")
            client.beta.assistants.update(
                assistant_id,
                tools=functions
            )
            app.logger.info(f"create_assistant_config_admin: Functions updated for assistant {assistant_id}.")
        except Exception as e:
            app.logger.error(f"create_assistant_config_admin: Error updating functions for assistant {assistant_id}: {e}")
            return jsonify({'message': 'Assistant configuration created but failed to update functions.'}), 201

    return jsonify({'message': 'Assistant config created successfully'}), 201

@app.route('/api/admin/assistant_configs/<assistant_id>', methods=['PUT'])
@admin_required
def update_assistant_config_admin(user_id, assistant_id):
    try:
        app.logger.info(f"update_assistant_config_admin: Updating assistant configuration: {assistant_id}")
        config = AssistantConfig.query.filter_by(id=assistant_id).first()
        if not config:
            app.logger.warning(f"update_assistant_config_admin: Assistant configuration not found: {assistant_id}")
            return jsonify({'message': 'Assistant config not found'}), 404

        data = request.get_json()
        app.logger.debug(f"update_assistant_config_admin: Received data for update: {data}")

        # Do not allow updating the 'id'
        if 'id' in data:
            app.logger.warning("update_assistant_config_admin: Attempt to modify assistant ID.")
            return jsonify({'message': 'Modifying the ID is not allowed.'}), 400

        # Validate required fields
        name = data.get('name')
        if not name:
            app.logger.warning("update_assistant_config_admin: Name not provided in update.")
            return jsonify({'message': 'Name is required.'}), 400

        # Validate theme fields
        theme_updates = data.get('theme', {})
        for key, value in theme_updates.items():
            if key in COLOR_FIELDS:
                if not is_valid_hex_color(value):
                    app.logger.warning(f"update_assistant_config_admin: Invalid color value for {key}: {value}")
                    return jsonify({'message': f'Invalid color value for {key}.'}), 400
            elif key in URL_FIELDS:
                if not is_valid_url(value):
                    app.logger.warning(f"update_assistant_config_admin: Invalid URL value for {key}: {value}")
                    return jsonify({'message': f'Invalid URL value for {key}.'}), 400
            elif key == 'label':
                label = value
                if not isinstance(label, dict):
                    app.logger.warning("update_assistant_config_admin: The 'label' field must be an object.")
                    return jsonify({'message': "The 'label' field must be an object."}), 400
                # Validate label subfields
                label_text = label.get('text', config.theme.get('label', {}).get('text', ''))
                label_bg_color = label.get('bg_color', config.theme.get('label', {}).get('bg_color', '#ffffff'))
                label_text_color = label.get('text_color', config.theme.get('label', {}).get('text_color', '#000000'))
                label_visible = label.get('visible', config.theme.get('label', {}).get('visible', True))

                if not isinstance(label_text, str):
                    app.logger.warning("update_assistant_config_admin: The 'label.text' field must be a string.")
                    return jsonify({'message': "The 'label.text' field must be a string."}), 400
                if not is_valid_hex_color(label_bg_color):
                    app.logger.warning(f"update_assistant_config_admin: Invalid color value for 'label.bg_color': {label_bg_color}")
                    return jsonify({'message': "Invalid color value for 'label.bg_color'."}), 400
                if not is_valid_hex_color(label_text_color):
                    app.logger.warning(f"update_assistant_config_admin: Invalid color value for 'label.text_color': {label_text_color}")
                    return jsonify({'message': "Invalid color value for 'label.text_color'."}), 400
                if not isinstance(label_visible, bool):
                    app.logger.warning("update_assistant_config_admin: The 'label.visible' field must be a boolean.")
                    return jsonify({'message': "The 'label.visible' field must be a boolean."}), 400
            else:
                app.logger.warning(f"update_assistant_config_admin: Unknown field in theme: {key}")
                return jsonify({'message': f'Unknown field in theme: {key}.'}), 400

        # Update other fields
        config.name = name
        config.logo_url = data.get('logo_url', config.logo_url)

        # Update is_full_screen if provided
        if 'is_full_screen' in data:
            is_full_screen = data['is_full_screen']
            if not isinstance(is_full_screen, bool):
                app.logger.warning("update_assistant_config_admin: is_full_screen must be a boolean.")
                return jsonify({'message': 'is_full_screen must be a boolean.'}), 400
            config.is_full_screen = is_full_screen
            app.logger.debug(f"update_assistant_config_admin: is_full_screen updated to {is_full_screen}.")

        # Update theme fields by assigning the complete or partial object
        if theme_updates:
            if not config.theme:
                config.theme = {}
            config.theme = {**config.theme, **theme_updates}
            app.logger.debug(f"update_assistant_config_admin: New 'theme' object after update: {config.theme}")

        db.session.commit()
        app.logger.info(f"update_assistant_config_admin: Assistant configuration {assistant_id} successfully updated.")

        # If the assistant is ChatWindowLuis, update it with functions
        if assistant_id == CHATWINDOWLUIS_ASSISTANT_ID:
            functions = [
                {
                    "type": "function",
                    "function": submit_hotel_request_function_json
                }
            ]
            # Update the assistant with the functions
            try:
                app.logger.debug(f"update_assistant_config_admin: Updating assistant {assistant_id} with functions.")
                client.beta.assistants.update(
                    assistant_id,
                    tools=functions
                )
                app.logger.info(f"update_assistant_config_admin: Functions updated for assistant {assistant_id}.")
            except Exception as e:
                app.logger.error(f"update_assistant_config_admin: Error updating functions for assistant {assistant_id}: {e}")
                return jsonify({'message': 'Assistant configuration updated but failed to update functions.'}), 200

        return jsonify({'message': 'Assistant configuration successfully updated.'}), 200
    except Exception as e:
        app.logger.error(f"update_assistant_config_admin: Error updating assistant configuration: {e}")
        return jsonify({'message': 'Internal server error.'}), 500

# Additional routes (e.g., create_assistant_config, get_session_token, refresh_token) are omitted for brevity

# SocketIO event for connection
@socketio.on('connect')
def handle_connect():
    access_token = request.args.get('access_token')  # Use 'access_token'
    app.logger.info(f"handle_connect: Attempting to connect with access_token: {access_token}")
    
    if not access_token:
        app.logger.warning("handle_connect: Access token required.")
        emit('error', {'message': 'Access token required.'})
        disconnect()
        return
    
    # Unpack the tuple returned by decode_session_token
    session_id, assistant_id = decode_session_token(access_token)
    
    if not session_id or not assistant_id:
        app.logger.warning("handle_connect: Invalid or expired access token.")
        emit('error', {'message': 'Invalid or expired access token.'})
        disconnect()
        return
    
    app.logger.info(f"handle_connect: Valid connection with session_id: {session_id}")
    join_room(session_id)
    emit('connected', {'msg': f'Connected with session_id {session_id}'}, room=session_id)
    
    # Store the mapping of socket_id to session_id
    socket_session_map[request.sid] = session_id
    app.logger.debug(f"handle_connect: socket_id {request.sid} mapped to session_id {session_id}")

@socketio.on('disconnect')
def handle_disconnect():
    session_id = socket_session_map.get(request.sid)
    
    if session_id:
        leave_room(session_id)
        app.logger.info(f"handle_disconnect: User with session_id {session_id} disconnected and left rooms.")
        # Remove the mapping once the user disconnects
        del socket_session_map[request.sid]
    else:
        app.logger.info(f"handle_disconnect: User with unknown session_id {request.sid} disconnected.")

# SocketIO event for sending messages
@socketio.on('send_message')
@session_required_socketio
def handle_send_message(message, session_id=None, assistant_id=None):
    app.logger.info(f"handle_send_message: Received message: {message}, session_id={session_id}, assistant_id={assistant_id}")
    
    username = session_id  # Use session_id as identifier
    content = message.get("content", None)
    create_new_thread = message.get("new_thread", False)

    # Get thread_id from Redis using session_id
    thread_id = redis_client.get(f"thread_id:{username}")
    if thread_id:
        thread_id = thread_id.decode('utf-8')
        app.logger.debug(f"handle_send_message: thread_id retrieved from Redis: {thread_id}")
    if create_new_thread or not thread_id:
        thread = client.beta.threads.create()
        thread_id = thread.id
        redis_client.set(f"thread_id:{username}", thread_id, ex=3600)  # Expiration consistent with JWT
        app.logger.info(f"handle_send_message: Created new thread_id: {thread_id}")
        if create_new_thread:
            emit('thread_created', room=username)
            app.logger.debug(f"handle_send_message: Emitted 'thread_created' event for room={username}")

    if content:
        # Get the current date and time
        date_info = get_current_date_time()
        date_str = f"[Message date and time: {date_info['fecha_hoy']} ({date_info['timezone']})]"

        # Combine user content with date and time
        combined_content = f"{content}\n{date_str}"
        app.logger.debug(f"handle_send_message: combined_content: {combined_content}")

        # Send the user's message with date and time information
        try:
            client.beta.threads.messages.create(
                thread_id=thread_id,
                role="user",
                content=combined_content
            )
            app.logger.info(f"handle_send_message: Message sent to thread_id={thread_id}")
        except Exception as e:
            app.logger.error(f"handle_send_message: Error sending message to thread_id={thread_id}: {e}")
            emit('error', {'message': 'Error sending the message.'}, room=username)
            return

        # Check if there is an active execution in this thread
        with thread_runs_lock:
            if thread_runs.get(thread_id, False):
                app.logger.warning(f"handle_send_message: Active execution in thread_id={thread_id}")
                emit('error', {'message': 'There is an active execution in this thread. Please wait.'}, room=username)
                return
            thread_runs[thread_id] = True
            app.logger.debug(f"handle_send_message: thread_runs[{thread_id}] set to True")

        current_thread_id = thread_id

        # Get specific assistant configuration using the helper function
        assistant_config = fetch_assistant_config(assistant_id)
        if not assistant_config:
            app.logger.error(f"handle_send_message: Assistant configuration not found for assistant_id={assistant_id}")
            emit('error', {'message': 'Assistant configuration not found.'}, room=username)
            with thread_runs_lock:
                thread_runs[thread_id] = False
                app.logger.debug(f"handle_send_message: thread_runs[{thread_id}] set to False due to nonexistent configuration")
            return

        def run_assistant_thread():
            app.logger.info(f"handle_send_message: Running run_assistant_thread for thread_id={current_thread_id}, session_id={session_id}")
            run_assistant(current_thread_id, session_id, assistant_config)

        # Use eventlet to run run_assistant cooperatively
        try:
            eventlet.spawn(run_assistant_thread)
            app.logger.debug(f"handle_send_message: run_assistant_thread launched for thread_id={current_thread_id}")
        except Exception as e:
            app.logger.error(f"handle_send_message: Error launching run_assistant_thread for thread_id={current_thread_id}: {e}")
            emit('error', {'message': 'Error processing your request.'}, room=username)
            with thread_runs_lock:
                thread_runs[thread_id] = False
                app.logger.debug(f"handle_send_message: thread_runs[{thread_id}] set to False due to error launching run_assistant_thread")

def run_assistant(thread_id_param, session_id, assistant_config_param):
    try:
        app.logger.info(f"run_assistant: Starting execution for thread_id={thread_id_param}, session_id={session_id}")
        event_handler = MyEventHandler(socketio, thread_id_param, session_id, assistant_config_param)
        tools = [{"type": "file_search"}]
        if assistant_config_param.id == CHATWINDOWLUIS_ASSISTANT_ID:
            tools.append({
                "type": "function",
                "function": submit_hotel_request_function_json
            })
        with client.beta.threads.runs.stream(
            thread_id=thread_id_param,
            assistant_id=assistant_config_param.id,  # Use the specific assistant_id
            event_handler=event_handler,
            tools=tools
        ) as stream:
            app.logger.info(f"run_assistant: Stream started for thread_id={thread_id_param}")
            stream.until_done()
            app.logger.info(f"run_assistant: Stream ended for thread_id={thread_id_param}")
    except Exception as e:
        app.logger.error(f"run_assistant: Error in run_assistant for thread_id={thread_id_param}: {e}")
    finally:
        with thread_runs_lock:
            thread_runs[thread_id_param] = False
            app.logger.info(f"run_assistant: thread_runs updated for thread_id={thread_id_param}")

if __name__ == "__main__":
    app.logger.info("Flask application starting...")
    socketio.run(app, debug=True)