How can I delete or entirely overwrite the 16 key metadata on a conversation?

I want to replace the metadata of a conversation with a new set of 16 key/value pairs so I can recycle those 16 keys as needed, but this doesn’t seem supported by the API.

Is there a way to do this?

This is required until individual messages or items have their own metadata in the conversations API.

1 Like

Are you looking for search ability (only in the platform site UI) or for data storage.

I’m thinking start one key with a magic string like “gzip01”, and then you’ve got 384 bytes of base64 storage for the first chunk of your 6144 bytes of compressed JSON.
Then make an encoder for the rest of the key names for 32 * 16 more bytes. Base58 should work. (Guess that would be a write-once storage.)

.gz may be more compute-efficient than my packing of 18 bit token integers…

Not exactly, just want to store “attachment ids” associated with message IDs (the key) for the last 16 messages in my conversation. My intention is to just recycle the 16 keys for the last 16 messages with attachments so I can easily look them up by message ID to reconstruct my data, but it wont let me change the keys after initializing.

I thought maybe deleting then re-adding, or clearing the entire thing and reconstructing would be the right move, but it seems like I can’t make any changes to keys after creating them with the API.

I can probably get that number much higher for you.

Here’s a utility for packing the 1516 char of its demonstration’s test string into three keys and 1208 string characters.

The encoding function provides you a parameter for the maximum number of its own keys it should create.

Against an unknown count of other keys on a conversation, I would either poll the existing key count, know your application’s count, verify the success after storage - or just use this exclusively.

import logging

logger = logging.getLogger(__name__)
import sys
import base64
from typing import Any

try:
    import httpx  # required by stream_response/async_stream_response and non-stream POSTs
except Exception as e:
    raise ImportError("Missing dependency: install with `pip install httpx`") from e

def get_api_key_headers(
    printing: bool = False,
    dotenv_override: bool = True,
    env_path: str | None = None,
) -> dict[str, str]:
    """
    Returns OpenAI API auth headers.

    Behavior:
      - Attempts to load variables from a .env file (if python-dotenv is available).
      - By default, .env VALUES OVERRIDE existing os.environ entries (dotenv_override=True).
      - You can point to a specific file via env_path; otherwise we use find_dotenv(usecwd=True).
      - Each .env path is loaded at most once per process (cached).

    Notes:
      - This updates process environment variables; in multi-account, single-process scenarios,
        prefer passing explicit headers/keys per call rather than relying on global env.

    Usage: To avoid cross-call bleed, pass a copy before mutation is possible, e.g.:
    api_call(url, headers = {**get_api_key_headers(), "OpenAI-Beta": "responses=v99"}, ...)
    """
    # One-time cache per path
    if not hasattr(get_api_key_headers, "_dotenv_loaded_paths"):
        get_api_key_headers._dotenv_loaded_paths = set()

    # Best-effort .env loading (optional dependency)
    try:
        if env_path is None:
            from dotenv import load_dotenv, find_dotenv  # lazy import
            dotenv_path = find_dotenv(usecwd=True)
        else:
            from dotenv import load_dotenv  # lazy import
            dotenv_path = env_path

        if dotenv_path and dotenv_path not in get_api_key_headers._dotenv_loaded_paths:
            load_dotenv(dotenv_path=dotenv_path, override=dotenv_override)
            get_api_key_headers._dotenv_loaded_paths.add(dotenv_path)
    except Exception:
        # Missing python-dotenv or any load failure is a no-op.
        pass

    import os
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("ERROR: Set the OPENAI_API_KEY environment variable (or in your .env).")
    org_id = os.environ.get("OPENAI_ORG_ID")
    project_id = os.environ.get("OPENAI_PROJECT_ID")

    if printing:
        print(f"Using OPENAI_API_KEY {api_key[:10]}...{api_key[-4:]}")
        print(f"Using optional OPENAI_ORG_ID {org_id}")
        print(f"Using optional OPENAI_PROJECT_ID {project_id}")

    headers: dict[str, str] = {"Authorization": f"Bearer {api_key}"}
    if org_id:
        headers["OpenAI-Organization"] = org_id
    if project_id:
        headers["OpenAI-Project"] = project_id
    return headers

