Massive Token Overestimation in Multiple Tool Calls due to Internal Unicode Escaping

When submitting Chat Completions requests containing multiple tool messages (e.g., when an agent uses multiple tool calls), the OpenAI backend significantly overcharges prompt tokens if the tool outputs contain non-ASCII characters (such as Korean, Japanese, or Chinese).

Based on extensive payload analysis and isolated testing, it appears the backend serialization process forces Unicode escaping (similar to json.dumps(..., ensure_ascii=True)) on the messages array when len(tool_calls) > 1.

Consequently, a single non-ASCII character (e.g., the Korean character "ํ…Œ์ŠคํŠธ", normally 1-2 tokens) is converted into its literal escape sequence (e.g., \ud14c\uc2a4\ud2b8) before tokenization. The tokenizer then evaluates these 18 ASCII characters individually, resulting in a 10x to 15x token bloat for the exact same payload.

This bug does not occur with single tool calls, nor does it occur with pure English (ASCII) tool outputs.

Environment

  • Models Tested: gpt-4.1, gpt-5.2
  • Endpoints: Both Public OpenAI API and Azure OpenAI Service exhibit the exact same behavior.

Evidence & Token Calculation

In a production environment, we observed severe discrepancies between the physical payload size and the billed tokens:

  • Actual Payload Bytes: ~130 KB
  • Expected Tokens (via tiktoken o200k_base): ~42,384 tokens
  • Billed Prompt Tokens: 117,071 tokens (Approx. 2.8x inflation)

In controlled dummy tests using pure strings ("ํ…Œ์ŠคํŠธ " repeated 1,000 times vs "test " repeated 1,000 times):

  1. 1 Tool Call (Korean): Billed 1,121 tokens (Matches tiktoken calculation).
  2. 3 Multiple Tool Calls (English): Billed 3,191 tokens (Normal linear scaling).
  3. 3 Multiple Tool Calls (Korean): Billed 39,235 tokens (Expected ~3,300 tokens. 11.8x inflation).

Reproduction Code

The following script uses LangGraph to construct the agentic workflow.

import os
import tiktoken
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_openai import AzureChatOpenAI, ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from dotenv import load_dotenv
from langfuse.langchain import CallbackHandler

load_dotenv(override=True)
langfuse_handler = CallbackHandler()

# DUMMY_1000_TOKENS = "test " * 1000

@tool
def get_dummy_data(data_id: str) -> str:
    """
    Returns dummy data for the given ID.
    """
    return f"Data for {data_id}:\n{(data_id + ' ') * 1000}"

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

llm = AzureChatOpenAI(
    model_name="gpt-4.1",
    api_key=os.getenv("AZURE_API_KEY"),
    api_version="2025-04-01-preview",
    azure_endpoint=os.getenv("AZURE_API_ENDPOINT"),
    temperature=0,
)

tools = [get_dummy_data]
llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=True)

def agent_node(state: AgentState):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: AgentState):
    last_msg = state["messages"][-1]
    if last_msg.tool_calls:
        return "tools"
    return END

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))

workflow.add_edge(START, "agent")

# workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
workflow.add_conditional_edges("agent", tools_condition)
workflow.add_edge("tools", "agent")

graph = workflow.compile()

def run_test(test_name: str, system_prompt: str, user_prompt: str):
    print(f"\n{'='*50}\n[TEST] {test_name}\n{'='*50}")

    inputs = {
        "messages": [
            SystemMessage(content=system_prompt),
            HumanMessage(content=user_prompt)
        ]
    }
    config = {"callbacks": [langfuse_handler], "run_name": test_name,  "recursion_limit": 100}
    result = graph.invoke(inputs, config=config)
    final_message = result["messages"][-1]
   
    print(f"Total messages in state: {len(result['messages'])}")
    if hasattr(final_message, 'response_metadata') and 'token_usage' in final_message.response_metadata:
        usage = final_message.response_metadata['token_usage']
        print(f"Total tokens billed by server: {usage.get('total_tokens')}")
        print(f" - Prompt tokens: {usage.get('prompt_tokens')}")
        print(f" - Completion tokens: {usage.get('completion_tokens')}")
    else:
        print("Token usage information not found.")

