Sure I can provide my code -
import openai
import os
import pinecone
import time
class Bot:
def init(self, api_key=None, pinecone_api_key=None, index_name=“chat-bot-index”):
self.api_key = api_key or os.environ.get(“API_KEY”)
openai.api_key = self.api_key
self.client = openai.OpenAI(api_key=self.api_key)
self.pinecone_api_key = "pinecone_api_key "
self.index_name = index_name
self.pc = pinecone.Pinecone(api_key=self.pinecone_api_key)
if self.index_name not in self.pc.list_indexes().names():
print(f"Creating index {self.index_name}...")
self.pc.create_index(
name=self.index_name,
dimension=1536,
metric='cosine',
spec=pinecone.ServerlessSpec(
cloud='aws',
region='us-east-1'
)
)
else:
print(f"Index {self.index_name} already exists.")
self.index = self.pc.Index(self.index_name)
self.assistant_id = "assistant_id "
self.thread_id = None
def create_thread(self):
self.thread = self.client.beta.threads.create()
self.thread_id = self.thread.id
def create_message(self, user_query, enhanced_content):
# Construct the prompt content
prompt_content = (
f"Based on the following content, generate an SQL query without including database schema or schema descriptions: {enhanced_content}. "
f"User's query: {user_query}"
)
self.message = self.client.beta.threads.messages.create(
thread_id=self.thread_id,
role="user",
content=prompt_content
)
def run_assistant(self):
self.run = self.client.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id
)
def is_greeting(self, user_query):
greetings = [
"hello", "hi", "hey", "good morning", "good afternoon", "good evening",
"greetings", "howdy", "what's up", "yo", "namaste", "hola", "bonjour",
"hallo", "ciao", "olá", "konnichiwa"
]
user_query_lower = user_query.lower().strip()
# Check if the query is a greeting by comparing against a list of greetings
return user_query_lower in [greeting.lower() for greeting in greetings]
def get_response(self, user_query):
start_time = time.time()
if self.thread_id:
try:
self.client.beta.threads.retrieve(thread_id=self.thread_id)
except openai.OpenAIError:
# Thread does not exist, create a new one
self.create_thread()
else:
self.create_thread()
if self.is_greeting(user_query):
greeting_response = f"{user_query.capitalize()}! How can I assist you today with your inquiries?"
print(f"Greeting response: {greeting_response}")
return greeting_response, 0, 0, 0, 0
embedding_response = self.client.embeddings.create(input=user_query, model="text-embedding-3-small")
embedding = embedding_response.data[0].embedding
print("Embedding created:", embedding) # Debug print
matching_results = self.index.query(
vector=embedding,
top_k=10,
include_values=True,
include_metadata=True
)
print("Query Results:", matching_results) # Debug print
matched_texts = [match.get("metadata", {}).get("text", "") for match in matching_results["matches"]]
combined_texts = " ".join(matched_texts)
print("Combined texts:", combined_texts) # Debug print
enhanced_content = f"{combined_texts}\n\nUser Input: {user_query}"
print("Full enhanced content:", enhanced_content) # Debug print
# Truncate enhanced_content to 10-20 characters
#truncated_content = enhanced_content[:20] # Adjust the slicing as needed
#print("Truncated content sent to GPT model:", truncated_content) # Debug print
self.create_message(user_query, enhanced_content) # Pass both user query and enhanced content
self.run_assistant() # Call without arguments
try:
while True:
run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=self.run.id)
if run.status == "completed":
messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)
assistant_response = [msg.content[0].text.value for msg in messages.data if msg.role == "assistant"]
chat_history_content = assistant_response[0]
total_time = time.time() - start_time
prompt_tokens_used = run.usage.prompt_tokens
completion_tokens_used = run.usage.completion_tokens
tokens_used = prompt_tokens_used + completion_tokens_used
print(f"Chat history content: {chat_history_content}") # Debug print
print(f"Total time: {total_time}, Prompt tokens used: {prompt_tokens_used}, Completion tokens used: {completion_tokens_used}, Tokens used: {tokens_used}") # Debug print
return chat_history_content, total_time, prompt_tokens_used, completion_tokens_used, tokens_used
except Exception as e:
total_time = time.time() - start_time
print(f"Error occurred: {e}") # Debug print
raise e