After I added many messages to the thread (more than 30,000 tokens in total), every subsequent request failed. Is there a problem with the thread’s token calculation adding all the message tokens?
this is my code
import json
import time
from openai import AssistantEventHandler
from openai import OpenAI
from _openai.config import OPENAI_API_KEY
client = OpenAI(api_key=OPENAI_API_KEY)
debug = False
def print_debug(s, end="\n", flush=False):
if debug:
print(s, end=end, flush=flush)
class EventHandler(AssistantEventHandler):
def __init__(self, file):
super().__init__()
self.file = file
def on_text_created(self, text) -> None:
print_debug(f"\nassistant > ", end="", flush=True)
self.file.write(f"\nassistant >")
def on_text_delta(self, delta, snapshot):
print_debug(delta.value, end="", flush=True)
self.file.write(delta.value)
def on_tool_call_created(self, tool_call):
print_debug(f"\nassistant > {tool_call.type}\n", flush=True)
self.file.write(f"\nassistant > {tool_call.type}\n")
def on_tool_call_delta(self, delta, snapshot):
if delta.type == 'code_interpreter':
if delta.code_interpreter.input:
print_debug(delta.code_interpreter.input, end="", flush=True)
self.file.write(delta.code_interpreter.input)
if delta.code_interpreter.outputs:
print_debug(f"\n\noutput >", flush=True)
self.file.write(f"\n\noutput >")
for output in delta.code_interpreter.outputs:
if output.type == "logs":
print_debug(f"\n{output.logs}", flush=True)
self.file.write(f"\n{output.logs}")
def process(__country, __thread_id, __assistant_id, start=0):
product_split_path = f"{dir}/product_split_{__country}.txt"
result_path = f"{dir}/product_result_{__country}_{__thread_id}.txt"
count = -1
with open(product_split_path, "r") as psf, open(result_path, 'a') as rf:
for line in psf:
count += 1
if count < start:
continue
info = line.strip().split("\t")
product_word = info[0]
rf.write(f"\n******{product_word} {count} start******\n")
words = json.loads(info[1])
message_id = send_message(__thread_id, __assistant_id, product_word, "\n".join(words))
rf.write(f"******{message_id}******\n")
event_handler(__thread_id, __assistant_id, rf)
rf.write(f"\n******{product_word} {count} end******\n")
rf.flush()
time.sleep(10)
def send_message(_thread_id, _assistant_id, _product_word, _words):
message = client.beta.threads.messages.create(
thread_id=_thread_id,
role="user",
content=f"Process the classification of attribute words related to the {_product_word} word :\n{_words}"
)
return message.id
def event_handler(_thread_id, _assistant_id, file):
with client.beta.threads.runs.stream(
thread_id=_thread_id,
assistant_id=_assistant_id,
instructions="Please address the user as Liusx. The user has a premium account.",
event_handler=EventHandler(file),
timeout=300
) as stream:
stream.until_done()
run = stream.get_final_run()
print_debug(run)
def init(__country, __thread_id, __assistant_id, _product_word, message_id, start=0):
result_path = f"{dir}/product_result_{__country}_{__thread_id}.txt"
with open(result_path, 'a') as rf:
rf.write(f"\n******{_product_word} {start} start******\n")
rf.write(f"******{message_id}******\n")
event_handler(__thread_id, __assistant_id, rf)
rf.write(f"\n******{_product_word} {start} end******\n")
if __name__ == "__main__":
from data import thread_id, assistant_id
# read file , classification , save to file
dir = "H:/word/words"
debug = True
# init("us", thread_id, assistant_id, "1000xm5","msg_Snj3cAzk2SHGnsgTS1AoWDNI", 1)
process("us", thread_id, assistant_id, 10)