if __name__ == "__main__":
    SYSTEM_PROMPT = "You are a helpful assistant."

    # [Scenario 1] Single Tool Call
    run_test(
        "Single Tool Call",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for ID 'ํ…Œ์ŠคํŠธ' and summarize it."
    ) # ํ…Œ์ŠคํŠธ means 'Test'

    # [Scenario 2] Multiple Tool Calls (3 items)
    run_test(
        "Multiple Tool Calls (3 items)",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for the following 3 IDs simultaneously: 'ํ…Œ์ŠคํŠธ', 'ํ…Œ์ŠคํŠธ', 'ํ…Œ์ŠคํŠธ', and summarize each of them."
    )

    # [Scenario 3] Single Tool Call
    run_test(
        "Single Tool Call",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for ID '1' and summarize it."
    )

    # [Scenario 4] Multiple Tool Calls (3 items)
    run_test(
        "Multiple Tool Calls (3 items)",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for the following 3 IDs simultaneously: '1', '2', '3', and summarize each of them."
    )

Impact

This backend serialization bug creates a severe financial and performance penalty for global developers building Agentic workflows (ReAct, function calling loops) in non-English languages. A single execution of multiple tool calls can silently drain an API budget 10x faster than expected due to phantom tokens generated by Unicode escaping.

We kindly request the engineering team to review serialization pipeline for ToolMessage array parsing and ensure that Unicode characters are not escaped when len(tool_calls) > 1.

2 Likes

I went a head and escalated this one. Not sure it is a bug or a feature.

2 Likes

Thank you for escalating this issue.

To further isolate the root cause and prove this is not a third-party framework (LangChain) issue, I benchmarked the exact same payload across multiple LLM providers.

The results show that this Unicode escaping bloat is strictly isolated to OpenAIโ€™s backend.

Tool Call Payload OpenAI gpt-4.1 OpenAI gpt-5.2 grok-4 Kimi-K2.5 gemini-2.5-pro
โ€œtestโ€ (1 call) 1,168 1,208 1,274 1,152 1,330
โ€œtestโ€ x 3 calls 3,329 3,283 3,901 3,270 3,727
โ€œํ…Œ์ŠคํŠธโ€ (1 call) 1,121 1,211 1,665 1,155 1,087
โ€œํ…Œ์ŠคํŠธโ€ x 3 calls 39,235 39,325 3,936 3,272 3,146

I hope this helps the engineering team deploy a hotfix quickly.

Can you give a simple code snippet that shows this (no API keys please). Having a hard time recreating this.

Sure, here it is.

This reproduction script isolates the issue by testing both English (test) and Korean (ํ…Œ์ŠคํŠธ) payloads with single vs. multiple tool calls. If you run this script with OpenAI models, you will observe the prompt tokens exponentially inflate only on Scenario 2 (Multiple Korean Tool Calls).

versions

# langchain 1.2.10
# langgraph 1.0.9
# langchain-openai 1.1.10
# langchain_google_genai 4.2.1

GPT, grok, Kimi

import os
import tiktoken
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_openai import AzureChatOpenAI, ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition

@tool
def get_dummy_data(data_id: str) -> str:
    """
    Returns dummy data for the given ID.
    """
    return f"Data for {data_id}:\n{(data_id + ' ') * 1000}"

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

llm = AzureChatOpenAI(
    model_name="gpt-4.1", # grok-4, Kimi-K2.5
    # api_key="...",
    # azure_endpoint="...",
    # api_version="2025-04-01-preview",
    temperature=0,
)

# llm = ChatOpenAI(
#     model="gpt-4.1",
#     api_key="",
#     temperature=0,
# )

tools = [get_dummy_data]
llm_with_tools = llm.bind_tools(tools, parallel_tool_calls=True)

def agent_node(state: AgentState):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: AgentState):
    last_msg = state["messages"][-1]
    if last_msg.tool_calls:
        return "tools"
    return END

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", tools_condition)
workflow.add_edge("tools", "agent")

graph = workflow.compile()

