#!/home/jack/miniconda3/envs/cloned_base/bin/python
import json
import logging
import os
import glob
import subprocess
import os
import string
‘’’
create working directory in it a directory called CHATGPT
in CHATGPT folder unzip the downloaded zip file of chatgpt dataset
run this script from the project folder it will create a three folders called:
directory1 = ‘CHATGPT/JSON’
make_path_exist(directory1)
directory2 = ‘CHATGPT/HTML’
make_path_exist(directory2)
directory3 = ‘CHATGPT/TEXT’
make_path_exist(directory3)
it will convert the data into three forms txt, html and json
it will create a database called CHATGPT_files.db
it will create a table called files
it will insert the data into the table
it will close the connection
‘’’
def clean_title(title):
valid_chars = set(string.ascii_letters + string.digits + string.whitespace)
cleaned_title = ‘’.join(char if char in valid_chars else ‘’ for char in title)
cleaned_title = cleaned_title.replace(’ ', '’) # Replace spaces with underscores
return cleaned_title.strip()
make a function tooocreate folder if it doesn’t exist
‘’’
This code defines a function make_path_exist that takes a directory path as input and creates the directory if it does not already exist. It then calls this function three times with different directory names.
‘’’
def make_path_exist(directory):
path = os.path.join(os.getcwd(), directory)
if not os.path.exists(path):
os.makedirs(path)
def split_and_save_and_convert(conversations_file):
directory1 = ‘CHATGPT/JSON’
make_path_exist(directory1)
directory2 = ‘CHATGPT/HTML’
make_path_exist(directory2)
directory3 = ‘CHATGPT/TEXT’
make_path_exist(directory3)
try:
with open(conversations_file, ‘r’, encoding=‘utf-8’) as file:
data = json.load(file)
for conversation in data:
title = conversation.get('title', 'Unknown_Title')
title_with_underscores = clean_title(title)
chapter_filename = f"{title_with_underscores}.json"
chapter_filepath = os.path.join(directory1, chapter_filename)
logging.info(f"Saving data for conversation '{title}' to {chapter_filepath}")
with open(chapter_filepath, 'w', encoding='utf-8') as chapter_file:
json.dump([conversation], chapter_file, indent=2)
# Convert JSON to HTML
html_output_file = os.path.join(directory2, f"{title_with_underscores}.html")
convert_to_html(chapter_filepath, html_output_file)
# Convert JSON to TXT
txt_output_file = os.path.join(directory3, f"{title_with_underscores}.txt")
convert_to_txt(chapter_filepath, txt_output_file)
except FileNotFoundError:
logging.error(f"File not found: {conversations_file}")
except json.JSONDecodeError:
logging.error(f"Error decoding JSON in file: {conversations_file}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
def convert_to_html(json_file, html_output_file):
with open(json_file, ‘r’, encoding=‘utf-8’) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(html_output_file, "w", encoding='utf-8') as html_output:
result_html = result_str.replace("/n", "XXXXXXX\n")
result_html = result_html.replace("<", "<")
result_html = result_html.replace(">", ">")
for line in result_html.split("XXXXXXX"):
line = line.replace("\n", "<br />\n")
html_output.write(line)
def convert_to_txt(json_file, txt_output_file):
with open(json_file, ‘r’, encoding=‘utf-8’) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(txt_output_file, "w", encoding='utf-8') as txt_output:
result_txt = result_str.replace("/n", "XXXXXXX\n")
for line in result_txt.split("XXXXXXX"):
txt_output.write(line)
def get_conversation_result(json_data):
result_str = “”
for conversation in json_data:
title = conversation.get(‘title’, ‘’)
messages = get_conversation_messages(conversation)
result_str += title + '\n'
for message in messages:
result_str += message['author'] + '\n' + message['text'] + '\n'
result_str += '\n'
return result_str
def get_conversation_messages(conversation):
messages =
current_node = conversation.get(‘current_node’)
while current_node:
node = conversation[‘mapping’][current_node]
message = node.get(‘message’)
if (message and message.get(‘content’) and message[‘content’].get(‘content_type’) == ‘text’ and
len(message[‘content’].get(‘parts’, )) > 0 and len(message[‘content’][‘parts’][0]) > 0 and
(message[‘author’][‘role’] != ‘system’ or message.get(‘metadata’, {}).get(‘is_user_system_message’))):
author = message[‘author’][‘role’]
if author == ‘assistant’:
author = ‘ChatGPT’
elif author == ‘system’ and message[‘metadata’].get(‘is_user_system_message’):
author = ‘Custom user info’
messages.append({‘author’: author, ‘text’: message[‘content’][‘parts’][0]})
current_node = node.get(‘parent’)
return messages[::-1]
Example usage
conversations_file_path = ‘CHATGPT/conversations.json’
#output_folder = ‘CHATDPT/output_txt_html_json’
Ensure the output folder exists
#os.makedirs(output_folder, exist_ok=True)
Configure logging
logging.basicConfig(level=logging.INFO)
Call the split, save, and convert function
split_and_save_and_convert(conversations_file_path)
import sqlite3
import os
import hashlib
Connect to SQLite database (creates a new database if it doesn’t exist)
db_path2 = ‘CHATGPT_files.db’
conn = sqlite3.connect(db_path2)
cursor = conn.cursor()
Create a table to store file information
cursor.execute(‘’’
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
filename TEXT NOT NULL,
content BLOB NOT NULL,
text_content TEXT NOT NULL,
hash_value TEXT NOT NULL,
format TEXT NOT NULL
)
‘’')
Commit changes and close the connection
conn.commit()
conn.close()
Function to calculate SHA-256 hash of a file
def calculate_hash(file_path):
sha256 = hashlib.sha256()
with open(file_path, ‘rb’) as file:
while chunk := file.read(8192): # Read in 8KB chunks
sha256.update(chunk)
return sha256.hexdigest()
Function to insert a file into the database
def insert_file(filename, content, text_content, hash_value, file_format):
conn = sqlite3.connect(db_path2)
cursor = conn.cursor()
cursor.execute(‘INSERT INTO files (filename, content, text_content, hash_value, format) VALUES (?, ?, ?, ?, ?)’,
(filename, content, text_content, hash_value, file_format))
conn.commit()
conn.close()
Function to insert HTML files recursively
def insert_text_files(directory):
for filename in os.listdir(directory): # Corrected variable name
if filename.endswith(‘.txt’):
file_path = os.path.join(directory, filename) # Construct full file path
with open(file_path, ‘rb’) as file:
print(file_path)
file_content = file.read()
text_content = file_content.decode('utf-8', errors='ignore') # Convert bytes to string
hash_value = calculate_hash(file_path)
insert_file(filename, file_content, text_content, hash_value, 'txt') # Corrected insertion
print(f"Inserted: {filename}")
Example: Insert HTML files recursively from the specified directory
input_folder = ‘CHATGPT/TEXT’
insert_text_files(input_folder)
print(‘Insertion process completed.’)
#---------------------------------------------------
def clean_title(title):
valid_chars = set(string.ascii_letters + string.digits + string.whitespace)
cleaned_title = ‘’.join(char if char in valid_chars else ‘’ for char in title)
cleaned_title = cleaned_title.replace(’ ', '’) # Replace spaces with underscores
return cleaned_title.strip()
make a function tooocreate folder if it doesn’t exist
‘’’
This code defines a function make_path_exist that takes a directory path as input and creates the directory if it does not already exist. It then calls this function three times with different directory names.
‘’’
def make_path_exist(directory):
path = os.path.join(os.getcwd(), directory)
if not os.path.exists(path):
os.makedirs(path)
def split_and_save_and_convert(conversations_file):
directory1 = ‘CHATGPT/JSON’
make_path_exist(directory1)
directory2 = ‘CHATGPT/HTML’
make_path_exist(directory2)
directory3 = ‘CHATGPT/TEXT’
make_path_exist(directory3)
try:
with open(conversations_file, ‘r’, encoding=‘utf-8’) as file:
data = json.load(file)
for conversation in data:
title = conversation.get('title', 'Unknown_Title')
title_with_underscores = clean_title(title)
chapter_filename = f"{title_with_underscores}.json"
chapter_filepath = os.path.join(directory1, chapter_filename)
logging.info(f"Saving data for conversation '{title}' to {chapter_filepath}")
with open(chapter_filepath, 'w', encoding='utf-8') as chapter_file:
json.dump([conversation], chapter_file, indent=2)
# Convert JSON to HTML
html_output_file = os.path.join(directory2, f"{title_with_underscores}.html")
convert_to_html(chapter_filepath, html_output_file)
# Convert JSON to TXT
txt_output_file = os.path.join(directory3, f"{title_with_underscores}.txt")
convert_to_txt(chapter_filepath, txt_output_file)
except FileNotFoundError:
logging.error(f"File not found: {conversations_file}")
except json.JSONDecodeError:
logging.error(f"Error decoding JSON in file: {conversations_file}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
def convert_to_html(json_file, html_output_file):
with open(json_file, ‘r’, encoding=‘utf-8’) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(html_output_file, "w", encoding='utf-8') as html_output:
result_html = result_str.replace("/n", "XXXXXXX\n")
result_html = result_html.replace("<", "<")
result_html = result_html.replace(">", ">")
for line in result_html.split("XXXXXXX"):
line = line.replace("\n", "<br />\n")
html_output.write(line)
def convert_to_txt(json_file, txt_output_file):
with open(json_file, ‘r’, encoding=‘utf-8’) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(txt_output_file, "w", encoding='utf-8') as txt_output:
result_txt = result_str.replace("/n", "XXXXXXX\n")
for line in result_txt.split("XXXXXXX"):
txt_output.write(line)
def get_conversation_result(json_data):
result_str = “”
for conversation in json_data:
title = conversation.get(‘title’, ‘’)
messages = get_conversation_messages(conversation)
result_str += title + '\n'
for message in messages:
result_str += message['author'] + '\n' + message['text'] + '\n'
result_str += '\n'
return result_str
def get_conversation_messages(conversation):
messages =
current_node = conversation.get(‘current_node’)
while current_node:
node = conversation[‘mapping’][current_node]
message = node.get(‘message’)
if (message and message.get(‘content’) and message[‘content’].get(‘content_type’) == ‘text’ and
len(message[‘content’].get(‘parts’, )) > 0 and len(message[‘content’][‘parts’][0]) > 0 and
(message[‘author’][‘role’] != ‘system’ or message.get(‘metadata’, {}).get(‘is_user_system_message’))):
author = message[‘author’][‘role’]
if author == ‘assistant’:
author = ‘ChatGPT’
elif author == ‘system’ and message[‘metadata’].get(‘is_user_system_message’):
author = ‘Custom user info’
messages.append({‘author’: author, ‘text’: message[‘content’][‘parts’][0]})
current_node = node.get(‘parent’)
return messages[::-1]
Example usage
conversations_file_path = ‘CHATGPT/conversations.json’
#output_folder = ‘CHATDPT/output_txt_html_json’
Ensure the output folder exists
#os.makedirs(output_folder, exist_ok=True)
Configure logging
logging.basicConfig(level=logging.INFO)
Call the split, save, and convert function
#split_and_save_and_convert(conversations_file_path)
json_file=‘CHATGPT/conversations.json’
txt_output_file =“conversations_2_text.txt”
convert_to_txt(json_file, txt_output_file)
Insert = open(“conversations.txt”,“a”)
with open(“conversations_2_text.txt”,“r”) as data:
lines = data.read()
line = lines.replace(“user\n”,“CHAT_DIALOGUEuser\n”)
Insert.write(line)
import sqlite3
import logging
Configure logging
logging.basicConfig(level=logging.DEBUG)
def connect_to_database(database_name):
“”"
Connect to the SQLite database.
Args:
database_name (str): The name of the SQLite database file.
Returns:
sqlite3.Connection or None: The database connection or None if connection fails.
"""
try:
conn = sqlite3.connect(database_name)
logging.info("Connected to the database successfully.")
return conn
except Exception as e:
logging.error(f"Failed to connect to the database: {e}")
return None
def create_table(conn):
“”"
Create the dialogue table in the database.
Args:
conn (sqlite3.Connection): The SQLite database connection.
“”"
try:
if conn:
c = conn.cursor()
c.execute(‘’‘CREATE TABLE IF NOT EXISTS dialogue (
id INTEGER PRIMARY KEY,
user_ChatGPT_PAIR TEXT,
user_ChatGPT_PAIRb BLOB
)’‘’)
conn.commit()
logging.info(“Table ‘dialogue’ created successfully.”)
except Exception as e:
logging.error(f"Failed to create table ‘dialogue’: {e}“)
def insert_dialogue(conn, dialogue_data):
“””
Insert dialogue data into the database.
Args:
conn (sqlite3.Connection): The SQLite database connection.
dialogue_data (str): The dialogue data to insert into the database.
“”"
try:
if conn:
c = conn.cursor()
c.execute(“INSERT INTO dialogue (user_ChatGPT_PAIR, user_ChatGPT_PAIRb) VALUES (?,?)”, (dialogue_data,dialogue_data.encode(‘utf-8’),))
conn.commit()
logging.info(“Dialogue inserted into the database successfully.”)
except Exception as e:
logging.error(f"Failed to insert dialogue into the database: {e}")
Define the file path
file_path = ‘conversations.txt’
Read the file and insert dialogue into the database
try:
with open(file_path, “r”) as file:
file_contents = file.read()
dialogue_parts = file_contents.split(“CHAT_DIALOGUE”)
conn = connect_to_database(‘dialogueEXP2.db’)
if conn:
create_table(conn)
for dialogue_part in dialogue_parts:
insert_dialogue(conn, dialogue_part.strip())
print(“.”, end=“-”)
conn.close()
except Exception as e:
logging.error(f"An error occurred while reading or processing the file: {e}")