Best pattern for implementing OpenAI Beta Assistant API with custom functions with Flask and Next.js

I was wondering what the best pattern to use custom functions with the OpenAI Beta Assistant API.

I was struggling with this at first, but got it to run nicely with this code thanks to this forum, git of the openai python api and of course my coding helper GPT. Of course this is a development version and not for production, just playing around with the API to make it work, I also added many comments to make it clear for all levels what is happening here.

Here is my code: Please feel free to critique it freely - if you struggle to understand the API initially as I did (very much so, how to actually call it etc, the official docs are misleading and omit a lot in my opinion) you might also find this helpful.

Above all I am looking views of people with more experience managing assistants and custom functions to share their views on the pattern and other ways they solved this.

For instance I had this as a http request in the beginning, but my frontend timed out and displayed a internal server error during long assistant runs that completed successfully in the backend eventually - so I switched to websockets to cater for this. How are you setting this up?

from flask import Flask, request, session, jsonify
from openai import OpenAI
import time
import requests
from dotenv import load_dotenv
import os
from app import app
import asyncio
import json
import requests
from flask_socketio import SocketIO

load_dotenv()  # This loads the environment variables from .env
client = OpenAI()
# Initialize SocketIO with your Flask app
socketio = socketio = SocketIO(app, cors_allowed_origins="*")  # Adjust CORS as needed - limit to url for production
# Global variable to keep track of the assistant's ID
assistant_id = None  # Initialize assistant_id globally