def run_test(test_name: str, system_prompt: str, user_prompt: str):
    print(f"\n{'='*50}\n[TEST] {test_name}\n{'='*50}")

    inputs = {
        "messages": [
            SystemMessage(content=system_prompt),
            HumanMessage(content=user_prompt)
        ]
    }
    config = {"run_name": test_name,  "recursion_limit": 100}
    result = graph.invoke(inputs, config=config)
    final_message = result["messages"][-1]
    print(f"Total messages in state: {len(result['messages'])}")
    if hasattr(final_message, 'response_metadata') and 'token_usage' in final_message.response_metadata:
        usage = final_message.response_metadata['token_usage']
        print(f"Total tokens billed by server: {usage.get('total_tokens')}")
        print(f" - Prompt tokens: {usage.get('prompt_tokens')}")
        print(f" - Completion tokens: {usage.get('completion_tokens')}")
    else:
        print("Token usage information not found.")

if __name__ == "__main__":
    SYSTEM_PROMPT = "You are a helpful assistant."

    # [Scenario 1] Single Tool Call
    run_test(
        "Single Tool Call",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for ID 'ํ…Œ์ŠคํŠธ' and summarize it."
    ) # ํ…Œ์ŠคํŠธ means 'Test'

    # [Scenario 2] Multiple Tool Calls (3 items)
    run_test(
        "Multiple Tool Calls (3 items)",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for the following 3 IDs simultaneously: 'ํ…Œ์ŠคํŠธ', 'ํ…Œ์ŠคํŠธ', 'ํ…Œ์ŠคํŠธ', and summarize each of them."
    )

    # [Scenario 3] Single Tool Call
    run_test(
        "Single Tool Call",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for ID '1' and summarize it."
    )

    # [Scenario 4] Multiple Tool Calls (3 items)
    run_test(
        "Multiple Tool Calls (3 items)",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for the following 3 IDs simultaneously: '1', '2', '3', and summarize each of them."
    )

Gemini

import os
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_google_genai import ChatGoogleGenerativeAI

@tool
def get_dummy_data(data_id: str) -> str:
    """
    Returns dummy data for the given ID.
    """
    return f"Data for {data_id}:\n{(data_id + ' ') * 1000}"

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]


base_url = ""
api_key = ""
model_name = "gemini-2.5-pro"


client_options = {}
if base_url:
    client_options["api_endpoint"] = f"{base_url}/gemini"

llm = ChatGoogleGenerativeAI(
    model=model_name,
    google_api_key=api_key,
    temperature=0,
    client_options=client_options if client_options else None
)

tools = [get_dummy_data]
llm_with_tools = llm.bind_tools(tools)

def agent_node(state: AgentState):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: AgentState):
    last_msg = state["messages"][-1]
    if hasattr(last_msg, 'tool_calls') and last_msg.tool_calls:
        return "tools"
    return END

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", tools_condition)
workflow.add_edge("tools", "agent")

graph = workflow.compile()


def run_test(test_name: str, system_prompt: str, user_prompt: str):
    print(f"\n{'='*50}\n[TEST] {test_name}\n{'='*50}")

    inputs = {
        "messages": [
            SystemMessage(content=system_prompt),
            HumanMessage(content=user_prompt)
        ]
    }

    config = {"run_name": test_name, "recursion_limit": 100}
    result = graph.invoke(inputs, config=config)
    final_message = result["messages"][-1]

    print(f"Total messages in state: {len(result['messages'])}")


    if hasattr(final_message, 'usage_metadata') and final_message.usage_metadata:
        usage = final_message.usage_metadata
        print(f"Total tokens billed by server: {usage.get('total_tokens')}")
        print(f" - Prompt tokens: {usage.get('input_tokens')}")
        print(f" - Completion tokens: {usage.get('output_tokens')}")
    elif hasattr(final_message, 'response_metadata') and 'token_usage' in final_message.response_metadata:
        usage = final_message.response_metadata['token_usage']
        print(f"Total tokens billed by server: {usage.get('total_tokens')}")
        print(f" - Prompt tokens: {usage.get('prompt_tokens')}")
        print(f" - Completion tokens: {usage.get('completion_tokens')}")
    else:
        print("Token usage information not found.")


