Hello, I often get status “failed” from the Assistants API while streaming a response. The only response that I get is “Sorry, something went wrong”.
Does anyone know what the issue could be? It happens perhaps every fifth run, so I added some grace time, so if it fails before it has gone 8 seconds, it retries, but sometimes that as well gives the same error.
grace_period = 8 # seconds
max_retries = 1
attempt = 0
local_start = time.time()
response_text = ""
while attempt <= max_retries:
try:
self.logger.debug(f"Initiating run for thread {thread_id} (attempt {attempt + 1}).")
response = requests.post(run_url, headers=self.headers, json=run_payload, stream=True)
if response.status_code != 200:
self.logger.error(f"Failed to start streaming: {response.text}")
raise RuntimeError(f"Failed to start streaming: {response.text}")
for line in response.iter_lines():
if line:
decoded_line = line.decode("utf-8").strip()
# Skip event metadata
if decoded_line.startswith("event:"):
continue
if decoded_line.startswith("data: "):
data_content = decoded_line[6:].strip()
if data_content == "[DONE]":
duration_sec = time.time() - local_start
self.logger.info(f"Run finished. Total run duration: {duration_sec:.2f} seconds")
if self.promo is not None:
self.promo.setGaugeValue("openai_run_duration", duration_sec)
return response_text # finish on [DONE]
try:
json_data = json.loads(data_content)
# Check for run object updates and error statuses
if json_data.get("object") == "thread.run":
status = json_data.get("status")
self.logger.debug(f"Run update - status: {status}")
if status == "failed":
last_error = json_data.get("last_error", {})
error_msg = last_error.get("message", "Unknown error")
elapsed = time.time() - local_start
self.logger.error(f"Run failed with error: {error_msg}")
if elapsed < grace_period and attempt < max_retries:
self.logger.info(
f"Error occurred quickly (elapsed {elapsed:.2f}s); retrying run (attempt {attempt + 1})..."
)
attempt += 1
break # exit the for-loop to retry the run
else:
raise AssistantModelError(f"Run failed with error: {error_msg}")
# Process content delta
if "delta" in json_data and "content" in json_data["delta"]:
for part in json_data["delta"]["content"]:
if part["type"] == "text":
text_chunk = part["text"]["value"]
response_text += text_chunk
if callable(stream_handler):
stream_handler(text_chunk)
else:
self.logger.error(
"Invalid stream_handler: Expected function or class with 'on_llm_new_token'"
)
except Exception as e:
self.logger.error(f"Error processing streaming response: {e}")
raise
except Exception as e:
self.logger.error(f"Exception during streaming run: {e}")
elapsed = time.time() - local_start
if elapsed < grace_period and attempt < max_retries:
self.logger.info(
f"Exception occurred quickly (elapsed {elapsed:.2f}s); retrying run (attempt {attempt + 1})..."
)
attempt += 1
continue # retry the run
else:
raise
finally:
self.logger.info(f"Finished streaming response for thread {thread_id} (attempt {attempt + 1}).")
return response_text