Certainly!
Pardon the mess…
import os
import sys
import json
import time
import requests
import subprocess
from tqdm import tqdm
from openai import OpenAI
from dotenv import load_dotenv
import docx
def debug_print(message):
print(f"DEBUG: {message}")
sys.stdout.flush()
# Load environment variables
load_dotenv()
# Set up OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
MAX_TOKENS = 2000
INITIAL_DELAY = 10 # Initial delay between API calls in seconds
def debug_print(message):
print(f"DEBUG: {message}")
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
def curl_test():
debug_print("Testing API connection with curl...")
curl_command = f'curl https://api.openai.com/v1/models -H "Authorization: Bearer {api_key}"'
try:
result = subprocess.run(curl_command, shell=True, check=True, capture_output=True, text=True)
debug_print("Curl command executed successfully.")
debug_print(f"Curl output: {result.stdout[:200]}...") # Print first 200 characters
return True
except subprocess.CalledProcessError as e:
debug_print(f"Curl command failed: {e}")
debug_print(f"Curl error output: {e.stderr}")
return False
def requests_test():
debug_print("Testing API connection with requests...")
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.get("https://api.openai.com/v1/models", headers=headers)
response.raise_for_status()
debug_print("Requests test successful.")
debug_print(f"Response status: {response.status_code}")
debug_print(f"Response content: {response.text[:200]}...") # Print first 200 characters
return True
except requests.exceptions.RequestException as e:
debug_print(f"Requests test failed: {str(e)}")
return False
def api_connection_test():
if curl_test():
debug_print("Curl test passed. Now trying with Python requests...")
if requests_test():
debug_print("Both curl and requests tests passed. API connection is working.")
return True
else:
debug_print("Curl test passed, but requests test failed. Issue might be in Python environment.")
return False
else:
debug_print("Curl test failed. There might be a network or API key issue.")
return False
def translate_chunk(chunk, delay=INITIAL_DELAY, max_retries=5):
debug_print(f"Sending chunk of length {len(chunk)} to OpenAI for translation")
for attempt in range(max_retries):
try:
time.sleep(delay) # Delay here
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a translator. Translate the following Hebrew text to English."},
{"role": "user", "content": chunk}
]
)
translated_chunk = response.choices[0].message.content
debug_print("Translation received from OpenAI")
return translated_chunk, delay # Return the current delay for future use
except Exception as e:
error_message = str(e)
debug_print(f"Error during translation (attempt {attempt + 1}/{max_retries}): {error_message}")
if "rate_limit_exceeded" in error_message.lower():
delay *= 2 # Double the delay if rate limit is exceeded
debug_print(f"Rate limit exceeded. Increasing delay to {delay} seconds.")
elif "insufficient_quota" in error_message.lower():
debug_print("Quota exceeded. Please check your OpenAI account status and billing details.")
return None, delay
elif attempt < max_retries - 1:
debug_print(f"Retrying in {delay} seconds...")
else:
debug_print("Max retries reached. Translation failed.")
return None, delay
def split_into_chunks(text, max_tokens):
chunks = []
current_chunk = ""
sentences = text.split('.')
for sentence in sentences:
if len(current_chunk) + len(sentence) > max_tokens:
chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk += sentence + '.'
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def read_docx_file(file_path):
try:
doc = docx.Document(file_path)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)
except Exception as e:
debug_print(f"Error reading file {file_path}: {str(e)}")
return None
def get_progress_file_path(source_path):
return source_path + ".progress"
def process_files(source_folder, destination_folder):
debug_print(f"Starting process_files function")
debug_print(f"Source folder: {source_folder}")
debug_print(f"Destination folder: {destination_folder}")
os.makedirs(destination_folder, exist_ok=True)
files = [f for f in os.listdir(source_folder) if f.endswith('.docx') and not f.startswith('~$') and os.path.isfile(os.path.join(source_folder, f))]
debug_print(f"Found {len(files)} valid .docx files")
files.sort()
translated_files = set(f.split('_')[0] for f in os.listdir(destination_folder) if f.endswith('_Haim_Hadashim_Translated.txt'))
debug_print(f"Found {len(translated_files)} already translated files")
files_to_process = [f for f in files if f.split('_')[0] not in translated_files]
debug_print(f"{len(files_to_process)} files left to process")
progress_bar = tqdm(total=len(files_to_process), unit="file")
current_delay = INITIAL_DELAY
for filename in files_to_process:
source_path = os.path.join(source_folder, filename)
index = filename.split('_')[0]
dest_filename = f"{index}_Haim_Hadashim_Translated.txt"
dest_path = os.path.join(destination_folder, dest_filename)
try:
success, current_delay = process_file(source_path, dest_path, current_delay)
if success:
progress_bar.update(1)
debug_print(f"Successfully processed {filename}")
debug_print(f"Waiting for {current_delay} seconds before next file")
time.sleep(current_delay)
else:
debug_print(f"Failed to process {filename}")
except Exception as e:
debug_print(f"Unexpected error processing {filename}: {str(e)}")
debug_print(f"Error type: {type(e).__name__}")
debug_print(f"Error args: {e.args}")
progress_bar.close()
debug_print("File processing and translation completed")
def process_file(source_path, dest_path, current_delay):
debug_print(f"Processing file: {os.path.basename(source_path)}")
content = read_docx_file(source_path)
if content is None:
return False, current_delay
chunks = split_into_chunks(content, MAX_TOKENS)
debug_print(f"Split content into {len(chunks)} chunks")
translated_chunks = []
for i, chunk in enumerate(chunks):
debug_print(f"Translating chunk {i+1}/{len(chunks)}")
translated_chunk, new_delay = translate_chunk(chunk, delay=current_delay)
if translated_chunk is None:
debug_print(f"Translation failed for chunk {i+1}")
return False, new_delay
translated_chunks.append(translated_chunk)
current_delay = new_delay # Update the delay for the next API call
full_translation = "\n\n".join(translated_chunks)
try:
debug_print(f"Writing translated content to {os.path.basename(dest_path)}")
with open(dest_path, 'w', encoding='utf-8') as file:
file.write(full_translation)
except Exception as e:
debug_print(f"Error writing file {os.path.basename(dest_path)}: {str(e)}")
return False, current_delay
return True, current_delay
def save_progress(source_path, translated_chunks):
progress_file = get_progress_file_path(source_path)
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(translated_chunks, f, ensure_ascii=False, indent=2)
def load_progress(source_path):
progress_file = get_progress_file_path(source_path)
if os.path.exists(progress_file):
with open(progress_file, 'r', encoding='utf-8') as f:
return json.load(f)
return []
if __name__ == "__main__":
debug_print("Script started")
if not api_connection_test():
debug_print("Exiting due to API connection or test failure.")
sys.exit(1)
source_folder = r"C:\Users\cxj05\Documents\KabU Chatbot\Haim Hadashim\haim-hadashim-files\he"
destination_folder = r"C:\Users\cxj05\Documents\KabU Chatbot\Haim Hadashim\haim-hadashim-files\Translated Texts"
process_files(source_folder, destination_folder)
debug_print("Script completed")