if __name__ == "__main__":
    SYSTEM_PROMPT = "You are a helpful assistant."

    # [Scenario 1] Single Tool Call
    run_test(
        "Single Tool Call",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for ID 'ํ…Œ์ŠคํŠธ' and summarize it."
    ) # ํ…Œ์ŠคํŠธ means 'Test'

    # [Scenario 2] Multiple Tool Calls (3 items)
    run_test(
        "Multiple Tool Calls (3 items)",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for the following 3 IDs simultaneously: 'ํ…Œ์ŠคํŠธ', 'ํ…Œ์ŠคํŠธ', 'ํ…Œ์ŠคํŠธ', and summarize each of them."
    )

    # [Scenario 3] Single Tool Call
    run_test(
        "Single Tool Call",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for ID '1' and summarize it."
    )

    # [Scenario 4] Multiple Tool Calls (3 items)
    run_test(
        "Multiple Tool Calls (3 items)",
        SYSTEM_PROMPT,
        "Using the get_dummy_data tool, fetch the data for the following 3 IDs simultaneously: '1', '2', '3', and summarize each of them."
    )

Disposition: Verified, replicated

  • The Chat Completions API is providing the model an altered version of the context and significantly overbilling when the input of parallel tool call return text includes multi-byte UTF-8 languages.

Half the worldโ€™s population is paying 5x as much for asking about their documents

I sat down an created much cleaner replication code that returns the result of one, two, or three tool calls having been โ€œmadeโ€ in parallel (when there is more than one tool return, that implicit return implies the call was via triggering the multi_tool_use wrapper that makes parallel function calls possible).

The effect is dramatic, for a 252 token tool โ€œresponseโ€ for each and all of the tool calls:

USAGE (1 tool call(s)): input: 321 (uncached 321, cached 0); output: 264 (non-reasoning 264, reasoning 0)
USAGE (2 tool call(s)): input: 2473 (uncached 2473, cached 0); output: 185 (non-reasoning 185, reasoning 0)
USAGE (3 tool call(s)): input: 3675 (uncached 3675, cached 0); output: 205 (non-reasoning 205, reasoning 0)

What should have been a difference not much more than the 252 tokens of return message is 1200 tokens. A five-fold amplification - as if we were sending the text as "\uBBF8" or b'\xeb\xaf\xb8' instead of "๋ฏธ".


That the AI model input is being affected can also be seen in the response to an English question, replicated in repeated API calls:

One tool return: Korean language

์šฐ๋ฆฌ ํšŒ์‚ฌ์˜ ํ•ต์‹ฌ ์ •์ฑ…์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค:

1. ํ™˜๊ฒฝ ์˜ํ–ฅ ์ ๊ทน ๊ฐ์†Œ ๋ฐ ์ง€์†๊ฐ€๋Šฅ์„ฑ ๊ฐ€์†ํ™”: ๋ชจ๋“  ๊ธฐ์ˆ  ๊ณ„์ธต

Two or three: English produced by the AI

Our company's core policies are:

1. Environmental Responsibility: We aim to minimize environmental impact and build sustainable AI systems.

Replication code

  • uses aiohttp to make direct RESTful calls - no foreign AI library to blame.
import os
import json
import asyncio
from typing import Any, AsyncIterator, Literal, TypedDict, NotRequired, cast

import aiohttp

OPENAI_BASE_URL = "https://api.openai.com/v1"
MODEL = "gpt-4.1"
STREAM = True

class TextContentPart(TypedDict):
    type: Literal["text"]
    text: str

ContentPart = TextContentPart
MessageContent = list[ContentPart]

class FunctionToolCall(TypedDict):
    name: str
    arguments: str  # JSON string in the OpenAI API

class ToolCall(TypedDict):
    id: str
    type: Literal["function"]
    function: FunctionToolCall

class SystemMessage(TypedDict):
    role: Literal["system", "developer"]
    content: MessageContent

class UserMessage(TypedDict):
    role: Literal["user"]
    content: MessageContent

class AssistantMessage(TypedDict):
    role: Literal["assistant"]
    content: MessageContent
    tool_calls: NotRequired[list[ToolCall] | None]

class ToolMessage(TypedDict):
    role: Literal["tool"]
    content: MessageContent
    tool_call_id: str

