I can probably get that number much higher for you.
Here’s a utility for packing the 1516 char of its demonstration’s test string into three keys and 1208 string characters.
The encoding function provides you a parameter for the maximum number of its own keys it should create.
Against an unknown count of other keys on a conversation, I would either poll the existing key count, know your application’s count, verify the success after storage - or just use this exclusively.
import logging
logger = logging.getLogger(__name__)
import sys
import base64
from typing import Any
try:
import httpx # required by stream_response/async_stream_response and non-stream POSTs
except Exception as e:
raise ImportError("Missing dependency: install with `pip install httpx`") from e
def get_api_key_headers(
printing: bool = False,
dotenv_override: bool = True,
env_path: str | None = None,
) -> dict[str, str]:
"""
Returns OpenAI API auth headers.
Behavior:
- Attempts to load variables from a .env file (if python-dotenv is available).
- By default, .env VALUES OVERRIDE existing os.environ entries (dotenv_override=True).
- You can point to a specific file via env_path; otherwise we use find_dotenv(usecwd=True).
- Each .env path is loaded at most once per process (cached).
Notes:
- This updates process environment variables; in multi-account, single-process scenarios,
prefer passing explicit headers/keys per call rather than relying on global env.
Usage: To avoid cross-call bleed, pass a copy before mutation is possible, e.g.:
api_call(url, headers = {**get_api_key_headers(), "OpenAI-Beta": "responses=v99"}, ...)
"""
# One-time cache per path
if not hasattr(get_api_key_headers, "_dotenv_loaded_paths"):
get_api_key_headers._dotenv_loaded_paths = set()
# Best-effort .env loading (optional dependency)
try:
if env_path is None:
from dotenv import load_dotenv, find_dotenv # lazy import
dotenv_path = find_dotenv(usecwd=True)
else:
from dotenv import load_dotenv # lazy import
dotenv_path = env_path
if dotenv_path and dotenv_path not in get_api_key_headers._dotenv_loaded_paths:
load_dotenv(dotenv_path=dotenv_path, override=dotenv_override)
get_api_key_headers._dotenv_loaded_paths.add(dotenv_path)
except Exception:
# Missing python-dotenv or any load failure is a no-op.
pass
import os
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("ERROR: Set the OPENAI_API_KEY environment variable (or in your .env).")
org_id = os.environ.get("OPENAI_ORG_ID")
project_id = os.environ.get("OPENAI_PROJECT_ID")
if printing:
print(f"Using OPENAI_API_KEY {api_key[:10]}...{api_key[-4:]}")
print(f"Using optional OPENAI_ORG_ID {org_id}")
print(f"Using optional OPENAI_PROJECT_ID {project_id}")
headers: dict[str, str] = {"Authorization": f"Bearer {api_key}"}
if org_id:
headers["OpenAI-Organization"] = org_id
if project_id:
headers["OpenAI-Project"] = project_id
return headers
def create_conversation(system: str = "") -> str:
"""
Create a conversation containing one developer message that sets the tone.
Returns the server-generated conversation ID.
"""
from pathlib import Path
payload: dict[str, object] = {
"items": [
{
"type": "message",
"role": "developer",
"content": system.strip(),
}
],
}
with httpx.Client(timeout=20) as client:
response = client.post(
"https://api.openai.com/v1/conversations",
headers=get_api_key_headers(),
json=payload,
)
response.raise_for_status()
conversation_id: str = response.json()["id"]
print(f"Conversation created {conversation_id}")
return conversation_id
def delete_conversation(conversation_id: str) -> None:
"""
Delete the conversation so the demo doesn’t leave stray server objects.
On 2xx or 400, it's treated as success. Errors are logged to stderr.
"""
import sys
try:
with httpx.Client(timeout=20) as client:
response = client.delete(
f"https://api.openai.com/v1/conversations/{conversation_id}",
headers=get_api_key_headers(),
)
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
import json
resp = exc.response
# Attempt to parse JSON, fall back to raw text
try:
body_content = resp.json()
except ValueError:
body_content = resp.text
# Format body for logging/printing
if isinstance(body_content, (dict, list)):
formatted_body = json.dumps(body_content, indent=2)
else:
formatted_body = str(body_content)
if resp.status_code == 400:
logger.warning(
"Conversation wasn't deleted or didn't exist: %s\nResponse body:\n%s",
conversation_id,
formatted_body,
)
else:
print(
f"Error deleting conversation {conversation_id}: "
f"status {resp.status_code}\nResponse body:\n{formatted_body}"
)
logger.error(
"HTTP status error deleting conversation %s: %s",
conversation_id,
formatted_body,
)
raise
else:
logger.info("Conversation deleted: %s", conversation_id)
except Exception as exc:
print(f"[warn] Couldn’t delete {conversation_id}: {exc}", file=sys.stderr)
# Reference-style encoder/decoder and conversation helpers for metadata blobs.
# Single path: LZMA (xz) + latin-1 (raw). Minimal overhead: only a fixed
# plaintext sentinel appended BEFORE compression to validate integrity.
class MetadataCapacityError(Exception):
"""Raised when the encoded payload exceeds the available metadata capacity."""
def encode_metadata_blob(
data: str,
*,
max_keys: int = 16,
chunk_size: int = 512,
key_prefix: str = "datastorage",
sentinel: str = "÷÷ÿÿ",
) -> dict[str, str]:
"""
Encode `data` into a metadata dict spread across sequential keys
(e.g., datastorage01..datastorageNN), using xz (LZMA) compression and a
1:1 latin-1 encoding of compressed bytes.
The only overhead is `sentinel`, which is appended BEFORE compression and
must be present at the end of the decoded string during readback.
This encoder writes ONLY the keys required for the current data length.
It does not pre-clear or pad extra keys. Keys above `max_keys` are never
touched; if the payload would require more than `max_keys` keys, a
MetadataCapacityError is raised.
Returns
-------
dict[str, str]
Mapping of just the needed metadata keys to chunk strings.
Raises
------
MetadataCapacityError
If the encoded wire exceeds `max_keys * chunk_size` characters.
"""
import lzma # lazy import
def _key(i: int) -> str:
return f"{key_prefix}{i:02d}"
def _chunk(s: str, n: int) -> list[str]:
return [s[i : i + n] for i in range(0, len(s), n)]
# Build wire: (data + sentinel) -> UTF-8 -> LZMA -> latin-1 text
raw = (data + sentinel).encode("utf-8")
zbytes = lzma.compress(raw, preset=9) # user-established best setting
wire = zbytes.decode("latin-1") # 1 char per byte, reversible
capacity = max_keys * chunk_size
if len(wire) > capacity:
raise MetadataCapacityError(
f"Encoded payload ({len(wire)} chars) exceeds capacity ({capacity}). "
f"Reduce input size or increase max_keys."
)
parts = _chunk(wire, chunk_size)
# Only emit the keys actually used
return { _key(i + 1): parts[i] for i in range(len(parts)) }
def decode_metadata_blob(
metadata: dict[str, str],
*,
# NOTE: The decoder does not require size/length params; these remain for API
# compatibility only and are ignored.
max_keys: int = 16, # unused
chunk_size: int = 512, # unused
key_prefix: str = "datastorage",
sentinel: str = "÷÷ÿÿ",
) -> str:
"""
Smart decoder for blobs produced by `encode_metadata_blob`, tolerant of:
- variable chunk counts,
- orphan/trailing keys left by older, longer writes,
- unrelated metadata keys (e.g., {"title": "..."}).
Strategy:
- Collect all keys matching f"{key_prefix}\\d+" (e.g., datastorage01..).
- Sort by numeric index and append chunk values in order.
- After each append, attempt LZMA decompression; continue until:
• a full stream is reached and the decoded text ends with `sentinel`,
in which case return the text without the sentinel; or
• no valid stream with sentinel is found after all chunks (error).
Parameters
----------
metadata : dict[str, str]
A metadata dict possibly containing many keys; only matching chunk keys
are considered.
key_prefix : str
Prefix of the chunk keys (default "datastorage").
sentinel : str
Plaintext suffix that must be present after decompression.
Returns
-------
str
The original string (without the sentinel).
Raises
------
ValueError
If no valid compressed stream with the expected sentinel is found.
"""
import re # lazy import
import lzma # lazy import
if not isinstance(metadata, dict):
raise ValueError("metadata must be a dict of key->string")
# Find chunk keys like f"{key_prefix}01", f"{key_prefix}02", ...
pattern = re.compile(rf"^{re.escape(key_prefix)}(\d+)$")
chunks: list[tuple[int, str]] = []
for k, v in metadata.items():
if not isinstance(v, str):
continue
m = pattern.match(k)
if m:
idx = int(m.group(1))
# Keep empty strings too; they may be valid chunk boundaries.
chunks.append((idx, v))
if not chunks:
raise ValueError(f"No chunk keys found for prefix '{key_prefix}'")
chunks.sort(key=lambda kv: kv[0])
# Incrementally attempt to decode after each concatenation step.
wire_accum = ""
for _, piece in chunks:
wire_accum += piece
try:
# Use a fresh decompressor each attempt to test completeness.
dec = lzma.LZMADecompressor()
out_bytes = dec.decompress(wire_accum.encode("latin-1"))
if not dec.eof:
# Stream not complete yet; need more bytes.
continue
text = out_bytes.decode("utf-8")
if text.endswith(sentinel):
return text[: -len(sentinel)]
# Complete LZMA stream but missing sentinel; keep scanning. This
# allows forward-compat with earlier writers (rare).
except lzma.LZMAError:
# Incomplete or invalid so far; append more and retry.
continue
raise ValueError(
"Unable to reconstruct a complete LZMA stream with the expected end sentinel. "
"Check that the encoder used xz+latin-1 and appended the matching sentinel."
)
def update_conversation_metadata_blob(
conversation_id: str,
data: str,
*,
max_keys: int = 16,
chunk_size: int = 512,
key_prefix: str = "datastorage",
sentinel: str = "÷÷ÿÿ",
) -> dict[str, int | str]:
"""
Write `data` into `conversation_id` metadata using xz+raw across sequential keys.
Only the needed keys are written (no blind clearing). Keys above `max_keys`
are never overwritten.
Returns
-------
dict
Summary: {"mode": "raw_xz", "stored_chars": int, "chunk_count": int, "keys_used": int}
"""
import httpx # lazy import
# Encode into just the necessary keys
payload = encode_metadata_blob(
data,
max_keys=max_keys,
chunk_size=chunk_size,
key_prefix=key_prefix,
sentinel=sentinel,
)
wire_len = sum(len(v) for v in payload.values())
keys_used = len(payload)
chunk_count = keys_used # each value is a chunk
# Merge with existing metadata; overwrite only the keys we emit.
with httpx.Client(timeout=20) as client:
get_resp = client.get(
f"https://api.openai.com/v1/conversations/{conversation_id}",
headers=get_api_key_headers(),
)
get_resp.raise_for_status()
existing = get_resp.json().get("metadata", {}) or {}
if not isinstance(existing, dict):
existing = {}
merged = {**existing, **payload}
post_resp = client.post(
f"https://api.openai.com/v1/conversations/{conversation_id}",
headers=get_api_key_headers(),
json={"metadata": merged},
)
post_resp.raise_for_status()
return {
"mode": "raw_xz",
"stored_chars": wire_len,
"chunk_count": chunk_count,
"keys_used": keys_used,
}
def read_conversation_metadata_blob(
conversation_id: str,
*,
# Retain these kwargs for symmetry with update; decoder ignores size params.
max_keys: int = 16,
chunk_size: int = 512,
key_prefix: str = "datastorage",
sentinel: str = "÷÷ÿÿ",
) -> str:
"""
Read and decode string data from `conversation_id` metadata using the
smart, incremental decoder. Orphan keys and unrelated metadata are ignored.
"""
import httpx # lazy import
with httpx.Client(timeout=20) as client:
resp = client.get(
f"https://api.openai.com/v1/conversations/{conversation_id}",
headers=get_api_key_headers(),
)
resp.raise_for_status()
md = resp.json().get("metadata", {}) or {}
if not isinstance(md, dict):
raise ValueError("Conversation metadata is not a dict.")
return decode_metadata_blob(
md,
# size-related args are ignored by the decoder; keeping API stable
max_keys=max_keys,
chunk_size=chunk_size,
key_prefix=key_prefix,
sentinel=sentinel,
)
def demo_metadata_roundtrip() -> None:
"""
Create → store → retrieve → verify → delete.
Stores a megasample vs a mini sample repeatedly and verifies.
"""
sample = r"""ASCII/punct: !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
Latin: café naïve coöperate soufflé señor piñata
Decomposed accents: é ñ å (e+U+0301, n+U+0303, a+U+030A)
Greek: Ελληνικά — Αθήνα
Cyrillic: Привет мир
Hebrew (RTL): שלום עולם
Arabic (RTL, with diacritics): السَّلامُ عَلَيْكُمْ
Mixed bidi: ABC עברית DEF
Devanagari: नमस्ते दुनिया
Thai: สวัสดีโลก
CJK: 你好,世界;こんにちは世界;안녕하세요 세계
Rare CJK: 𠮷 野家
Math letters: 𝒜𝓑𝔠 𝔸𝕓𝕔 𝖆𝖇𝖈
Music symbol: 𝄞
Emoji: 😀 😁 😂 🤣 😜 🥳
Emoji ZWJ: 👨👩👧👦 🧑💻 👩🏽🚀
Skin tones: 👍 👍🏻 👍🏽 👍🏿
Keycaps: 1️⃣ 2️⃣ #️⃣ *️⃣
Flags: 🇺🇸 🇯🇵 🇪🇺
Variation selector: ☺️
Zero‑width joiner: AB (U+200D between)
Zero‑width non‑joiner: क्ष (U+200C)
Zero‑width space: XY (U+200B)
Directional marks: LRM RLM (U+200E U+200F)
FSI/RLI/PDI: a<U+2067>b<U+2069> a<U+2066>b<U+2069> (U+2067/U+2066/U+2069)
Quotes/backslashes: "double" 'single' backslash \ and JSON {} []
Combining stack: a̷̲̐ b̸̛͓ c͜͟͟͡͞
Here's a JSON also to test in this long data storage.
{
"type": "json_schema",
"json_schema": {
"name": "output",
"strict": true,
"schema": {
"type": "object",
"properties": {
"output_to_user": {
"type": "string",
"description": "The main message or output intended for the user."
},
"title_for_chat": {
"type": "string",
"description": "A brief, descriptive title suitable for chat summaries or UI headings."
}
},
"required": [
"output_to_user",
"title_for_chat"
],
"additionalProperties": false
}
}
}
We can do that too!
""".strip() # can cap this with [:500] slice or similar
sample2="hello"
conv_id = create_conversation(system="Demo: metadata blob storage")
for test in [sample, sample2, sample, sample2]:
try:
info = {}
print(f"-- 1. Storing {len(test)} char:\n{test}")
info = update_conversation_metadata_blob(conv_id, test)
print(f"-- 1a. Storage Report:\n{info}")
print("-- 2. Retrieving")
recovered = read_conversation_metadata_blob(conv_id)
print("-- 2a. Retrieved Back:\n", recovered)
assert recovered == test, "3. BAD/ERROR: Round-trip mismatch"
print(f"-- 3. Round-trip OK.\n{('*'*50)}")
except Exception as e:
print(e)
raise
print(read_conversation_metadata_blob(conv_id))
delete_conversation(conv_id)
if __name__ == "__main__":
demo_metadata_roundtrip()
I had o3 do the work of reading it for you, since there is no “cookbook format” for code on the forum for a walk-through.
Quick tour of the “metadata-blob” helper set
───────────────────────────────────────────
The file is a self-contained toolkit that lets you hide arbitrarily long text (dozens of KB in practice) inside the 16 metadata key/value slots that the conversation REST API gives you, while still being able to overwrite the data later and leave unrelated metadata untouched.
Below is a cheat-sheet for every public-facing helper (skipping the API-key header routine).
-
create_conversation(system=“”)
• POST /v1/conversations with one developer message.
• Returns the conversation_id so you have a server object to pin your metadata to.
-
delete_conversation(conversation_id)
• Best-effort DELETE on that conversation.
• Treats 2xx and 400 (“already gone”) as success; everything else is logged/raised.
• Lets the script clean-up conversation after a demo of success.
-
MetadataCapacityError
• Raised when you try to write more than 16 × 512 = 8192 characters after compression/encoding.
• Signals to the caller that they must shorten the payload or bump the policy.
-
encode_metadata_blob(…)
• Core writer:
(plaintext + “[end]”) → UTF-8 bytes → LZMA (xz) → decode as latin-1 (1 char = 1 byte)
• Splits that “wire” string into 512-char segments and maps them to sequential keys
datastorage01, datastorage02 … up to 16.
• Only the keys actually needed are returned; no pre-clearing of the rest.
• Raises MetadataCapacityError if the payload would overflow the allowed slots.
• Creates a metadata object to update any API endpoint that gives you the key map.
-
decode_metadata_blob(…)
• Robust reader of a passed metadata field that works even if:
– Some old, longer chunks are still present.
– Unrelated metadata keys are mixed in.
• Collects datastorageNN keys, concatenates them in numeric order, and after every
append tries to fully decompress.
• Once a complete LZMA stream ending in “[end]” is seen, it strips the sentinel and
returns the original text.
• If no valid stream is found, raises ValueError with a helpful message.
-
update_conversation_metadata_blob(conversation_id, data, …)
• High-level “write” operation:
– Calls encode_metadata_blob() → gets only the chunks it needs.
– GETs the current conversation to fetch existing metadata.
– Merges the new chunk keys into that dict (leaving everything else intact).
– POSTs the merged dict back.
• Returns a tiny status dict, e.g.
{“mode”:“raw_xz”,“stored_chars”:7412,“chunk_count”:3,“keys_used”:3}
-
read_conversation_metadata_blob(conversation_id, …)
• High-level “read” companion:
– GETs the conversation, hands the metadata dict to decode_metadata_blob(),
and returns your original string.
-
demo_metadata_roundtrip()
• Smoke-test / example usage.
• Creates a conversation, alternately stores a multi-KB multilingual sample and a short
“hello”, reads it back each time to prove the overwrite works, then deletes the conversation.
Take-aways for your own projects
────────────────────────────────
• You can treat the 16 metadata slots as a mini file-system: write big compressed blobs, overwrite selectively, and recover gracefully even if older, longer writes left trailing chunk keys.
• The writer is one-way compatible with future larger/shorter payloads; the reader is tolerant of history and ignores junk.
• LZMA compression + latin-1 encoding is 100 % reversible and stays within the “must be valid JSON string” constraint of the metadata API.
• Thanks to the sentinel and incremental decompression the reader can quickly stop as soon as the real end of stream is found—no need to concatenate any or all 16 orphaned chunks if only 3 are used.
Drop-in pattern
───────────────
conv_id = create_conversation()
update_conversation_metadata_blob(conv_id, your_long_text)
text_back = read_conversation_metadata_blob(conv_id)
assert text_back == your_long_text # raises if not a match
That’s all that’s needed to piggy-back sizeable, structured data onto a conversation object without any extra storage infrastructure.