def create_conversation(system: str = "") -> str:
    """
    Create a conversation containing one developer message that sets the tone.
    Returns the server-generated conversation ID.
    """
    from pathlib import Path

    payload: dict[str, object] = {
        "items": [
            {
                "type": "message",
                "role": "developer",
                "content": system.strip(),
            }
        ],
    }

    with httpx.Client(timeout=20) as client:
        response = client.post(
            "https://api.openai.com/v1/conversations",
            headers=get_api_key_headers(),
            json=payload,
        )
        response.raise_for_status()
        conversation_id: str = response.json()["id"]

    print(f"Conversation created {conversation_id}")
    return conversation_id

def delete_conversation(conversation_id: str) -> None:
    """
    Delete the conversation so the demo doesn’t leave stray server objects.
    On 2xx or 400, it's treated as success. Errors are logged to stderr.
    """
    import sys
    try:
        with httpx.Client(timeout=20) as client:
            response = client.delete(
                f"https://api.openai.com/v1/conversations/{conversation_id}",
                headers=get_api_key_headers(),
            )

        try:
            response.raise_for_status()
        except httpx.HTTPStatusError as exc:
            import json
            resp = exc.response
            # Attempt to parse JSON, fall back to raw text
            try:
                body_content = resp.json()
            except ValueError:
                body_content = resp.text

            # Format body for logging/printing
            if isinstance(body_content, (dict, list)):
                formatted_body = json.dumps(body_content, indent=2)
            else:
                formatted_body = str(body_content)

            if resp.status_code == 400:
                logger.warning(
                    "Conversation wasn't deleted or didn't exist: %s\nResponse body:\n%s",
                    conversation_id,
                    formatted_body,
                )
            else:
                print(
                    f"Error deleting conversation {conversation_id}: "
                    f"status {resp.status_code}\nResponse body:\n{formatted_body}"
                )
                logger.error(
                    "HTTP status error deleting conversation %s: %s",
                    conversation_id,
                    formatted_body,
                )
                raise
        else:
            logger.info("Conversation deleted: %s", conversation_id)
    except Exception as exc:
        print(f"[warn] Couldn’t delete {conversation_id}: {exc}", file=sys.stderr)


# Reference-style encoder/decoder and conversation helpers for metadata blobs.
# Single path: LZMA (xz) + latin-1 (raw). Minimal overhead: only a fixed
# plaintext sentinel appended BEFORE compression to validate integrity.

class MetadataCapacityError(Exception):
    """Raised when the encoded payload exceeds the available metadata capacity."""


def encode_metadata_blob(
    data: str,
    *,
    max_keys: int = 16,
    chunk_size: int = 512,
    key_prefix: str = "datastorage",
    sentinel: str = "÷÷ÿÿ",
) -> dict[str, str]:
    """
    Encode `data` into a metadata dict spread across sequential keys
    (e.g., datastorage01..datastorageNN), using xz (LZMA) compression and a
    1:1 latin-1 encoding of compressed bytes.

    The only overhead is `sentinel`, which is appended BEFORE compression and
    must be present at the end of the decoded string during readback.

    This encoder writes ONLY the keys required for the current data length.
    It does not pre-clear or pad extra keys. Keys above `max_keys` are never
    touched; if the payload would require more than `max_keys` keys, a
    MetadataCapacityError is raised.

    Returns
    -------
    dict[str, str]
        Mapping of just the needed metadata keys to chunk strings.

    Raises
    ------
    MetadataCapacityError
        If the encoded wire exceeds `max_keys * chunk_size` characters.
    """
    import lzma  # lazy import

    def _key(i: int) -> str:
        return f"{key_prefix}{i:02d}"

    def _chunk(s: str, n: int) -> list[str]:
        return [s[i : i + n] for i in range(0, len(s), n)]

    # Build wire: (data + sentinel) -> UTF-8 -> LZMA -> latin-1 text
    raw = (data + sentinel).encode("utf-8")
    zbytes = lzma.compress(raw, preset=9)  # user-established best setting
    wire = zbytes.decode("latin-1")        # 1 char per byte, reversible

    capacity = max_keys * chunk_size
    if len(wire) > capacity:
        raise MetadataCapacityError(
            f"Encoded payload ({len(wire)} chars) exceeds capacity ({capacity}). "
            f"Reduce input size or increase max_keys."
        )

    parts = _chunk(wire, chunk_size)
    # Only emit the keys actually used
    return { _key(i + 1): parts[i] for i in range(len(parts)) }