ChatMessage = SystemMessage | UserMessage | AssistantMessage | ToolMessage

## 252 o200k_base tokens of Korean sent as tool response
tool_response:str = """
๋ฏธ์…˜ ์„ ์–ธ๋ฌธ

์šฐ๋ฆฌ์˜ ๋ฏธ์…˜

ํ™˜๊ฒฝ ์˜ํ–ฅ์„ ์ ๊ทน์ ์œผ๋กœ ๊ฐ์†Œ์‹œํ‚ค๊ณ  ์ „ ์„ธ๊ณ„ ์ง€์†๊ฐ€๋Šฅ์„ฑ์„ ๊ฐ€์†ํ™”ํ•˜๋Š” ์ธ๊ณต์ง€๋Šฅ ์‹œ์Šคํ…œ์„ ์„ค๊ณ„ํ•˜๊ณ  ๊ตฌ์ถ•ํ•ฉ๋‹ˆ๋‹ค.

๋ฐ์ดํ„ฐ ์ˆ˜์ง‘, ๋ชจ๋ธ ํ•™์Šต, ์ธํ”„๋ผ, ๋ฐฐํฌ์— ์ด๋ฅด๊ธฐ๊นŒ์ง€ ๊ธฐ์ˆ ์˜ ๋ชจ๋“  ๊ณ„์ธต์— ์ƒํƒœ์  ์ฑ…์ž„์„ ๋‚ด์žฌํ™”ํ•ฉ๋‹ˆ๋‹ค.

๋ชจ๋“  ์—ฐ๊ตฌ, ์—”์ง€๋‹ˆ์–ด๋ง ๋ฐ ์šด์˜ ์˜์‚ฌ๊ฒฐ์ •์—์„œ ์—๋„ˆ์ง€ ํšจ์œจ์„ฑ, ํƒ„์†Œ ๊ฐ์ถ•, ์ž์› ์ตœ์ ํ™”๋ฅผ ์ตœ์šฐ์„ ์œผ๋กœ ํ•ฉ๋‹ˆ๋‹ค.

๊ธฐํ›„ ํšŒ๋ณต๋ ฅ, ์ˆœํ™˜ ๊ฒฝ์ œ, ์žฌ์ƒ์  ์‹ค์ฒœ, ์žฅ๊ธฐ์  ์ง€๊ตฌ ๊ฑด๊ฐ•์„ ์ง€์›ํ•˜๋Š” AI ์†”๋ฃจ์…˜์„ ๊ฐœ๋ฐœํ•ฉ๋‹ˆ๋‹ค.

ํˆฌ๋ช…์„ฑ๊ณผ ์ธก์ • ๊ฐ€๋Šฅํ•œ ํ™˜๊ฒฝ์  ์ฑ…์ž„, ๊ณผํ•™ ๊ธฐ๋ฐ˜ ์ง€์†๊ฐ€๋Šฅ์„ฑ ๋ชฉํ‘œ๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์šด์˜ํ•ฉ๋‹ˆ๋‹ค.

์ •๋ถ€, ๊ธฐ์—…, ์—ฐ๊ตฌ๊ธฐ๊ด€ ๋ฐ ์ง€์—ญ์‚ฌํšŒ์™€ ํ˜‘๋ ฅํ•˜์—ฌ ๊ธ์ •์ ์ธ ์ƒํƒœ์  ์˜ํ–ฅ์„ ํ™•์žฅํ•ฉ๋‹ˆ๋‹ค.

์šฐ๋ฆฌ์˜ ๊ธฐ์ˆ ์ด ์œค๋ฆฌ์ ์œผ๋กœ ์„ค๊ณ„๋˜๊ณ  ์‚ฌํšŒ์  ์ฑ…์ž„์„ ๋‹คํ•˜๋ฉฐ, ์„ธ๋Œ€ ๊ฐ„ ๋ณต์ง€์™€ ์กฐํ™”๋ฅผ ์ด๋ฃจ๋„๋ก ๋ณด์žฅํ•ฉ๋‹ˆ๋‹ค.

์šฐ๋ฆฌ๊ฐ€ ์†Œ๋น„ํ•˜๋Š” ๊ฒƒ๋ณด๋‹ค ๋” ๋งŽ์ด ์ง€๊ตฌ์— ํ™˜์›ํ•˜๋Š” ์ˆœํ™˜๊ฒฝ์  ๊ธฐ์—ฌ๋ฅผ ๋ชฉํ‘œ๋กœ ์ง€์†์ ์œผ๋กœ ํ˜์‹ ํ•ฉ๋‹ˆ๋‹ค.
""".strip()

