I am trying to build an app with openai realtime api, using gpt-realtime as model. It works pretty ok so far how ever i have the problem that the modell is completely halucinating. I tried to implement some kind of video streaming like the gpt app did it. The app is soposed to work with the users screen so it uses the users screen as video (sending frames of it to the ai regularely). how ever when i then ask the ai what’s on screen it makes up all kinds of things but is almost never right. Also i have a little tool included just to test but the ai never uses the tool although it’s registered.
This is my code.
“”"
Minimal Realtime voice call client.
What this script does
- Captures microphone audio and streams it to OpenAI Realtime.
- Optionally streams live screen frames (periodic images) while recording.
- Plays the model’s audio responses through your speakers.
- Includes a simple tool “say_hello” that, when called by the model, prints “hi”.
- Push-to-talk UX: press Enter to start, Enter again to stop and send.
Requirements
- Environment variable
OPENAI_API_KEYset to your key. - Optional:
OPENAI_REALTIME_MODELto override the default model. - PortAudio installed on your system (sounddevice dependency). On macOS:
brew install portaudio.
Run
python forumfile.py [–verbose]
“”"
import argparse
import asyncio
import base64
import json
import logging
import os
import queue
import threading
from typing import Optional
from dotenv import load_dotenv
import sounddevice as sd
from websockets.client import connect as ws_connect
from websockets.exceptions import ConnectionClosed
try:
from screenshot_tool import capture_screenshot_png_base64
except Exception:
capture_screenshot_png_base64 = None # type: ignore
Logging setup
logger = logging.getLogger(“realtime_forum”)
def setup_logger(path: str = “log_forum.txt”) → None:
logger.setLevel(logging.INFO)
logger.handlers.clear()
fh = logging.FileHandler(path, mode=“a”, encoding=“utf-8”)
fmt = logging.Formatter(“%(asctime)s %(levelname)s %(message)s”)
fh.setFormatter(fmt)
logger.addHandler(fh)
def log_send_event(evt: dict) → None:
t = evt.get(“type”)
if t == “input_audio_buffer.append”:
audio_b64 = evt.get(“audio”, “”)
logger.info(“SEND: %s len=%d”, t, len(audio_b64))
else:
try:
logger.info(“SEND: %s”, json.dumps(evt, ensure_ascii=False))
except Exception:
logger.info(“SEND: %s (unserializable)”, t)
def log_recv_event(evt: dict) → None:
t = evt.get(“type”)
if t in (“response.output_audio.delta”, “output_audio.delta”, “response.audio.delta”):
audio_b64 = evt.get(“audio”) or evt.get(“delta”) or “”
logger.info(“RECV: %s len=%d”, t, len(audio_b64))
else:
try:
logger.info(“RECV: %s”, json.dumps(evt, ensure_ascii=False))
except Exception:
logger.info(“RECV: %s (unserializable)”, t)
Audio config
INPUT_SAMPLE_RATE = int(os.getenv(“OPENAI_INPUT_SAMPLE_RATE”, “16000”)) # Hz
OUTPUT_SAMPLE_RATE = int(os.getenv(“OPENAI_OUTPUT_SAMPLE_RATE”, “24000”)) # Hz
CHANNELS = 1
SAMPLE_WIDTH_BYTES = 2 # int16
BLOCK_DURATION_SEC = 0.2
def _b64encode_pcm(pcm_bytes: bytes) → str:
return base64.b64encode(pcm_bytes).decode(“ascii”)
class AudioPlayer:
“”“Simple buffered PCM16 player using sounddevice RawOutputStream.”“”
def __init__(self, samplerate: int = OUTPUT_SAMPLE_RATE):
self._samplerate = samplerate
self._q: "queue.Queue[bytes]" = queue.Queue(maxsize=128)
self._buffer = bytearray()
self._stream: Optional[sd.RawOutputStream] = None
def _callback(self, outdata, frames, time_info, status): # type: ignore[override]
if status:
pass
needed = frames * SAMPLE_WIDTH_BYTES * CHANNELS
while len(self._buffer) < needed:
try:
chunk = self._q.get_nowait()
except queue.Empty:
break
else:
self._buffer.extend(chunk)
if len(self._buffer) >= needed:
outdata[:needed] = bytes(self._buffer[:needed])
del self._buffer[:needed]
else:
out = bytes(self._buffer) + b"\x00" * (needed - len(self._buffer))
outdata[:needed] = out
self._buffer.clear()
def start(self):
if self._stream is not None:
return
self._stream = sd.RawOutputStream(
samplerate=self._samplerate,
channels=CHANNELS,
dtype="int16",
callback=self._callback,
blocksize=int(self._samplerate * BLOCK_DURATION_SEC),
)
self._stream.start()
def stop(self):
if self._stream is not None:
self._stream.stop()
self._stream.close()
self._stream = None
with self._q.mutex:
self._q.queue.clear()
self._buffer.clear()
def write(self, pcm_bytes: bytes):
try:
self._q.put_nowait(pcm_bytes)
except queue.Full:
try:
_ = self._q.get_nowait()
except queue.Empty:
pass
try:
self._q.put_nowait(pcm_bytes)
except queue.Full:
pass
def mic_capture_thread(loop: asyncio.AbstractEventLoop, send_q: “asyncio.Queue[dict]”, stop_evt: threading.Event):
“”“Capture mic audio and enqueue realtime input events until stopped.”“”
blocksize = int(INPUT_SAMPLE_RATE * BLOCK_DURATION_SEC)
def on_audio(indata, frames, time_info, status): # type: ignore[override]
if stop_evt.is_set():
raise sd.CallbackStop
pcm = bytes(indata)
evt = {"type": "input_audio_buffer.append", "audio": _b64encode_pcm(pcm)}
loop.call_soon_threadsafe(send_q.put_nowait, evt)
try:
with sd.RawInputStream(
samplerate=INPUT_SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
callback=on_audio,
blocksize=blocksize,
):
while not stop_evt.is_set():
sd.sleep(int(BLOCK_DURATION_SEC * 1000))
except sd.PortAudioError as e:
print(f"Microphone error: {e}")
finally:
# Finalize the current input buffer; caller will issue response.create
loop.call_soon_threadsafe(send_q.put_nowait, {"type": "input_audio_buffer.commit"})
def screen_capture_thread(
loop: asyncio.AbstractEventLoop,
send_q: “asyncio.Queue[dict]”,
stop_evt: threading.Event,
fps: float,
):
“”“Capture screen frames periodically and enqueue as conversation items.”“”
if capture_screenshot_png_base64 is None:
return
if fps <= 0:
return
import time
interval = 1.0 / float(fps)
next_ts = time.perf_counter()
while not stop_evt.is_set():
try:
img_b64 = capture_screenshot_png_base64(region=None)
except Exception as e:
if VERBOSE:
print(f"Screen capture failed: {e}")
break
evt = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{"type": "input_image", "image": img_b64, "mime_type": "image/png"}
],
},
}
loop.call_soon_threadsafe(send_q.put_nowait, evt)
next_ts += interval
now = time.perf_counter()
sleep_for = max(0.0, next_ts - now)
if sleep_for:
time.sleep(sleep_for)
else:
next_ts = now
async def ws_sender(ws, send_q: “asyncio.Queue[dict]”):
while True:
evt = await send_q.get()
try:
log_send_event(evt)
if VERBOSE:
t = evt.get(“type”)
if t == “input_audio_buffer.append”:
sz = len(evt.get(“audio”, “”))
print(f"SEND: {t} (+{sz} b64 chars)“)
else:
print(“SEND:”, t)
await ws.send(json.dumps(evt))
except Exception as e:
print(f"Send error: {e}”)
return
VERBOSE = False
class ToolCallManager:
“”“Accumulates function call arguments and dispatches tool outputs.”“”
def __init__(self, send_q: "asyncio.Queue[dict]"):
self._send_q = send_q
self._calls: dict[str, dict] = {}
def handle_output_item_added(self, item: dict):
if item.get("type") == "function_call":
call_id = item.get("id")
name = (item.get("name") or "").strip()
if call_id:
self._calls[call_id] = {"name": name, "args_buf": ""}
try:
logger.info("FUNC_CALL_ADDED id=%s name=%s", call_id, name)
except Exception:
pass
def handle_args_delta(self, evt: dict):
call_id = evt.get("id") or evt.get("item_id") or evt.get("call_id")
delta = evt.get("delta") or ""
if call_id and call_id in self._calls:
self._calls[call_id]["args_buf"] += str(delta)
if VERBOSE:
print(f"FUNC_ARGS_DELTA id={call_id} +{len(str(delta))} chars")
try:
logger.info("FUNC_ARGS_DELTA id=%s +%d", call_id, len(str(delta)))
except Exception:
pass
async def maybe_dispatch_on_done(self, item: dict):
if item.get("type") != "function_call":
return
call_id = item.get("id")
if not call_id or call_id not in self._calls:
return
name = self._calls[call_id].get("name")
# args are optional for this tool; parse if provided
args_buf = self._calls[call_id].get("args_buf") or "{}"
try:
_ = json.loads(args_buf) if isinstance(args_buf, str) and args_buf.strip() else {}
except Exception:
_ = {}
if name == "say_hello":
await self._run_say_hello(call_id)
self._calls.pop(call_id, None)
async def _run_say_hello(self, call_id: str):
print("hi")
try:
logger.info("TOOL_RUN name=say_hello status=ok")
except Exception:
pass
await self._send_q.put(
{
"type": "tool_output.create",
"tool_output": {
"tool_call_id": call_id,
"content": [
{"type": "output_text", "text": "Hello said."},
],
},
}
)
# Optionally ask the model to continue
await self._send_q.put({"type": "response.create"})
async def ws_receiver(ws, player: AudioPlayer, send_q: “asyncio.Queue[dict]”, session_ready: asyncio.Event | None = None):
try:
async for msg in ws:
try:
evt = json.loads(msg)
except Exception:
continue
et = evt.get(“type”)
log_recv_event(evt)
if VERBOSE:
print(“RECV:”, et)
if et == “session.created”:
if session_ready is not None:
session_ready.set()
elif et in (“response.output_audio.delta”, “response.audio.delta”, “output_audio.delta”):
audio_b64 = evt.get(“audio”) or evt.get(“delta”)
if audio_b64:
player.write(base64.b64decode(audio_b64))
elif et == “response.output_text.delta”:
delta = evt.get(“delta”) or “”
if delta:
print(delta, end=“”, flush=True)
elif et == “response.output_item.added”:
item = evt.get(“item”) or {}
tool_mgr.handle_output_item_added(item)
elif et in (
“response.function_call_arguments.delta”,
“response.function_call.arguments.delta”,
“function_call.arguments.delta”,
):
tool_mgr.handle_args_delta(evt)
elif et in (
“response.function_call_arguments.done”,
“response.function_call.arguments.done”,
):
pass
elif et == “response.output_item.done”:
item = evt.get(“item”) or {}
await tool_mgr.maybe_dispatch_on_done(item)
elif et in (“response.completed”, “response.done”):
print(“”)
elif et in (“response.error”, “error”):
print(“Realtime error:”, evt)
elif VERBOSE:
print(“EVT:”, et, evt)
except ConnectionClosed:
print(“Connection closed”)
async def main():
load_dotenv()
setup_logger(“log_forum.txt”)
parser = argparse.ArgumentParser(description=“OpenAI Realtime voice call (forum copy)”)
parser.add_argument(“–verbose”, action=“store_true”, help=“Enable verbose event logging”)
args = parser.parse_args()
global VERBOSE
VERBOSE = bool(args.verbose)
def get_api_key() -> Optional[str]:
candidates = ["OPENAI_API_KEY", "OPENAI_KEY", "OPENAI_TOKEN", "API_KEY"]
for name in candidates:
val = os.getenv(name)
if val:
return val
val = os.getenv(name.lower())
if val:
return val
return None
api_key = get_api_key()
if not api_key:
raise SystemExit("OpenAI API key not found. Set OPENAI_API_KEY.")
model = os.getenv("OPENAI_REALTIME_MODEL")
if not model:
candidate = os.getenv("OPENAI_MODEL", "")
model = candidate if "realtime" in candidate else "gpt-realtime"
uri = f"wss://api.openai.com/v1/realtime?model={model}"
headers = [("Authorization", f"Bearer {api_key}"), ("OpenAI-Beta", "realtime=v1")]
print(f"Connecting to OpenAI Realtime... (model={model})")
async with ws_connect(uri, extra_headers=headers, max_size=None) as ws:
send_q: "asyncio.Queue[dict]" = asyncio.Queue(maxsize=1024)
sender_task = asyncio.create_task(ws_sender(ws, send_q))
player = AudioPlayer(samplerate=OUTPUT_SAMPLE_RATE)
player.start()
global tool_mgr
tool_mgr = ToolCallManager(send_q)
session_ready = asyncio.Event()
recv_task = asyncio.create_task(ws_receiver(ws, player, send_q, session_ready))
voice = os.getenv("OPENAI_REALTIME_VOICE", "alloy")
tools = [
{
"type": "function",
"name": "say_hello",
"description": "Prints 'hi' to the console.",
"parameters": {"type": "object", "strict": True, "properties": {}, "required": []},
}
]
try:
await asyncio.wait_for(session_ready.wait(), timeout=3.0)
except asyncio.TimeoutError:
if VERBOSE:
print("session.created not seen within 3s; sending session.update anyway")
await send_q.put(
{
"type": "session.update",
"session": {
"type": "realtime",
"modalities": ["text", "audio"],
"voice": voice,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_sample_rate_hz": INPUT_SAMPLE_RATE,
"output_audio_sample_rate_hz": OUTPUT_SAMPLE_RATE,
"instructions": "You are a helpful voice assistant. Use the say_hello tool when appropriate.",
"tools": tools,
"tool_choice": "auto",
},
}
)
tool_names = [t.get("name") for t in tools]
if VERBOSE:
print("Registered tools:", tool_names)
try:
logger.info("TOOLS_REGISTERED %s", ",".join(tool_names))
except Exception:
pass
print("Connected. Press Enter to talk; 'q' + Enter to quit.")
loop = asyncio.get_running_loop()
async def ainput(prompt: str = "") -> str:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: input(prompt))
try:
while True:
cmd = (await ainput("\nPress Enter to start recording (q=quit): ")).strip().lower()
if cmd == "q":
break
stop_evt = threading.Event()
mic_t = threading.Thread(
target=mic_capture_thread, args=(loop, send_q, stop_evt), daemon=True
)
mic_t.start()
# Optionally start screen streaming at configured FPS
try:
screen_fps = float(os.getenv("OPENAI_SCREEN_FPS", os.getenv("SCREEN_FPS", "1.0")))
except Exception:
screen_fps = 1.0
screen_t: threading.Thread | None = None
if capture_screenshot_png_base64 is not None and screen_fps > 0:
screen_t = threading.Thread(
target=screen_capture_thread,
args=(loop, send_q, stop_evt, screen_fps),
daemon=True,
)
screen_t.start()
if VERBOSE:
print(f"Screen streaming enabled at {screen_fps:.2f} fps")
await ainput("Recording... press Enter to stop and send.")
stop_evt.set()
if screen_t is not None:
screen_t.join(timeout=1.0)
mic_t.join()
# Ask for a response using the audio captured
await send_q.put({"type": "response.create"})
print("Sent. Listening for reply...")
finally:
player.stop()
sender_task.cancel()
recv_task.cancel()
try:
await ws.close()
except Exception:
pass
if name == “main”:
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Any help would bed be apreciated.