sharing to help those building with api assistants that have documents. this checks to see if thread exists for a user already if not it makes one. the user uses only one thread in this case always so adjust if you need new one each pass.
this also logs out to a debug file for data capture and debug understanding. this also has a polling mechanic to keep checking for response.
#Entering Assistant api calls.
# Global dictionary to store user thread IDs (replace with a database in production)
user_threads = {}
#Assistant doc handlers
# Constants for adaptive polling
INITIAL_POLLING_INTERVAL = 2.0 # Initial polling interval in seconds
MAX_POLLING_INTERVAL = 10.0 # Maximum polling interval in seconds
TIMEOUT = 180 # Total time to wait for a response (upper limit)
# Global dictionary to store user thread IDs
user_threads = {}
def log_assistant_response(assistant_name, message):
"""Logs the assistant's response to a file named after the assistant."""
filename = f"/app/{assistant_name}_responses.out"
with open(filename, 'a') as log_file:
log_file.write(f"{message}\n")
log_file.write("-----\n") # Delimiter for separating entries
async def handle_assistant_response(assistant_id, prompt, user):
runic_logging_debug(f"🧠🧠🧠 Handling assistant response for user '{user}' with prompt '{prompt}'")
# Ensure a thread exists for the user or create one
thread_id = await ensure_thread_exists(openai_client, user)
# Send user's message to the thread and create a run
try:
message_response = openai_client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=prompt
)
runic_logging_debug(f"Message sent to thread {thread_id}, response: {message_response}")
run = trigger_assistant(openai_client, thread_id, assistant_id)
runic_logging_debug(f"Triggered assistant {assistant_id} for thread {thread_id}, run ID: {run.id}")
except Exception as e:
runic_logging_debug(f"Error in handling response: {e}")
return "Sorry, there was an error in processing your request."
# Adaptive polling for assistant's response
start_time = time.time()
current_polling_interval = INITIAL_POLLING_INTERVAL
last_notification_time = start_time
while True:
elapsed_time = time.time() - start_time
if elapsed_time > TIMEOUT:
runic_logging_debug("Assistant response timed out.")
return "Assistant response timed out."
# Send a direct message to the user every 10 seconds
if time.time() - last_notification_time >= 10.0:
runic_logging_debug(f"🧠🧠🧠...THINKING/RESEARCHING...🧠🧠🧠")
last_notification_time = time.time()
run_status = openai_client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
runic_logging_debug(f"Polling assistant run status: {run_status.status}")
if run_status.status == "completed":
messages = openai_client.beta.threads.messages.list(thread_id=thread_id)
assistant_responses = [msg for msg in messages.data if msg.role == 'assistant']
if assistant_responses:
response = assistant_responses[-1].content[0].text.value
runic_logging_debug(f"Assistant response: {response}")
log_assistant_response(assistant_id, f"User: {user}, Prompt: {prompt}, Response: {response}")
return response
break
elif run_status.status == "failed":
runic_logging_debug(f"Run failed: {run_status.last_error}")
return "Assistant run failed."
await asyncio.sleep(current_polling_interval)
current_polling_interval = min(MAX_POLLING_INTERVAL, current_polling_interval * 2)
runic_logging_debug("No response from assistant.")
return "No response received from assistant."
async def ensure_thread_exists(openai_client, user_id):
if user_id not in user_threads:
new_thread = openai_client.beta.threads.create()
user_threads[user_id] = new_thread.id
runic_logging_debug(f"Created new thread for user '{user_id}' with thread ID '{new_thread.id}'")
return user_threads[user_id]
def trigger_assistant(openai_client, thread_id, assistant_id):
try:
run = openai_client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id,
)
return run
except Exception as e:
raise e
def get_assistant_responses(openai_client, thread_id):
try:
thread_messages = openai_client.beta.threads.messages.list(thread_id=thread_id)
logging.debug(f"Raw thread messages: {thread_messages}")
responses = []
for message in thread_messages.data:
if 'content' in message and isinstance(message['content'], list):
for content in message['content']:
if content.get('type') == 'text' and 'text' in content:
text_value = content['text'].get('value', '')
if text_value:
responses.append(text_value)
else:
logging.warning(f"Unexpected message structure: {message}")
return responses
except Exception as e:
logging.error(f"Error in get_assistant_responses: {e}")
raise