### or uncomment to make the tool return message 1 token
#tool_response:str = "null"

chat_history: list[ChatMessage] = [
    {
      "role": "system",
      "content": [
        {
          "type": "text",
          "text": (
              "You answer briefly, helpfully, accurately, and truthfully. "
              "Answer questions by first using search_user_documents for specialized knowledge."
          )
        }
      ]
    },
    {
      "role": "user",
      "content": [
        {
          "type": "text",
          "text": "What are our company's core policies?"
        }
      ]
    },
    {
      "role": "assistant",
      "content": [],
      "tool_calls": [
        {
          "id": "call_mmsUvjRiiSy6ogY0FD9tI1j4",
          "type": "function",
          "function": {
            "name": "search_user_documents",
            "arguments": "{\"query\": \"core company policies\"}"
          }
        },
        {
          "id": "call_JtDnxpgHepJzx07TLtKeicWW",
          "type": "function",
          "function": {
            "name": "search_user_documents",
            "arguments": "{\"query\": \"employee handbook policies\"}"
          }
        },
        {
          "id": "call_dKkEiweLNwgWkuZ96zeOYuv6",
          "type": "function",
          "function": {
            "name": "search_user_documents",
            "arguments": "{\"query\": \"company code of conduct\"}"
          }
        }
      ]
    },
    {
      "role": "tool",
      "content": [
        {
          "type": "text",
          "text": tool_response
        }
      ],
      "tool_call_id": "call_mmsUvjRiiSy6ogY0FD9tI1j4"
    },
    {
      "role": "tool",
      "content": [
        {
          "type": "text",
          "text": tool_response
        }
      ],
      "tool_call_id": "call_JtDnxpgHepJzx07TLtKeicWW"
    },
    {
      "role": "tool",
      "content": [
        {
          "type": "text",
          "text": tool_response
        }
      ],
      "tool_call_id": "call_dKkEiweLNwgWkuZ96zeOYuv6"
    },
  ]

chat_response: Any = None
chat_usage: dict[str, Any] | None = None

def _make_openai_headers(api_key: str) -> dict[str, str]:
    return {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }


async def _raise_for_status(resp: aiohttp.ClientResponse) -> None:
    if 200 <= resp.status < 300:
        return
    text = await resp.text()
    raise RuntimeError(f"OpenAI API error {resp.status}: {text}")


async def _iter_sse_json(resp: aiohttp.ClientResponse) -> AsyncIterator[dict[str, Any]]:
    buffer = ""
    async for chunk in resp.content.iter_any():
        buffer += chunk.decode("utf-8", errors="replace")
        while "\n" in buffer:
            line, buffer = buffer.split("\n", 1)
            line = line.strip()
            if not line.startswith("data:"):
                continue

            data = line[5:].strip()
            if not data or data == "[DONE]":
                if data == "[DONE]":
                    return
                continue

            yield json.loads(data)


def post_to_openai(
    session: aiohttp.ClientSession,
    base_url: str,
    path: str,
    body: dict[str, Any],
    *,
    stream: bool = False,
) -> AsyncIterator[dict[str, Any]]:
    url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"

    async def gen() -> AsyncIterator[dict[str, Any]]:
        async with session.post(url, json=body) as resp:
            await _raise_for_status(resp)
            if not stream:
                yield await resp.json(content_type=None)
                return
            async for event in _iter_sse_json(resp):
                yield event

    return gen()


async def _one(it: AsyncIterator[dict[str, Any]]) -> dict[str, Any]:
    async for item in it:
        return item
    raise RuntimeError("Expected exactly one item, got none.")