def decode_metadata_blob(
    metadata: dict[str, str],
    *,
    # NOTE: The decoder does not require size/length params; these remain for API
    # compatibility only and are ignored.
    max_keys: int = 16,              # unused
    chunk_size: int = 512,           # unused
    key_prefix: str = "datastorage",
    sentinel: str = "÷÷ÿÿ",
) -> str:
    """
    Smart decoder for blobs produced by `encode_metadata_blob`, tolerant of:
      - variable chunk counts,
      - orphan/trailing keys left by older, longer writes,
      - unrelated metadata keys (e.g., {"title": "..."}).

    Strategy:
      - Collect all keys matching f"{key_prefix}\\d+" (e.g., datastorage01..).
      - Sort by numeric index and append chunk values in order.
      - After each append, attempt LZMA decompression; continue until:
          • a full stream is reached and the decoded text ends with `sentinel`,
            in which case return the text without the sentinel; or
          • no valid stream with sentinel is found after all chunks (error).

    Parameters
    ----------
    metadata : dict[str, str]
        A metadata dict possibly containing many keys; only matching chunk keys
        are considered.
    key_prefix : str
        Prefix of the chunk keys (default "datastorage").
    sentinel : str
        Plaintext suffix that must be present after decompression.

    Returns
    -------
    str
        The original string (without the sentinel).

    Raises
    ------
    ValueError
        If no valid compressed stream with the expected sentinel is found.
    """
    import re    # lazy import
    import lzma  # lazy import

    if not isinstance(metadata, dict):
        raise ValueError("metadata must be a dict of key->string")

    # Find chunk keys like f"{key_prefix}01", f"{key_prefix}02", ...
    pattern = re.compile(rf"^{re.escape(key_prefix)}(\d+)$")
    chunks: list[tuple[int, str]] = []
    for k, v in metadata.items():
        if not isinstance(v, str):
            continue
        m = pattern.match(k)
        if m:
            idx = int(m.group(1))
            # Keep empty strings too; they may be valid chunk boundaries.
            chunks.append((idx, v))

    if not chunks:
        raise ValueError(f"No chunk keys found for prefix '{key_prefix}'")

    chunks.sort(key=lambda kv: kv[0])

    # Incrementally attempt to decode after each concatenation step.
    wire_accum = ""
    for _, piece in chunks:
        wire_accum += piece
        try:
            # Use a fresh decompressor each attempt to test completeness.
            dec = lzma.LZMADecompressor()
            out_bytes = dec.decompress(wire_accum.encode("latin-1"))
            if not dec.eof:
                # Stream not complete yet; need more bytes.
                continue
            text = out_bytes.decode("utf-8")
            if text.endswith(sentinel):
                return text[: -len(sentinel)]
            # Complete LZMA stream but missing sentinel; keep scanning. This
            # allows forward-compat with earlier writers (rare).
        except lzma.LZMAError:
            # Incomplete or invalid so far; append more and retry.
            continue

    raise ValueError(
        "Unable to reconstruct a complete LZMA stream with the expected end sentinel. "
        "Check that the encoder used xz+latin-1 and appended the matching sentinel."
    )


