Struggling to achieve fast, parallel, embeddings

Hey all,

Been struggling to achieve fast embeddings on large, chunked corpuses of text (200 pages).

I’m using python, and implemented an asyncio coroutine + gather loop to call the api n times concurrently.

Using a batch size of 600 for strings in the array per request, a single request takes ~5.2 seconds. Since i’m using asyncio, I would expect most requests to take around that time, ideally finishing in under 10 seconds for 20 concurrent requests.

However, it appears that with each subsequent request the time increases. Why is this? I would like each task to take roughly the same time, and finish fast. It almost adds a second or two between each execution.

Function async_openai_embed took 4.796342 seconds to execute.
Function async_openai_embed took 7.096694 seconds to execute.
Function async_openai_embed took 9.332089 seconds to execute.
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
Function async_openai_embed took 11.629549 seconds to execute.
Function async_openai_embed took 13.932997 seconds to execute.
Function async_openai_embed took 16.511327 seconds to execute.
Function async_openai_embed took 18.786935 seconds to execute.
Function async_openai_embed took 21.024530 seconds to execute.
Function async_openai_embed took 23.467683 seconds to execute.
Function async_openai_embed took 25.757097 seconds to execute.
Function async_openai_embed took 28.144605 seconds to execute.
Function async_openai_embed took 30.391189 seconds to execute.
Function async_openai_embed took 32.617231 seconds to execute.
Function async_openai_embed took 34.847542 seconds to execute.
Function async_openai_embed took 37.185364 seconds to execute.
Function async_openai_embed took 39.401796 seconds to execute.
Function async_openai_embed took 41.642024 seconds to execute.
Function async_openai_embed took 43.875063 seconds to execute.
Function async_openai_embed took 46.141818 seconds to execute.
Function async_openai_embed took 48.457224 seconds to execute.

Code:

def timer(func):
    if asyncio.iscoroutinefunction(func):
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            result = await func(*args, **kwargs)
            end_time = time.time()
            execution_time = end_time - start_time
            print(f"Function {func.__name__} took {execution_time:.6f} seconds to execute.")
            return result
        return wrapper

from openai import AsyncOpenAI
client = AsyncOpenAI()

@timer
async def async_openai_embed(data):
    return await client.embeddings.create(
        model="text-embedding-ada-002",
        input=data["input"],
        encoding_format="float"
    )

def split_into_batches(lst, batch_size):
    """Yield successive batch_size-sized chunks from lst."""
    for i in range(0, len(lst), batch_size):
        yield lst[i:i + batch_size]

def get_batch_tasks(session, batches, sslcontext, semaphore):
    tasks = []
    for batch in batches:
        tasks.append(asyncio.create_task(async_openai_embed(batch)))

    return tasks

async def get_embeddings(batches):
    embeddings = []
    
    sslcontext = ssl.create_default_context(cafile=certifi.where())
    
    semaphore = asyncio.Semaphore(100)
    
    async with aiohttp.ClientSession() as session:
        tasks = get_batch_tasks(session, batches,  sslcontext, semaphore)
        counter = 0
        responses = await asyncio.gather(*tasks)

@router.get("/test/", response_description="test openai embeddings", status_code=status.HTTP_200_OK)
async def test():
    # Get the directory where this script is located
    current_dir = os.path.dirname(os.path.abspath(__file__))
    file_path = os.path.join(current_dir, 'chunk_texts.txt')

    # Read the chunks from file
    with open(file_path, 'r', encoding='utf-8') as f:
        texts = f.read().split(',')
    
    batch_size = 600
    batches = list(split_into_batches(texts, batch_size))
    embeddings = await get_embeddings(batches)

This has been super frustrating, i’ve tried everything I can think of to speed this up. Is this just how asyncio is in python? Am I being shadow throttled in the openai backend? As the number of requests increases, the time per request appears to greatly increase. In theory, if they’re all done in parallel, they should finish around the same time.

There’s lots of potential slowdowns with the API that may give reduced performance, mainly, that you are submitting to one routed Cloudflare endpoint from your IPv4, where OpenAI’s rate limiter is a frontend that does token approximation and account lookups, requiring its own computations.

For embeddings in general, I would be more future-proof by using the 3-large model, and capturing the original dimensions in base64. You can submit long lists of embeddings text input, up to 2k in one API request.

For programming, Python 3.12 on Linux with uvloop is the fastest parallel accelerator if you don’t want to switch to Go or another language with network throughput as a main consideration.

For Windows, it looks like there is finally someone working on a port called winloop.

I’ve been thinking about a simple embeddings corpus bulk batcher with fastest threaded parallel techniques, that could run batch JSONL at full price and speed and over the artificial 1M batch API limits, as perhaps the most logical drop-in for least-dependence on what data preparation and vector database someone else would run - while encouraging programming for the batch API’s discount.

1 Like