Assistants API don't allow perform two concurrent runs on same thread

Hello,

My application using async function which can call Assistants API. When there are two requests sending to the same thread, it throws this error.

{'error': {'message': "Can't add messages to thread_Fmzxw5GjqKcFBaVeMeM2ozTF while a run run_6b3A7jCcFwHikdR9si409rCj is active.", 'type': 'invalid_request_error', 'param': None, 'code': None}}

At this points it seems that i can’t find a way to retrieve in_process runs so that i can wait until there is no in_process runs.

My code is simple, just like this.

async def send_openai(assistant_id, message, thread=None):
     if not thread: 
         thread = _openai.start_thread()
     _openai.beta.threads.messages.create(
                 thread_id=thread_id, role="user", content=message
     )
    run = _openai.beta.threads.runs.create(
            thread_id=thread,
            assistant_id=assistant_id,
            instructions="Some instruction",
        )
    while True:
            run = _openai.beta.threads.runs.retrieve(
                 thread_id=thread_id, run_id=run.id
            )
            run_status = _get_response_status(
               run
            )
            if run_status == "ok":
                return run
            if run_status == "failed":
                return None
            if run_status == "retry":
                await asyncio.sleep(10)

def get_response_status(data: Run) -> Literal["retry", "failed", "ok"]:
    if data.status in ("completed",):
        return "ok"
    if data.status in (
        "queue",
        "in_progress",
    ):
        return "retry"
    if data.status in (
        "requires_action",
        "cancelling",
        "cancelled",
        "failed",
        "expired",
    ):
        return "failed"

A thread is a “chat history” of a session.

It would make sense to lock out multiple runs using the same thread object, as you would get two different AI replies added, breaking conversational context and chat continuation based on previous inputs and responses.

So, the question here is, how do I queue the new incoming messages so the assistant can receive them after finishing the current run?

The answer here is to assign threads per user.

Imagine the conversations in ChatGPT to be individual threads (conversation history). You cannot have a user send another query until their last one has been received for reading. In a chatbot, each following conversation turn is contextual.

The run is the “asking and processing and getting an answer”, an individual question to ChatGPT.

The assistant is equivalent to a model with its own special behaviors, or in this case, the configuration of files, functional modes, and instructions. Multiple simultaneous users can interact with one at a time. The selection of ChatGPT 4 or selecting custom instructions.

So there is no queue. Only having users wait for their conversation answer to be finished and retrieved.

Thank you, that was a fast answer.
My use case is this. I have a WhatsApp Chatbot for customer service. The thing is that when the users write in WhatsApp, they usually do this:
“Hi there”
“I have a problem”
“I cannot find my recent purchase in my history”

I’m currently using a lambda function that saves the message in my backend, then creates a thread with my assistant, and runs it to answer my user. The lambda uses a while loop until the run is completed and then I serve the answer to my user.

Of course, this takes time (around 10-30 secs), so, in my example, I can only run “Hi there” because the other messages don’t run because the current run, and the answer is something like “Hello, how can I help you”.

So, I was hoping to use the status “queued” from the assistant, but maybe I’ll need to add a debouncer and concat all the messages before sending my request…

It seems you need to look into some more asynchronous and threaded event-driven programming. Except with assistants, you have to create your own events…

And that is the oddity about the whole system. You essentially need similar management of async tasks and even the conversations on your side as were you to use chat completions.

To demonstrate the complexity: Let’s give your user exactly the ChatGPT interface. They submit their question, know it will be a bit, select another conversation tab and submit a different question on that other topic. When switching between conversations, who’s keeping track of the IDs and contents? Are you going to pull down a thread every time a tab is loaded as your only conversation database? Add metadata to a message or thread so you can see who it belongs to when you pull down the whole list of them every second with statuses? Try to keep this in sync with your own database suitable to the complete job anyway? Individual tasks whirring even longer per user?

Assistants doesn’t really solve any problem. OpenAI could have open-sourced libraries for file extraction and embedding retrieval, function shims and dynamic JupyterHub deployment examples for python-coding AI, tunable conversation management solutions. But instead you get to write your code for your own stateful interactions with…let’s see…28 different API methods.

There’s nothing to prevent anyone from maintaining two threads for each conversation. The main thread on which the interaction occurs with the assistant and the interaction thread that simply acts as a queue(fifo). The queue is the vehicle through which the messages get sent to the assistant on the main thread.

Thank you again for the answer. Althought I don’t see how to apply what you’re saying. What I understad is that I need to give more context…

  • An user writes to my WhatsApp number.

  • The message is received, then processed (type of message, sender, etc.).

  • If it’s the first time the user writes, a record is generated using his WhatsApp ID as an ID (firestore)

  • If it’s the first time the user writes, a thread is generated with the first message and the ID of the thread is saved with the user data.

  • a run is then created and a loop checks that the run ends.

  • when the run ends, the message is retrieved to the user.

  • all messages (from user and from assistant) are saved in a doc’s subcollection inside the user document for history purposes (future front end).

  • if it’s not the first message, the user is identified by their WhatsApp ID and the thread ID is recovered.

  • a run is created with the new message and the user’s thread ID.

  • the logic then is the same as above.

Maybe this will give enough context. As you may see, there’s only one thread per user ID, the conversations doesn’t mix, and there’s track of the user, the thread and the sender at every moment.

I don’t look every message to see who it belongs, because the threads are isolated by user. I just check if the run has ended to retrieve the message generated by the assistant (consistently the first of the “list messages” thread method).

For now, all it’s working as expected (fortunately), but I couldn’t solve the queue. Given that i’m using a lambda function and by design it’s “stateless”, every new post is isolated. In other words, I don’t know how to “debounce” (wait for fast incoming messages and queue them before merging them and sending them to my assistant) because every new message is processed for another lambda instance and they don’t share state…

It could be done if I were able to control the front end, but that’s note the case and we need the WhatsApp communication channel.

The best choice here might be to use a VM, but I don’t have experience with them, and was hoping that the “queued” status in the thread were my solution.