def update_conversation_metadata_blob(
    conversation_id: str,
    data: str,
    *,
    max_keys: int = 16,
    chunk_size: int = 512,
    key_prefix: str = "datastorage",
    sentinel: str = "÷÷ÿÿ",
) -> dict[str, int | str]:
    """
    Write `data` into `conversation_id` metadata using xz+raw across sequential keys.

    Only the needed keys are written (no blind clearing). Keys above `max_keys`
    are never overwritten.

    Returns
    -------
    dict
        Summary: {"mode": "raw_xz", "stored_chars": int, "chunk_count": int, "keys_used": int}
    """
    import httpx  # lazy import

    # Encode into just the necessary keys
    payload = encode_metadata_blob(
        data,
        max_keys=max_keys,
        chunk_size=chunk_size,
        key_prefix=key_prefix,
        sentinel=sentinel,
    )
    wire_len = sum(len(v) for v in payload.values())
    keys_used = len(payload)
    chunk_count = keys_used  # each value is a chunk

    # Merge with existing metadata; overwrite only the keys we emit.
    with httpx.Client(timeout=20) as client:
        get_resp = client.get(
            f"https://api.openai.com/v1/conversations/{conversation_id}",
            headers=get_api_key_headers(),
        )
        get_resp.raise_for_status()
        existing = get_resp.json().get("metadata", {}) or {}
        if not isinstance(existing, dict):
            existing = {}

        merged = {**existing, **payload}

        post_resp = client.post(
            f"https://api.openai.com/v1/conversations/{conversation_id}",
            headers=get_api_key_headers(),
            json={"metadata": merged},
        )
        post_resp.raise_for_status()

    return {
        "mode": "raw_xz",
        "stored_chars": wire_len,
        "chunk_count": chunk_count,
        "keys_used": keys_used,
    }


def read_conversation_metadata_blob(
    conversation_id: str,
    *,
    # Retain these kwargs for symmetry with update; decoder ignores size params.
    max_keys: int = 16,
    chunk_size: int = 512,
    key_prefix: str = "datastorage",
    sentinel: str = "÷÷ÿÿ",
) -> str:
    """
    Read and decode string data from `conversation_id` metadata using the
    smart, incremental decoder. Orphan keys and unrelated metadata are ignored.
    """
    import httpx  # lazy import

    with httpx.Client(timeout=20) as client:
        resp = client.get(
            f"https://api.openai.com/v1/conversations/{conversation_id}",
            headers=get_api_key_headers(),
        )
        resp.raise_for_status()
        md = resp.json().get("metadata", {}) or {}
        if not isinstance(md, dict):
            raise ValueError("Conversation metadata is not a dict.")

    return decode_metadata_blob(
        md,
        # size-related args are ignored by the decoder; keeping API stable
        max_keys=max_keys,
        chunk_size=chunk_size,
        key_prefix=key_prefix,
        sentinel=sentinel,
    )


def demo_metadata_roundtrip() -> None:
    """
    Create → store → retrieve → verify → delete.
    Stores a megasample vs a mini sample repeatedly and verifies.
    """

    sample = r"""ASCII/punct: !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
Latin: café naïve coöperate soufflé señor piñata
Decomposed accents: é ñ å (e+U+0301, n+U+0303, a+U+030A)
Greek: Ελληνικά — Αθήνα
Cyrillic: Привет мир
Hebrew (RTL): שלום עולם
Arabic (RTL, with diacritics): السَّلامُ عَلَيْكُمْ
Mixed bidi: ABC ‏עברית‏ DEF
Devanagari: नमस्ते दुनिया
Thai: สวัสดีโลก
CJK: 你好,世界;こんにちは世界;안녕하세요 세계
Rare CJK: 𠮷 野家
Math letters: 𝒜𝓑𝔠 𝔸𝕓𝕔 𝖆𝖇𝖈
Music symbol: 𝄞
Emoji: 😀 😁 😂 🤣 😜 🥳
Emoji ZWJ: 👨‍👩‍👧‍👦 🧑‍💻 👩🏽‍🚀
Skin tones: 👍 👍🏻 👍🏽 👍🏿
Keycaps: 1️⃣ 2️⃣ #️⃣ *️⃣
Flags: 🇺🇸 🇯🇵 🇪🇺
Variation selector: ☺️
Zero‑width joiner: A‍B (U+200D between)
Zero‑width non‑joiner: क्‌ष (U+200C)
Zero‑width space: X​Y (U+200B)
Directional marks: LRM‎ RLM‏ (U+200E U+200F)
FSI/RLI/PDI: a<U+2067>b<U+2069> a<U+2066>b<U+2069> (U+2067/U+2066/U+2069)
Quotes/backslashes: "double" 'single' backslash \ and JSON {} []
Combining stack: a̷̲̐ b̸̛͓ c͜͟͟͡͞

Here's a JSON also to test in this long data storage.
{
  "type": "json_schema",
  "json_schema": {
    "name": "output",
    "strict": true,
    "schema": {
      "type": "object",
      "properties": {
        "output_to_user": {
          "type": "string",
          "description": "The main message or output intended for the user."
        },
        "title_for_chat": {
          "type": "string",
          "description": "A brief, descriptive title suitable for chat summaries or UI headings."
        }
      },
      "required": [
        "output_to_user",
        "title_for_chat"
      ],
      "additionalProperties": false
    }
  }
}
We can do that too!
""".strip()  # can cap this with [:500] slice or similar
    sample2="hello"

    conv_id = create_conversation(system="Demo: metadata blob storage")
    for test in [sample, sample2, sample, sample2]:
        try:
            info = {}
            print(f"-- 1. Storing {len(test)} char:\n{test}")
            info = update_conversation_metadata_blob(conv_id, test)
            print(f"-- 1a. Storage Report:\n{info}")
            print("-- 2. Retrieving")
            recovered = read_conversation_metadata_blob(conv_id)
            print("-- 2a. Retrieved Back:\n", recovered)
            assert recovered == test, "3. BAD/ERROR: Round-trip mismatch"
            print(f"-- 3. Round-trip OK.\n{('*'*50)}")
        except Exception as e:
            print(e)
            raise
    print(read_conversation_metadata_blob(conv_id))
    delete_conversation(conv_id)


