Hello.
I am experimenting with Assistants, streaming, and tool calls.
I was following this example OpenAI docs: function-calling. After digging around I have figured out, that I need some extra methods in the EventHandler class that I use to extend on AssistantEventHandler
. In particular I needed on_text_delta
as not every part of the conversation results in tool calls.
class EventHandler(AssistantEventHandler):
#@override
def on_event(self, event):
# Retrieve events that are denoted with 'requires_action'
# since these will have our tool_calls
if event.event == 'thread.run.requires_action':
run_id = event.data.id # Retrieve the run ID from the event data
self.handle_requires_action(event.data, run_id)
def on_text_delta(self, delta, snapshot):
console.print(delta.value, end = "", style = "black on white")
def on_error(error):
print(error)
def handle_requires_action(self, data, run_id):
tool_outputs = []
for tool in data.required_action.submit_tool_outputs.tool_calls:
console.print("Tool call detected", style = "green on black")
if tool.function.name == "db_category_tool":
tool_outputs.append({"tool_call_id": tool.id,
"output": "\n".join([row.to_json() for row in MYFUNCTIONNAME(json.loads(tool.function.arguments)["ARGUMENTNAME"])])})
# Submit all tool_outputs at the same time
self.submit_tool_outputs(tool_outputs, run_id)
def submit_tool_outputs(self, tool_outputs, run_id):
# Use the submit_tool_outputs_stream helper
with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=self.current_run.thread_id,
run_id=self.current_run.id,
tool_outputs=tool_outputs,
event_handler=EventHandler(),
) as stream:
for text in stream.text_deltas:
print(text, end="", flush=True)
print()
But such approach results in doubling the output, as it is being yielded twice. I am really missing a bit more of the documentation on the AssistantEventHandler. What are the best practices when using it? Is this the only way to submit results of tools calls when streaming? Which other methods are also worth overriding?
Is this the appropriate way of streaming?
with client.beta.threads.runs.stream(
thread_id=thread.id,
assistant_id=expert_assistant_id,
event_handler=EventHandler()
) as stream:
stream.until_done()
If yes - what is the proper way to handle the end of communication with an assistant?
I appreciate any help, as official docs sometimes miss out on important things.
Bonus question: is it possible to track tokens that were used while streaming? I would really like to be able to not exceed the TPM limits by simply waiting for a while if this will help me.