The number of output tokens CANNOT be obtained by counting the chunks of the stream. The delta chunks are transmitted in representable glyphs at a minimum. Letâs say I request a bunch of emoji, multi-byte sometimes where tokens combine to represent one code point glyph, and some can even be multiple code point combos.
Responses API call:
=== Assembled Output (verbatim) ===

























































































































=== Streaming & Tokenization Report ===
Delta text chunks counted: 122
output_tokens: 245
reasoning_tokens: 0
(output_tokens - reasoning_tokens): 245
Observation: The number of streamed delta chunks does NOT equal the number of output tokens.
However, what if I set max_output_tokens too low, the minimum being 25 for some reason, terminating that output â and then I never get a final response.completed event with the usage report? The code has nothing to report on:
=== Assembled Output (verbatim) ===

















=== Streaming & Tokenization Report ===
Delta text chunks counted: 17
Final usage was not returned in âresponse.completedâ; cannot report token counts for this run.
The delta chunks we know to be unreliable - I requested a cutoff at 25 tokens.
However, if using the API parameter store: True (the default on the Responses API endpoint), the API calls are logged, and essentially forever if you donât go cleaning them up per-call.
This can also deliver the usage of incomplete responses. The response ID is in the very first event, so we can immediately store it. Chaining the incomplete call, along with a call to retrieve the model response (as you might do in background mode also):
[Fallback retrieval used] GET /v1/responses/{response_id} to obtain usage.
response_id: resp_68b9d2689e4c81969f729d5290cd6e6508195443e4383370
=== Assembled Output (verbatim) ===
















