I have been using chat completions API endpoint for audio input , audio output process on my apps. But now I need to use it on a device with very low memory. It can not handle long (500-600KB) string (base64 encoded audio)
So I am looking for a solution to firectly receive mp3 (or wav) formatted audio.
I handle audio streams (gpt-realtime) on a rasperry pi using the normal base64 encoding⌠are you on a footprint smaller than that? Its ARM with a few GB. Could just be a matter of setting the chunk size low so you have lots of small audio clips instead of the large ones you mention. check out chattyfriend[dot]com (redirects to git, all py)
yes , much smaller footprint. It is a esp32s3 with a very low memory and some PSRAM. Unfortunately Json serialization libraries all use heap memory. I have to write my own json parser to handle with psram. That can be done, but why ? Why Open AI prefers to send base64 instead of proper audio file. Btw, audio file would be much smaller than base64.
Base64 is a 3-to-4 binary encoding. When youâve received four characters, youâve received three bytes. âMuch smallerâ is not that much smaller. Transport encryption can act on this. But your challenge is device storage, where you wish for (but donât actually want) a file.
It seems you will want to have not a âfileâ, especially an MP3 that would take decompression across frames and thus need a large audio buffer and compute, but indeed, a stream that can be directly buffered and consumed in âpacketsâ.
The Chat Completions API can do that. And it is completely undocumented. "stream": true. It only allows âpcm16â as a format for you to receive - headerless audio.
This delivers interleaved SSE events of transcript or audio. The transcription delta stream comes in semantic packets, sentences of token chunk deltas, before and between the audio events.
The transcript can be immediately yielded like you would a streaming AI conversation.
What one discovers: continuous audio events have a âdataâ with maximum of 16000 characters.
What 16,000 base64 characters mean:
Underlying bytes: 16,000 Ă 3/4 = 12,000 bytes
PCM samples: 12,000 / 2 = 6,000 samples
Duration: 6,000 / 24,000kHz = 0.25 seconds = 250 ms
Therefore:
A single âmaxâ audio.data event of 16,000 base64 characters represents 250 ms of audio (but can be less, even starting with less than the rest of the event chunks).
You have little left to do but decode the base64 segment, buffer, and to play.
I had a close inspection of the stream, even down to the transport chunking, and hereâs what I found:
http transport chunks: can have multiple events per streamed chunk, in the case of transcript JSON;
on larger chunks, the transport terminus is always at the linefeed between events, and the data transmission is not carried arbitrarily across JSON;
the larger audio events are understandably broken across transport chunks by packets (around 1300 bytes), but also still will finish with transport alignment to event end.
But essentially, you can ignore the underlying layer, as the JSON events are a consumable size with little latency compared to the AI itself.
I also did a close inspection of how the binary contents are split across JSON events. They are each decodable base64, and also do not split two-byte samples across events. Thus, it is a clean stream of individual audio segments that doesnât need a reservoir or a remainder caught and reassembled (unlike mp3 frames).
The audio for your device (hopefully native):
little-endian PCM (Intel LSB/MSB)
sample_rate: int = 24_000, channels: int = 1, bits_per_sample: int = 16
Here is Python code worked up to demonstrate such streaming, extracting and displaying the parallel stream of transcription like might flow to a UI (or even a 24 char display). Everything up to simulating the rate of a playback device - when it writes the audio stream to a file (as how to play on a microcontroller is slightly out of the transferable knowledge from example code written in Python). The only thing it canât anticipate is a learned prebuffer needed if the network is slower than playback, difficult because you donât know the ultimate length ahead of time.
"""
Async SSE -> PCM16 reference with event-based logging and optional real-time throttling.
- Requests a streaming Chat Completions response with audio (pcm16) + transcript.
- Parses SSE frames, logs each event line to 'sse-log.txt' with a monotonic timestamp.
- Decodes each audio.data base64 payload per event (no residual buffer needed).
- Packs PCM into 20 ms frames (960 bytes at 24 kHz mono int16).
- Two modes via a single boolean toggle:
THROTTLE_REALTIME = True:
* Queue is bounded (~0.5 s). Producer blocks when full (backpressure).
* Consumer paces at audio rate with a 0.5 s prebuffer.
* End-to-end rate is "no faster than playback".
THROTTLE_REALTIME = False:
* Queue is unbounded. Producer never blocks; consumes network as fast as delivered.
* Consumer writes frames immediately (no pacing).
- Prints transcript/content deltas immediately upon event reception.
- Writes PCM to 'audio-24kHz-mono.raw' and wraps it to WAV at the end.
- On HTTP error, prints the JSON body for diagnostics.
Environment:
export OPENAI_API_KEY=...
Dependencies:
pip install httpx
"""
import asyncio
import base64
import json
import os
import shutil
import struct
import time
from pathlib import Path
import httpx
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
URL = "https://api.openai.com/v1/chat/completions"
# gzip compresses base64 well with modest CPU; keep it simple and effective
ACCEPT_ENCODING = "gzip"
# Audio facts
SAMPLE_RATE = 24_000 # Hz
BYTES_PER_SAMPLE = 2 # int16 mono
BYTES_PER_SEC = SAMPLE_RATE * BYTES_PER_SAMPLE
FRAME_MS = 20 # per frame
FRAME_BYTES = BYTES_PER_SEC * FRAME_MS // 1000 # 960 bytes per 20 ms
# Real-time throttling (backpressure) toggle
# - True -> bound queue (~0.5 s), consumer paces at 24 kHz, producer backpressures the network
# - False -> unbounded queue, consumer writes immediately with no pacing
THROTTLE_REALTIME = True
# Managed prebuffer target for throttled mode
PREBUFFER_SECONDS = 0.5
PREBUFFER_FRAMES = max(1, int((PREBUFFER_SECONDS * 1000) // FRAME_MS)) # ~0.5 s => 25 frames
# Queue size:
# - When throttled, bind to the prebuffer size (keeps memory bounded and rate-controlled)
# - When not throttled, use an unbounded queue (consume as fast as delivered)
QUEUE_MAX_FRAMES = PREBUFFER_FRAMES if THROTTLE_REALTIME else 0
# SSE event logging
SSE_EVENT_LOG = True
SSE_LOG_PATH = Path("sse-log.txt")
# Output files
RAW_OUT = Path("audio-24kHz-mono.raw")
WAV_OUT = Path("audio-24kHz-mono.wav")
# Example prompt
messages = [
{"role": "developer", "content": "You are a cheerful Australian speaker"},
{
"role": "user",
"content": [{"type": "text", "text": "Greetings to you in the down under!"}],
},
]
params = {
"model": "gpt-audio-mini",
"modalities": ["text", "audio"],
"audio": {"voice": "sage", "format": "pcm16"},
"max_completion_tokens": 800,
"temperature": 0.6,
"stream": True,
}
HEADERS = {
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
"Accept": "text/event-stream",
"Accept-Encoding": ACCEPT_ENCODING,
"Connection": "keep-alive",
}
# -----------------------------------------------------------------------------
# Framing helpers
# -----------------------------------------------------------------------------
class PCMFramePacker:
"""Collect PCM bytes and yield fixed-size frames used for pacing and queuing."""
def __init__(self, frame_bytes: int):
self.frame_bytes = frame_bytes
self._buf = bytearray()
def feed(self, pcm: bytes) -> list[bytes]:
out: list[bytes] = []
if pcm:
self._buf.extend(pcm)
while len(self._buf) >= self.frame_bytes:
out.append(bytes(self._buf[:self.frame_bytes]))
del self._buf[:self.frame_bytes]
return out
def flush(self) -> bytes:
if not self._buf:
return b""
tail = bytes(self._buf)
self._buf.clear()
return tail
def _extract_sse_payload(frame: bytes) -> bytes | None:
"""Return the joined 'data:' payload as bytes, or None if no data lines exist."""
data_lines = []
for raw_line in frame.splitlines():
if raw_line.startswith(b"data:"):
line = raw_line[5:]
if line.startswith(b" "):
line = line[1:]
data_lines.append(line)
if not data_lines:
return None
return b"\n".join(data_lines)
def _log_sse_event(logf, payload: bytes) -> None:
"""Write a single 'data:' line prefixed by a monotonic timestamp."""
if not logf:
return
ts = time.monotonic()
try:
s = payload.decode("utf-8")
except UnicodeDecodeError:
s = payload.decode("utf-8", errors="replace")
logf.write(f"{ts:.6f} data: {s}\n\n")
logf.flush()
# -----------------------------------------------------------------------------
# Producer / Consumer
# -----------------------------------------------------------------------------
async def sse_audio_producer(response: httpx.Response, q: asyncio.Queue, logf):
"""
Read SSE bytes, parse events, log them immediately, print transcripts,
decode per-event audio base64 to PCM, frame it, and enqueue.
"""
packer = PCMFramePacker(FRAME_BYTES)
buf = bytearray()
async for chunk in response.aiter_bytes():
buf.extend(chunk)
# Parse frames separated by a blank line
while True:
sep = buf.find(b"\n\n")
if sep == -1:
break
frame = bytes(buf[:sep])
del buf[: sep + 2]
if not frame.strip():
continue # heartbeat
payload = _extract_sse_payload(frame)
if payload is None:
continue
# Log event reception time before any processing
_log_sse_event(logf, payload)
if payload == b"[DONE]":
# Flush any remaining PCM from the packer
tail = packer.flush()
if tail:
await _enqueue_frame_bytes(q, tail)
await _enqueue_end(q)
return
# Parse JSON
try:
obj = json.loads(payload)
except json.JSONDecodeError:
continue
try:
delta = obj["choices"][0]["delta"]
except Exception:
continue
# 1) Transcript/content: print immediately (no pacing here)
audio = delta.get("audio")
if audio:
t = audio.get("transcript")
if t:
print(t, end="", flush=True)
content = delta.get("content")
if content:
print(content, end="", flush=True)
# 2) Audio: decode per-event base64 and frame
if audio:
b64 = audio.get("data")
if b64:
try:
pcm = base64.b64decode(b64, validate=False)
except Exception:
# Skip malformed payload in this production build
continue
frames = packer.feed(pcm)
for fr in frames:
await _enqueue_frame(q, fr)
# In case the async iteration ends unexpectedly, try to flush and end
tail = packer.flush()
if tail:
await _enqueue_frame_bytes(q, tail)
await _enqueue_end(q)
async def _enqueue_frame(q: asyncio.Queue, frame: bytes) -> None:
"""Enqueue a single frame (bounded or unbounded depending on THROTTLE_REALTIME)."""
# Using await q.put for both modes keeps the code simple:
# - bounded queue blocks (backpressure) in throttled mode
# - unbounded queue returns immediately in unthrottled mode
await q.put(frame)
async def _enqueue_frame_bytes(q: asyncio.Queue, b: bytes) -> None:
"""Enqueue arbitrary bytes as frame-sized slices."""
for i in range(0, len(b), FRAME_BYTES):
await _enqueue_frame(q, b[i : i + FRAME_BYTES])
async def _enqueue_end(q: asyncio.Queue) -> None:
"""Send sentinel to signal end-of-stream to the consumer."""
# Always ensure the sentinel is enqueued
await q.put(None)
async def playback_consumer(q: asyncio.Queue, outf):
"""
Consume frames and write to file.
- In throttled mode: wait for ~0.5 s prebuffer, then enforce real-time pacing.
- In unthrottled mode: write immediately with no sleep (consume-as-delivered).
"""
if THROTTLE_REALTIME:
while q.qsize() < PREBUFFER_FRAMES:
await asyncio.sleep(0.005)
while True:
item = await q.get()
if item is None:
# Drain any remaining frames
while not q.empty():
more = q.get_nowait()
if more is None:
break
outf.write(more)
outf.flush()
return
outf.write(item)
if THROTTLE_REALTIME:
# Real-time pacing: enforce 24 kHz consumption
await asyncio.sleep(len(item) / BYTES_PER_SEC)
# -----------------------------------------------------------------------------
# WAV wrapper utility
# -----------------------------------------------------------------------------
def raw_to_wav(
raw_path: str | Path,
wav_path: str | Path,
sample_rate: int = 24_000,
channels: int = 1,
bits_per_sample: int = 16,
) -> None:
"""Wrap a raw PCM16 mono 24 kHz file into a standard RIFF/WAVE file."""
raw_path = Path(raw_path)
wav_path = Path(wav_path)
data_size = raw_path.stat().st_size
block_align = channels * (bits_per_sample // 8)
byte_rate = sample_rate * block_align
riff_chunk_size = 36 + data_size
with raw_path.open("rb") as src, wav_path.open("wb") as dst:
dst.write(b"RIFF")
dst.write(struct.pack("<I", riff_chunk_size))
dst.write(b"WAVE")
dst.write(b"fmt ")
dst.write(struct.pack("<I", 16)) # fmt chunk size
dst.write(struct.pack("<H", 1)) # PCM
dst.write(struct.pack("<H", channels))
dst.write(struct.pack("<I", sample_rate))
dst.write(struct.pack("<I", byte_rate))
dst.write(struct.pack("<H", block_align))
dst.write(struct.pack("<H", bits_per_sample))
dst.write(b"data")
dst.write(struct.pack("<I", data_size))
shutil.copyfileobj(src, dst, length=1024 * 1024)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
async def main():
out_f = RAW_OUT.open("wb")
logf = SSE_LOG_PATH.open("w", encoding="utf-8") if SSE_EVENT_LOG else None
# Create queue according to throttling mode
q: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=QUEUE_MAX_FRAMES)
async with httpx.AsyncClient(timeout=180) as client:
async with client.stream(
"POST",
URL,
headers=HEADERS,
json={**params, "messages": messages},
) as resp:
# Error path: read entire body, print JSON if available
if resp.is_error:
body = await resp.aread()
try:
txt = body.decode()
except UnicodeDecodeError:
txt = repr(body)
print(f"Request failed ({resp.status_code}):\n{txt}")
raise httpx.HTTPStatusError(
f"{resp.status_code} Error",
request=resp.request,
response=resp,
)
consumer_task = asyncio.create_task(playback_consumer(q, out_f))
try:
await sse_audio_producer(resp, q, logf)
await consumer_task
finally:
out_f.close()
if logf:
logf.close()
if __name__ == "__main__":
try:
asyncio.run(main())
print("\n[done] stream complete")
raw_to_wav(RAW_OUT, WAV_OUT)
print(f"[done] wrote {WAV_OUT.name}")
except httpx.RequestError as exc:
print(f"Transport error: {exc}")
raise
except httpx.HTTPStatusError as exc:
print(f"HTTPStatusError: {exc}")
raise
This expects your OPENAI_API_KEY as an environment variable, but youâll note no SDK module hiding whatâs happening.
As configured, and for configuring globals:
For robust streaming with real-time playback behavior, leave THROTTLE_REALTIME = True.
The queue is capped to ~0.5 s. The consumer starts after priming that prebuffer and then sleeps per frame to maintain 24 kHz.
The producer blocks when the queue is full, which backs up through httpx to the TCP layer, naturally rate-limiting ingress to the pace of playback.
writing to a fixed audio-24kHz-mono.raw in realtime, then I added a call to a rewriter to give you wav with a header.
For âconsume as fast as deliveredâ (no http stream backpressure), set THROTTLE_REALTIME = False.
The queue becomes unbounded and the consumer writes immediately with no sleep. The script finishes as quickly as the network and server can deliver.
sse-log.txt
Each event is logged before any processing with a monotonic timestamp. This captures the timing as delivered over the network, which you can analyze to understand transcript pacing independent of your audio pipeline and the nature of the SSE stream from the endpoint.
Base64 handling
Events are decoded as complete base64 units. No cross-event reservoir seems needed, and here, we arenât trying to break down a continuous base64 that non-stream would deliver.
Tech Notes:
A simulated playback rate?
Yes, the consumer intentionally enforces a âreal-timeâ audio rate even though it writes to a file.
After writing each frame (960 bytes â 20 ms), the consumer performs await asyncio.sleep(len(item) / BYTES_PER_SEC), which is 20 ms per frame.
That sleep is the pacer; it makes the consumer behave like a real audio device with a fixed consumption rate.
Because of the pacer, the programâs wall-clock duration tends to match the audio duration (plus brief startup and teardown), even if the server could send faster.
This shows networking you can code up so your device memory isnât overwhelmed with input.
Async + http backpressure
Because the producer isnât reading when on âpauseâ, httpx doesnât pull more data from the socket. The OS receive buffer fills, the TCP receive window shrinks, and standard TCP flow control tells the server to slow down or pause.
Net effect: the slow consumer propagates backpressure all the way to the remote sender at the TCP layer. Close communication with the https event stream.
What happens in this code if the server is faster than real-time?
The producer decodes frames faster than the consumer can sleep-drain them.
The queue eventually fills to MAX_BUFFER_FRAMES. At that point q.put blocks, which stops the producer from reading the socket, and TCP backpressure throttles the server. The pipeline becomes strictly âreal-time.â
What happens if the server is slower than real-time (the full-size audio models, wifi)?
The consumer starts with a prebuffer (about PREBUFFER_FRAMES frames). That aims to absorb jitter and avoid underruns.
If the serverâs production falls behind sustained, the queue will drain toward empty; the consumer will block on q.get waiting for the next frame, and will write as frames arrive.
You will have device constraints to optimize - how much extra safety you want to prebuffer, but you arenât decoding full-blown audio from mp3, so you have a lot more freedom to increase resiliance still.
Probably I donât understand your issue well, in this case discard my answer⌠butâŚ
How do you get base64 audio in/out?
have a look at this.
function convertiTestoInAudio(response) {
const apikey = localStorage.getItem("openaikey");
console.log(apikey);
const prompt = response;
const selectedvoice = "nova";
if (prompt) {
fetch("https://api.openai.com/v1/audio/speech", {
method: "POST",
headers: {
Authorization: `Bearer ${apikey}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
model: "gpt-4o-mini-tts",
input: prompt,
response_format: "wav",
voice: selectedvoice
})
})
.then((response) => response.blob())
.then((blob) => {
const audioUrl = URL.createObjectURL(blob);
const audioPlayer = document.getElementById("audioPlayer");
audioPlayer.src = audioUrl;
audioPlayer.play();
// Aggiungi l'evento per riavviare il riconoscimento dopo la riproduzione
audioPlayer.addEventListener("ended", () => {
recognition.start(); // Riavvia il riconoscimento vocale
});
})
.catch((error) => {
console.error("Error while converting TTS: ", error);
});
} else {
alert("Please insert a text prompt before converting.");
}
}
This routine takes a prompt in (response) coming from speech recognition transcript, and generates an audio output in WAV form, plays it and then restarts recognition.
I know it is not realtime, but the output has a delay of 2-3 sec maximum, i think itâs affordable.
You may try something like this, customized for your needsâŚ