Currently I am call the submit_tools_output api, loop through the events to look for message completed and injecting it into the queue. My micro is based on the work by @brandonareid2
def on_tool_call_created(self, tool_call):
function_name = tool_call.function.name
arguments_str = tool_call.function.arguments
# Parse arguments if they are in JSON format
try:
arguments = json.loads(arguments_str) if arguments_str else {}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse arguments: {e}")
arguments = {}
# Log function name and arguments
logger.info(f'on_tool_call_created {function_name} with arguments:')
for arg_name, arg_value in arguments.items():
logger.info(f' {arg_name}: {arg_value}')
asyncio.run_coroutine_threadsafe(self.queue.put(f"\nI guess it's a tool call > {tool_call.function.name}\n"),
self.loop)
async def submit_tool_outputs(self, tool_outputs, data):
logger.info(f'submit_tool_outputs {tool_outputs}, thread_id {data.thread_id}, run_id {data.id}')
with openai_client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=data.thread_id,
run_id=data.id,
tool_outputs=tool_outputs,
event_handler=CustomEventHandler(self.loop),
) as stream:
for text in stream.text_deltas:
logger.info(f"Stream text delta: {text}") # I see it in logs
asyncio.run_coroutine_threadsafe(self.queue.put(f"{text}"), self.loop) # I don't get it in response
print(text, end="", flush=True)
print()
logger.info("submit_tool_outputs_stream call passed")
await self.queue.put("Some strange output") # I have it in response
async for chunk in get_openai_response_stream(data.thread_id, self.loop):
logger.info(f"async chunk: {chunk}")
# await self.queue.put(chunk)
asyncio.run_coroutine_threadsafe(self.queue.put(f"{chunk}"), self.loop)
And I get “guess it’s a tool call”, as well as "Some strange output| in the FastAPI results, and I don’t get text there. But I get it in logs