I am using the openai python package in an experimental FastAPI application.
I have been having issues with both the completions and chat completion acreate methods hanging for long periods of time so am trying to implement a timeout. The hanging is always before any generation has started. I don’t want to wait the expected length of a response before trying again since this could be quite long so I am streaming the response and trying to base the timeout on the streaming generator being established. Below is my function
async def achat_stream(
messages: List[str] = None, timeout: float = 5, num_retries: int = 3, **kwargs
):
async def chat():
return await openai.ChatCompletion.acreate(
messages=messages, stream=True, **kwargs
)
for _ in range(num_retries):
aiterator = chat()
try:
aiterator = await asyncio.wait_for(aiterator, timeout=timeout)
response = None
async for chunk in aiterator:
chunk = chunk.to_dict_recursive()
delta = chunk["choices"][0]["delta"]
if chunk["choices"][0]["finish_reason"]:
return delta_concat(response, delta)
else:
response = delta_concat(response, delta)
return response
except asyncio.TimeoutError:
logging.info("TimeoutError")
raise TimeoutError(f"OpenAI API timed out {num_retries} times")
For clarity the delta_concat just joins the generations together iteratively so I get the full response in the same format as if I hadn’t streamed it at the end.
The issue is the asyncio.wait_for times out the code will move on but this doesn’t stop it looking for the connection I think and I get a warning. Does anyone know what the appropriate way of dealing with this issue is specifically when using asynchronous streaming?
If you look in the function I am using the stream=True parameter as defined in the chat() function.
From what I have tested and from the links you shared there tend to be 2 main delays. There is the delay caused from the request being in the open ai server queue and then the delay caused from the iterative generation of the prompt. The first delay seems to be taken up by the line:
This is what I have put the time out around as this is the one that seems to take anywhere from 0.5 second to over 10 seconds. If it does complete then the aiterator object is realised as an async_generator object and the delay for generation for the iterative generation comes up in this part of the code:
async for chunk in aiterator:
chunk = chunk.to_dict_recursive()
delta = chunk["choices"][0]["delta"]
if chunk["choices"][0]["finish_reason"]:
return delta_concat(response, delta)
else:
response = delta_concat(response, delta)
I am not to worried about this because once the the stream is started and the servers are actually processing the result it seems to be relatively consistent how long it takes to get the response.
From my understanding my issue is that when awaiting aiterator with the timeout I have spawned a aiohttp.client.ClientSession object but I don’t have access to it because the aiiterator is still just the coroutine. For some reason the client session is not ended when the timeout error is raised and I can’t do it myself in the except clause as I only have access to the coroutine. This leaves me getting the following warning: