As per the title, I have an assistant that I’m using with streaming in the Python SDK with a function tool that performs RAG on user data. When the user asks a question that required calling the tool, I expect the assistant to call the tool and, upon receiving the output, resume the response.
However, after I send the tool call output, the stream ends. Here’s my code for handling the stream:
def event_stream():
with openai.beta.threads.runs.stream(
thread_id=openai_thread_id,
assistant_id=assistant.assistant_id,
) as stream:
for chunk in stream:
data = chunk.data
run_id = getattr(data, "run_id", None) or run_id
status = getattr(data, "status", None)
if (
status == "requires_action"
and data.required_action.type == "submit_tool_outputs"
):
tool_call = data.required_action.submit_tool_outputs.tool_calls[
0
]
if tool_call.type == "function":
tool_call_id = tool_call.id
args = json.loads(tool_call.function.arguments)
d = json.dumps(
{
"type": "function_call",
"data": {
"name": tool_call.function.name,
"arguments": args,
},
}
).replace("\n", "")
yield f"data: {d}\n\n"
tool_call_output = agents.get_tool_call_output(
tool_call.function.name,
args
) or {"status": "OK"}
tool_outputs = [
{
"tool_call_id": tool_call_id,
"output": json.dumps(tool_call_output),
}
]
openai.beta.threads.runs.submit_tool_outputs(
thread_id=conversation.openai_thread_id,
run_id=run_id,
tool_outputs=tool_outputs
)
if data.object == "thread.message.delta":
text_delta = "".join([c.text.value for c in data.delta.content])
d = {
"type": "message_delta",
"data": {
"text": text_delta,
},
}
yield f"data: {json.dumps(d)}\n\n"
yield f"data: [DONE]\n\n"
What am I doing wrong?