if __name__ == "__main__":
    demo_metadata_roundtrip()

I had o3 do the work of reading it for you, since there is no “cookbook format” for code on the forum for a walk-through.

Quick tour of the “metadata-blob” helper set
───────────────────────────────────────────
The file is a self-contained toolkit that lets you hide arbitrarily long text (dozens of KB in practice) inside the 16 metadata key/value slots that the conversation REST API gives you, while still being able to overwrite the data later and leave unrelated metadata untouched.
Below is a cheat-sheet for every public-facing helper (skipping the API-key header routine).

  1. create_conversation(system=“”)
    • POST /v1/conversations with one developer message.
    • Returns the conversation_id so you have a server object to pin your metadata to.

  2. delete_conversation(conversation_id)
    • Best-effort DELETE on that conversation.
    • Treats 2xx and 400 (“already gone”) as success; everything else is logged/raised.
    • Lets the script clean-up conversation after a demo of success.

  3. MetadataCapacityError
    • Raised when you try to write more than 16 × 512 = 8192 characters after compression/encoding.
    • Signals to the caller that they must shorten the payload or bump the policy.

  4. encode_metadata_blob(…)
    • Core writer:
    (plaintext + “[end]”) → UTF-8 bytes → LZMA (xz) → decode as latin-1 (1 char = 1 byte)
    • Splits that “wire” string into 512-char segments and maps them to sequential keys
    datastorage01, datastorage02 … up to 16.
    • Only the keys actually needed are returned; no pre-clearing of the rest.
    • Raises MetadataCapacityError if the payload would overflow the allowed slots.
    • Creates a metadata object to update any API endpoint that gives you the key map.

  5. decode_metadata_blob(…)
    • Robust reader of a passed metadata field that works even if:
    – Some old, longer chunks are still present.
    – Unrelated metadata keys are mixed in.
    • Collects datastorageNN keys, concatenates them in numeric order, and after every
    append tries to fully decompress.
    • Once a complete LZMA stream ending in “[end]” is seen, it strips the sentinel and
    returns the original text.
    • If no valid stream is found, raises ValueError with a helpful message.

  6. update_conversation_metadata_blob(conversation_id, data, …)
    • High-level “write” operation:
    – Calls encode_metadata_blob() → gets only the chunks it needs.
    – GETs the current conversation to fetch existing metadata.
    – Merges the new chunk keys into that dict (leaving everything else intact).
    – POSTs the merged dict back.
    • Returns a tiny status dict, e.g.
    {“mode”:“raw_xz”,“stored_chars”:7412,“chunk_count”:3,“keys_used”:3}

  7. read_conversation_metadata_blob(conversation_id, …)
    • High-level “read” companion:
    – GETs the conversation, hands the metadata dict to decode_metadata_blob(),
    and returns your original string.

  8. demo_metadata_roundtrip()
    • Smoke-test / example usage.
    • Creates a conversation, alternately stores a multi-KB multilingual sample and a short
    “hello”, reads it back each time to prove the overwrite works, then deletes the conversation.

