Best strategy on managing concurrent calls ? (Python/Asyncio)

Hi everyone,

I’m trying to understand what is the best approach to handle concurrent calls to Whisper Transcriptions API - like 50 at the same time with an average size audio of 10 MB for each call.
My stack is Python and Asyncio.
I have two main concerns :

  1. Memory wise (RAM) : reading the audio file prior to sending it to the Transcriptions API is a huge bummer (50 concurrent calls with 10 MB audio files…) : I’ve tried to implement chunked transfer encoding with chunks of 8KB to workaround this memory exhaustion risk, but it doesn’t seem like Whisper API supports this ?
  2. API rate limits : Being on Tier 2 restricts me to 50RPM, so using a pydub chunking approach would burn my RPM…

Context : I’m hosting my web app on Heroku with a RAM capacity of 512MB.
I admit that I’m a newbie regarding handling stuff concurrently, and I would appreciate a ton any advice you guys could give me on this !

Thanks a lot !

Kinimod

Here is some code :grin:

import os
import asyncio
import aiohttp
import aiofiles
from typing import List


async def async_transcribe_audio(
    session: aiohttp.ClientSession,
    input_file_path: str,
    output_file_path: str,
    rate_limit_semaphore: asyncio.Semaphore,
):
    url = "https://api.openai.com/v1/audio/transcriptions"
    headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}

    async with rate_limit_semaphore:
        print(f"Starting transcription for '{input_file_path}'")
        async with aiofiles.open(input_file_path, "rb") as file:
            data = aiohttp.FormData()
            data.add_field(
                "file",
                file,
                filename=input_file_path.split("/")[-1],
                content_type="audio/mpeg",
            )
            data.add_field("model", "whisper-1")
            data.add_field("language", "en")
            data.add_field("prompt", "Welcome to our radio show.")
            data.add_field("response_format", "json")
            data.add_field("temperature", "0.2")

            try:
                async with session.post(url, headers=headers, data=data) as response:
                    response.raise_for_status()
                    transcription = await response.json()
                    
                    remaining_requests = int(
                        response.headers.get("x-ratelimit-remaining-requests", 0)
                    )
                    total_requests = int(
                        response.headers.get("x-ratelimit-limit-requests", 1)
                    )
                    remaining_percentage = remaining_requests / total_requests
                    print(f"Rate: {remaining_requests}/{total_requests}: {remaining_percentage*100:.1f}%")

                    if remaining_percentage < 0.1:  # Adjust this threshold as needed
                        backoff = 5  # Add a 5-second pause
                        print(
                            f"Low remaining requests ({remaining_requests}/{total_requests}). Retrying in {backoff} seconds."
                        )
                        await asyncio.sleep(backoff)
            except Exception as e:
                if response.status == 429:
                    if "check your plan" in response.text:
                        print("Aborting due to exceeded quota.")
                        return

                        return await async_transcribe_audio(
                            session,
                            input_file_path,
                            output_file_path,
                            rate_limit_semaphore,
                        )
                print(f"An API error occurred: {e}")
                # do some more backoff, retry, or handing cases
                return

        print(f"Sending complete for '{input_file_path}'")

    transcribed_text = transcription["text"]

    try:
        async with aiofiles.open(output_file_path, "w") as file:
            await file.write(transcribed_text)
        print(f"--- Transcribed text successfully saved to '{output_file_path}'.")
    except Exception as e:
        print(f"Output file error: {e}")


async def main(input_file_paths: List[str]):
    rate_limit_semaphore = asyncio.Semaphore(
        20
    )  # Allow at least one request every 3 seconds
    output_file_paths = [path + "-transcript.txt" for path in input_file_paths]

    async with aiohttp.ClientSession() as session:
        tasks = [
            async_transcribe_audio(
                session, input_file_path, output_file_path, rate_limit_semaphore
            )
            for input_file_path, output_file_path in zip(
                input_file_paths, output_file_paths
            )
        ]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    input_file_paths = ["audio1.mp3","audio2.mp3","audio3.mp3",]
    asyncio.run(main(input_file_paths))

