High User Volume with Async OpenAI

I am developing a Telegram bot using Async OpenAI, but I’m facing challenges in managing a high volume of users simultaneously. Currently, the bot responds to user queries one by one, resulting in slow performance.

The bot is structured to handle various functionalities, including fetching token prices, generating images with DALL-E, voice responses, Google search results, and handling threads for each user. Here is a snippet of my code structure:

Generating Response:

async def answerResponse(thread, message):
    try:
        print(f"Preparing to answer: {message}")
        await client.beta.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=message,
        )

        run = await client.beta.threads.runs.create(
            thread_id=thread.id,
            assistant_id=assistantId,
        )
        old_status = ""
        print(f"Thread run status: {run.status}")

        while True:
            run = await client.beta.threads.runs.retrieve(
                run_id=run.id,
                thread_id=thread.id,
            )
            current_status = run.status
            if current_status != old_status:
                print(f"Run status: {run.status} For Thread: {thread.id}")
                old_status = current_status

            if run.status not in [
                "queued",
                "in_progress",
                "cancelling",
                "requires_action",
            ]:
                break
            elif run.status == "requires_action":
                tool_calls = run.required_action.submit_tool_outputs.tool_calls
                tool_outputs = []
                for tool_call in tool_calls:
                    function_name = tool_call.function.name
                    try:
                        if tool_call.function.arguments:
                            arguments = json.loads(tool_call.function.arguments)
                            print(
                                f"Should call '{function_name}' with args {arguments}"
                            )
                            if function_name == "get_token_price":
                                token_name = arguments["token_name"]
                                price = await get_token_price(token_name)
                                tool_outputs.append(
                                    {"tool_call_id": tool_call.id, "output": price}
                                )

                                print(f"Called '{function_name}' with args {arguments}")
                            elif function_name == "fetch_google_results":
                                query = arguments["query"]
                                results = await fetch_google_results(query)
                                tool_outputs.append(
                                    {
                                        "tool_call_id": tool_call.id,
                                        "output": json.dumps(results),
                                    }
                                )
                            elif function_name == "generate_image_with_dalle":
                                prompt = arguments["prompt"]
                                try:
                                    image_data = await generate_image_with_dalle(prompt)
                                    tool_outputs.append(
                                        {
                                            "tool_call_id": tool_call.id,
                                            "output": image_data,
                                        }
                                    )
                                    print(image_data)
                                    return image_data
                                except ValueError as e:
                                    print(f"Error: {str(e)}")
                                    tool_outputs.append(
                                        {
                                            "tool_call_id": tool_call.id,
                                            "output": "Error generating image",
                                        }
                                    )

                            print(f"Called '{function_name}' with args {arguments}")
                        else:
                            print("No arguments provided for the function call")
                    except json.JSONDecodeError as e:
                        print(f"An error occurred while decoding JSON: {e}")
                await client.beta.threads.runs.submit_tool_outputs(
                    run_id=run.id,
                    thread_id=thread.id,
                    tool_outputs=tool_outputs,
                )

            else:
                time.sleep(0.1)

        if run.status == "completed":
            messages = await client.beta.threads.messages.list(
                thread_id=thread.id,
                limit=1,
            )

            first_message_content = messages.data[0].content[0].text.value
            response = clean_api_response(first_message_content)
            print(f"Generated response: {response}")
            return response
        else:
            print(f"Run status issue: {run.status}")
            print(run.last_error)
            return "Server Error, please try again."

    except openai.BadRequestError as e:
        print(f"BadRequestError encountered: {e}. Deleting old thread and retrying...")
        return "Error"

Processing messages:

async def process_message(message_text, update, context, is_voice=False):
    global threads
    counter = 0

    while counter < 3:
        async with aiofiles.open(threads_path, "r") as file:
            threads = json.loads(await file.read())

        user_thread_id = threads.get(str(update.effective_user.id))
        thread = None

        if user_thread_id:
            try:
                thread = await client.beta.threads.retrieve(thread_id=user_thread_id)
            except openai.NotFoundError:
                thread = await create_new_thread(
                    threads, update.effective_user.id, threads_path
                )
        else:
            thread = await create_new_thread(
                threads, update.effective_user.id, threads_path
            )

        answer = await answerResponse(thread, message_text)
        if answer != "Error":
            break
        else:
            await delete_thread_from_json(update.effective_user.id, threads_path)
            counter += 1
    if is_voice:
        print("Generating voice message...")
        voice_message_path = await generate_voice_response(answer)

        if voice_message_path:
            with open(voice_message_path, "rb") as voice:
                await context.bot.send_voice(
                    chat_id=update.effective_chat.id, voice=voice
                )
            os.remove(voice_message_path)
        else:
            print("Error: voice message generation failed.")
    else:
        if answer and answer != "Error":
            await handle_text_or_image_response(answer, update, context)
        else:
            await context.bot.send_message(
                chat_id=update.effective_chat.id,
                text="I'm sorry, but I'm unable to process your message at the moment.",
            )

Main function:

async def main():
    nest_asyncio.apply()
    application = Application.builder().token(TOKEN).build()

    voice_handler = MessageHandler(filters.VOICE, handle_voice_message)

    reply_to_bot_handler = MessageHandler(
        filter_reply_to_bot | filters.ChatType.PRIVATE, echo
    )
    bot_mention_or_start_handler = MessageHandler(filter_bot_mention_or_start, echo)
    application.add_handler(CommandHandler("start", start))
    application.add_handler(CommandHandler("clear", clear))

    application.add_handler(voice_handler)
    application.add_handler(reply_to_bot_handler)
    application.add_handler(bot_mention_or_start_handler)

    print("Bot started")
    await application.run_polling()


if __name__ == "__main__":
    asyncio.run(main())

Specific Problem

The bot is not scaling well with an increasing number of users. Despite using asynchronous programming, it still processes requests sequentially, leading to slow response times. I’m looking for advice or solutions to enable the bot to handle multiple requests concurrently, improving response time and scalability.

  1. Is there a way to optimize the async operations to handle multiple user requests more efficiently?
  2. Can threading or multiprocessing be integrated with the current async setup to improve concurrency?
  3. Are there any specific design patterns or architectural changes recommended for bots like this to enhance scalability?