Openai gpt-realtime halucinates like crazy and don't uses tools properly

I am trying to build an app with openai realtime api, using gpt-realtime as model. It works pretty ok so far how ever i have the problem that the modell is completely halucinating. I tried to implement some kind of video streaming like the gpt app did it. The app is soposed to work with the users screen so it uses the users screen as video (sending frames of it to the ai regularely). how ever when i then ask the ai what’s on screen it makes up all kinds of things but is almost never right. Also i have a little tool included just to test but the ai never uses the tool although it’s registered.

This is my code.

“”"
Minimal Realtime voice call client.

What this script does

  • Captures microphone audio and streams it to OpenAI Realtime.
  • Optionally streams live screen frames (periodic images) while recording.
  • Plays the model’s audio responses through your speakers.
  • Includes a simple tool “say_hello” that, when called by the model, prints “hi”.
  • Push-to-talk UX: press Enter to start, Enter again to stop and send.

Requirements

  • Environment variable OPENAI_API_KEY set to your key.
  • Optional: OPENAI_REALTIME_MODEL to override the default model.
  • PortAudio installed on your system (sounddevice dependency). On macOS: brew install portaudio.

Run
python forumfile.py [–verbose]
“”"

import argparse
import asyncio
import base64
import json
import logging
import os
import queue
import threading
from typing import Optional

from dotenv import load_dotenv
import sounddevice as sd
from websockets.client import connect as ws_connect
from websockets.exceptions import ConnectionClosed

try:
from screenshot_tool import capture_screenshot_png_base64
except Exception:
capture_screenshot_png_base64 = None # type: ignore

Logging setup

logger = logging.getLogger(“realtime_forum”)

def setup_logger(path: str = “log_forum.txt”) → None:
logger.setLevel(logging.INFO)
logger.handlers.clear()
fh = logging.FileHandler(path, mode=“a”, encoding=“utf-8”)
fmt = logging.Formatter(“%(asctime)s %(levelname)s %(message)s”)
fh.setFormatter(fmt)
logger.addHandler(fh)

def log_send_event(evt: dict) → None:
t = evt.get(“type”)
if t == “input_audio_buffer.append”:
audio_b64 = evt.get(“audio”, “”)
logger.info(“SEND: %s len=%d”, t, len(audio_b64))
else:
try:
logger.info(“SEND: %s”, json.dumps(evt, ensure_ascii=False))
except Exception:
logger.info(“SEND: %s (unserializable)”, t)

def log_recv_event(evt: dict) → None:
t = evt.get(“type”)
if t in (“response.output_audio.delta”, “output_audio.delta”, “response.audio.delta”):
audio_b64 = evt.get(“audio”) or evt.get(“delta”) or “”
logger.info(“RECV: %s len=%d”, t, len(audio_b64))
else:
try:
logger.info(“RECV: %s”, json.dumps(evt, ensure_ascii=False))
except Exception:
logger.info(“RECV: %s (unserializable)”, t)

Audio config

INPUT_SAMPLE_RATE = int(os.getenv(“OPENAI_INPUT_SAMPLE_RATE”, “16000”)) # Hz
OUTPUT_SAMPLE_RATE = int(os.getenv(“OPENAI_OUTPUT_SAMPLE_RATE”, “24000”)) # Hz
CHANNELS = 1
SAMPLE_WIDTH_BYTES = 2 # int16
BLOCK_DURATION_SEC = 0.2

def _b64encode_pcm(pcm_bytes: bytes) → str:
return base64.b64encode(pcm_bytes).decode(“ascii”)

class AudioPlayer:
“”“Simple buffered PCM16 player using sounddevice RawOutputStream.”“”

def __init__(self, samplerate: int = OUTPUT_SAMPLE_RATE):
    self._samplerate = samplerate
    self._q: "queue.Queue[bytes]" = queue.Queue(maxsize=128)
    self._buffer = bytearray()
    self._stream: Optional[sd.RawOutputStream] = None

