#!/home/jack/miniconda3/envs/cloned_base/bin/python
import json
import logging
import os
import glob
import subprocess
import os
import string
āāā
create working directory in it a directory called CHATGPT
in CHATGPT folder unzip the downloaded zip file of chatgpt dataset
run this script from the project folder it will create a three folders called:
directory1 = āCHATGPT/JSONā
make_path_exist(directory1)
directory2 = āCHATGPT/HTMLā
make_path_exist(directory2)
directory3 = āCHATGPT/TEXTā
make_path_exist(directory3)
it will convert the data into three forms txt, html and json
it will create a database called CHATGPT_files.db
it will create a table called files
it will insert the data into the table
it will close the connection
āāā
def clean_title(title):
valid_chars = set(string.ascii_letters + string.digits + string.whitespace)
cleaned_title = āā.join(char if char in valid_chars else āā for char in title)
cleaned_title = cleaned_title.replace(ā ', 'ā) # Replace spaces with underscores
return cleaned_title.strip()
make a function tooocreate folder if it doesnāt exist
āāā
This code defines a function make_path_exist that takes a directory path as input and creates the directory if it does not already exist. It then calls this function three times with different directory names.
āāā
def make_path_exist(directory):
path = os.path.join(os.getcwd(), directory)
if not os.path.exists(path):
os.makedirs(path)
def split_and_save_and_convert(conversations_file):
directory1 = āCHATGPT/JSONā
make_path_exist(directory1)
directory2 = āCHATGPT/HTMLā
make_path_exist(directory2)
directory3 = āCHATGPT/TEXTā
make_path_exist(directory3)
try:
with open(conversations_file, ārā, encoding=āutf-8ā) as file:
data = json.load(file)
for conversation in data:
title = conversation.get('title', 'Unknown_Title')
title_with_underscores = clean_title(title)
chapter_filename = f"{title_with_underscores}.json"
chapter_filepath = os.path.join(directory1, chapter_filename)
logging.info(f"Saving data for conversation '{title}' to {chapter_filepath}")
with open(chapter_filepath, 'w', encoding='utf-8') as chapter_file:
json.dump([conversation], chapter_file, indent=2)
# Convert JSON to HTML
html_output_file = os.path.join(directory2, f"{title_with_underscores}.html")
convert_to_html(chapter_filepath, html_output_file)
# Convert JSON to TXT
txt_output_file = os.path.join(directory3, f"{title_with_underscores}.txt")
convert_to_txt(chapter_filepath, txt_output_file)
except FileNotFoundError:
logging.error(f"File not found: {conversations_file}")
except json.JSONDecodeError:
logging.error(f"Error decoding JSON in file: {conversations_file}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
def convert_to_html(json_file, html_output_file):
with open(json_file, ārā, encoding=āutf-8ā) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(html_output_file, "w", encoding='utf-8') as html_output:
result_html = result_str.replace("/n", "XXXXXXX\n")
result_html = result_html.replace("<", "<")
result_html = result_html.replace(">", ">")
for line in result_html.split("XXXXXXX"):
line = line.replace("\n", "<br />\n")
html_output.write(line)
def convert_to_txt(json_file, txt_output_file):
with open(json_file, ārā, encoding=āutf-8ā) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(txt_output_file, "w", encoding='utf-8') as txt_output:
result_txt = result_str.replace("/n", "XXXXXXX\n")
for line in result_txt.split("XXXXXXX"):
txt_output.write(line)
def get_conversation_result(json_data):
result_str = āā
for conversation in json_data:
title = conversation.get(ātitleā, āā)
messages = get_conversation_messages(conversation)
result_str += title + '\n'
for message in messages:
result_str += message['author'] + '\n' + message['text'] + '\n'
result_str += '\n'
return result_str
def get_conversation_messages(conversation):
messages =
current_node = conversation.get(ācurrent_nodeā)
while current_node:
node = conversation[āmappingā][current_node]
message = node.get(āmessageā)
if (message and message.get(ācontentā) and message[ācontentā].get(ācontent_typeā) == ātextā and
len(message[ācontentā].get(āpartsā, )) > 0 and len(message[ācontentā][āpartsā][0]) > 0 and
(message[āauthorā][āroleā] != āsystemā or message.get(āmetadataā, {}).get(āis_user_system_messageā))):
author = message[āauthorā][āroleā]
if author == āassistantā:
author = āChatGPTā
elif author == āsystemā and message[āmetadataā].get(āis_user_system_messageā):
author = āCustom user infoā
messages.append({āauthorā: author, ātextā: message[ācontentā][āpartsā][0]})
current_node = node.get(āparentā)
return messages[::-1]
Example usage
conversations_file_path = āCHATGPT/conversations.jsonā
#output_folder = āCHATDPT/output_txt_html_jsonā
Ensure the output folder exists
#os.makedirs(output_folder, exist_ok=True)
Configure logging
logging.basicConfig(level=logging.INFO)
Call the split, save, and convert function
split_and_save_and_convert(conversations_file_path)
import sqlite3
import os
import hashlib
Connect to SQLite database (creates a new database if it doesnāt exist)
db_path2 = āCHATGPT_files.dbā
conn = sqlite3.connect(db_path2)
cursor = conn.cursor()
Create a table to store file information
cursor.execute(āāā
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
filename TEXT NOT NULL,
content BLOB NOT NULL,
text_content TEXT NOT NULL,
hash_value TEXT NOT NULL,
format TEXT NOT NULL
)
āā')
Commit changes and close the connection
conn.commit()
conn.close()
Function to calculate SHA-256 hash of a file
def calculate_hash(file_path):
sha256 = hashlib.sha256()
with open(file_path, ārbā) as file:
while chunk := file.read(8192): # Read in 8KB chunks
sha256.update(chunk)
return sha256.hexdigest()
Function to insert a file into the database
def insert_file(filename, content, text_content, hash_value, file_format):
conn = sqlite3.connect(db_path2)
cursor = conn.cursor()
cursor.execute(āINSERT INTO files (filename, content, text_content, hash_value, format) VALUES (?, ?, ?, ?, ?)ā,
(filename, content, text_content, hash_value, file_format))
conn.commit()
conn.close()
Function to insert HTML files recursively
def insert_text_files(directory):
for filename in os.listdir(directory): # Corrected variable name
if filename.endswith(ā.txtā):
file_path = os.path.join(directory, filename) # Construct full file path
with open(file_path, ārbā) as file:
print(file_path)
file_content = file.read()
text_content = file_content.decode('utf-8', errors='ignore') # Convert bytes to string
hash_value = calculate_hash(file_path)
insert_file(filename, file_content, text_content, hash_value, 'txt') # Corrected insertion
print(f"Inserted: {filename}")
Example: Insert HTML files recursively from the specified directory
input_folder = āCHATGPT/TEXTā
insert_text_files(input_folder)
print(āInsertion process completed.ā)
#---------------------------------------------------
def clean_title(title):
valid_chars = set(string.ascii_letters + string.digits + string.whitespace)
cleaned_title = āā.join(char if char in valid_chars else āā for char in title)
cleaned_title = cleaned_title.replace(ā ', 'ā) # Replace spaces with underscores
return cleaned_title.strip()
make a function tooocreate folder if it doesnāt exist
āāā
This code defines a function make_path_exist that takes a directory path as input and creates the directory if it does not already exist. It then calls this function three times with different directory names.
āāā
def make_path_exist(directory):
path = os.path.join(os.getcwd(), directory)
if not os.path.exists(path):
os.makedirs(path)
def split_and_save_and_convert(conversations_file):
directory1 = āCHATGPT/JSONā
make_path_exist(directory1)
directory2 = āCHATGPT/HTMLā
make_path_exist(directory2)
directory3 = āCHATGPT/TEXTā
make_path_exist(directory3)
try:
with open(conversations_file, ārā, encoding=āutf-8ā) as file:
data = json.load(file)
for conversation in data:
title = conversation.get('title', 'Unknown_Title')
title_with_underscores = clean_title(title)
chapter_filename = f"{title_with_underscores}.json"
chapter_filepath = os.path.join(directory1, chapter_filename)
logging.info(f"Saving data for conversation '{title}' to {chapter_filepath}")
with open(chapter_filepath, 'w', encoding='utf-8') as chapter_file:
json.dump([conversation], chapter_file, indent=2)
# Convert JSON to HTML
html_output_file = os.path.join(directory2, f"{title_with_underscores}.html")
convert_to_html(chapter_filepath, html_output_file)
# Convert JSON to TXT
txt_output_file = os.path.join(directory3, f"{title_with_underscores}.txt")
convert_to_txt(chapter_filepath, txt_output_file)
except FileNotFoundError:
logging.error(f"File not found: {conversations_file}")
except json.JSONDecodeError:
logging.error(f"Error decoding JSON in file: {conversations_file}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
def convert_to_html(json_file, html_output_file):
with open(json_file, ārā, encoding=āutf-8ā) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(html_output_file, "w", encoding='utf-8') as html_output:
result_html = result_str.replace("/n", "XXXXXXX\n")
result_html = result_html.replace("<", "<")
result_html = result_html.replace(">", ">")
for line in result_html.split("XXXXXXX"):
line = line.replace("\n", "<br />\n")
html_output.write(line)
def convert_to_txt(json_file, txt_output_file):
with open(json_file, ārā, encoding=āutf-8ā) as file:
json_data = json.load(file)
result_str = get_conversation_result(json_data)
with open(txt_output_file, "w", encoding='utf-8') as txt_output:
result_txt = result_str.replace("/n", "XXXXXXX\n")
for line in result_txt.split("XXXXXXX"):
txt_output.write(line)
def get_conversation_result(json_data):
result_str = āā
for conversation in json_data:
title = conversation.get(ātitleā, āā)
messages = get_conversation_messages(conversation)
result_str += title + '\n'
for message in messages:
result_str += message['author'] + '\n' + message['text'] + '\n'
result_str += '\n'
return result_str
def get_conversation_messages(conversation):
messages =
current_node = conversation.get(ācurrent_nodeā)
while current_node:
node = conversation[āmappingā][current_node]
message = node.get(āmessageā)
if (message and message.get(ācontentā) and message[ācontentā].get(ācontent_typeā) == ātextā and
len(message[ācontentā].get(āpartsā, )) > 0 and len(message[ācontentā][āpartsā][0]) > 0 and
(message[āauthorā][āroleā] != āsystemā or message.get(āmetadataā, {}).get(āis_user_system_messageā))):
author = message[āauthorā][āroleā]
if author == āassistantā:
author = āChatGPTā
elif author == āsystemā and message[āmetadataā].get(āis_user_system_messageā):
author = āCustom user infoā
messages.append({āauthorā: author, ātextā: message[ācontentā][āpartsā][0]})
current_node = node.get(āparentā)
return messages[::-1]
Example usage
conversations_file_path = āCHATGPT/conversations.jsonā
#output_folder = āCHATDPT/output_txt_html_jsonā
Ensure the output folder exists
#os.makedirs(output_folder, exist_ok=True)
Configure logging
logging.basicConfig(level=logging.INFO)
Call the split, save, and convert function
#split_and_save_and_convert(conversations_file_path)
json_file=āCHATGPT/conversations.jsonā
txt_output_file =āconversations_2_text.txtā
convert_to_txt(json_file, txt_output_file)
Insert = open(āconversations.txtā,āaā)
with open(āconversations_2_text.txtā,ārā) as data:
lines = data.read()
line = lines.replace(āuser\nā,āCHAT_DIALOGUEuser\nā)
Insert.write(line)
import sqlite3
import logging
Configure logging
logging.basicConfig(level=logging.DEBUG)
def connect_to_database(database_name):
āā"
Connect to the SQLite database.
Args:
database_name (str): The name of the SQLite database file.
Returns:
sqlite3.Connection or None: The database connection or None if connection fails.
"""
try:
conn = sqlite3.connect(database_name)
logging.info("Connected to the database successfully.")
return conn
except Exception as e:
logging.error(f"Failed to connect to the database: {e}")
return None
def create_table(conn):
āā"
Create the dialogue table in the database.
Args:
conn (sqlite3.Connection): The SQLite database connection.
āā"
try:
if conn:
c = conn.cursor()
c.execute(āāāCREATE TABLE IF NOT EXISTS dialogue (
id INTEGER PRIMARY KEY,
user_ChatGPT_PAIR TEXT,
user_ChatGPT_PAIRb BLOB
)āāā)
conn.commit()
logging.info(āTable ādialogueā created successfully.ā)
except Exception as e:
logging.error(f"Failed to create table ādialogueā: {e}ā)
def insert_dialogue(conn, dialogue_data):
āāā
Insert dialogue data into the database.
Args:
conn (sqlite3.Connection): The SQLite database connection.
dialogue_data (str): The dialogue data to insert into the database.
āā"
try:
if conn:
c = conn.cursor()
c.execute(āINSERT INTO dialogue (user_ChatGPT_PAIR, user_ChatGPT_PAIRb) VALUES (?,?)ā, (dialogue_data,dialogue_data.encode(āutf-8ā),))
conn.commit()
logging.info(āDialogue inserted into the database successfully.ā)
except Exception as e:
logging.error(f"Failed to insert dialogue into the database: {e}")
Define the file path
file_path = āconversations.txtā
Read the file and insert dialogue into the database
try:
with open(file_path, ārā) as file:
file_contents = file.read()
dialogue_parts = file_contents.split(āCHAT_DIALOGUEā)
conn = connect_to_database(ādialogueEXP2.dbā)
if conn:
create_table(conn)
for dialogue_part in dialogue_parts:
insert_dialogue(conn, dialogue_part.strip())
print(ā.ā, end=ā-ā)
conn.close()
except Exception as e:
logging.error(f"An error occurred while reading or processing the file: {e}")