Stream event tools call about function

I am interacting with OpenAI’s API using the stream event method. Although I submit the tool_outputs after calling my custom function with function tools, OpenAI does not seem to successfully receive the results from my custom function, as it does with non-event mode. Why is this? Here is my code:
@override
def on_tool_call_done(self, tool_call: ToolCall):
if tool_call.type == “code_interpreter”:
self.logger.info(
“on_tool_call_done:%s”, tool_call.code_interpreter.outputs
)
elif tool_call.type == “retrieval”:
self.logger.info(“on_tool_call_done:%s”, tool_call.type)
elif tool_call.type == “function”:
model_and_function = tool_call.function.name.split(“.”)
func = getattr(baidu, model_and_function[1])
func_result = func(
location=json.loads(tool_call.function.arguments).get(“location”)
)
function_responses =
function_responses.append(
{
“tool_call_id”: tool_call.id,
“output”: func_result,
}
)
if function_responses:
# 将函数返回结果提交给run
self.client.beta.threads.runs.submit_tool_outputs_stream(
run_id=self.current_run.id,
thread_id=self.thread_id,
tool_outputs=function_responses,
)
else:
self.logger.info(“call function error”)
self.logger.info(“on_tool_call_done”)

1 Like

I am having the same problem.

the documentation suggests this:

stream = client.beta.threads.runs.submit_tool_outputs(
thread_id=self.thread_id,
run_id=run_id,
tool_outputs=outputs,
stream=True
)

and I’ve tried client.beta.threads.runs.submit_tool_outputs_stream, with and without "stream = " in front, etc

nothing seems to work

This is a sample node.js code but you might see the pattern how to submit the tool outputs. Notice the actual function for submitting tool output when you want streaming. It is written here (I think the one shown when streaming is selected in API reference page is incorrect).

let tool_outputs = []
let run_id

let stream = await openai.beta.threads.runs.create(
    thread_id,
    {
        assistant_id: assistant_id,
        stream: true
    }

for await (const event of stream) {

    if(event.event === 'thread.message.delta') {
        
        // send text response to the client side
        //event.data.delta.content[0].text.value
    
    } else if(event.event === 'thread.run.requires_action') {
        if(event.data.status === 'requires_action') {
            if(event.data.required_action && event.data.required_action.type === 'submit_tool_outputs') {

                // save the run_id for submitToolOutputs call
                run_id = event.data.id

                const tools_called = event.data.required_action.submit_tool_outputs.tool_calls

                tools_called.forEach((tool) => {

                    const tool_name = tool.function.name
                    const tool_args = JSON.parse(tool.function.arguments)

                    // call your external API here to process your tools
                    const tool_output = { status: 'success' }

                    tool_outputs.push({
                        tool_call_id: tool.id,
                        output: JSON.stringify(tool_output)
                    })

                })

                // exit loop, nothing more to do
                break

            }
        }
    }

}

// submit tools output
stream = openai.beta.threads.runs.submitToolOutputsStream(
        thread_id,
        run_id,
        {
            tool_outputs
        }
    )

for await (const event of stream) {

    if(event.event === 'thread.message.delta') {
        
        // send text response to the client side
        //event.data.delta.content[0].text.value
    
    }

}

I have completely resolved this issue.
1.Firstly, define an array in the EventHandler to store all the answers returned by the AI from function calls.
class EventHandler(AssistantEventHandler):
def init(self, no, client, is_fee, index):
super().init()
self.logger = logging.getLogger(“chatgpt4_stream”)
self.client = client
self.is_fee = is_fee
self.index = index
self.no = no
self.question_answers =
self.usage = None
2.Secondly, trigger the function event inside the on_tool_call_done.
if (
tool_call.type == “function”
and self.current_run.status == “requires_action”
):
function_responses =
3.Lastly, submit the function execution result to the assistant once the function has completed its execution.At this point, it is essential to ensure that the question_answers from the stream event triggered by the function submission are accumulated into the current stream.
with self.client.beta.threads.runs.submit_tool_outputs_stream(
run_id=self.current_run.id,
thread_id=self.current_run.thread_id,
tool_outputs=function_responses,
event_handler=EventHandler(
no=self.no + 1,
client=self.client,
is_fee=self.is_fee,
index=self.index,
),
) as stream:
stream.until_done()
self.question_answers += stream.question_answers
4.Refactor the on_message_done, process the message as necessary, and assign the value to question_answers. Only in this way, after the stream event has been executed, can you obtain the answers to questions returned by the assistant.
@override
def on_message_done(self, message: Message) → None:
value = “”
for content in message.content:
if content.type == “text”:
value += content.text.value + “\n”
for annotation in content.text.annotations:
if (
annotation.type == “file_citation”
and annotation.file_citation.quote
):
self.question_answers.append(
{
“message_id”: message.id,
“role”: message.role,
“content”: value,
}
)
5.After the EventHandler has been refactored according to the steps above, you can then use the stream event method to call the API.
with OpenAI().beta.threads.runs.create_and_stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=EventHandler(0, self.clients[index], is_fee, index),
model=self.key_models[index].get(“model”),
) as stream:
stream.until_done()
question_answers += stream.question_answers

1 Like

thank very much,to solve this problem, I’m going to crazy , I will try immediately