def _callback(self, outdata, frames, time_info, status):  # type: ignore[override]
    if status:
        pass
    needed = frames * SAMPLE_WIDTH_BYTES * CHANNELS
    while len(self._buffer) < needed:
        try:
            chunk = self._q.get_nowait()
        except queue.Empty:
            break
        else:
            self._buffer.extend(chunk)

    if len(self._buffer) >= needed:
        outdata[:needed] = bytes(self._buffer[:needed])
        del self._buffer[:needed]
    else:
        out = bytes(self._buffer) + b"\x00" * (needed - len(self._buffer))
        outdata[:needed] = out
        self._buffer.clear()

def start(self):
    if self._stream is not None:
        return
    self._stream = sd.RawOutputStream(
        samplerate=self._samplerate,
        channels=CHANNELS,
        dtype="int16",
        callback=self._callback,
        blocksize=int(self._samplerate * BLOCK_DURATION_SEC),
    )
    self._stream.start()

def stop(self):
    if self._stream is not None:
        self._stream.stop()
        self._stream.close()
        self._stream = None
    with self._q.mutex:
        self._q.queue.clear()
    self._buffer.clear()

def write(self, pcm_bytes: bytes):
    try:
        self._q.put_nowait(pcm_bytes)
    except queue.Full:
        try:
            _ = self._q.get_nowait()
        except queue.Empty:
            pass
        try:
            self._q.put_nowait(pcm_bytes)
        except queue.Full:
            pass

def mic_capture_thread(loop: asyncio.AbstractEventLoop, send_q: “asyncio.Queue[dict]”, stop_evt: threading.Event):
“”“Capture mic audio and enqueue realtime input events until stopped.”“”
blocksize = int(INPUT_SAMPLE_RATE * BLOCK_DURATION_SEC)

def on_audio(indata, frames, time_info, status):  # type: ignore[override]
    if stop_evt.is_set():
        raise sd.CallbackStop
    pcm = bytes(indata)
    evt = {"type": "input_audio_buffer.append", "audio": _b64encode_pcm(pcm)}
    loop.call_soon_threadsafe(send_q.put_nowait, evt)

try:
    with sd.RawInputStream(
        samplerate=INPUT_SAMPLE_RATE,
        channels=CHANNELS,
        dtype="int16",
        callback=on_audio,
        blocksize=blocksize,
    ):
        while not stop_evt.is_set():
            sd.sleep(int(BLOCK_DURATION_SEC * 1000))
except sd.PortAudioError as e:
    print(f"Microphone error: {e}")
finally:
    # Finalize the current input buffer; caller will issue response.create
    loop.call_soon_threadsafe(send_q.put_nowait, {"type": "input_audio_buffer.commit"})

def screen_capture_thread(
loop: asyncio.AbstractEventLoop,
send_q: “asyncio.Queue[dict]”,
stop_evt: threading.Event,
fps: float,
):
“”“Capture screen frames periodically and enqueue as conversation items.”“”
if capture_screenshot_png_base64 is None:
return
if fps <= 0:
return
import time

interval = 1.0 / float(fps)
next_ts = time.perf_counter()
while not stop_evt.is_set():
    try:
        img_b64 = capture_screenshot_png_base64(region=None)
    except Exception as e:
        if VERBOSE:
            print(f"Screen capture failed: {e}")
        break

    evt = {
        "type": "conversation.item.create",
        "item": {
            "type": "message",
            "role": "user",
            "content": [
                {"type": "input_image", "image": img_b64, "mime_type": "image/png"}
            ],
        },
    }
    loop.call_soon_threadsafe(send_q.put_nowait, evt)

    next_ts += interval
    now = time.perf_counter()
    sleep_for = max(0.0, next_ts - now)
    if sleep_for:
        time.sleep(sleep_for)
    else:
        next_ts = now

