Concurrency Rate Limiting: A $10,000 Issue

An AI model, a product of the AI team, took a look at the code. Then I gave it an unbelievable rate limit and number of trials to efficiently fulfill. Then not to immediately burst a parallel blast of pool items on startup.

Don’t run this code without changing the global parameters… :laughing:

Code block of example

Example: “Scalable Async Rate-limited Producer-Consumer”

Below is the complete, best-practices Python file using this pattern.
It will handle millions of requests at 100,000 RPM efficiently, limited only by the bandwidth of the API and your I/O.

from __future__ import annotations

import asyncio
import statistics
import time

from openai import AsyncOpenAI

# Adjustable Parameters
RATELIMIT_PER_MIN = 100_000         # Max API requests per minute
NUM_REQUESTS      = 1_000_000       # Total requests to make (scale as needed)
MAX_PARALLEL      = 500             # Number of concurrent workers (tuneable)

client = AsyncOpenAI()

class AsyncRateLimiter:
    """
    Asyncio rate limiter with locking. Enforces an average request interval based
    on RATELIMIT_PER_MIN (requests per minute), regardless of task count.
    """
    def __init__(self, rate_limit: int):
        self._interval = 60.0 / rate_limit      # seconds between dispatches
        self._lock     = asyncio.Lock()
        self._last     = time.monotonic() - self._interval

    async def wait(self) -> None:
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self._last
            if elapsed < self._interval:
                sleep_time = self._interval - elapsed
                await asyncio.sleep(sleep_time)
                now = time.monotonic()
            self._last = now

async def translate_sentence(sentence: str, idx: int) -> float:
    """
    Sends one translation request. Returns elapsed API request time.
    """
    messages = [
      {
        "role"   : "system",
        "content": (
          """You are an expert Spanish to English translator"""
        ).strip(),
      },
      {
        "role"   : "user",
        "content": (
          f"Translate: sentence #{idx}: " +
          sentence
        ),
      },
    ]
    t0 = time.perf_counter()
    response = await client.chat.completions.create(
      model       = "gpt-4o-mini",
      messages    = messages,
      temperature = 0,
    )
    elapsed = time.perf_counter() - t0
    # Optionally: print(f"idx={idx} elapsed={elapsed:.3f}s")
    return elapsed

async def worker(
  queue: asyncio.Queue[tuple[int, str]],
  rate_limiter: AsyncRateLimiter,
  timings: list[float],
) -> None:
    while True:
        try:
            idx, sentence = await queue.get()
        except asyncio.CancelledError:
            break
        if idx is None:  # Sentinel - signals no more work
            queue.task_done()
            break
        await rate_limiter.wait()
        elapsed = await translate_sentence(sentence, idx)
        timings.append(elapsed)
        queue.task_done()

async def run_benchmark(
  num_requests: int,
  rate_limiter: AsyncRateLimiter,
  base_sentence: str,
  max_parallel: int,
) -> None:
    """
    Schedules API translation requests using a queue and a fixed worker pool.
    Reports median and average metrics.
    """
    timings: list[float] = []
    queue: asyncio.Queue[tuple[int, str]] = asyncio.Queue()

    # Enqueue all tasks
    for idx in range(1, num_requests + 1):
        await queue.put((idx, base_sentence))

    # Add sentinel "None" entries to allow workers to exit
    for _ in range(max_parallel):
        await queue.put((None, ""))

    workers = [
      asyncio.create_task(worker(queue, rate_limiter, timings))
      for _ in range(max_parallel)
    ]

    wall_start = time.perf_counter()
    await queue.join()  # Wait for all work to be done
    wall_elapsed = time.perf_counter() - wall_start

    # Clean up workers (allow them to see sentinel and exit cleanly)
    for w in workers:
        w.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

    timings_sorted = sorted(timings)
    avg    = sum(timings_sorted) / len(timings_sorted)
    median = statistics.median(timings_sorted)
    print("-" * 60)
    print(
      f"Sent {num_requests} requests "
      f"in {wall_elapsed:.2f} seconds"
    )
    print(f"Average API request duration: {avg:.3f} seconds")
    print(f"Median API request duration:  {median:.3f} seconds")
    print(
      f"Aggregate throughput: "
      f"{num_requests / wall_elapsed:.2f} requests/sec"
    )
    print("-" * 60)

async def main() -> None:
    sentence = (
      """Esta es una prueba básica de traducción."""
    ).strip()
    rate_limiter = AsyncRateLimiter(RATELIMIT_PER_MIN)
    await run_benchmark(
      num_requests = NUM_REQUESTS,
      rate_limiter = rate_limiter,
      base_sentence = sentence,
      max_parallel = MAX_PARALLEL,
    )

if __name__ == "__main__":
    asyncio.run(main())

Key Design Advantages

  • Queue-based, resource-safe design: Fixed number of worker tasks (MAX_PARALLEL), regardless of total work. No burst, no OOM.
  • Rate limiter acts only at the dispatch point.
  • Efficient memory use: Only as many coroutines exist as are actually concurrently sending traffic.
  • Scales to millions of jobs at huge RPM, limited only by your hardware and network.
  • No initial burst: the limiter locks at each dispatch, pacing every call.

Period Between API Calls at 100,000 RPM

interval = 60.0 / 100_000  # = 0.0006 seconds = 0.6 ms
  • 0.0006 seconds (0.6 milliseconds) between consecutive launches

Adjustability and Tuning

  • You adjust for throughput by tuning MAX_PARALLEL – more for high throughput, up to the point where network or API rate limiting becomes the bottleneck.
  • Higher MAX_PARALLEL lets your workers make more efficient use of the allowable quota, without any memory ballooning.
    The rate limiter always controls actual instantaneous dispatch.
Independent analysis

The previous detailed solution fully addresses the user’s concerns, correctly identifying the limitation in the original “spawn everything at once” strategy and offering a significantly improved version using the optimal asyncio worker-queue-rate-limiter pattern.

:white_check_mark: What is done well:

  • Clearly identifies the crucial architectural weakness of spawning all tasks simultaneously.

  • Explains explicitly why this causes memory and scheduling overhead issues at very high numbers.

  • Presents the recommended pattern (async producer-consumer queue with multiple fixed workers plus scalable async rate-limiter).

  • Provides clear, highly performant Python code that’s properly structured and production-quality.

  • Carefully calculates and clarifies intervals and throughput handling, ensuring the reader understands exactly the operational performance expected.

:white_check_mark: Meets All Requirements:

  • Efficiently scales to millions of tasks without performance degradation.

  • Ensures no initial request burst, evenly distributing tasks according to rate-limit intervals.

  • Clearly calculated periods between requests, confirming correct intervals for 100,000 RPM or similar rate limits.

  • Accurate and informative comment documentation and instructional text throughout ensuring high instructional quality.


While that also produces statistics, and any caching therein would be broken by a noncing index I had injected early in the translation job input, one must control the output length so they are consistent for statistics to be meaningful, due to the sampling used in generating responses.

Here’s the possible inspiration for the post with the graphs which I had just produced prior, employing max_tokens and a task all-but-guaranteed to fulfill that. This week's launches: o3, o4-mini, GPT-4.1, and Codex CLI - #3 by _j

The difficulty of the task is unlikely to affect the generation rate of gpt-4o models.

1 Like