Appropriate way of timing out an asynchronous chat completions stream

Hi all,

I am using the openai python package in an experimental FastAPI application.

I have been having issues with both the completions and chat completion acreate methods hanging for long periods of time so am trying to implement a timeout. The hanging is always before any generation has started. I don’t want to wait the expected length of a response before trying again since this could be quite long so I am streaming the response and trying to base the timeout on the streaming generator being established. Below is my function

async def achat_stream(
    messages: List[str] = None, timeout: float = 5, num_retries: int = 3, **kwargs
):
    async def chat():
        return await openai.ChatCompletion.acreate(
            messages=messages, stream=True, **kwargs
        )

    for _ in range(num_retries):
        aiterator = chat()
        try:
            aiterator = await asyncio.wait_for(aiterator, timeout=timeout)
            response = None
            async for chunk in aiterator:
                chunk = chunk.to_dict_recursive()
                delta = chunk["choices"][0]["delta"]
                if chunk["choices"][0]["finish_reason"]:
                    return delta_concat(response, delta)
                else:
                    response = delta_concat(response, delta)
            return response
        except asyncio.TimeoutError:
            logging.info("TimeoutError")
    raise TimeoutError(f"OpenAI API timed out {num_retries} times")

For clarity the delta_concat just joins the generations together iteratively so I get the full response in the same format as if I hadn’t streamed it at the end.

The issue is the asyncio.wait_for times out the code will move on but this doesn’t stop it looking for the connection I think and I get a warning. Does anyone know what the appropriate way of dealing with this issue is specifically when using asynchronous streaming?

The streaming parameter will return the first token the fastest, so I would take a look at that.

Hi, thanks for getting back to me.

If you look in the function I am using the stream=True parameter as defined in the chat() function.

From what I have tested and from the links you shared there tend to be 2 main delays. There is the delay caused from the request being in the open ai server queue and then the delay caused from the iterative generation of the prompt. The first delay seems to be taken up by the line:

aiterator = await asyncio.wait_for(aiterator, timeout=timeout)

This is what I have put the time out around as this is the one that seems to take anywhere from 0.5 second to over 10 seconds. If it does complete then the aiterator object is realised as an async_generator object and the delay for generation for the iterative generation comes up in this part of the code:

async for chunk in aiterator:
                chunk = chunk.to_dict_recursive()
                delta = chunk["choices"][0]["delta"]
                if chunk["choices"][0]["finish_reason"]:
                    return delta_concat(response, delta)
                else:
                    response = delta_concat(response, delta)

I am not to worried about this because once the the stream is started and the servers are actually processing the result it seems to be relatively consistent how long it takes to get the response.

From my understanding my issue is that when awaiting aiterator with the timeout I have spawned a aiohttp.client.ClientSession object but I don’t have access to it because the aiiterator is still just the coroutine. For some reason the client session is not ended when the timeout error is raised and I can’t do it myself in the except clause as I only have access to the coroutine. This leaves me getting the following warning:

ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000010DDA73E0D0>
ERROR:asyncio:Unclosed client session

I may have misunderstood something and if so would welcome the correction.

1 Like

The will probably be remedied by moving up to a higher tier, assuming you are not already a tier 3 or above.