I was wondering what the best pattern to use custom functions with the OpenAI Beta Assistant API.
I was struggling with this at first, but got it to run nicely with this code thanks to this forum, git of the openai python api and of course my coding helper GPT. Of course this is a development version and not for production, just playing around with the API to make it work, I also added many comments to make it clear for all levels what is happening here.
Here is my code: Please feel free to critique it freely - if you struggle to understand the API initially as I did (very much so, how to actually call it etc, the official docs are misleading and omit a lot in my opinion) you might also find this helpful.
Above all I am looking views of people with more experience managing assistants and custom functions to share their views on the pattern and other ways they solved this.
For instance I had this as a http request in the beginning, but my frontend timed out and displayed a internal server error during long assistant runs that completed successfully in the backend eventually - so I switched to websockets to cater for this. How are you setting this up?
from flask import Flask, request, session, jsonify
from openai import OpenAI
import time
import requests
from dotenv import load_dotenv
import os
from app import app
import asyncio
import json
import requests
from flask_socketio import SocketIO
load_dotenv() # This loads the environment variables from .env
client = OpenAI()
# Initialize SocketIO with your Flask app
socketio = socketio = SocketIO(app, cors_allowed_origins="*") # Adjust CORS as needed - limit to url for production
# Global variable to keep track of the assistant's ID
assistant_id = None # Initialize assistant_id globally
def assistant_function(question='how does encryption work'):
global assistant_id # Access the global variable
try:
# Initialize assistant if not already done
if assistant_id is None:
# Emitting an initializing message to clients
socketio.emit('assistant_update', {'message': 'Initializing assistant...'})
print("Initializing assistant...")
# Create a new assistant with specific configuration
assistant = client.beta.assistants.create(
name="Financial Analyst",
instructions="You are a helpful financial analyst that specialises on the stock market - use the functions provided as tools to fetch stock quotes and time series for further analysis",
tools=[{"type": "code_interpreter"},{
"type": "function",
"function": {
"name": "get_stock_quote",
"description": "Fetch the latest stock price for a given symbol.",
"parameters": {
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "The stock symbol for which to fetch the price."},
},
"required": ["symbol"]
}
}
},{
"type": "function",
"function": {
"name": "get_time_series",
"description": "Fetch a time series per day of the stock price for a given symbol in order to have the data to analyse stock price performance - returned as JSON of stock quotes.",
"parameters": {
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "The stock symbol for which to fetch the price."},
"includeHistory": {"type": "boolean", "description": "Whether to include historical price data."}
},
"required": ["symbol"]
}
}
}],
model="gpt-4-turbo-preview"
)
assistant_id = assistant.id
print(f"Assistant initialized with ID: {assistant_id}")
# Emit a message indicating the initiation of the assistant run
socketio.emit('assistant_update', {'message': 'Initiating the assistant run...'})
print(f"Question: {question}")
# Create or retrieve the thread ID from session for continuity
if 'thread_id' not in session:
print("Creating new thread...")
thread = client.beta.threads.create()
session['thread_id'] = thread.id
print(f"New thread created with ID: {session['thread_id']}")
print("Sending message to the thread...")
# Send the user's question to the assistant within the thread
message = client.beta.threads.messages.create(
thread_id=session['thread_id'],
role="user",
content=question
)
# Create a run to process the question by the assistant
socketio.emit('assistant_update', {'message': 'Initiating the assistant run...'})
print("Initiating the assistant run...")
run = client.beta.threads.runs.create(
thread_id=session['thread_id'],
assistant_id=assistant_id,
)
# Poll the run's status and process tool outputs if necessary
socketio.emit('assistant_update', {'message': 'Waiting for the assistant to process the request...'})
print("Waiting for the assistant to process the request...")
while run.status not in ["completed", "failed", "cancelled"]:
run = client.beta.threads.runs.retrieve(thread_id=session['thread_id'], run_id=run.id)
# Process required actions, such as tool outputs - this is where the magic happens and we need to listen if
# the Assistand calls the functions, i.e. the status turns to 'requires_action'
if run.status == 'requires_action':
print("Action required")
socketio.emit('assistant_update', {'message': "Action required"})
# Get all tool calls
tool_calls = run.required_action.submit_tool_outputs.tool_calls
# Initiate the return list
tool_outputs = []
# Iterate over the tool_calls
for tool in tool_calls:
# Load the args per call
tool_args = json.loads(tool.function.arguments)
tool_output = {'status': 'error', 'message': 'function not found'} # As a fallback for the output
symbol = tool_args['symbol']
# If the Assistant calls the custom get_time_series function
if tool.function.name == 'get_time_series':
'''Fetch time series and return to OpenAI using provided identifiers'''
socketio.emit('assistant_update', {'message': "Time series fetched"})
API_KEY = os.getenv('ALPHA_SECRET')
url = "https://www.alphavantage.co/query"
parameters = {
"function": "TIME_SERIES_DAILY",
"symbol": tool_args['symbol'],
"outputsize": "compact",
'apikey': API_KEY
}
response = requests.get(url, params=parameters)
data = response.json()
# print(data)
# Submit this output back to the OpenAI APIxx
tool_output = data
# print(f"Tool Output: {tool_output}")
if tool.function.name == 'get_stock_quote':
"""Fetch stock quote and submit the result back to OpenAI using provided identifiers."""
socketio.emit('assistant_update', {'message': "Stock Quote fetched"})
API_KEY = os.getenv('ALPHA_SECRET')
URL = 'https://www.alphavantage.co/query'
parameters = {
'function': 'GLOBAL_QUOTE',
'symbol': tool_args['symbol'],
'apikey': API_KEY
}
response = requests.get(URL, params=parameters)
data = response.json()
if 'Global Quote' in data and '05. price' in data['Global Quote']:
price = data['Global Quote']['05. price']
output = f"The current price of {symbol} is ${price}."
else:
output = "Stock price could not be fetched."
# Submit this output back to the OpenAI APIxx
tool_output = output
print(f"Tool Output: {tool_output}")
tool_outputs.append({
'tool_call_id': tool.id,
'output': json.dumps(tool_output)
})
client.beta.threads.runs.submit_tool_outputs(
thread_id=session['thread_id'],
run_id=run.id,
tool_outputs=tool_outputs
)
# Sleep to prevent spamming the API with requests
time.sleep(5)
# Retrieving messages from the thread after the run.status switches to ["completed", "failed", "cancelled"]
socketio.emit('assistant_update', {'message': "Retrieving messages from the thread..."})
messages_response = client.beta.threads.messages.list(thread_id=session['thread_id'])
# Retrieve the last Assistant answer from the thread
answer = ""
for thread_message in messages_response.data: # Iterate through each message in the data list
if thread_message.role == "assistant": # Check if the message role is 'assistant'
# Assuming thread_message.content is a list of MessageContentText objects
for content_item in thread_message.content:
# Access the 'value' of the 'text' object directly
answer_text = content_item.text.value
answer += answer_text + "\n"
break # Assuming you only want the latest answer from the assistant
# After processing is complete and you have the result
print(f"Returning the formatted response. {answer}")
socketio.emit('assistant_update', {'message': 'Processing complete.'})
return answer.strip()
except Exception as e:
# Emit an error message if something goes wrong
socketio.emit('assistant_error', {'error': str(e)})
return jsonify({"error": str(e)}), 500
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
# The start-assistant message starts the handle_start_assistant with that calls the assistant_function
@socketio.on('start_assistant')
def handle_start_assistant(json):
print(str(json))
# Extract the question from the frontend message
question = json['question']
# call the assistant_function (that creates the assistant and provides the custom functions) and get the result
result = assistant_function(question=question)
socketio.emit('assistant_update', {'answer': result, "question":question})
if __name__ == '__main__':
socketio.run(app, debug=True)