Hi all, i found it hard to create an implementation of an Assistant that can handle file-search and function-calling in parallel and stream the response to the frontend (without the polling hack).
Below is the code that worked for me.
BUT: from time to time my system behaves strangely, the API calls are very slow and sometimes fail, even if i go to the playground.
Is there anything fundamentally wrong with my implementation that can cause an error like that? I know that in theory i should create a run, but i did not get it to work with this - only with implictly creating the run in my code i got it working. Is this a problem?
def generate_llm_response(client, user_message, chatbot_id, instruct1, model1):
“”“Handles OpenAI API interaction for chat responses with function calling support.”“”
# Initialize or retrieve assistant
def get_assistant():
assistant = client.beta.assistants.get_or_create(
instructions=instruct1,
model=model1,
tools=[{"type": "function", "function": Functions.save_username}, {"type": "file_search"}]
)
return assistant
assistant = get_assistant()
# Manage conversation thread
active_thread = get_active_thread(chatbot_id)
thread_id = active_thread["thread_id"] if active_thread else client.beta.threads.create().id
if not active_thread:
save_active_thread(chatbot_id, assistant.id, thread_id)
# Streaming response generation
def generate():
try:
client.beta.threads.messages.create(thread_id=thread_id, role="user", content=user_message)
with client.beta.threads.runs.stream(thread_id=thread_id, assistant_id=assistant.id) as stream:
for chunk in stream:
if hasattr(chunk.data, "delta") and hasattr(chunk.data.delta, "content"):
for content in chunk.data.delta.content:
if content.type == "text":
yield f"data: {content.text.value}\n\n"
# Handle function calls
if hasattr(chunk.data, "status") and chunk.data.status == "requires_action":
if hasattr(chunk.data, "required_action") and hasattr(chunk.data.required_action, "submit_tool_outputs"):
tool_calls = chunk.data.required_action.submit_tool_outputs.tool_calls
tool_outputs = []
for tool_call in tool_calls:
if hasattr(tool_call, "function"):
args = json.loads(tool_call.function.arguments)
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": json.dumps({"status": "success"})
})
client.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id,
run_id=chunk.data.id,
tool_outputs=tool_outputs
)
yield "data: [END]\n\n"
except Exception as e:
logging.error(f"Error during message processing: {e}")
yield f"data: Error: {str(e)}\n\n"
return Response(generate(), content_type="text/event-stream")