def assistant_function(question='how does encryption work'):    
    global assistant_id # Access the global variable
    try:
        # Initialize assistant if not already done
        if assistant_id is None:
            # Emitting an initializing message to clients
            socketio.emit('assistant_update', {'message': 'Initializing assistant...'})

            print("Initializing assistant...")
            # Create a new assistant with specific configuration
            assistant = client.beta.assistants.create(
                name="Financial Analyst",
                instructions="You are a helpful financial analyst that specialises on the stock market - use the functions provided as tools to fetch stock quotes and time series for further analysis",
                tools=[{"type": "code_interpreter"},{
                    "type": "function",
                    "function": {
                        "name": "get_stock_quote",
                        "description": "Fetch the latest stock price for a given symbol.",
                        "parameters": {
                        "type": "object",
                        "properties": {
                            "symbol": {"type": "string", "description": "The stock symbol for which to fetch the price."},
                        },
                        "required": ["symbol"]
                        }
                    }
                    },{
                    "type": "function",
                    "function": {
                        "name": "get_time_series",
                        "description": "Fetch a time series per day of the stock price for a given symbol in order to have the data to analyse stock price performance - returned as JSON of stock quotes.",
                        "parameters": {
                        "type": "object",
                        "properties": {
                            "symbol": {"type": "string", "description": "The stock symbol for which to fetch the price."},
                            "includeHistory": {"type": "boolean", "description": "Whether to include historical price data."}
                        },
                        "required": ["symbol"]
                        }
                    }
                    }],
                model="gpt-4-turbo-preview"
            )
            assistant_id = assistant.id
            print(f"Assistant initialized with ID: {assistant_id}")

        # Emit a message indicating the initiation of the assistant run
        socketio.emit('assistant_update', {'message': 'Initiating the assistant run...'})
        print(f"Question: {question}")

        # Create or retrieve the thread ID from session for continuity
        if 'thread_id' not in session:
            print("Creating new thread...")
            thread = client.beta.threads.create()
            session['thread_id'] = thread.id
            print(f"New thread created with ID: {session['thread_id']}")
        
        print("Sending message to the thread...")
        # Send the user's question to the assistant within the thread
        message = client.beta.threads.messages.create(
            thread_id=session['thread_id'],
            role="user",
            content=question
        )

        # Create a run to process the question by the assistant
        socketio.emit('assistant_update', {'message': 'Initiating the assistant run...'})
        print("Initiating the assistant run...")
        run = client.beta.threads.runs.create(
            thread_id=session['thread_id'],
            assistant_id=assistant_id,
        )
        # Poll the run's status and process tool outputs if necessary
        socketio.emit('assistant_update', {'message': 'Waiting for the assistant to process the request...'})
        print("Waiting for the assistant to process the request...")
        while run.status not in ["completed", "failed", "cancelled"]:
            run = client.beta.threads.runs.retrieve(thread_id=session['thread_id'], run_id=run.id)
            
            # Process required actions, such as tool outputs - this is where the magic happens and we need to listen if 
            # the Assistand calls the functions, i.e. the status turns to 'requires_action'
            if run.status == 'requires_action':
                print("Action required")
                socketio.emit('assistant_update', {'message': "Action required"})

                # Get all tool calls
                tool_calls = run.required_action.submit_tool_outputs.tool_calls
                # Initiate the return list
                tool_outputs = []

                # Iterate over the tool_calls 
                for tool in tool_calls:
                    # Load the args per call
                    tool_args = json.loads(tool.function.arguments)
                    tool_output = {'status': 'error', 'message': 'function not found'} # As a fallback for the output
                    symbol = tool_args['symbol']

                    # If the Assistant calls the custom get_time_series function
                    if tool.function.name == 'get_time_series':
                        '''Fetch time series and return to OpenAI using provided identifiers'''
                        socketio.emit('assistant_update', {'message': "Time series fetched"})
                        API_KEY = os.getenv('ALPHA_SECRET')
                        url = "https://www.alphavantage.co/query"
                        parameters = {
                            "function": "TIME_SERIES_DAILY",
                            "symbol": tool_args['symbol'],
                            "outputsize": "compact",
                            'apikey': API_KEY
                        }
                        response = requests.get(url, params=parameters)
                        data = response.json()
                        # print(data)
                        # Submit this output back to the OpenAI APIxx         
                        tool_output = data
                        # print(f"Tool Output: {tool_output}")

                    if tool.function.name == 'get_stock_quote':
                        """Fetch stock quote and submit the result back to OpenAI using provided identifiers."""
                        socketio.emit('assistant_update', {'message': "Stock Quote fetched"})
                        API_KEY = os.getenv('ALPHA_SECRET')
                        URL = 'https://www.alphavantage.co/query'
                        parameters = {
                            'function': 'GLOBAL_QUOTE',
                            'symbol': tool_args['symbol'],
                            'apikey': API_KEY
                        }
                        response = requests.get(URL, params=parameters)
                        data = response.json()

                        if 'Global Quote' in data and '05. price' in data['Global Quote']:
                            price = data['Global Quote']['05. price']
                            output = f"The current price of {symbol} is ${price}."
                        else:
                            output = "Stock price could not be fetched."

                        # Submit this output back to the OpenAI APIxx         
                        tool_output = output
                        print(f"Tool Output: {tool_output}")
                    tool_outputs.append({
                        'tool_call_id': tool.id,
                        'output': json.dumps(tool_output)
                    })

                client.beta.threads.runs.submit_tool_outputs(
                    thread_id=session['thread_id'],
                    run_id=run.id,
                    tool_outputs=tool_outputs
                )

            # Sleep to prevent spamming the API with requests
            time.sleep(5)

        # Retrieving messages from the thread after the run.status switches to ["completed", "failed", "cancelled"]
        socketio.emit('assistant_update', {'message': "Retrieving messages from the thread..."})
        messages_response = client.beta.threads.messages.list(thread_id=session['thread_id'])
        
        # Retrieve the last Assistant answer from the thread
        answer = ""
        for thread_message in messages_response.data:  # Iterate through each message in the data list
            if thread_message.role == "assistant":  # Check if the message role is 'assistant'
                # Assuming thread_message.content is a list of MessageContentText objects
                for content_item in thread_message.content:
                    # Access the 'value' of the 'text' object directly
                    answer_text = content_item.text.value
                    answer += answer_text + "\n"
                break  # Assuming you only want the latest answer from the assistant

        # After processing is complete and you have the result
        print(f"Returning the formatted response. {answer}")
        socketio.emit('assistant_update', {'message': 'Processing complete.'})
        return  answer.strip()
    
    except Exception as e:
        # Emit an error message if something goes wrong
        socketio.emit('assistant_error', {'error': str(e)})
        return jsonify({"error": str(e)}), 500
    
