The code seems to be just one method of a class, not knowing what’s outside or called, so:
Code reconstructed and AI-analyzed to see if it is useful to us
Below is a clean read of what that snippet is doing, what it depends on, and where it likely lives in the codebase—followed by a properly indented, fully contained version of the same logic.
Discovery
What it is:
A singleasyncinstance method (likely on some worker/analysis class) that concurrently:
- analyzes file contents via an API, and
- performs web searches for queries in an analysis plan,
and returns two lists of results(file_answers, web_answers).It defines two nested async helper functions (closures) that share a single semaphore:
sem_analyze(file): throttles and callsanalyzeFile(...)viasafeAPICall.sem_websearch(query): throttles and callswebSearch(...)viasafeAPICall.Concurrency/flow:
- A shared
asyncio.Semaphore(4)gates the total concurrency across both file-analysis and web-search tasks.- Each task awaits a fixed sleep (
delay_between_calls = 5) inside the semaphore context before calling the API. This means each slot is occupied while sleeping, effectively reducing useful parallelism. Practically: batches of ≤4 will sleep for 5 seconds each before the API call, then release the slot after the call completes.- All tasks are scheduled together via
asyncio.gather(*all_tasks).gatherpreserves the order of the input coroutines, so slicing the results back intofile_answersandweb_answersis correct.Dependencies / unseen methods & fields (likely members of the same class):
self.logger: used to log start/finish and elapsed time.self.folderId: included in log messages.self.folder_dict["files"]: iterable of files to analyze.self.buildFileContent(file): transforms a file reference into content for the analysis call.self.safeAPICall(callable, *args): an async wrapper that probably handles retries/backoff/rate-limits, returning the result of callingcallable(*args).self.analyzeFile(key_analysis_areas, content): async method that likely issues the actual LLM (Chat Completions) request for file analysis.self.webSearch(query): async method that likely issues a web request or another LLM call for search results.What we can infer (and not infer) about the API calling approach:
- All outbound calls are funneled through
safeAPICall, which is where rate-limiting, retries, and exception policies would exist. Without that code, we cannot conclusively assess their scheduling/retry policy or diagnose latency variance beyond the visible concurrency throttle.- The code intentionally adds a fixed 5-second delay before every call (inside the semaphore), which dominates latency and can mask or exacerbate perceived variance.
- There’s a textual claim of “semaphore (limit = 5)”; the code shows
asyncio.Semaphore(4). That mismatch is likely an oversight in the post, not logic, but worth noting.On “quality scheduling of workers”:
This is a simple throttle pattern rather than a queue/worker pool. It works, but:
- Sleeping while holding the semaphore reduces effective concurrency and throughput.
- There’s no per-call timing/metrics at the task level, only a total elapsed-time log.
asyncio.gatherwill schedule all coroutines immediately; the semaphore does the real gating.
Reconstructed code (proper indentation & a plausible container)
import asyncio import time from typing import Any class Analyzer: logger: Any folderId: str folder_dict: dict[str, Any] async def analyzeFilesAndWeb(self, analysis_plan: dict[str, Any]) -> tuple[list[Any], list[Any]]: self.logger.info(f"{self.folderId}: analyzeFilesAndWeb started with semaphore limit") semaphore = asyncio.Semaphore(4) delay_between_calls = 5 start_time = time.time() async def sem_analyze(file: Any) -> Any: async with semaphore: await asyncio.sleep(delay_between_calls) content = self.buildFileContent(file) return await self.safeAPICall(self.analyzeFile, analysis_plan["key_analysis_areas"], content) async def sem_websearch(query: str) -> Any: async with semaphore: await asyncio.sleep(delay_between_calls) return await self.safeAPICall(self.webSearch, query) file_tasks = [sem_analyze(file) for file in self.folder_dict["files"]] web_tasks = [sem_websearch(q) for q in analysis_plan["internet_search_queries"]] all_tasks = file_tasks + web_tasks results = await asyncio.gather(*all_tasks) file_answers = results[:len(file_tasks)] web_answers = results[len(file_tasks):] self.logger.info(f"{self.folderId}: analyzeFilesAndWeb finished in {time.time() - start_time:.2f}s") return file_answers, web_answersWrap-up:
This is one class method that spawns concurrent coroutine calls for file analysis and web search, gated by a shared semaphore and a fixed pre-call delay, returning results split by source. The actual API-calling strategy is encapsulated insafeAPICall, which isn’t shown, so we can’t draw deeper conclusions about retry/backoff or why latencies vary beyond the visible throttling pattern.
The technique for limiting API call rate isn’t really useful. The AI analysis inferred you are making all the calls sleep, and then sending the limit in parallel.
I benchmark Chat Completions by 600 calls per minute, easy, when it fits the need, but by scheduled dispatch to not burst all the calls at once into a worker pool. That is also just so performance evaluation doesn’t depend on if it was an initial network contention or sending latency. OpenAI has the cloud capacity to handle enterprises, they can deal with your five calls.
The only limit is your organization tier rate limit, where you try to consume your per-minute rate limit over that full minute’s allocation instead of blasting a minute’s worth in a second to avoid a premature cutoff and to avoid disrupting other organization users.
Technique:
Do you have enough common input to be cached - over 1200 tokens the same at the start of input?
If not, I would give each call a random or sequential “prompt_cache_key”, to indicate that loading to the same inference server is not desired or needed.
Then: this just seems to be a common effect when making API calls: every now and then you get, besides timeouts, a server that is generating tokens at extremely slow rates, like you got routed to “Backwater Joe’s Server Corral”.
If you get the instantiation of batch of calls processing much faster, you’ll have faster final results.
If you use streaming, you can see when an API call is returning nothing to you for an extended time, or measure its output rate vs minimum expectations (adapting to any expected reasoning model delay). That can let you start a concurrent retry, and maybe you then close() the connection on that 159 second call when the retry you kicked off at 15 seconds of nothing is done faster.
I’ll assume you’re doing other stuff pretty clever with your “web” and “files” that are not offered tool services on Chat Completions, and thus you cannot blame OpenAI’s lazy tools on a call’s slow response.