On this forum, you have a formatting bar.
The button </> will enclose the code in a fence container so that it isn’t formatted and damaged.
Here’s how that looks, and in the tidy box, Responses code to make the call. After that AI answers the same question, you can start chatting at the command line.
'''Example code for OpenAI's API AI inference endpoint called "Responses"
- streaming
- catch minimal 'content' events
- non-SDK
- Runs an initial message, then a command-line chatbot
'''
import os
import json
import httpx
model = "gpt-5-chat-latest"
dev_instructions = """Knowledge cutoff: 2024-09
You are a helpful conversational AI assistant powered by OpenAI's gpt-5.
"""
chat = [
{
"type": "message", "role": "user",
"content": [
{"type": "input_text", "text": "What's your AI model?"},
],
},
]
def extract_event(event_lines: list[str]) -> dict | None:
"""
Extract an SSE event from a list of lines.
Expected SSE format:
event: <event-type>
data: <JSON string>
(blank line)
Returns:
A dictionary with keys "type", "data", and "raw_data" (the original JSON string),
or None if extraction fails.
"""
event_type = None
data_str = ""
for line in event_lines:
if line.startswith("event:"):
event_type = line[len("event:"):].strip()
elif line.startswith("data:"):
data_str = line[len("data:"):].strip()
if event_type is None or not data_str:
return None
try:
data_json = json.loads(data_str)
except json.JSONDecodeError:
data_json = None
return {"type": event_type, "data": data_json, "raw_data": data_str}
def process_event(event: dict, assembled_text: str, chat: list[dict]) -> str:
"""
Process a single SSE event chunk.
Args:
event: The event dictionary containing "type" and "data".
assembled_text: The text assembled so far from delta events.
chat: The chat history list to update with the final assistant response.
Returns:
The updated assembled_text.
"""
if event["type"] == "response.output_text.delta":
delta = event["data"].get("delta", "")
print(delta, end="", flush=True)
assembled_text += delta
elif event["type"] == "response.output_text.done":
final_piece = event["data"].get("text", "")
if not assembled_text:
print(final_piece, end="", flush=True)
assembled_text = final_piece
elif event["type"] == "response.completed":
comp = event["data"].get("response", {})
usage = comp.get("usage")
if isinstance(usage, dict):
in_tokens = usage.get("input_tokens")
out_tokens = usage.get("output_tokens")
if in_tokens is not None and out_tokens is not None:
print(f"\n[[Tokens in: {in_tokens}, out: {out_tokens}]]", flush=True)
final_text = ""
if comp.get("output"):
out_item = comp["output"][0]
for content in out_item.get("content", []):
if content.get("type") == "output_text":
final_text += content.get("text", "")
if not final_text:
final_text = assembled_text
chat.append({"role": "assistant", "content": final_text})
elif event["type"] == "error":
error_data = event["data"].get("error", {})
message = error_data.get("message", "")
print(f"\nError: {message}", flush=True)
else:
#print(f' - Note: {event["type"]} not parsed.', flush=True)
pass
return assembled_text
def stream_response(request_payload: dict, headers: dict, chat: list[dict]) -> str:
assembled_text = ""
try:
with httpx.Client(timeout=600) as client:
with client.stream("POST", "https://api.openai.com/v1/responses",
headers=headers, json=request_payload) as response:
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
# Read the full error content from the streaming response.
error_body = response.read() # Fully consume the streaming response
error_text = error_body.decode("utf-8") if error_body else "No error message"
print(f"HTTP error occurred: {exc.response.status_code} - {error_text}")
raise
with open("responses_chunk_log.txt", "w", encoding="utf-8") as log_file:
event_buffer: list[str] = []
for line in response.iter_lines():
# Log each received line.
log_file.write(line + "\n")
log_file.flush()
if line == "":
if not event_buffer:
continue
event = extract_event(event_buffer)
event_buffer = []
if event is None:
continue
# Uncomment next line for detailed event debugging:
# debug_print_event(event)
assembled_text = process_event(event, assembled_text, chat)
else:
event_buffer.append(line)
if event_buffer:
event = extract_event(event_buffer)
if event:
# Uncomment next line for detailed event debugging:
# debug_print_event(event)
assembled_text = process_event(event, assembled_text, chat)
except Exception as e:
print(f"Error: {e}")
return assembled_text
reasoning_params = (
{"reasoning": {"effort": "low"}}
if (model.startswith("o") or (model.startswith("gpt-5")
and model != "gpt-5-chat-latest"))
else {"top_p": 0.5, "temperature": 0.9}
)
# API request template.
params_template = {
"model": model,
"max_output_tokens": 16000, # cost limit: reasoning + output, forces termination
"stream": True, # SSE as the model generates are two-line events
"previous_response_id": None, # use past conversation state, 30 day expiry
"include": None, # reproduce search results, user image URLs, computer-call images
"instructions": dev_instructions, # insertion of "system" role before input
"store": False, # disable default server-side storing responses
"metadata": {}, # key-value pairs for store evals/distillation
"text": None, # for structured responses, verbosity
**reasoning_params, # dynamically set reasoning params
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"
}
for _ in range(10):
request_payload = {**params_template, "input": chat}
assembled_text = stream_response(request_payload, headers, chat)
user_input = input("\nPrompt: ")
if user_input.lower() == "exit":
break
chat.append({"role": "user", "content": user_input})