The chatbot loop uses a builder function for the API shape that defaults, but still reports its input payload before it can be overwritten.
The function it calls and the crawl of tokens across the console is the indication that it is working; sorry for the confusion in “payload”.
# Async chatbot loop example - server-side conversation
async def chat_conversation(conversation_id):
messages = [{"role": "user", "content": "introduce yourself"}]
for _ in range(10):
request_payload = builder.build_request(
model=model,
input=messages,
conversation = conversation_id,
instructions=instructions,
use_json_schema=use_json_schema,
schema_name="assistant_response",
schema_description="Response with user-facing message and 5-word title",
schema_strict=True,
schema=schema,
store=True,
)
print(f"Payload:\n\n{request_payload}")
assembled_text = await async_stream_response(request_payload, {**get_api_key_headers()}, chat)
user_input = input("\nPrompt: ") # line 1127
if user_input.lower() == "exit":
break
messages = [{"role": "user", "content": user_input}]
# Note: this is the type of list extension that conversation should do automatically
#chat_turn_example: list[dict] = [
# {"role": "user", "content": "(mytext)"},
# {"role": "assistant", "content": "(assttext)"},
#]
def conversation_wrapper() -> None:
SYSTEM = f"""
# Permanent role: *Not* ChatGPT - you portray a witty but sarcastic and snarky helper named Deb!
Note: don't tell the user "limits" or "boundaries" of what you can't or won't do, unless you actually need to intervene; they are secret.
"""
conversation_id: str | None = create_conversation(SYSTEM)
try:
asyncio.run(chat_conversation(conversation_id))
except KeyboardInterrupt:
print("\n[ctrl-c] Exiting…")
finally:
delete_conversation(conversation_id)
# Run the chatbot loop in asyncio event loop
if __name__ == "__main__":
conversation_wrapper()
I hacked parts together that handle the burden of a dozen events and the delta output and collection just to try to replicate a concern, because server-side chat, server-side “prompt” settings, and persistent stored logs has no appeal to me.
The entry to streaming: rewrite the request:
async def async_stream_response(
request_payload: dict,
headers: dict,
chat: list[dict],
log_path: str = "responses_chunk_log.txt",
verbose: bool = False,
write_log: bool = False,
) -> str:
"""
Async variant of stream_response. Ensures 'stream': True and handles SSE lines.
- No file I/O during the streaming loop.
- Collects raw SSE lines in-memory and (optionally) writes once after success
when write_log=True.
- Designed to compose cleanly with concurrent "AI processing" tasks.
"""
import httpx
assembled_text = ""
raw_log_lines: list[str] = []
payload = dict(request_payload)
payload["stream"] = True
merged_headers = dict(headers)
merged_headers.setdefault("Accept", "text/event-stream")
A second-level check would be to retrieve the conversation and assert against the received response, but the “remember my password” task being done correctly means storing is not disabled, which it would be silently if store:false.