Take-aways for your own projects
────────────────────────────────
• You can treat the 16 metadata slots as a mini file-system: write big compressed blobs, overwrite selectively, and recover gracefully even if older, longer writes left trailing chunk keys.

• The writer is one-way compatible with future larger/shorter payloads; the reader is tolerant of history and ignores junk.

• LZMA compression + latin-1 encoding is 100 % reversible and stays within the “must be valid JSON string” constraint of the metadata API.

• Thanks to the sentinel and incremental decompression the reader can quickly stop as soon as the real end of stream is found—no need to concatenate any or all 16 orphaned chunks if only 3 are used.

Drop-in pattern
───────────────

conv_id = create_conversation()
update_conversation_metadata_blob(conv_id, your_long_text)
text_back = read_conversation_metadata_blob(conv_id)
assert text_back == your_long_text  # raises if not a match

That’s all that’s needed to piggy-back sizeable, structured data onto a conversation object without any extra storage infrastructure.

You can update metadata by sending a new dict with the changes.
If a key is new, it will be added and if you want to delete you can set an existing key value to None.

In the example below you will add a new key and delete another (if it exists). The changes apply over the existing metadata and will return the updated information:

updated = client.conversations.update(
  "conv_123",
  metadata={"new_key": "project-x", 
            "key_to_delete": None}
)
print(updated.to_dict())

Source

1 Like

Yep - as RESTful API interaction it would be “null”. Tested that it is gone, not that you can do this 100 times without exhausting keys.

(metadata empty)

Choose an action:
  [1] Set metadata key/value
  [2] List metadata
  [3] Delete metadata key (send null)
  [4] Delete conversation and exit
>1
Key name: test_key
Value: Hello key, I want to delete you!
Updated metadata: test_key = Hello key, I want to delete you!

Choose an action:
  [1] Set metadata key/value
  [2] List metadata
  [3] Delete metadata key (send null)
  [4] Delete conversation and exit
>2
test_key: Hello key, I want to delete you!

Choose an action:
  [1] Set metadata key/value
  [2] List metadata
  [3] Delete metadata key (send null)
  [4] Delete conversation and exit
>3
Key to delete (send null): test_key
Sent null for key: test_key

Choose an action:
  [1] Set metadata key/value
  [2] List metadata
  [3] Delete metadata key (send null)
  [4] Delete conversation and exit
>2
(metadata empty)

Choose an action:
  [1] Set metadata key/value
  [2] List metadata...
Demo Python code, no SDK

Just AI with a bunch of input training already, needing the black formatter to be presentable.

#!/usr/bin/env python3
"""
Console utility to experiment with conversation metadata updates.

Environment:
  OPENAI_API_KEY  (required)

Actions:
  1) Set key/value in metadata (POST {"metadata": {key: value}})
  2) List metadata (GET ... -> print one key/value per line)
  3) Delete key (best-effort) by sending null (POST {"metadata": {key: null}})
  4) Delete conversation and exit (guaranteed cleanup)

This script creates a temporary conversation on start and cleans it up on exit.
"""

from __future__ import annotations

import os
import sys
from typing import Any

import httpx


API_BASE = "https://api.openai.com/v1"


# --------------------------
# HTTP helpers
# --------------------------


def _headers() -> dict[str, str]:
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY is not set.")
    return {"Authorization": f"Bearer {api_key}"}


def _print_http_error(exc: httpx.HTTPStatusError) -> None:
    resp = exc.response
    try:
        body = resp.json()
    except Exception:
        body = resp.text
    print(f"[HTTP {resp.status_code}] {body}", file=sys.stderr)


# --------------------------
# Conversations API helpers
# --------------------------


