Questions about calculating tokens using the assistant api

After I added many messages to the thread (more than 30,000 tokens in total), every subsequent request failed. Is there a problem with the thread’s token calculation adding all the message tokens?

this is my code

import json
import time

from openai import AssistantEventHandler

from openai import OpenAI

from _openai.config import OPENAI_API_KEY

client = OpenAI(api_key=OPENAI_API_KEY)
debug = False


def print_debug(s, end="\n", flush=False):
    if debug:
        print(s, end=end, flush=flush)


class EventHandler(AssistantEventHandler):

    def __init__(self, file):
        super().__init__()
        self.file = file

    def on_text_created(self, text) -> None:
        print_debug(f"\nassistant > ", end="", flush=True)
        self.file.write(f"\nassistant >")

    def on_text_delta(self, delta, snapshot):
        print_debug(delta.value, end="", flush=True)
        self.file.write(delta.value)

    def on_tool_call_created(self, tool_call):
        print_debug(f"\nassistant > {tool_call.type}\n", flush=True)
        self.file.write(f"\nassistant > {tool_call.type}\n")

    def on_tool_call_delta(self, delta, snapshot):
        if delta.type == 'code_interpreter':
            if delta.code_interpreter.input:
                print_debug(delta.code_interpreter.input, end="", flush=True)
                self.file.write(delta.code_interpreter.input)
            if delta.code_interpreter.outputs:
                print_debug(f"\n\noutput >", flush=True)
                self.file.write(f"\n\noutput >")
                for output in delta.code_interpreter.outputs:
                    if output.type == "logs":
                        print_debug(f"\n{output.logs}", flush=True)
                        self.file.write(f"\n{output.logs}")


def process(__country, __thread_id, __assistant_id, start=0):
    product_split_path = f"{dir}/product_split_{__country}.txt"
    result_path = f"{dir}/product_result_{__country}_{__thread_id}.txt"
    count = -1
    with open(product_split_path, "r") as psf, open(result_path, 'a') as rf:
        for line in psf:
            count += 1
            if count < start:
                continue
            info = line.strip().split("\t")
            product_word = info[0]
            rf.write(f"\n******{product_word} {count} start******\n")
            words = json.loads(info[1])
            message_id = send_message(__thread_id, __assistant_id, product_word, "\n".join(words))
            rf.write(f"******{message_id}******\n")
            event_handler(__thread_id, __assistant_id, rf)
            rf.write(f"\n******{product_word} {count} end******\n")
            rf.flush()
            time.sleep(10)


def send_message(_thread_id, _assistant_id, _product_word, _words):
    message = client.beta.threads.messages.create(
        thread_id=_thread_id,
        role="user",
        content=f"Process the classification of attribute words related to the {_product_word} word :\n{_words}"
    )
    return message.id


def event_handler(_thread_id, _assistant_id, file):
    with client.beta.threads.runs.stream(
            thread_id=_thread_id,
            assistant_id=_assistant_id,
            instructions="Please address the user as Liusx. The user has a premium account.",
            event_handler=EventHandler(file),
            timeout=300
    ) as stream:
        stream.until_done()
        run = stream.get_final_run()
        print_debug(run)


def init(__country, __thread_id, __assistant_id, _product_word, message_id, start=0):
    result_path = f"{dir}/product_result_{__country}_{__thread_id}.txt"
    with  open(result_path, 'a') as rf:
        rf.write(f"\n******{_product_word} {start} start******\n")
        rf.write(f"******{message_id}******\n")
        event_handler(__thread_id, __assistant_id, rf)
        rf.write(f"\n******{_product_word} {start} end******\n")


if __name__ == "__main__":
    from data import thread_id, assistant_id

    # read file , classification , save to file
    dir = "H:/word/words"
    debug = True
    # init("us", thread_id, assistant_id, "1000xm5","msg_Snj3cAzk2SHGnsgTS1AoWDNI", 1)

    process("us", thread_id, assistant_id, 10)

Your impediment is the rate limiter for AI models and your usage tier.

If you are in tier 1 - having paid under $50 and not having made a subsequent payment for recalculation after a waiting period - then the tokens-per-minute rate limit for gpt-4o models is 30000 tokens per minute.

If a single call attempts to exceed that, the rate limiter will deny the API request. That also applies to the internal API requests that can be made within assistants by their agentic behavior of calling tools autonomously and growing a context before responding.

You can mitigate this by using a model with a higher TPM limit, tuning tools such as file_search so that they inject less chunks from a search, or by manually deleting past message IDs from a thread. Or purchase more credits for tier elevation.

OpenAI must have a “chat management” that discards past thread turns to ensure a recent chat can fit within a model’s input context, but there is no awareness or setting that considers your own need and usage limit.

I mainly want to ask why this API is designed this way, and what is the reason for this design?

Hello @liusx ,

I suggest you take a look at the links below that talk about OpenAI API request limits.

1 Like