Assistant API functions for async polling with single thread per user to ask on assistant with doc tool

sharing to help those building with api assistants that have documents. this checks to see if thread exists for a user already if not it makes one. the user uses only one thread in this case always so adjust if you need new one each pass.

this also logs out to a debug file for data capture and debug understanding. this also has a polling mechanic to keep checking for response.

#Entering Assistant api calls.
# Global dictionary to store user thread IDs (replace with a database in production)
user_threads = {}

#Assistant doc handlers
# Constants for adaptive polling
INITIAL_POLLING_INTERVAL = 2.0  # Initial polling interval in seconds
MAX_POLLING_INTERVAL = 10.0  # Maximum polling interval in seconds
TIMEOUT = 180  # Total time to wait for a response (upper limit)

# Global dictionary to store user thread IDs
user_threads = {}

def log_assistant_response(assistant_name, message):
    """Logs the assistant's response to a file named after the assistant."""
    filename = f"/app/{assistant_name}_responses.out"
    with open(filename, 'a') as log_file:
        log_file.write(f"{message}\n")
        log_file.write("-----\n")  # Delimiter for separating entries


async def handle_assistant_response(assistant_id, prompt, user):
    runic_logging_debug(f"🧠🧠🧠 Handling assistant response for user '{user}' with prompt '{prompt}'")

    # Ensure a thread exists for the user or create one
    thread_id = await ensure_thread_exists(openai_client, user)

    # Send user's message to the thread and create a run
    try:
        message_response = openai_client.beta.threads.messages.create(
            thread_id=thread_id,
            role="user",
            content=prompt
        )
        runic_logging_debug(f"Message sent to thread {thread_id}, response: {message_response}")

        run = trigger_assistant(openai_client, thread_id, assistant_id)
        runic_logging_debug(f"Triggered assistant {assistant_id} for thread {thread_id}, run ID: {run.id}")
    except Exception as e:
        runic_logging_debug(f"Error in handling response: {e}")
        return "Sorry, there was an error in processing your request."

    # Adaptive polling for assistant's response
    start_time = time.time()
    current_polling_interval = INITIAL_POLLING_INTERVAL
    last_notification_time = start_time

    while True:
        elapsed_time = time.time() - start_time
        if elapsed_time > TIMEOUT:
            runic_logging_debug("Assistant response timed out.")
            return "Assistant response timed out."

        # Send a direct message to the user every 10 seconds
        if time.time() - last_notification_time >= 10.0:
            runic_logging_debug(f"🧠🧠🧠...THINKING/RESEARCHING...🧠🧠🧠")
            last_notification_time = time.time()

        run_status = openai_client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
        runic_logging_debug(f"Polling assistant run status: {run_status.status}")

        if run_status.status == "completed":
            messages = openai_client.beta.threads.messages.list(thread_id=thread_id)
            assistant_responses = [msg for msg in messages.data if msg.role == 'assistant']
            if assistant_responses:
                response = assistant_responses[-1].content[0].text.value
                runic_logging_debug(f"Assistant response: {response}")
                log_assistant_response(assistant_id, f"User: {user}, Prompt: {prompt}, Response: {response}")
                return response
            break
        elif run_status.status == "failed":
            runic_logging_debug(f"Run failed: {run_status.last_error}")
            return "Assistant run failed."

        await asyncio.sleep(current_polling_interval)
        current_polling_interval = min(MAX_POLLING_INTERVAL, current_polling_interval * 2)

    runic_logging_debug("No response from assistant.")
    return "No response received from assistant."

async def ensure_thread_exists(openai_client, user_id):
    if user_id not in user_threads:
        new_thread = openai_client.beta.threads.create()
        user_threads[user_id] = new_thread.id
        runic_logging_debug(f"Created new thread for user '{user_id}' with thread ID '{new_thread.id}'")
    return user_threads[user_id]

def trigger_assistant(openai_client, thread_id, assistant_id):
    try:
        run = openai_client.beta.threads.runs.create(
            thread_id=thread_id,
            assistant_id=assistant_id,
        )
        return run
    except Exception as e:
        raise e

def get_assistant_responses(openai_client, thread_id):
    try:
        thread_messages = openai_client.beta.threads.messages.list(thread_id=thread_id)
        logging.debug(f"Raw thread messages: {thread_messages}")
        responses = []
        for message in thread_messages.data:
            if 'content' in message and isinstance(message['content'], list):
                for content in message['content']:
                    if content.get('type') == 'text' and 'text' in content:
                        text_value = content['text'].get('value', '')
                        if text_value:
                            responses.append(text_value)
            else:
                logging.warning(f"Unexpected message structure: {message}")
        return responses
    except Exception as e:
        logging.error(f"Error in get_assistant_responses: {e}")
        raise
3 Likes