class OpenAITransport:
    def __init__(self, *, base_url: str = OPENAI_BASE_URL, api_key: str | None = None) -> None:
        key = api_key or os.environ.get("OPENAI_API_KEY") or ""
        if not key:
            raise RuntimeError("OPENAI_API_KEY is not set.")

        self.base_url = base_url
        self._headers = _make_openai_headers(key)
        self._session: aiohttp.ClientSession | None = None

    async def open(self) -> None:
        if self._session is None or self._session.closed:
            timeout = aiohttp.ClientTimeout(total=60)
            self._session = aiohttp.ClientSession(headers=self._headers, timeout=timeout)

    async def aclose(self) -> None:
        if self._session is not None and not self._session.closed:
            await self._session.close()

    @property
    def session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            raise RuntimeError("OpenAITransport is not open.")
        return self._session

    def post(self, path: str, body: dict[str, Any], *, stream: bool = False) -> AsyncIterator[dict[str, Any]]:
        return post_to_openai(self.session, self.base_url, path, body, stream=stream)


_openai: OpenAITransport | None = None


async def _get_openai() -> OpenAITransport:
    global _openai
    if _openai is None:
        _openai = OpenAITransport()
    await _openai.open()
    return _openai

async def _chat_text_chunks(
    openai: OpenAITransport,
    msg_list: list[ChatMessage],
    *,
    stream: bool,
) -> AsyncIterator[str]:
    global chat_response
    global chat_usage

    chat_usage = None

    params_template: dict[str, Any] = {"model": MODEL, "max_tokens": 2000, "top_p": 0.1}
    messages = msg_list

    request: dict[str, Any] = {**params_template, "messages": messages}
    if stream:
        request["stream"] = True
        request["stream_options"] = {
            "include_obfuscation": False,
            "include_usage": True,
        }

        events: list[dict[str, Any]] = []
        async for event in openai.post("/chat/completions", request, stream=True):
            events.append(event)

            usage = event.get("usage")
            if isinstance(usage, dict):
                chat_usage = usage

            choices = event.get("choices") or []
            if not choices:
                continue
            delta = choices[0].get("delta") or {}
            content = delta.get("content")
            if isinstance(content, str) and content:
                yield content

        chat_response = events
        return

    payload = await _one(openai.post("/chat/completions", request, stream=False))
    chat_response = payload

    usage = payload.get("usage")
    if isinstance(usage, dict):
        chat_usage = usage

    yield payload["choices"][0]["message"]["content"]


async def get_chat_response(msg_list: list[ChatMessage], *, stream: bool = False) -> AsyncIterator[str]:
    openai = await _get_openai()
    async for chunk in _chat_text_chunks(openai, msg_list, stream=stream):
        yield chunk