"""
x-ratelimit-limit-requests	60	The maximum number of requests that are permitted before exhausting the rate limit.
x-ratelimit-limit-tokens	150000	The maximum number of tokens that are permitted before exhausting the rate limit.
x-ratelimit-remaining-requests	59	The remaining number of requests that are permitted before exhausting the rate limit.
x-ratelimit-remaining-tokens	149984	The remaining number of tokens that are permitted before exhausting the rate limit.
x-ratelimit-reset-requests	1s	The time until the rate limit (based on requests) resets to its initial state.
x-ratelimit-reset-tokens	6m0s	The time until the rate limit (based on tokens) resets to its initial state.
"""

It’s at a point of 100% working and 50% of checking and error-handling that would be prudent to not empty your account balance.

This is a starting point to the next step of typical application, which is to speed up the process and get less errors by splitting to one-minute chunks by silence and batching those for text re-assembly.


Here’s ChatGPT doing the documentation for me:

Here’s a detailed explanation of how the provided code addresses your concerns about concurrent calls to the Whisper Transcriptions API, focusing on memory usage, rate limiting, and handling rate limit errors:

Overview

The code aims to transcribe audio files concurrently while addressing memory constraints and API rate limits using aiohttp and asyncio. It includes mechanisms for reading audio files in a memory-efficient way and respecting the API rate limits.

Key Concepts and Methods

  1. Memory Management:

    • The code reads audio files asynchronously in chunks using aiofiles. This approach helps manage memory usage more efficiently compared to loading entire files into memory at once.
    • Using aiofiles.open with a context manager ensures that the file is properly closed after processing.
  2. Rate Limiting:

    • The rate_limit_semaphore controls the number of concurrent API requests. By initializing it with asyncio.Semaphore(20), the code allows up to 20 concurrent requests.
    • This semaphore ensures that only a limited number of requests are made simultaneously, respecting the API rate limit and preventing rate limit errors.
  3. Handling Rate Limit Errors:

    • The code checks the x-ratelimit-remaining-requests header in the API response to monitor the remaining requests.
    • If the remaining requests fall below a certain threshold (10% in this case), it introduces a backoff period (5 seconds) before retrying.
    • If an API error occurs, the code retries the request if the error is related to rate limiting (status 429).

Step-by-Step Tutorial

  1. Initialize the Semaphore:

    rate_limit_semaphore = asyncio.Semaphore(20)
    

    This semaphore limits the number of concurrent API requests to 20, reducing the likelihood of hitting the rate limit.

  2. Asynchronous File Reading:

    async with aiofiles.open(input_file_path, "rb") as file:
    

    aiofiles is used for asynchronous file operations, reading files in a non-blocking manner to conserve memory.

  3. FormData for API Request:

    data = aiohttp.FormData()
    data.add_field("file", file, filename=input_file_path.split("/")[-1], content_type="audio/mpeg")
    

    This creates a form-data payload for the API request, including the audio file and additional parameters required by the Whisper API.

  4. Handling API Rate Limits:

    remaining_requests = int(response.headers.get("x-ratelimit-remaining-requests", 0))
    total_requests = int(response.headers.get("x-ratelimit-limit-requests", 1))
    remaining_percentage = remaining_requests / total_requests
    

    These lines extract the rate limit information from the API response headers to monitor the remaining requests.

  5. Backoff Mechanism:

    if remaining_percentage < 0.1:
        backoff = 5
        await asyncio.sleep(backoff)
    

    If the remaining requests fall below 10% of the total allowed, the code waits for 5 seconds before making further requests.

Usage Instructions

To use this script with a long list of audio files and customize the rate limits according to your API plan, follow these steps:

  1. List Your Audio Files:
    Modify the input_file_paths list to include all your audio file paths.

    input_file_paths = ["audio1.mp3", "audio2.mp3", "audio3.mp3", ...]
    
  2. Adjust the Semaphore for Your Rate Limits:
    If your API plan allows a different number of requests per minute, adjust the semaphore value accordingly. For example, if you can make 60 requests per minute:

    rate_limit_semaphore = asyncio.Semaphore(60)
    
  3. Run the Script:
    Execute the script using a Python environment with the necessary packages installed (aiohttp and aiofiles).

Example

Here’s a modified example for a larger list of files and a custom rate limit:

if __name__ == "__main__":
    input_file_paths = ["audio1.mp3", "audio2.mp3", "audio3.mp3", ..., "audio50.mp3"]
    asyncio.run(main(input_file_paths))

And adjust the semaphore if your rate limit is different:

rate_limit_semaphore = asyncio.Semaphore(30)  # Adjust this based on your API rate limit