async def ws_sender(ws, send_q: “asyncio.Queue[dict]”):
while True:
evt = await send_q.get()
try:
log_send_event(evt)
if VERBOSE:
t = evt.get(“type”)
if t == “input_audio_buffer.append”:
sz = len(evt.get(“audio”, “”))
print(f"SEND: {t} (+{sz} b64 chars)“)
else:
print(“SEND:”, t)
await ws.send(json.dumps(evt))
except Exception as e:
print(f"Send error: {e}”)
return

VERBOSE = False

class ToolCallManager:
“”“Accumulates function call arguments and dispatches tool outputs.”“”

def __init__(self, send_q: "asyncio.Queue[dict]"):
    self._send_q = send_q
    self._calls: dict[str, dict] = {}

def handle_output_item_added(self, item: dict):
    if item.get("type") == "function_call":
        call_id = item.get("id")
        name = (item.get("name") or "").strip()
        if call_id:
            self._calls[call_id] = {"name": name, "args_buf": ""}
            try:
                logger.info("FUNC_CALL_ADDED id=%s name=%s", call_id, name)
            except Exception:
                pass

def handle_args_delta(self, evt: dict):
    call_id = evt.get("id") or evt.get("item_id") or evt.get("call_id")
    delta = evt.get("delta") or ""
    if call_id and call_id in self._calls:
        self._calls[call_id]["args_buf"] += str(delta)
        if VERBOSE:
            print(f"FUNC_ARGS_DELTA id={call_id} +{len(str(delta))} chars")
        try:
            logger.info("FUNC_ARGS_DELTA id=%s +%d", call_id, len(str(delta)))
        except Exception:
            pass

async def maybe_dispatch_on_done(self, item: dict):
    if item.get("type") != "function_call":
        return
    call_id = item.get("id")
    if not call_id or call_id not in self._calls:
        return
    name = self._calls[call_id].get("name")
    # args are optional for this tool; parse if provided
    args_buf = self._calls[call_id].get("args_buf") or "{}"
    try:
        _ = json.loads(args_buf) if isinstance(args_buf, str) and args_buf.strip() else {}
    except Exception:
        _ = {}

    if name == "say_hello":
        await self._run_say_hello(call_id)

    self._calls.pop(call_id, None)

async def _run_say_hello(self, call_id: str):
    print("hi")
    try:
        logger.info("TOOL_RUN name=say_hello status=ok")
    except Exception:
        pass
    await self._send_q.put(
        {
            "type": "tool_output.create",
            "tool_output": {
                "tool_call_id": call_id,
                "content": [
                    {"type": "output_text", "text": "Hello said."},
                ],
            },
        }
    )
    # Optionally ask the model to continue
    await self._send_q.put({"type": "response.create"})

async def ws_receiver(ws, player: AudioPlayer, send_q: “asyncio.Queue[dict]”, session_ready: asyncio.Event | None = None):
try:
async for msg in ws:
try:
evt = json.loads(msg)
except Exception:
continue
et = evt.get(“type”)
log_recv_event(evt)
if VERBOSE:
print(“RECV:”, et)
if et == “session.created”:
if session_ready is not None:
session_ready.set()
elif et in (“response.output_audio.delta”, “response.audio.delta”, “output_audio.delta”):
audio_b64 = evt.get(“audio”) or evt.get(“delta”)
if audio_b64:
player.write(base64.b64decode(audio_b64))
elif et == “response.output_text.delta”:
delta = evt.get(“delta”) or “”
if delta:
print(delta, end=“”, flush=True)
elif et == “response.output_item.added”:
item = evt.get(“item”) or {}
tool_mgr.handle_output_item_added(item)
elif et in (
“response.function_call_arguments.delta”,
“response.function_call.arguments.delta”,
“function_call.arguments.delta”,
):
tool_mgr.handle_args_delta(evt)
elif et in (
“response.function_call_arguments.done”,
“response.function_call.arguments.done”,
):
pass
elif et == “response.output_item.done”:
item = evt.get(“item”) or {}
await tool_mgr.maybe_dispatch_on_done(item)
elif et in (“response.completed”, “response.done”):
print(“”)
elif et in (“response.error”, “error”):
print(“Realtime error:”, evt)
elif VERBOSE:
print(“EVT:”, et, evt)
except ConnectionClosed:
print(“Connection closed”)

async def main():
load_dotenv()
setup_logger(“log_forum.txt”)
parser = argparse.ArgumentParser(description=“OpenAI Realtime voice call (forum copy)”)
parser.add_argument(“–verbose”, action=“store_true”, help=“Enable verbose event logging”)
args = parser.parse_args()
global VERBOSE
VERBOSE = bool(args.verbose)

def get_api_key() -> Optional[str]:
    candidates = ["OPENAI_API_KEY", "OPENAI_KEY", "OPENAI_TOKEN", "API_KEY"]
    for name in candidates:
        val = os.getenv(name)
        if val:
            return val
        val = os.getenv(name.lower())
        if val:
            return val
    return None

api_key = get_api_key()
if not api_key:
    raise SystemExit("OpenAI API key not found. Set OPENAI_API_KEY.")

model = os.getenv("OPENAI_REALTIME_MODEL")
if not model:
    candidate = os.getenv("OPENAI_MODEL", "")
    model = candidate if "realtime" in candidate else "gpt-realtime"
uri = f"wss://api.openai.com/v1/realtime?model={model}"
headers = [("Authorization", f"Bearer {api_key}"), ("OpenAI-Beta", "realtime=v1")]

print(f"Connecting to OpenAI Realtime... (model={model})")
async with ws_connect(uri, extra_headers=headers, max_size=None) as ws:
    send_q: "asyncio.Queue[dict]" = asyncio.Queue(maxsize=1024)
    sender_task = asyncio.create_task(ws_sender(ws, send_q))

    player = AudioPlayer(samplerate=OUTPUT_SAMPLE_RATE)
    player.start()
    global tool_mgr
    tool_mgr = ToolCallManager(send_q)
    session_ready = asyncio.Event()
    recv_task = asyncio.create_task(ws_receiver(ws, player, send_q, session_ready))

    voice = os.getenv("OPENAI_REALTIME_VOICE", "alloy")
    tools = [
        {
            "type": "function",
            "name": "say_hello",
            "description": "Prints 'hi' to the console.",
            "parameters": {"type": "object", "strict": True, "properties": {}, "required": []},
        }
    ]

    try:
        await asyncio.wait_for(session_ready.wait(), timeout=3.0)
    except asyncio.TimeoutError:
        if VERBOSE:
            print("session.created not seen within 3s; sending session.update anyway")

    await send_q.put(
        {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "modalities": ["text", "audio"],
                "voice": voice,
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "input_audio_sample_rate_hz": INPUT_SAMPLE_RATE,
                "output_audio_sample_rate_hz": OUTPUT_SAMPLE_RATE,
                "instructions": "You are a helpful voice assistant. Use the say_hello tool when appropriate.",
                "tools": tools,
                "tool_choice": "auto",
            },
        }
    )

    tool_names = [t.get("name") for t in tools]
    if VERBOSE:
        print("Registered tools:", tool_names)
    try:
        logger.info("TOOLS_REGISTERED %s", ",".join(tool_names))
    except Exception:
        pass

    print("Connected. Press Enter to talk; 'q' + Enter to quit.")
    loop = asyncio.get_running_loop()

    async def ainput(prompt: str = "") -> str:
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, lambda: input(prompt))

    try:
        while True:
            cmd = (await ainput("\nPress Enter to start recording (q=quit): ")).strip().lower()
            if cmd == "q":
                break

            stop_evt = threading.Event()
            mic_t = threading.Thread(
                target=mic_capture_thread, args=(loop, send_q, stop_evt), daemon=True
            )
            mic_t.start()

            # Optionally start screen streaming at configured FPS
            try:
                screen_fps = float(os.getenv("OPENAI_SCREEN_FPS", os.getenv("SCREEN_FPS", "1.0")))
            except Exception:
                screen_fps = 1.0
            screen_t: threading.Thread | None = None
            if capture_screenshot_png_base64 is not None and screen_fps > 0:
                screen_t = threading.Thread(
                    target=screen_capture_thread,
                    args=(loop, send_q, stop_evt, screen_fps),
                    daemon=True,
                )
                screen_t.start()
                if VERBOSE:
                    print(f"Screen streaming enabled at {screen_fps:.2f} fps")

            await ainput("Recording... press Enter to stop and send.")
            stop_evt.set()
            if screen_t is not None:
                screen_t.join(timeout=1.0)
            mic_t.join()

            # Ask for a response using the audio captured
            await send_q.put({"type": "response.create"})
            print("Sent. Listening for reply...")

    finally:
        player.stop()
        sender_task.cancel()
        recv_task.cancel()
        try:
            await ws.close()
        except Exception:
            pass

if name == “main”:
try:
asyncio.run(main())
except KeyboardInterrupt:
pass

Any help would bed be apreciated.