Disposition: Verified, replicated
- The Chat Completions API is providing the model an altered version of the context and significantly overbilling when the input of parallel tool call return text includes multi-byte UTF-8 languages.
Half the worldโs population is paying 5x as much for asking about their documents
I sat down an created much cleaner replication code that returns the result of one, two, or three tool calls having been โmadeโ in parallel (when there is more than one tool return, that implicit return implies the call was via triggering the multi_tool_use wrapper that makes parallel function calls possible).
The effect is dramatic, for a 252 token tool โresponseโ for each and all of the tool calls:
USAGE (1 tool call(s)): input: 321 (uncached 321, cached 0); output: 264 (non-reasoning 264, reasoning 0)
USAGE (2 tool call(s)): input: 2473 (uncached 2473, cached 0); output: 185 (non-reasoning 185, reasoning 0)
USAGE (3 tool call(s)): input: 3675 (uncached 3675, cached 0); output: 205 (non-reasoning 205, reasoning 0)
What should have been a difference not much more than the 252 tokens of return message is 1200 tokens. A five-fold amplification - as if we were sending the text as "\uBBF8" or b'\xeb\xaf\xb8' instead of "๋ฏธ".
That the AI model input is being affected can also be seen in the response to an English question, replicated in repeated API calls:
One tool return: Korean language
์ฐ๋ฆฌ ํ์ฌ์ ํต์ฌ ์ ์ฑ
์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค:
1. ํ๊ฒฝ ์ํฅ ์ ๊ทน ๊ฐ์ ๋ฐ ์ง์๊ฐ๋ฅ์ฑ ๊ฐ์ํ: ๋ชจ๋ ๊ธฐ์ ๊ณ์ธต
Two or three: English produced by the AI
Our company's core policies are:
1. Environmental Responsibility: We aim to minimize environmental impact and build sustainable AI systems.
Replication code
- uses
aiohttp to make direct RESTful calls - no foreign AI library to blame.
import os
import json
import asyncio
from typing import Any, AsyncIterator, Literal, TypedDict, NotRequired, cast
import aiohttp
OPENAI_BASE_URL = "https://api.openai.com/v1"
MODEL = "gpt-4.1"
STREAM = True
class TextContentPart(TypedDict):
type: Literal["text"]
text: str
ContentPart = TextContentPart
MessageContent = list[ContentPart]
class FunctionToolCall(TypedDict):
name: str
arguments: str # JSON string in the OpenAI API
class ToolCall(TypedDict):
id: str
type: Literal["function"]
function: FunctionToolCall
class SystemMessage(TypedDict):
role: Literal["system", "developer"]
content: MessageContent
class UserMessage(TypedDict):
role: Literal["user"]
content: MessageContent
class AssistantMessage(TypedDict):
role: Literal["assistant"]
content: MessageContent
tool_calls: NotRequired[list[ToolCall] | None]
class ToolMessage(TypedDict):
role: Literal["tool"]
content: MessageContent
tool_call_id: str
ChatMessage = SystemMessage | UserMessage | AssistantMessage | ToolMessage
## 252 o200k_base tokens of Korean sent as tool response
tool_response:str = """
๋ฏธ์
์ ์ธ๋ฌธ
์ฐ๋ฆฌ์ ๋ฏธ์
ํ๊ฒฝ ์ํฅ์ ์ ๊ทน์ ์ผ๋ก ๊ฐ์์ํค๊ณ ์ ์ธ๊ณ ์ง์๊ฐ๋ฅ์ฑ์ ๊ฐ์ํํ๋ ์ธ๊ณต์ง๋ฅ ์์คํ
์ ์ค๊ณํ๊ณ ๊ตฌ์ถํฉ๋๋ค.
๋ฐ์ดํฐ ์์ง, ๋ชจ๋ธ ํ์ต, ์ธํ๋ผ, ๋ฐฐํฌ์ ์ด๋ฅด๊ธฐ๊น์ง ๊ธฐ์ ์ ๋ชจ๋ ๊ณ์ธต์ ์ํ์ ์ฑ
์์ ๋ด์ฌํํฉ๋๋ค.
๋ชจ๋ ์ฐ๊ตฌ, ์์ง๋์ด๋ง ๋ฐ ์ด์ ์์ฌ๊ฒฐ์ ์์ ์๋์ง ํจ์จ์ฑ, ํ์ ๊ฐ์ถ, ์์ ์ต์ ํ๋ฅผ ์ต์ฐ์ ์ผ๋ก ํฉ๋๋ค.
๊ธฐํ ํ๋ณต๋ ฅ, ์ํ ๊ฒฝ์ , ์ฌ์์ ์ค์ฒ, ์ฅ๊ธฐ์ ์ง๊ตฌ ๊ฑด๊ฐ์ ์ง์ํ๋ AI ์๋ฃจ์
์ ๊ฐ๋ฐํฉ๋๋ค.
ํฌ๋ช
์ฑ๊ณผ ์ธก์ ๊ฐ๋ฅํ ํ๊ฒฝ์ ์ฑ
์, ๊ณผํ ๊ธฐ๋ฐ ์ง์๊ฐ๋ฅ์ฑ ๋ชฉํ๋ฅผ ๋ฐํ์ผ๋ก ์ด์ํฉ๋๋ค.
์ ๋ถ, ๊ธฐ์
, ์ฐ๊ตฌ๊ธฐ๊ด ๋ฐ ์ง์ญ์ฌํ์ ํ๋ ฅํ์ฌ ๊ธ์ ์ ์ธ ์ํ์ ์ํฅ์ ํ์ฅํฉ๋๋ค.
์ฐ๋ฆฌ์ ๊ธฐ์ ์ด ์ค๋ฆฌ์ ์ผ๋ก ์ค๊ณ๋๊ณ ์ฌํ์ ์ฑ
์์ ๋คํ๋ฉฐ, ์ธ๋ ๊ฐ ๋ณต์ง์ ์กฐํ๋ฅผ ์ด๋ฃจ๋๋ก ๋ณด์ฅํฉ๋๋ค.
์ฐ๋ฆฌ๊ฐ ์๋นํ๋ ๊ฒ๋ณด๋ค ๋ ๋ง์ด ์ง๊ตฌ์ ํ์ํ๋ ์ํ๊ฒฝ์ ๊ธฐ์ฌ๋ฅผ ๋ชฉํ๋ก ์ง์์ ์ผ๋ก ํ์ ํฉ๋๋ค.
""".strip()
### or uncomment to make the tool return message 1 token
#tool_response:str = "null"
chat_history: list[ChatMessage] = [
{
"role": "system",
"content": [
{
"type": "text",
"text": (
"You answer briefly, helpfully, accurately, and truthfully. "
"Answer questions by first using search_user_documents for specialized knowledge."
)
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": "What are our company's core policies?"
}
]
},
{
"role": "assistant",
"content": [],
"tool_calls": [
{
"id": "call_mmsUvjRiiSy6ogY0FD9tI1j4",
"type": "function",
"function": {
"name": "search_user_documents",
"arguments": "{\"query\": \"core company policies\"}"
}
},
{
"id": "call_JtDnxpgHepJzx07TLtKeicWW",
"type": "function",
"function": {
"name": "search_user_documents",
"arguments": "{\"query\": \"employee handbook policies\"}"
}
},
{
"id": "call_dKkEiweLNwgWkuZ96zeOYuv6",
"type": "function",
"function": {
"name": "search_user_documents",
"arguments": "{\"query\": \"company code of conduct\"}"
}
}
]
},
{
"role": "tool",
"content": [
{
"type": "text",
"text": tool_response
}
],
"tool_call_id": "call_mmsUvjRiiSy6ogY0FD9tI1j4"
},
{
"role": "tool",
"content": [
{
"type": "text",
"text": tool_response
}
],
"tool_call_id": "call_JtDnxpgHepJzx07TLtKeicWW"
},
{
"role": "tool",
"content": [
{
"type": "text",
"text": tool_response
}
],
"tool_call_id": "call_dKkEiweLNwgWkuZ96zeOYuv6"
},
]
chat_response: Any = None
chat_usage: dict[str, Any] | None = None
def _make_openai_headers(api_key: str) -> dict[str, str]:
return {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
async def _raise_for_status(resp: aiohttp.ClientResponse) -> None:
if 200 <= resp.status < 300:
return
text = await resp.text()
raise RuntimeError(f"OpenAI API error {resp.status}: {text}")
async def _iter_sse_json(resp: aiohttp.ClientResponse) -> AsyncIterator[dict[str, Any]]:
buffer = ""
async for chunk in resp.content.iter_any():
buffer += chunk.decode("utf-8", errors="replace")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line.startswith("data:"):
continue
data = line[5:].strip()
if not data or data == "[DONE]":
if data == "[DONE]":
return
continue
yield json.loads(data)
def post_to_openai(
session: aiohttp.ClientSession,
base_url: str,
path: str,
body: dict[str, Any],
*,
stream: bool = False,
) -> AsyncIterator[dict[str, Any]]:
url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
async def gen() -> AsyncIterator[dict[str, Any]]:
async with session.post(url, json=body) as resp:
await _raise_for_status(resp)
if not stream:
yield await resp.json(content_type=None)
return
async for event in _iter_sse_json(resp):
yield event
return gen()
async def _one(it: AsyncIterator[dict[str, Any]]) -> dict[str, Any]:
async for item in it:
return item
raise RuntimeError("Expected exactly one item, got none.")
class OpenAITransport:
def __init__(self, *, base_url: str = OPENAI_BASE_URL, api_key: str | None = None) -> None:
key = api_key or os.environ.get("OPENAI_API_KEY") or ""
if not key:
raise RuntimeError("OPENAI_API_KEY is not set.")
self.base_url = base_url
self._headers = _make_openai_headers(key)
self._session: aiohttp.ClientSession | None = None
async def open(self) -> None:
if self._session is None or self._session.closed:
timeout = aiohttp.ClientTimeout(total=60)
self._session = aiohttp.ClientSession(headers=self._headers, timeout=timeout)
async def aclose(self) -> None:
if self._session is not None and not self._session.closed:
await self._session.close()
@property
def session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
raise RuntimeError("OpenAITransport is not open.")
return self._session
def post(self, path: str, body: dict[str, Any], *, stream: bool = False) -> AsyncIterator[dict[str, Any]]:
return post_to_openai(self.session, self.base_url, path, body, stream=stream)
_openai: OpenAITransport | None = None
async def _get_openai() -> OpenAITransport:
global _openai
if _openai is None:
_openai = OpenAITransport()
await _openai.open()
return _openai
async def _chat_text_chunks(
openai: OpenAITransport,
msg_list: list[ChatMessage],
*,
stream: bool,
) -> AsyncIterator[str]:
global chat_response
global chat_usage
chat_usage = None
params_template: dict[str, Any] = {"model": MODEL, "max_tokens": 2000, "top_p": 0.1}
messages = msg_list
request: dict[str, Any] = {**params_template, "messages": messages}
if stream:
request["stream"] = True
request["stream_options"] = {
"include_obfuscation": False,
"include_usage": True,
}
events: list[dict[str, Any]] = []
async for event in openai.post("/chat/completions", request, stream=True):
events.append(event)
usage = event.get("usage")
if isinstance(usage, dict):
chat_usage = usage
choices = event.get("choices") or []
if not choices:
continue
delta = choices[0].get("delta") or {}
content = delta.get("content")
if isinstance(content, str) and content:
yield content
chat_response = events
return
payload = await _one(openai.post("/chat/completions", request, stream=False))
chat_response = payload
usage = payload.get("usage")
if isinstance(usage, dict):
chat_usage = usage
yield payload["choices"][0]["message"]["content"]
async def get_chat_response(msg_list: list[ChatMessage], *, stream: bool = False) -> AsyncIterator[str]:
openai = await _get_openai()
async for chunk in _chat_text_chunks(openai, msg_list, stream=stream):
yield chunk
def pretty_usage_table(usage_data: dict, one_line=False) -> str:
'''Returns printable multi-line table string with usage, optionally a single line
Compatible with Responses or Chat Completions API; prints only useful information
'''
# process any usage object input to chat completions form
normalized_usage = {
key.replace("input_", "prompt_").replace("output_", "completion_"): value
for key, value in usage_data.items()
}
# Totals and detail breakdowns
total_prompt_tokens = normalized_usage.get("prompt_tokens", 0)
total_completion_tokens = normalized_usage.get("completion_tokens", 0)
prompt_detail = normalized_usage.get("prompt_tokens_details", {})
completion_detail = normalized_usage.get("completion_tokens_details", {})
cached_prompt_tokens = prompt_detail.get("cached_tokens", 0)
audio_prompt_tokens = prompt_detail.get("audio_tokens", 0)
reasoning_completion_tokens = completion_detail.get("reasoning_tokens", 0)
audio_completion_tokens = completion_detail.get("audio_tokens", 0)
prompt_column: list[str] = [
f"input tokens: {total_prompt_tokens}",
f"uncached: {(uncached := total_prompt_tokens - cached_prompt_tokens)}",
f"cached: {cached_prompt_tokens}",
]
completion_column: list[str] = [
f"output tokens: {total_completion_tokens}",
f"non-reasoning: {(nonreasoning := total_completion_tokens - reasoning_completion_tokens)}",
f"reasoning: {reasoning_completion_tokens}",
]
# Include audio breakdown if present
if audio_prompt_tokens or audio_completion_tokens:
prompt_column.append(f"non-audio: {(nonaudio_prompt := total_prompt_tokens - audio_prompt_tokens)}")
prompt_column.append(f"audio: {audio_prompt_tokens}")
completion_column.append(f"non-audio: {(nonaudio_completion := total_completion_tokens - audio_completion_tokens)}")
completion_column.append(f"audio: {audio_completion_tokens}")
prompt_width = max(len(cell) for cell in prompt_column)
completion_width = max(len(cell) for cell in completion_column)
table_lines: list[str] = []
table_lines.append(f"| {'-' * prompt_width} | {'-' * completion_width} |")
table_lines.append(f"| {prompt_column[0].ljust(prompt_width)} | {completion_column[0].ljust(completion_width)} |")
table_lines.append(f"| {'-' * prompt_width} | {'-' * completion_width} |")
for left_cell, right_cell in zip(prompt_column[1:], completion_column[1:]):
table_lines.append(f"| {left_cell.ljust(prompt_width)} | {right_cell.ljust(completion_width)} |")
# One-line summary uses the earlier assignments
prompt_audio_str = f", audio {audio_prompt_tokens}" if audio_prompt_tokens else ""
completion_audio_str = f", audio {audio_completion_tokens}" if audio_completion_tokens else ""
single_line = (
f"input: {total_prompt_tokens} (uncached {uncached}, cached {cached_prompt_tokens}{prompt_audio_str}); "
f"output: {total_completion_tokens} (non-reasoning {nonreasoning}, "
f"reasoning {reasoning_completion_tokens}{completion_audio_str})"
)
return "\n" + single_line if one_line else "\n" + "\n".join(table_lines)
def make_chat_history_with_n_tool_pairs(
base_history: list[ChatMessage],
n_tool_pairs: int,
) -> list[ChatMessage]:
"""
Returns a new chat history containing:
- system message
- user message
- assistant message with only the first N tool_calls
- only the first N tool messages (tool responses)
Assumes base_history is in this exact shape:
[system, user, assistant_with_tool_calls, tool_msg_1, tool_msg_2, ...]
"""
if n_tool_pairs < 1:
raise ValueError("n_tool_pairs must be >= 1")
system_msg = base_history[0]
user_msg = base_history[1]
assistant_msg = cast(AssistantMessage, base_history[2])
tool_msgs = base_history[3:]
all_tool_calls = assistant_msg.get("tool_calls") or []
new_assistant_msg: AssistantMessage = {
**assistant_msg,
"tool_calls": list(all_tool_calls[:n_tool_pairs]),
}
return [
system_msg,
user_msg,
new_assistant_msg,
*tool_msgs[:n_tool_pairs],
]
async def main() -> None:
global chat_history
global chat_usage
base_history = chat_history
for n_tool_pairs in (1, 2, 3):
scenario_history = make_chat_history_with_n_tool_pairs(base_history, n_tool_pairs)
print()
print("=" * 80)
print(f"RUN: {n_tool_pairs} tool call(s) + {n_tool_pairs} tool response message(s)")
print("=" * 80)
parts: list[str] = []
async for chunk in get_chat_response(scenario_history, stream=STREAM):
print(chunk, end="", flush=True)
parts.append(chunk)
print()
if chat_usage is not None:
# One-line usage summary, as requested
usage_line = pretty_usage_table(chat_usage, one_line=True).strip()
print(f"USAGE ({n_tool_pairs} tool call(s)): {usage_line}")
else:
print(f"USAGE ({n_tool_pairs} tool call(s)): <missing usage>")
async def _run() -> None:
try:
await main()
finally:
if _openai is not None:
await _openai.aclose()
if __name__ == "__main__":
asyncio.run(_run())
Youโll get three runs at different tool call count, the AI streaming a response to each, and then the usage report.