@socketio.on('connect')
def handle_connect():
    print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
    print('Client disconnected')

# The start-assistant message starts the handle_start_assistant with that calls the assistant_function
@socketio.on('start_assistant')
def handle_start_assistant(json):
    print(str(json))
    # Extract the question from the frontend message
    question = json['question']
    # call the assistant_function (that creates the assistant and provides the custom functions) and get the result
    result = assistant_function(question=question)
    socketio.emit('assistant_update', {'answer': result, "question":question})

if __name__ == '__main__':
    socketio.run(app, debug=True)

TLDR; what the working code example above does: This code integrates Flask with Flask-SocketIO and OpenAI’s API to create an interactive assistant application. It initializes a Flask app with SocketIO for real-time communication and loads environment variables, including API keys. The assistant_function dynamically handles user questions, initiates OpenAI Assistant with custom tools for stock quotes and time series analysis, and processes requests asynchronously. WebSocket events are used to emit status updates and responses to the frontend, enhancing user interaction with real-time feedback.

For those interested, here is the corresponding next.js frontend:

"use client";
import React, { useState, useEffect } from "react";
import DOMPurify from "dompurify";
import { marked } from "marked";
import io from "socket.io-client";

import "./Assistant.css"; // Assuming you name your CSS file Assistant.css

interface ConversationEntry {
  question?: string;
  answer: string;
}

const socket = io("http://localhost:5328/"); // Adjust this URL to match your Flask app's

const Assistant: React.FC = () => {
  const [question, setQuestion] = useState<string>("");
  const [conversation, setConversation] = useState<ConversationEntry[]>([]);
  const [statusMessage, setStatusMessage] = useState<string>("");

  useEffect(() => {
    socket.on("assistant_update", (data: any) => {
      if (data.answer) {
        const sanitizedAnswer = DOMPurify.sanitize(
          marked(data.answer) as string
        );
        const newEntry: ConversationEntry = {
          question: data.question,
          answer: sanitizedAnswer,
        };
        setConversation((prevConversations) => [
          ...prevConversations,
          newEntry,
        ]);
        setStatusMessage(""); // Clear the status message once we receive an answer
      } else if (data.message) {
        setStatusMessage(data.message); // Update the status message with the latest update from the assistant
      }
    });

    socket.on("assistant_error", (error: any) => {
      console.error("Error from assistant:", error.error);
      setStatusMessage(`Error: ${error.error}`); // Display the error as the status message
    });

    return () => {
      socket.off("assistant_update");
      socket.off("assistant_error");
    };
  }, []);

  const handleSubmit = async (event: React.FormEvent) => {
    event.preventDefault();
    if (!question.trim()) return;
    setStatusMessage("Processing your question..."); // Set a processing message immediately after submitting
    socket.emit("start_assistant", { question });
    setQuestion(""); // Reset question input after submission
  };

  return (
    <div className="assistant-container">
      <h1>Ask the Assistant</h1>
      <form onSubmit={handleSubmit} className="question-form">
        <input
          type="text"
          value={question}
          onChange={(e) => setQuestion(e.target.value)}
          placeholder="Ask a question..."
          className="question-input"
        />
        <button type="submit" className="submit-button">
          Submit
        </button>
      </form>
      {/* Display the status message directly beneath the text input */}
      {statusMessage && <div className="status-message">{statusMessage}</div>}
      <div className="conversation">
        {conversation.map((entry, index) => (
          <div key={index} className="conversation-entry">
            {entry.question && (
              <p>
                <strong>Question:</strong> {entry.question}
              </p>
            )}
            <div>
              <strong>Answer:</strong>{" "}
              <div
                dangerouslySetInnerHTML={{ __html: entry.answer }}
                className="answer"
              ></div>
            </div>
          </div>
        ))}
      </div>
    </div>
  );
};

export default Assistant;