def pretty_usage_table(usage_data: dict, one_line=False) -> str:
    '''Returns printable multi-line table string with usage, optionally a single line
Compatible with Responses or Chat Completions API; prints only useful information
    '''
    # process any usage object input to chat completions form
    normalized_usage = {
        key.replace("input_", "prompt_").replace("output_", "completion_"): value
        for key, value in usage_data.items()
    }

    # Totals and detail breakdowns
    total_prompt_tokens = normalized_usage.get("prompt_tokens", 0)
    total_completion_tokens = normalized_usage.get("completion_tokens", 0)

    prompt_detail = normalized_usage.get("prompt_tokens_details", {})
    completion_detail = normalized_usage.get("completion_tokens_details", {})

    cached_prompt_tokens = prompt_detail.get("cached_tokens", 0)
    audio_prompt_tokens = prompt_detail.get("audio_tokens", 0)

    reasoning_completion_tokens = completion_detail.get("reasoning_tokens", 0)
    audio_completion_tokens = completion_detail.get("audio_tokens", 0)

    prompt_column: list[str] = [
        f"input tokens: {total_prompt_tokens}",
        f"uncached: {(uncached := total_prompt_tokens - cached_prompt_tokens)}",
        f"cached: {cached_prompt_tokens}",
    ]
    completion_column: list[str] = [
        f"output tokens: {total_completion_tokens}",
        f"non-reasoning: {(nonreasoning := total_completion_tokens - reasoning_completion_tokens)}",
        f"reasoning: {reasoning_completion_tokens}",
    ]

    # Include audio breakdown if present
    if audio_prompt_tokens or audio_completion_tokens:
        prompt_column.append(f"non-audio: {(nonaudio_prompt := total_prompt_tokens - audio_prompt_tokens)}")
        prompt_column.append(f"audio: {audio_prompt_tokens}")
        completion_column.append(f"non-audio: {(nonaudio_completion := total_completion_tokens - audio_completion_tokens)}")
        completion_column.append(f"audio: {audio_completion_tokens}")

    prompt_width = max(len(cell) for cell in prompt_column)
    completion_width = max(len(cell) for cell in completion_column)

    table_lines: list[str] = []
    table_lines.append(f"| {'-' * prompt_width} | {'-' * completion_width} |")
    table_lines.append(f"| {prompt_column[0].ljust(prompt_width)} | {completion_column[0].ljust(completion_width)} |")
    table_lines.append(f"| {'-' * prompt_width} | {'-' * completion_width} |")
    for left_cell, right_cell in zip(prompt_column[1:], completion_column[1:]):
        table_lines.append(f"| {left_cell.ljust(prompt_width)} | {right_cell.ljust(completion_width)} |")

    # One-line summary uses the earlier assignments
    prompt_audio_str = f", audio {audio_prompt_tokens}" if audio_prompt_tokens else ""
    completion_audio_str = f", audio {audio_completion_tokens}" if audio_completion_tokens else ""
    single_line = (
        f"input: {total_prompt_tokens} (uncached {uncached}, cached {cached_prompt_tokens}{prompt_audio_str}); "
        f"output: {total_completion_tokens} (non-reasoning {nonreasoning}, "
        f"reasoning {reasoning_completion_tokens}{completion_audio_str})"
    )

    return "\n" + single_line if one_line else "\n" + "\n".join(table_lines)

def make_chat_history_with_n_tool_pairs(
    base_history: list[ChatMessage],
    n_tool_pairs: int,
) -> list[ChatMessage]:
    """
    Returns a new chat history containing:
      - system message
      - user message
      - assistant message with only the first N tool_calls
      - only the first N tool messages (tool responses)

    Assumes base_history is in this exact shape:
      [system, user, assistant_with_tool_calls, tool_msg_1, tool_msg_2, ...]
    """
    if n_tool_pairs < 1:
        raise ValueError("n_tool_pairs must be >= 1")

    system_msg = base_history[0]
    user_msg = base_history[1]

    assistant_msg = cast(AssistantMessage, base_history[2])
    tool_msgs = base_history[3:]

    all_tool_calls = assistant_msg.get("tool_calls") or []
    new_assistant_msg: AssistantMessage = {
        **assistant_msg,
        "tool_calls": list(all_tool_calls[:n_tool_pairs]),
    }

    return [
        system_msg,
        user_msg,
        new_assistant_msg,
        *tool_msgs[:n_tool_pairs],
    ]

async def main() -> None:
    global chat_history
    global chat_usage

    base_history = chat_history

    for n_tool_pairs in (1, 2, 3):
        scenario_history = make_chat_history_with_n_tool_pairs(base_history, n_tool_pairs)

        print()
        print("=" * 80)
        print(f"RUN: {n_tool_pairs} tool call(s) + {n_tool_pairs} tool response message(s)")
        print("=" * 80)

        parts: list[str] = []
        async for chunk in get_chat_response(scenario_history, stream=STREAM):
            print(chunk, end="", flush=True)
            parts.append(chunk)
        print()

        if chat_usage is not None:
            # One-line usage summary, as requested
            usage_line = pretty_usage_table(chat_usage, one_line=True).strip()
            print(f"USAGE ({n_tool_pairs} tool call(s)): {usage_line}")
        else:
            print(f"USAGE ({n_tool_pairs} tool call(s)): <missing usage>")


async def _run() -> None:
    try:
        await main()
    finally:
        if _openai is not None:
            await _openai.aclose()


if __name__ == "__main__":
    asyncio.run(_run())

Youโ€™ll get three runs at different tool call count, the AI streaming a response to each, and then the usage report.

5 Likes

Thank you for the excellent replication code. It perfectly demonstrates the exact issue I was pointing out.

3 Likes