Conclusion

This script effectively handles memory usage and rate limits for concurrent API requests. By using asynchronous file reading and a semaphore to control concurrency, it ensures efficient memory management and compliance with API rate limits. Customize the file paths and semaphore value as per your requirements to handle larger lists of audio files and different API rate limits.

Alternate solution explanation

Explanation of the Solution

1. Memory Management

The challenge highlighted involves handling large audio files (10 MB each) concurrently, which can lead to substantial memory usage especially when dealing with many files simultaneously. To address this, the script uses an asynchronous approach with aiohttp and aiofiles for non-blocking I/O operations. This means that the file reading, network requests, and writing to disk do not block the execution of the program, allowing other tasks to proceed in parallel. This approach minimizes memory consumption as:

  • Asynchronous File Handling: The script uses aiofiles to handle file reading and writing operations. This library provides an asynchronous interface to file I/O, ensuring that these potentially slow operations do not block the asynchronous loop. By reading and writing files asynchronously, the application can manage other tasks in the meantime, such as handling other file transcriptions or responding to rate limit constraints.
  • Streaming Uploads: Although the provided script does not implement chunked transfer encoding directly due to API limitations, it efficiently manages file uploads by creating a streaming form data object (aiohttp.FormData). This potentially reduces the memory footprint as the file does not need to be loaded entirely into memory before being sent.

2. Rate Limit Management

The script addresses the API’s rate limits in multiple ways:

  • Semaphore for Rate Limiting: A semaphore (asyncio.Semaphore) is used to limit the number of concurrent API calls. This is crucial for adhering to the API’s rate limit without needing to explicitly check the rate limit status before each request. The semaphore’s value should be set based on the rate limit of the API (e.g., 50 requests per minute for Tier 2), and it regulates the execution of concurrent requests.
  • Dynamic Response to Rate Limits: The script checks the response headers for remaining requests and adapts its behavior accordingly. If the available rate limit is low (less than 10% of the total limit available), it introduces a backoff delay (await asyncio.sleep(backoff)), which pauses the execution before attempting further requests. This proactive measure helps in preventing rate limit errors.

Usage Instructions for Long Lists of Files and Custom Rate Limits

Handling Long Lists of Files:

  • For handling long lists of audio files, ensure that the list input_file_paths is populated with the paths to all the audio files you wish to transcribe. The script can handle these files in batches or one-by-one based on the limits imposed by the semaphore.
  • For very long lists that could potentially exhaust daily or monthly API limits, consider adding additional logic to monitor and control usage over longer periods (e.g., daily caps).

Setting Custom Rate Limits:

  • If you are on a different tier or have a custom arrangement with specific rate limits, you can adjust the semaphore’s value accordingly. For example, if your limit is 100 requests per minute, initialize the semaphore with a value of 100:
    rate_limit_semaphore = asyncio.Semaphore(100)
    
  • Adapt the backoff and retry strategies based on your specific needs and the API’s rate limit reset intervals.

Execution:

  • To run the script, simply execute it in an environment where Python 3.7+ is installed, and ensure that aiohttp and aiofiles libraries are installed. Use the following commands to install necessary libraries if they are not already installed:
    pip install aiohttp aiofiles
    
  • Ensure that the OPENAI_API_KEY environment variable is set with your API key, or modify the script to read this from a secure configuration.

By addressing these areas, the script effectively manages both memory and API rate limits, enabling efficient and scalable use of the Whisper Transcription API in a resource-constrained environment like Heroku.


If you’d like to invest more time in different parallel queue solutions, you can look at this OpenAI cookbook code, besides a bit which accompanies the “rate limit” API documentation.

PS: Example output

Starting transcription for ‘audio1.mp3’
Starting transcription for ‘audio2.mp3’
Starting transcription for ‘audio3.mp3’
Rate: 497/500: 99.4%
Sending complete for ‘audio3.mp3’
— Transcribed text successfully saved to ‘audio3.mp3-transcript.txt’.
Rate: 499/500: 99.8%
Sending complete for ‘audio1.mp3’
— Transcribed text successfully saved to ‘audio1.mp3-transcript.txt’.
Rate: 497/500: 99.4%
Sending complete for ‘audio2.mp3’
— Transcribed text successfully saved to ‘audio2.mp3-transcript.txt’.