- When we ask question about math and ask it to elaborate we get arround 16000 tokens of \n\t
Did anybody experience something similar or any suggestions
Did anybody experience something similar or any suggestions
Each model has a different limit for input and output tokens.
In case of gpt-4o it is around 16k. There are other models with a higher output limit.
https://platform.openai.com/docs/models/compare?model=gpt-4o
problem it dosent output anything but control characters
This can be a symptom when you use the API parameter response_format
, specify json_object
as the type, but then don’t provide the AI a very detailed description of the mandatory JSON response style and its keys you wish to receive.
Do not use this parameter if you don’t want JSON, and if you do want a JSON, it is much better to use json_schema
as the response format type, and provide a JSON schema to form the output within.
What did you expect, you can get unlimited answers?
Create a software solutions, something like:
import openai
import time
import os
import tiktoken
from datetime import datetime
import json
import backoff
import logging
import argparse
from concurrent.futures import ThreadPoolExecutor
import threading
import signal
import sys
import hashlib
import asyncio
import aiohttp
import re
from typing import Dict, List, Optional, Tuple, Union, Any
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("api_requests.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("openai_generator")
class TokenBudget:
"""Manages token budgeting and cost estimation for OpenAI API calls"""
# Approximate costs per 1K tokens as of March 2025 (may need updating)
COST_PER_1K_TOKENS = {
"gpt-4": {"prompt": 0.03, "completion": 0.06},
"gpt-4-turbo": {"prompt": 0.01, "completion": 0.03},
"gpt-3.5-turbo": {"prompt": 0.0015, "completion": 0.002},
}
def __init__(self, model: str):
self.model = model
self.prompt_tokens = 0
self.completion_tokens = 0
self.cost = 0.0
# Use default costs if model not found
default_model = "gpt-4"
self.prompt_cost_per_1k = self.COST_PER_1K_TOKENS.get(
model, self.COST_PER_1K_TOKENS[default_model]
)["prompt"]
self.completion_cost_per_1k = self.COST_PER_1K_TOKENS.get(
model, self.COST_PER_1K_TOKENS[default_model]
)["completion"]
def add_prompt_tokens(self, tokens: int) -> None:
"""Add prompt tokens and update cost"""
self.prompt_tokens += tokens
self.cost += (tokens / 1000) * self.prompt_cost_per_1k
def add_completion_tokens(self, tokens: int) -> None:
"""Add completion tokens and update cost"""
self.completion_tokens += tokens
self.cost += (tokens / 1000) * self.completion_cost_per_1k
def get_stats(self) -> Dict[str, Union[int, float]]:
"""Get current token usage and cost statistics"""
return {
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens,
"total_tokens": self.prompt_tokens + self.completion_tokens,
"estimated_cost_usd": round(self.cost, 4)
}
class RateLimiter:
"""Thread-safe rate limiter for API requests"""
def __init__(self, requests_per_minute: int, tokens_per_minute: Optional[int] = None):
self.requests_per_minute = requests_per_minute
self.tokens_per_minute = tokens_per_minute
self.min_time_between_requests = 60.0 / requests_per_minute
self.last_request_time = 0
self.tokens_used_in_current_minute = 0
self.minute_start_time = time.time()
self.lock = threading.RLock()
def _reset_token_counter_if_needed(self) -> None:
"""Reset token counter if a minute has passed"""
current_time = time.time()
if current_time - self.minute_start_time >= 60:
self.tokens_used_in_current_minute = 0
self.minute_start_time = current_time
def wait_if_needed(self, tokens: Optional[int] = None) -> float:
"""
Wait if necessary to respect rate limits
Returns the time waited in seconds
"""
with self.lock:
current_time = time.time()
# Check time-based rate limit
time_since_last_request = current_time - self.last_request_time
time_to_wait = max(0, self.min_time_between_requests - time_since_last_request)
# Check token-based rate limit if applicable
if self.tokens_per_minute is not None and tokens is not None:
self._reset_token_counter_if_needed()
# If adding these tokens would exceed the limit, calculate wait time
if self.tokens_used_in_current_minute + tokens > self.tokens_per_minute:
# Wait until the minute rolls over
token_wait_time = 60 - (current_time - self.minute_start_time)
time_to_wait = max(time_to_wait, token_wait_time)
if time_to_wait > 0:
time.sleep(time_to_wait)
# Update state
self.last_request_time = time.time()
if self.tokens_per_minute is not None and tokens is not None:
self.tokens_used_in_current_minute += tokens
return time_to_wait
class RequestCache:
"""Simple cache for API requests to avoid redundant calls"""
def __init__(self, cache_dir: str = ".request_cache"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def _get_cache_key(self, messages: List[Dict[str, str]], model: str, temperature: float) -> str:
"""Generate a unique key for the request"""
# Create a string representation of the request
request_str = json.dumps({
"messages": messages,
"model": model,
"temperature": temperature
}, sort_keys=True)
# Hash it to create a filename-safe key
return hashlib.md5(request_str.encode()).hexdigest()
def get(self, messages: List[Dict[str, str]], model: str, temperature: float) -> Optional[str]:
"""Get a cached response if it exists"""
key = self._get_cache_key(messages, model, temperature)
cache_file = os.path.join(self.cache_dir, key + ".json")
if os.path.exists(cache_file):
try:
with open(cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
logger.info(f"Cache hit for request with key {key[:8]}")
return data.get("content")
except Exception as e:
logger.warning(f"Error reading cache: {str(e)}")
return None
def save(self, messages: List[Dict[str, str]], model: str, temperature: float, content: str) -> None:
"""Save a response to the cache"""
key = self._get_cache_key(messages, model, temperature)
cache_file = os.path.join(self.cache_dir, key + ".json")
try:
with open(cache_file, "w", encoding="utf-8") as f:
json.dump({
"messages": messages,
"model": model,
"temperature": temperature,
"content": content,
"timestamp": datetime.now().isoformat()
}, f, indent=2)
except Exception as e:
logger.warning(f"Error writing to cache: {str(e)}")
class OpenAILargeOutputGenerator:
def __init__(
self,
api_key: Optional[str] = None,
model: str = "gpt-4",
requests_per_minute: int = 50,
tokens_per_minute: Optional[int] = None,
cache_enabled: bool = True,
async_mode: bool = False,
max_concurrent_requests: int = 1,
base_url: Optional[str] = None,
organization: Optional[str] = None
):
# Set API key from parameter or environment variable
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
if not self.api_key:
raise ValueError("OpenAI API key is required. Provide it as a parameter or set OPENAI_API_KEY environment variable.")
# API configuration
self.model = model
self.base_url = base_url
self.organization = organization
# Configure OpenAI client
self.client_config = {
"api_key": self.api_key,
}
if base_url:
self.client_config["base_url"] = base_url
if organization:
self.client_config["organization"] = organization
openai.api_key = self.api_key
if base_url:
openai.api_base = base_url
if organization:
openai.organization = organization
# Initialize tokenizer
try:
self.encoder = tiktoken.encoding_for_model(model)
except KeyError:
logger.warning(f"Tokenizer not found for model {model}, using cl100k_base instead")
self.encoder = tiktoken.get_encoding("cl100k_base")
# Rate limiting
self.rate_limiter = RateLimiter(requests_per_minute, tokens_per_minute)
# Token budget tracking
self.token_budget = TokenBudget(model)
# Request cache
self.cache_enabled = cache_enabled
if cache_enabled:
self.cache = RequestCache()
# Async mode settings
self.async_mode = async_mode
self.max_concurrent_requests = max_concurrent_requests
self.async_session = None
# Concurrency control
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_requests)
# Interrupt handling
self.stop_event = threading.Event()
signal.signal(signal.SIGINT, self._handle_interrupt)
# Output directory
self.output_dir = f"output_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(self.output_dir, exist_ok=True)
# Metadata tracking
self.metadata = {
"model": model,
"api_base": base_url or "default",
"organization": organization,
"start_time": datetime.now().isoformat(),
"chunks": [],
"token_usage": self.token_budget.get_stats(),
"api_calls": 0,
"errors": 0,
"rate_limit_waits": 0,
"total_wait_time": 0.0,
"settings": {
"requests_per_minute": requests_per_minute,
"tokens_per_minute": tokens_per_minute,
"cache_enabled": cache_enabled,
"async_mode": async_mode,
"max_concurrent_requests": max_concurrent_requests
}
}
logger.info(f"Initialized OpenAI generator with model {model}")
logger.info(f"Rate limits: {requests_per_minute} requests/min, {tokens_per_minute or 'unlimited'} tokens/min")
logger.info(f"Output directory: {self.output_dir}")
def _handle_interrupt(self, sig, frame):
"""Handle interrupt signal (Ctrl+C)"""
logger.warning("Interrupt received, stopping gracefully...")
self.stop_event.set()
# Allow current operations to finish by not exiting immediately
def _count_tokens(self, text: str) -> int:
"""Count the number of tokens in the text using the model's tokenizer"""
return len(self.encoder.encode(text))
def _count_message_tokens(self, messages: List[Dict[str, str]]) -> int:
"""Count tokens in a list of chat messages"""
token_count = 0
for message in messages:
# Add tokens for message content
token_count += self._count_tokens(message.get("content", ""))
# Add tokens for role (approximation)
token_count += 4 # Approximate overhead per message
# Add tokens for message formatting overhead (approximation)
token_count += 3 # Overhead for the overall structure
return token_count
def _sanitize_continuation_content(self, text: str) -> str:
"""
Clean up content for continuation to avoid instruction repetition
and other issues that can occur when chunking
"""
# Remove any continuation instructions that might have leaked into the output
patterns_to_remove = [
r"(?i)Continue\s+from\s+where\s+you\s+left\s+off\.?",
r"(?i)Do\s+not\s+restart\s+or\s+summarize\s+what\s+you've\s+said\s+already\.?",
r"(?i)I'll\s+continue\s+from\s+where\s+I\s+left\s+off\.?",
r"(?i)Continuing\s+from\s+my\s+previous\s+response\.?",
r"(?i)Let\s+me\s+continue\.?"
]
for pattern in patterns_to_remove:
text = re.sub(pattern, "", text)
return text.strip()
async def _init_async_session(self):
"""Initialize the async session if not already done"""
if self.async_session is None:
self.async_session = aiohttp.ClientSession()
logger.debug("Initialized async session")
async def _close_async_session(self):
"""Close the async session if it exists"""
if self.async_session is not None:
await self.async_session.close()
self.async_session = None
logger.debug("Closed async session")
@backoff.on_exception(
backoff.expo,
(openai.error.RateLimitError, openai.error.APIConnectionError,
openai.error.ServiceUnavailableError, openai.error.APIError,
aiohttp.ClientError, asyncio.TimeoutError),
max_tries=8,
factor=2,
jitter=backoff.full_jitter
)
async def _generate_completion_async(self,
messages: List[Dict[str, str]],
max_tokens: int = 4000,
temperature: float = 0.7) -> str:
"""Generate a completion asynchronously with OpenAI API"""
await self._init_async_session()
# Check cache first
if self.cache_enabled:
cached_response = self.cache.get(messages, self.model, temperature)
if cached_response:
return cached_response
prompt_tokens = self._count_message_tokens(messages)
self.token_budget.add_prompt_tokens(prompt_tokens)
# Apply rate limiting
wait_time = self.rate_limiter.wait_if_needed(prompt_tokens)
if wait_time > 0:
self.metadata["rate_limit_waits"] += 1
self.metadata["total_wait_time"] += wait_time
logger.info(f"Rate limit applied: waited {wait_time:.2f}s")
self.metadata["api_calls"] += 1
logger.info(f"Making async API call with {prompt_tokens} prompt tokens")
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
if self.organization:
headers["OpenAI-Organization"] = self.organization
base_url = self.base_url or "https://api.openai.com/v1"
url = f"{base_url}/chat/completions"
payload = {
"model": self.model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": True
}
async with self.async_session.post(url, headers=headers, json=payload) as response:
response.raise_for_status()
chunk_data = ""
async for line in response.content:
if self.stop_event.is_set():
logger.warning("Stopping async generation due to interrupt")
break
line = line.strip()
if not line or line == b'data: [DONE]':
continue
if line.startswith(b'data: '):
try:
chunk = json.loads(line[6:])
delta = chunk['choices'][0].get('delta', {})
text = delta.get('content', '')
chunk_data += text
print(text, end="", flush=True)
except Exception as e:
logger.error(f"Error parsing chunk: {str(e)}")
completion_tokens = self._count_tokens(chunk_data)
self.token_budget.add_completion_tokens(completion_tokens)
# Update token usage stats in metadata
self.metadata["token_usage"] = self.token_budget.get_stats()
logger.info(f"Generated {completion_tokens} tokens asynchronously")
# Save to cache if enabled
if self.cache_enabled and chunk_data:
self.cache.save(messages, self.model, temperature, chunk_data)
return chunk_data
@backoff.on_exception(
backoff.expo,
(openai.error.RateLimitError, openai.error.APIConnectionError,
openai.error.ServiceUnavailableError, openai.error.APIError),
max_tries=8,
factor=2,
jitter=backoff.full_jitter
)
def _generate_completion(self,
messages: List[Dict[str, str]],
max_tokens: int = 4000,
temperature: float = 0.7) -> str:
"""Generate a completion with exponential backoff for API errors"""
# Check cache first
if self.cache_enabled:
cached_response = self.cache.get(messages, self.model, temperature)
if cached_response:
return cached_response
prompt_tokens = self._count_message_tokens(messages)
self.token_budget.add_prompt_tokens(prompt_tokens)
# Apply rate limiting
wait_time = self.rate_limiter.wait_if_needed(prompt_tokens)
if wait_time > 0:
self.metadata["rate_limit_waits"] += 1
self.metadata["total_wait_time"] += wait_time
logger.info(f"Rate limit applied: waited {wait_time:.2f}s")
try:
self.metadata["api_calls"] += 1
logger.info(f"Making API call with {prompt_tokens} prompt tokens")
response = openai.ChatCompletion.create(
model=self.model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=True
)
chunk_data = ""
for chunk in response:
if self.stop_event.is_set():
logger.warning("Stopping generation due to interrupt")
break
delta = chunk['choices'][0].get('delta', {})
text = delta.get('content', '')
chunk_data += text
print(text, end="", flush=True)
completion_tokens = self._count_tokens(chunk_data)
self.token_budget.add_completion_tokens(completion_tokens)
# Update token usage stats in metadata
self.metadata["token_usage"] = self.token_budget.get_stats()
logger.info(f"Generated {completion_tokens} tokens")
# Save to cache if enabled
if self.cache_enabled and chunk_data:
self.cache.save(messages, self.model, temperature, chunk_data)
return chunk_data
except Exception as e:
if not isinstance(e, (openai.error.RateLimitError, openai.error.APIConnectionError,
openai.error.ServiceUnavailableError, openai.error.APIError)):
# These will be handled by the backoff decorator
self.metadata["errors"] += 1
logger.error(f"Unexpected error: {str(e)}")
raise e
def _save_chunk(self, chunk_count: int, chunk_data: str, tokens: int) -> str:
"""Save a chunk to disk and update metadata"""
# Sanitize the content
chunk_data = self._sanitize_continuation_content(chunk_data)
# Save this chunk
chunk_filename = os.path.join(self.output_dir, f"chunk_{chunk_count:03d}.txt")
with open(chunk_filename, "w", encoding="utf-8") as f:
f.write(chunk_data)
# Update metadata
self.metadata["chunks"].append({
"chunk_number": chunk_count,
"tokens": tokens,
"filename": chunk_filename,
"timestamp": datetime.now().isoformat()
})
# Save metadata after each chunk
with open(os.path.join(self.output_dir, "metadata.json"), "w", encoding="utf-8") as f:
json.dump(self.metadata, f, indent=2)
return chunk_filename
def _create_continuation_messages(self,
original_prompt: str,
last_chunk: str,
max_context_tokens: int = 1500) -> List[Dict[str, str]]:
"""Create a well-structured continuation prompt"""
context_text = last_chunk
# Ensure we're not exceeding the token limit for context
if self._count_tokens(context_text) > max_context_tokens:
token_ids = self.encoder.encode(context_text)[-max_context_tokens:]
context_text = self.encoder.decode(token_ids)
# Clean up the context to remove any continuation instructions
context_text = self._sanitize_continuation_content(context_text)
return [
{"role": "system", "content": "You are continuing a response. Maintain the same style, tone, and format. "
"Do not repeat information, do not restart, and do not summarize what you've already said. "
"Pick up exactly where the previous response ended and continue developing the content."},
{"role": "user", "content": original_prompt},
{"role": "assistant", "content": context_text},
{"role": "user", "content": "Continue from where you left off, maintaining the same quality and depth."}
]
async def get_large_output_async(self,
prompt: str,
max_tokens_per_request: int = 4000,
total_tokens: int = 100000,
temperature: float = 0.7,
checkpoint_interval: int = 1) -> Tuple[str, Dict[str, Any]]:
"""
Asynchronously generate a large output by making multiple API calls
Args:
prompt: The initial prompt
max_tokens_per_request: Maximum tokens per API request
total_tokens: Target number of tokens for the entire output
temperature: Temperature setting for generation
checkpoint_interval: How often to save metadata (in chunks)
Returns:
Tuple of (full output, metadata)
"""
if not self.async_mode:
logger.warning("Async mode not enabled in constructor, falling back to sync mode")
return self.get_large_output(prompt, max_tokens_per_request, total_tokens, temperature, checkpoint_interval)
output = ""
token_count = 0
chunk_count = 0
current_messages = [{"role": "user", "content": prompt}]
logger.info(f"Starting async large output generation. Target: {total_tokens} tokens")
logger.info(f"Output will be saved in: {self.output_dir}")
try:
while token_count < total_tokens and not self.stop_event.is_set():
logger.info(f"Generating chunk {chunk_count+1} asynchronously...")
chunk_data = await self._generate_completion_async(
current_messages,
max_tokens_per_request,
temperature
)
# Append data
output += chunk_data
chunk_tokens = self._count_tokens(chunk_data)
token_count += chunk_tokens
chunk_count += 1
# Save this chunk
self._save_chunk(chunk_count, chunk_data, chunk_tokens)
# Save full metadata at checkpoint intervals
if chunk_count % checkpoint_interval == 0:
# Update token usage stats
self.metadata["token_usage"] = self.token_budget.get_stats()
with open(os.path.join(self.output_dir, "metadata.json"), "w") as f:
json.dump(self.metadata, f, indent=2)
# If we've reached the target, break
if token_count >= total_tokens or self.stop_event.is_set():
break
# Create continuation
current_messages = self._create_continuation_messages(
prompt, chunk_data, max_context_tokens=1500
)
logger.info(f"Progress: {token_count}/{total_tokens} tokens ({(token_count/total_tokens)*100:.1f}%)")
except KeyboardInterrupt:
logger.info("Process interrupted by user")
except Exception as e:
logger.error(f"Unexpected error in async generation: {str(e)}")
self.metadata["errors"] += 1
finally:
# Close the async session
await self._close_async_session()
# Final writes
self._finalize_output(output, token_count, chunk_count)
return output, self.metadata
def get_large_output(self,
prompt: str,
max_tokens_per_request: int = 4000,
total_tokens: int = 100000,
temperature: float = 0.7,
checkpoint_interval: int = 1) -> Tuple[str, Dict[str, Any]]:
"""
Generate a large output by making multiple API calls
Args:
prompt: The initial prompt
max_tokens_per_request: Maximum tokens per API request
total_tokens: Target number of tokens for the entire output
temperature: Temperature setting for generation
checkpoint_interval: How often to save metadata (in chunks)
Returns:
Tuple of (full output, metadata)
"""
# If async mode is enabled, use the async version
if self.async_mode:
return asyncio.run(self.get_large_output_async(
prompt, max_tokens_per_request, total_tokens, temperature, checkpoint_interval
))
output = ""
token_count = 0
chunk_count = 0
current_messages = [{"role": "user", "content": prompt}]
logger.info(f"Starting large output generation. Target: {total_tokens} tokens")
logger.info(f"Output will be saved in: {self.output_dir}")
try:
while token_count < total_tokens and not self.stop_event.is_set():
logger.info(f"Generating chunk {chunk_count+1}...")
chunk_data = self._generate_completion(
current_messages,
max_tokens_per_request,
temperature
)
# Append data
output += chunk_data
chunk_tokens = self._count_tokens(chunk_data)
token_count += chunk_tokens
chunk_count += 1
# Save this chunk
self._save_chunk(chunk_count, chunk_data, chunk_tokens)
# Save full metadata at checkpoint intervals
if chunk_count % checkpoint_interval == 0:
# Update token usage stats
self.metadata["token_usage"] = self.token_budget.get_stats()
with open(os.path.join(self.output_dir, "metadata.json"), "w") as f:
json.dump(self.metadata, f, indent=2)
# If we've reached the target, break
if token_count >= total_tokens or self.stop_event.is_set():
break
# Create continuation
current_messages = self._create_continuation_messages(
prompt, chunk_data, max_context_tokens=1500
)
logger.info(f"Progress: {token_count}/{total_tokens} tokens ({(token_count/total_tokens)*100:.1f}%)")
except KeyboardInterrupt:
logger.info("Process interrupted by user")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
self.metadata["errors"] += 1
finally:
# Final writes
self._finalize_output(output, token_count, chunk_count)
return output, self.metadata
def _finalize_output(self, output: str, token_count: int, chunk_count: int) -> None:
"""Finalize the output with metadata and summary"""
self.metadata["end_time"] = datetime.now().isoformat()
self.metadata["token_usage"] = self.token_budget.get_stats()
# Calculate duration
start_time = datetime.fromisoformat(self.metadata["start_time"])
end_time = datetime.fromisoformat(self.metadata["end_time"])
duration_seconds = (end_time - start_time).total_seconds()
self.metadata["duration_seconds"] = duration_seconds
# Save the full output
full_output_path = os.path.join(self.output_dir, "full_output.txt")
with open(full_output_path, "w", encoding="utf-8") as final_file:
final_file.write(output)
# Final metadata update
self.metadata["final_token_count"] = token_count
self.metadata["output_path"] = full_output_path
self.metadata["average_tokens_per_second"] = round(token_count / max(1, duration_seconds), 2)
with open(os.path.join(self.output_dir, "metadata.json"), "w") as f:
json.dump(self.metadata, f, indent=2)
# Create a summary file
summary = {
"total_tokens": token_count,
"chunks": chunk_count,
"api_calls": self.metadata["api_calls"],
"errors": self.metadata["errors"],
"rate_limit_waits": self.metadata["rate_limit_waits"],
"total_wait_time": self.metadata["total_wait_time"],
"estimated_cost_usd": self.metadata["token_usage"]["estimated_cost_usd"],
"duration": f"{duration_seconds:.1f} seconds",
"tokens_per_second": self.metadata["average_tokens_per_second"],
"output_path": full_output_path
}