def create_conversation(client: httpx.Client) -> str:
    """
    POST /conversations -> returns conversation id
    """
    payload = {
        "items": [
            {
                "type": "message",
                "role": "developer",
                "content": "Temporary conversation for metadata experiments.",
            }
        ]
    }
    resp = client.post(f"{API_BASE}/conversations", headers=_headers(), json=payload)
    resp.raise_for_status()
    cid = resp.json()["id"]
    return cid


def get_conversation_metadata(
    client: httpx.Client, conversation_id: str
) -> dict[str, Any]:
    """
    GET /conversations/{id} -> returns metadata dict (possibly empty)
    """
    resp = client.get(f"{API_BASE}/conversations/{conversation_id}", headers=_headers())
    resp.raise_for_status()
    meta = resp.json().get("metadata") or {}
    return meta if isinstance(meta, dict) else {}


def post_conversation_metadata_update(
    client: httpx.Client,
    conversation_id: str,
    patch: dict[str, Any],
) -> None:
    """
    POST /conversations/{id} with {"metadata": patch}
    (Server should merge/update keys present in patch; others unchanged.)
    """
    resp = client.post(
        f"{API_BASE}/conversations/{conversation_id}",
        headers=_headers(),
        json={"metadata": patch},
    )
    resp.raise_for_status()


def delete_conversation(client: httpx.Client, conversation_id: str) -> None:
    """
    DELETE /conversations/{id} (best-effort cleanup)
    """
    resp = client.delete(
        f"{API_BASE}/conversations/{conversation_id}", headers=_headers()
    )
    # Treat 2xx as success; surface non-2xx for visibility
    resp.raise_for_status()


# --------------------------
# Console UI
# --------------------------

MENU = """
Choose an action:
  [1] Set metadata key/value
  [2] List metadata
  [3] Delete metadata key (send null)
  [4] Delete conversation and exit
> """.rstrip()


def main() -> None:
    try:
        with httpx.Client(timeout=20) as client:
            # Create temporary conversation
            try:
                conversation_id = create_conversation(client)
            except httpx.HTTPStatusError as exc:
                _print_http_error(exc)
                return
            except Exception as exc:
                print(f"Error creating conversation: {exc}", file=sys.stderr)
                return

            print(f"Conversation created: {conversation_id}")

            # REPL loop
            while True:
                try:
                    choice = input(MENU).strip()
                except (EOFError, KeyboardInterrupt):
                    print("\nExiting…")
                    break

                if choice == "1":
                    key = input("Key name: ").strip()
                    value = input("Value: ").strip()
                    try:
                        post_conversation_metadata_update(
                            client, conversation_id, {key: value}
                        )
                        print(f"Updated metadata: {key} = {value}")
                    except httpx.HTTPStatusError as exc:
                        _print_http_error(exc)
                    except Exception as exc:
                        print(f"Error updating metadata: {exc}", file=sys.stderr)

                elif choice == "2":
                    try:
                        meta = get_conversation_metadata(client, conversation_id)
                        if not meta:
                            print("(metadata empty)")
                        else:
                            for k in sorted(meta.keys()):
                                print(f"{k}: {meta[k]}")
                    except httpx.HTTPStatusError as exc:
                        _print_http_error(exc)
                    except Exception as exc:
                        print(f"Error reading metadata: {exc}", file=sys.stderr)

                elif choice == "3":
                    key = input("Key to delete (send null): ").strip()
                    try:
                        post_conversation_metadata_update(
                            client, conversation_id, {key: None}
                        )
                        print(f"Sent null for key: {key}")
                    except httpx.HTTPStatusError as exc:
                        _print_http_error(exc)
                    except Exception as exc:
                        print(f"Error deleting key: {exc}", file=sys.stderr)

                elif choice == "4":
                    try:
                        delete_conversation(client, conversation_id)
                        print(f"Conversation deleted: {conversation_id}")
                    except httpx.HTTPStatusError as exc:
                        _print_http_error(exc)
                    except Exception as exc:
                        print(f"Error deleting conversation: {exc}", file=sys.stderr)
                    finally:
                        break

                else:
                    print("Please enter 1, 2, 3, or 4.")

    except Exception as exc:
        print(f"Fatal error: {exc}", file=sys.stderr)


if __name__ == "__main__":
    main()