I am developing a Telegram bot using Async OpenAI, but I’m facing challenges in managing a high volume of users simultaneously. Currently, the bot responds to user queries one by one, resulting in slow performance.
The bot is structured to handle various functionalities, including fetching token prices, generating images with DALL-E, voice responses, Google search results, and handling threads for each user. Here is a snippet of my code structure:
Generating Response:
async def answerResponse(thread, message):
try:
print(f"Preparing to answer: {message}")
await client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=message,
)
run = await client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistantId,
)
old_status = ""
print(f"Thread run status: {run.status}")
while True:
run = await client.beta.threads.runs.retrieve(
run_id=run.id,
thread_id=thread.id,
)
current_status = run.status
if current_status != old_status:
print(f"Run status: {run.status} For Thread: {thread.id}")
old_status = current_status
if run.status not in [
"queued",
"in_progress",
"cancelling",
"requires_action",
]:
break
elif run.status == "requires_action":
tool_calls = run.required_action.submit_tool_outputs.tool_calls
tool_outputs = []
for tool_call in tool_calls:
function_name = tool_call.function.name
try:
if tool_call.function.arguments:
arguments = json.loads(tool_call.function.arguments)
print(
f"Should call '{function_name}' with args {arguments}"
)
if function_name == "get_token_price":
token_name = arguments["token_name"]
price = await get_token_price(token_name)
tool_outputs.append(
{"tool_call_id": tool_call.id, "output": price}
)
print(f"Called '{function_name}' with args {arguments}")
elif function_name == "fetch_google_results":
query = arguments["query"]
results = await fetch_google_results(query)
tool_outputs.append(
{
"tool_call_id": tool_call.id,
"output": json.dumps(results),
}
)
elif function_name == "generate_image_with_dalle":
prompt = arguments["prompt"]
try:
image_data = await generate_image_with_dalle(prompt)
tool_outputs.append(
{
"tool_call_id": tool_call.id,
"output": image_data,
}
)
print(image_data)
return image_data
except ValueError as e:
print(f"Error: {str(e)}")
tool_outputs.append(
{
"tool_call_id": tool_call.id,
"output": "Error generating image",
}
)
print(f"Called '{function_name}' with args {arguments}")
else:
print("No arguments provided for the function call")
except json.JSONDecodeError as e:
print(f"An error occurred while decoding JSON: {e}")
await client.beta.threads.runs.submit_tool_outputs(
run_id=run.id,
thread_id=thread.id,
tool_outputs=tool_outputs,
)
else:
time.sleep(0.1)
if run.status == "completed":
messages = await client.beta.threads.messages.list(
thread_id=thread.id,
limit=1,
)
first_message_content = messages.data[0].content[0].text.value
response = clean_api_response(first_message_content)
print(f"Generated response: {response}")
return response
else:
print(f"Run status issue: {run.status}")
print(run.last_error)
return "Server Error, please try again."
except openai.BadRequestError as e:
print(f"BadRequestError encountered: {e}. Deleting old thread and retrying...")
return "Error"
Processing messages:
async def process_message(message_text, update, context, is_voice=False):
global threads
counter = 0
while counter < 3:
async with aiofiles.open(threads_path, "r") as file:
threads = json.loads(await file.read())
user_thread_id = threads.get(str(update.effective_user.id))
thread = None
if user_thread_id:
try:
thread = await client.beta.threads.retrieve(thread_id=user_thread_id)
except openai.NotFoundError:
thread = await create_new_thread(
threads, update.effective_user.id, threads_path
)
else:
thread = await create_new_thread(
threads, update.effective_user.id, threads_path
)
answer = await answerResponse(thread, message_text)
if answer != "Error":
break
else:
await delete_thread_from_json(update.effective_user.id, threads_path)
counter += 1
if is_voice:
print("Generating voice message...")
voice_message_path = await generate_voice_response(answer)
if voice_message_path:
with open(voice_message_path, "rb") as voice:
await context.bot.send_voice(
chat_id=update.effective_chat.id, voice=voice
)
os.remove(voice_message_path)
else:
print("Error: voice message generation failed.")
else:
if answer and answer != "Error":
await handle_text_or_image_response(answer, update, context)
else:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="I'm sorry, but I'm unable to process your message at the moment.",
)
Main function:
async def main():
nest_asyncio.apply()
application = Application.builder().token(TOKEN).build()
voice_handler = MessageHandler(filters.VOICE, handle_voice_message)
reply_to_bot_handler = MessageHandler(
filter_reply_to_bot | filters.ChatType.PRIVATE, echo
)
bot_mention_or_start_handler = MessageHandler(filter_bot_mention_or_start, echo)
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("clear", clear))
application.add_handler(voice_handler)
application.add_handler(reply_to_bot_handler)
application.add_handler(bot_mention_or_start_handler)
print("Bot started")
await application.run_polling()
if __name__ == "__main__":
asyncio.run(main())
Specific Problem
The bot is not scaling well with an increasing number of users. Despite using asynchronous programming, it still processes requests sequentially, leading to slow response times. I’m looking for advice or solutions to enable the bot to handle multiple requests concurrently, improving response time and scalability.
- Is there a way to optimize the async operations to handle multiple user requests more efficiently?
- Can threading or multiprocessing be integrated with the current async setup to improve concurrency?
- Are there any specific design patterns or architectural changes recommended for bots like this to enhance scalability?