=== Streaming & Tokenization Report ===
Delta text chunks counted: 16
output_tokens: 25
reasoning_tokens: 0
(output_tokens - reasoning_tokens): 25
Usage source: retrieval
Observation: The number of streamed delta chunks does NOT equal the number of output tokens.
Code to show this being walked through, speeded up by gpt-5 being the author to re-engineer my code for a purpose:
#!/usr/bin/env python3
"""
Streaming + fallback retrieval demo for OpenAI Responses API using httpx.
Scenario:
- We stream a response with max_output_tokens=25 (forced premature termination).
- Count 'response.output_text.delta' chunks and assemble the streamed text.
- Capture the server-assigned response_id from 'response.created' immediately.
- If the stream ends without a 'response.completed' usage object,
we issue a GET /v1/responses/{response_id} to retrieve the stored response,
extract the usage, and produce the same report.
This demonstrates:
1) Counting streamed delta chunks is not a proxy for token usage.
2) Even if a streaming run ends without a usage object in-stream (e.g., stopped by length),
the persisted response_id can be used to retrieve the final usage.
Requirements:
pip install httpx
export OPENAI_API_KEY="sk-..."
"""
from __future__ import annotations
import json
import os
import sys
from typing import Any, Optional
import httpx
API_URL = "https://api.openai.com/v1/responses"
MODEL = "gpt-4.1" # Use a model available to your org.
TIMEOUT = httpx.Timeout(30.0, connect=10.0, read=30.0, write=30.0)
def require_api_key() -> str:
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
print("ERROR: OPENAI_API_KEY is not set in the environment.", file=sys.stderr)
sys.exit(2)
return api_key
def build_payload_for_truncated_stream() -> dict[str, Any]:
"""
Build a streaming payload that will be truncated by max_output_tokens=25.
We request ONLY simple, single-codepoint emoji, in one line, no spaces, no punctuation,
and no multi-codepoint constructs (ZWJ, skin tones, VS-16).
"""
emoji_request = (
"Output exactly 500 emoji characters in one single line.\n"
"Rules:\n"
"- Use ONLY simple, single-codepoint emoji (no zero-width joiners, no skin-tone modifiers, no variation selectors like VS-16).\n"
"- No spaces, punctuation, or line breaks.\n"
"- Do not include text or anything besides the emoji characters.\n"
"- Avoid any emoji that rely on ZWJ or variation selectors (e.g., âšď¸, âď¸, âď¸, â¤ď¸, family/flag sequences, skin tones).\n"
"Return ONLY the emoji characters and nothing else."
)
return {
"model": MODEL,
"instructions": "You are a careful formatter who strictly follows rules.",
"input": emoji_request,
"stream": True,
"store": True, # ensure response is persisted server-side
"max_output_tokens": 25, # force premature termination by length
}
def process_event(event: str, data: str, state: dict[str, Any]) -> None:
"""
Handle an SSE event from the Responses API.
- On 'response.created'/'response.in_progress': capture response_id early.
- On 'response.output_text.delta': increment delta chunk count, append to assembled text.
- On 'response.completed': capture final usage.
"""
try:
payload = json.loads(data)
except json.JSONDecodeError:
return
etype = payload.get("type")
if etype in ("response.created", "response.in_progress"):
# Capture the response_id as early as possible.
response_obj = payload.get("response") or {}
rid = response_obj.get("id")
if rid and not state.get("response_id"):
state["response_id"] = rid
elif etype == "response.output_text.delta":
delta = payload.get("delta", "")
if isinstance(delta, str) and delta:
state["assembled_text"].append(delta)
state["delta_chunk_count"] += 1
elif etype == "response.completed":
response_obj = payload.get("response") or {}
rid = response_obj.get("id")
if rid and not state.get("response_id"):
state["response_id"] = rid
usage = response_obj.get("usage") or {}
out_tokens = usage.get("output_tokens")
details = usage.get("output_tokens_details") or {}
reasoning_tokens = details.get("reasoning_tokens", 0)
state["completed"] = True
state["output_tokens"] = out_tokens
state["reasoning_tokens"] = reasoning_tokens
state["usage_source"] = "stream"
def stream_request(client: httpx.Client, api_key: str, payload: dict[str, Any]) -> dict[str, Any]:
"""
Post the streaming request and iterate SSE lines, parsing 'event:' and 'data:'.
Returns a state dict with assembled text, delta chunk count, response_id, and usage if present.
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
state: dict[str, Any] = {
"assembled_text": [],
"delta_chunk_count": 0,
"completed": False,
"output_tokens": None,
"reasoning_tokens": 0,
"usage_source": None,
"response_id": None,
}
with client.stream("POST", API_URL, headers=headers, json=payload) as resp:
resp.raise_for_status()
current_event: Optional[str] = None
data_lines: list[str] = []
for raw_line in resp.iter_lines():
line = raw_line.strip()
if not line:
if current_event and data_lines:
data = "\n".join(data_lines)
process_event(current_event, data, state)
current_event = None
data_lines.clear()
continue
if line.startswith("event:"):
current_event = line[len("event:"):].strip()
elif line.startswith("data:"):
data_lines.append(line[len("data:"):].strip())
else:
# ignore other SSE fields (e.g., id:, comments)
pass
return state
def fetch_response_by_id(client: httpx.Client, api_key: str, response_id: str) -> dict[str, Any]:
"""
GET the persisted response object by id.
Returns the parsed JSON body.
"""
url = f"{API_URL}/{response_id}"
headers = {
"Authorization": f"Bearer {api_key}",
}
r = client.get(url, headers=headers, timeout=TIMEOUT)
r.raise_for_status()
return r.json()
def print_report(
assembled: str,
delta_chunk_count: int,
output_tokens: Optional[int],
reasoning_tokens: int,
usage_source: Optional[str],
retrieval_status: Optional[str] = None,
) -> None:
print("\n=== Assembled Output (verbatim) ===")
print(assembled)
print("\n=== Streaming & Tokenization Report ===")
print(f"Delta text chunks counted: {delta_chunk_count}")
if output_tokens is None:
print("Final usage was not returned; cannot report token counts for this run.")
if retrieval_status:
print(f"Retrieved response status: {retrieval_status}")
return
diff = output_tokens - (reasoning_tokens or 0)
print(f"output_tokens: {output_tokens}")
print(f"reasoning_tokens: {reasoning_tokens}")
print(f"(output_tokens - reasoning_tokens): {diff}")
if usage_source:
print(f"Usage source: {usage_source}")
if delta_chunk_count != diff:
print("\nObservation: The number of streamed delta chunks does NOT equal the number of output tokens.")
else:
print("\nNote: In this particular run, delta chunk count happened to equal the reported token measure; "
"this is coincidental and not reliable for token accounting.")
def main() -> None:
api_key = require_api_key()
payload = build_payload_for_truncated_stream()
try:
with httpx.Client(timeout=TIMEOUT) as client:
# Part 1: Stream with max_output_tokens=25 (forced premature stop)
state = stream_request(client, api_key, payload)
assembled = "".join(state["assembled_text"])
delta_chunk_count = state["delta_chunk_count"]
output_tokens = state["output_tokens"]
reasoning_tokens = state["reasoning_tokens"]
usage_source = state["usage_source"]
response_id = state.get("response_id")
# If usage wasn't delivered during stream, Part 2: fallback GET by response_id
if output_tokens is None:
if not response_id:
print("ERROR: Stream ended without usage and without a response_id; cannot retrieve usage.", file=sys.stderr)
print_report(assembled, delta_chunk_count, None, 0, None)
sys.exit(1)
# Retrieve persisted response and extract usage/output for reporting
retrieved = fetch_response_by_id(client, api_key, response_id)
# Extract usage if present
usage = retrieved.get("usage") or {}
output_tokens = usage.get("output_tokens")
details = usage.get("output_tokens_details") or {}
reasoning_tokens = details.get("reasoning_tokens", 0)
usage_source = "retrieval"
status = retrieved.get("status")
# If the streamed assembled text is very short (due to early truncation),
# it can be helpful to re-assemble from the persisted output too (optional).
# We'll prefer the streamed assembly we already captured; but if it is empty,
# try to reconstruct from the retrieved object.
if not assembled:
# Best-effort reassembly of assistant text from the retrieved object.
out_items = retrieved.get("output") or []
buf: list[str] = []
for item in out_items:
if item.get("type") == "message":
for part in item.get("content") or []:
if part.get("type") == "output_text":
t = part.get("text", "")
if t:
buf.append(t)
if buf:
assembled = "".join(buf).strip()
# Print final report using retrieved usage
print("\n[Fallback retrieval used] GET /v1/responses/{response_id} to obtain usage.")
print(f"response_id: {response_id}")
print_report(assembled, delta_chunk_count, output_tokens, reasoning_tokens, usage_source, retrieval_status=status)
else:
# We had usage in-stream; just report
print_report(assembled, delta_chunk_count, output_tokens, reasoning_tokens, usage_source)
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code} - {e.response.text}", file=sys.stderr)
sys.exit(1)
except httpx.RequestError as e:
print(f"Request error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
The input prompt count is also available there (not what I cared about in this case).
It does seam feasible to have a preliminary usage object event for measuring the input, but you know what you sent. If you want counting you donât do yourself, and donât like storing user data, send the call again to the cheapest model with the same features, like gpt-5-nano, shutting it off at max_output_tokens to just pay for the input. (just doesnât work if you use the âconversationsâ mechanism).
Note: that with the internal tools iterator, what you sent as input may be far less than what you are billed for input, because of the internal calling and recalling and adding to the context window.