I hit this bug - somebody forgot to f their strings? Also, this duration is different than the API reference of 2-10s:
Response body: {
“error”: {
“message”: “Known speaker references has duration {duration_s} seconds, but must be between 1.2 and 10.0 seconds”,
“type”: “invalid_request_error”,
“param”: “known_speaker_references”,
“code”: “invalid_value”
}
}
So to prevent ever seeing that, I truncated the length of any input references in code instead of going back and editing my speaker samples. Then convert them to wav, thus no need for any magic in determining MIME types in sending base64 data URLs.
The returned “done” transcription in stream mode does not have the speakers being referenced, you have to capture undocumented SSE data: events - which I document.
Here is this application’s log of events when using speaker diarization, as saved to a JSON. There are ONLY “transcript.text.segment” events, not “also” deltas, like the docs say.
{"type": "transcript.text.segment", "text": " Hi, we're back. You're listening to Car Talk with us,", "speaker": "Ray", "start": 3.412, "end": 5.312, "id": "seg_0"}
{"type": "transcript.text.segment", "text": " Click and Clack, the Tappet Brothers.", "speaker": "Ray", "start": 5.362, "end": 6.612, "id": "seg_1"}
{"type": "transcript.text.segment", "text": " And we're here to talk about cars, car repair,", "speaker": "Ray", "start": 6.7620000000000005, "end": 9.362, "id": "seg_2"}
{"type": "transcript.text.segment", "text": " and the answer to last week's puzzler.", "speaker": "Ray", "start": 9.612, "end": 12.362000000000002, "id": "seg_3"}
{"type": "transcript.text.segment", "text": " Now, this was sort of an anthropological, geological,", "speaker": "Ray", "start": 12.962, "end": 16.712, "id": "seg_4"}
{"type": "transcript.text.segment", "text": " algebraic, and above all,", "speaker": "Ray", "start": 16.862000000000002, "end": 18.962, "id": "seg_5"}
{"type": "transcript.text.segment", "text": " obfuscational puzzle.", "speaker": "Ray", "start": 19.712, "end": 21.462, "id": "seg_6"}
{"type": "transcript.text.segment", "text": " Yeah?", "speaker": "Tom", "start": 21.462, "end": 21.812, "id": "seg_7"}
...
{"type": "transcript.text.done", "text": "Hi, we're back. You're listening to Car Talk with us, Click and Clack, the Tappet Brothers. And we're here to talk about cars, car repair, and the answer to last week's puzzler. Now, this was sort of an anthropological, geological, algebraic, and above all, obfuscational puzzle. Yeah? Yeah, I think so. I don't remember any of that. This was from the Blind Them with Footwork collection.\nYeah?...", "usage": {"type": "tokens", "total_tokens": 14073, "input_tokens": 6188, "input_token_details": {"text_tokens": 407, "audio_tokens": 5781}, "output_tokens": 7885}}
So essentially, the assembled stream is the format you need, having
“speaker”, “start”, “end”, and “id”: “seg_0” for your use. A JSONL - a per-line JSON file, would be a good output.
Here’s code to make your diarize request (and get billed for input text tokens you never sent), doing all the observable heavy lifting for you. No errors on the input, and the function’s server_vad has defaults that you can over-ride with crazy numbers again by function parameter if you must.
import os
import asyncio
import base64 # needed for data URLs
import json
import io
import wave
from datetime import datetime
import httpx
import aiofiles
try:
import av # type: ignore
except ModuleNotFoundError as exc:
raise ModuleNotFoundError(
"Speaker reference conversion/truncation requires PyAV. Install with: pip install av"
) from exc
DEBUG_DUMP_SPEAKER_REFERENCE_WAVS = False
def _convert_audio_file_to_wav_24khz_16bit_mono_bytes(
input_file_path: str,
*,
max_duration_s: float = 9.9,
target_sample_rate_hz: int = 24_000,
) -> bytes:
"""
Decode an audio file and return WAV bytes (PCM 16-bit, 24kHz, mono),
truncated to at most max_duration_s seconds.
This returns a complete .wav FILE payload (RIFF header + PCM), suitable
for base64 embedding in: data:audio/wav;base64,...
Requires: pip install av
"""
if max_duration_s <= 1.2:
raise ValueError("max_duration_s must be > 1.2 seconds, up to 10s")
target_channels = 1
target_sample_width_bytes = 2 # s16 => 2 bytes/sample
max_samples = int(max_duration_s * target_sample_rate_hz)
samples_written = 0
pcm = bytearray()
def as_frame_list(resampled) -> list:
if resampled is None:
return []
if isinstance(resampled, list):
return resampled
return [resampled]
def append_s16_mono_frame(out_frame) -> bool:
"""
Append only the valid audio bytes for this frame (no padding),
and respect max_samples truncation.
Returns True if we've reached the truncation limit.
"""
nonlocal samples_written
if out_frame.samples <= 0:
return samples_written >= max_samples
# Enforce the output we asked the resampler for.
if out_frame.layout.nb_channels != target_channels:
raise ValueError(
f"Unexpected channel count after resample for '{input_file_path}': "
f"{out_frame.layout.nb_channels} (expected {target_channels})"
)
if out_frame.format.name not in ("s16", "s16p"):
raise ValueError(
f"Unexpected sample format after resample for '{input_file_path}': "
f"{out_frame.format.name} (expected s16)"
)
frame_sample_rate = getattr(out_frame, "sample_rate", 0) or 0
if frame_sample_rate and frame_sample_rate != target_sample_rate_hz:
raise ValueError(
f"Unexpected sample rate after resample for '{input_file_path}': "
f"{frame_sample_rate} (expected {target_sample_rate_hz})"
)
if len(out_frame.planes) != 1:
raise ValueError(
f"Unexpected plane count after resample for '{input_file_path}': "
f"{len(out_frame.planes)} (expected 1 for mono)"
)
bytes_per_sample = out_frame.format.bytes
if bytes_per_sample != target_sample_width_bytes:
raise ValueError(
f"Unexpected bytes/sample after resample for '{input_file_path}': "
f"{bytes_per_sample} (expected {target_sample_width_bytes})"
)
# PyAV planes expose the entire allocated buffer; only the first
# (samples * bytes/sample * channels) bytes are valid audio samples.
full_expected_bytes = out_frame.samples * bytes_per_sample * target_channels
plane_bytes = bytes(out_frame.planes[0])
if len(plane_bytes) < full_expected_bytes:
raise ValueError(
f"Audio plane too small for '{input_file_path}': "
f"{len(plane_bytes)} bytes, expected at least {full_expected_bytes}"
)
# Drop any padding beyond the valid sample payload.
plane_bytes = plane_bytes[:full_expected_bytes]
remaining_samples = max_samples - samples_written
if remaining_samples <= 0:
return True
take_samples = min(out_frame.samples, remaining_samples)
take_bytes = take_samples * bytes_per_sample * target_channels
pcm.extend(plane_bytes[:take_bytes])
samples_written += take_samples
return samples_written >= max_samples
with av.open(input_file_path) as container:
audio_stream = next((s for s in container.streams if s.type == "audio"), None)
if audio_stream is None:
raise ValueError(f"No audio stream found in reference file: {input_file_path}")
resampler = av.audio.resampler.AudioResampler( # pylint: disable=no-member
format="s16", # packed s16
layout="mono", # downmix to mono
rate=target_sample_rate_hz,
)
done = False
for packet in container.demux(audio_stream):
for frame in packet.decode():
for out_frame in as_frame_list(resampler.resample(frame)):
done = append_s16_mono_frame(out_frame)
if done:
break
if done:
break
if done:
break
# Flush resampler if we didn't truncate already.
if not done:
for out_frame in as_frame_list(resampler.resample(None)):
done = append_s16_mono_frame(out_frame)
if done:
break
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, "wb") as wf:
# pylint: disable=no-member
wf.setnchannels(target_channels)
wf.setsampwidth(target_sample_width_bytes)
wf.setframerate(target_sample_rate_hz)
wf.writeframes(bytes(pcm))
wav_bytes = wav_buffer.getvalue()
if DEBUG_DUMP_SPEAKER_REFERENCE_WAVS:
stamp_ms = datetime.now().strftime("%Y%m%d-%H%M%S-%f")[:-3]
base = os.path.splitext(os.path.basename(input_file_path))[0]
debug_filename = f"{base}__ref__{stamp_ms}.wav"
with open(debug_filename, "wb") as f:
f.write(wav_bytes)
return wav_bytes
async def _reference_to_data_url(reference: str) -> str:
"""
Convert a local audio file reference to a WAV data URL (24kHz/16-bit/mono),
truncated to <= 9.9 seconds, to satisfy diarize known speaker reference limits.
If a data: URL is provided, it is passed through unchanged.
"""
if reference.startswith("data:"):
return reference
wav_bytes = await asyncio.to_thread(
_convert_audio_file_to_wav_24khz_16bit_mono_bytes,
reference,
max_duration_s=9.9,
target_sample_rate_hz=24_000,
)
encoded = base64.b64encode(wav_bytes).decode("ascii")
return f"data:audio/wav;base64,{encoded}"
async def async_transcribe_audio(
input_file_path: str,
segments_jsonl_file_path: str,
transcript_file_path: str,
*,
known_speaker_names: list[str] | None = None,
known_speaker_references: list[str] | None = None,
chunking_strategy: str = "server_vad",
prefix_padding_ms: int = 300,
silence_duration_ms: int = 200,
threshold: float = 0.5,
) -> None:
"""Diarize flac, mp3, mp4, mpeg, mpga, m4a, ogg, wav, or webm.
This uses:
- POST https://api.openai.com/v1/audio/transcriptions
- model=gpt-4o-transcribe-diarize
- response_format=diarized_json
- stream=true
chunking_strategy:
- "auto" (send the literal string "auto")
- otherwise send a server_vad JSON object built from:
prefix_padding_ms, silence_duration_ms, threshold
Outputs:
- segments_jsonl_file_path: JSONL of transcript.text.segment events (speaker-labeled segments)
- transcript_file_path: transcript.text.done["text"] (full transcript, no speaker labels)
"""
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY is not set in the environment.")
url = "https://api.openai.com/v1/audio/transcriptions"
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
}
if chunking_strategy == "auto":
chunking_strategy_field_value = "auto"
else:
chunking_strategy_field_value = json.dumps(
{
"type": "server_vad",
"prefix_padding_ms": prefix_padding_ms,
"silence_duration_ms": silence_duration_ms,
"threshold": threshold,
},
ensure_ascii=False,
)
known_speaker_names = known_speaker_names or []
known_speaker_references = known_speaker_references or []
print("--- Requesting diarized transcription (streaming SSE).")
print(f"--- Input audio: '{input_file_path}'")
print(f"--- Output diarized segments (JSONL): '{segments_jsonl_file_path}'")
print(f"--- Output transcript text: '{transcript_file_path}'")
final_usage: dict = {}
timeout = httpx.Timeout(600.0, read=None)
async with httpx.AsyncClient(timeout=timeout) as client:
with open(input_file_path, "rb") as audio_file:
files: list[tuple[str, tuple[object, ...]]] = [
(
"file",
(os.path.basename(input_file_path), audio_file),
),
("model", (None, "gpt-4o-transcribe-diarize")),
("language", (None, "en")),
("response_format", (None, "diarized_json")),
("chunking_strategy", (None, chunking_strategy_field_value)),
("stream", (None, "true")),
]
if known_speaker_names:
for name, ref in zip(known_speaker_names, known_speaker_references):
data_url = await _reference_to_data_url(ref)
files.append(("known_speaker_names[]", (None, name)))
files.append(("known_speaker_references[]", (None, data_url)))
async with client.stream("POST", url, headers=headers, files=files) as response:
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
error_body = await response.aread()
print(f"HTTP error occurred: {exc}")
print(f"Response body: {error_body.decode('utf-8', errors='replace')}")
return
print(f"--- API call succeeded (HTTP {response.status_code}). Streaming diarized events...")
pending_data_lines: list[str] = []
async with aiofiles.open(segments_jsonl_file_path, "w") as segments_file, aiofiles.open(
transcript_file_path, "w"
) as transcript_file:
async def handle_sse_data(data: str) -> bool:
nonlocal final_usage
if data == "[DONE]":
return False
event = json.loads(data)
event_type = event["type"]
if event_type == "transcript.text.segment":
await segments_file.write(json.dumps(event, ensure_ascii=False) + "\n")
await segments_file.flush()
return True
if event_type == "transcript.text.done":
await transcript_file.write(event["text"])
await transcript_file.flush()
final_usage = event["usage"]
return True
# transcript.text.delta is not used for diarize output.
return True
async for line in response.aiter_lines():
if not line:
if pending_data_lines:
data = "\n".join(pending_data_lines)
pending_data_lines.clear()
keep_going = await handle_sse_data(data)
if not keep_going:
break
continue
if line.startswith("data:"):
pending_data_lines.append(line[len("data:") :].strip())
continue
# Ignore non-data SSE fields (event:, id:, retry:, comments).
continue
if pending_data_lines:
data = "\n".join(pending_data_lines)
await handle_sse_data(data)
print(f"--- Diarized segments saved to '{segments_jsonl_file_path}'.")
print(f"--- Transcript saved to '{transcript_file_path}'.")
print("--- Usage:")
print(json.dumps(final_usage, ensure_ascii=False, indent=2))
async def no_references_diarize() -> None:
input_file_path = "audio1.mp3"
segments_jsonl_file_path = input_file_path + "-segments.jsonl"
transcript_file_path = input_file_path + "-transcript.txt"
await async_transcribe_audio(input_file_path, segments_jsonl_file_path, transcript_file_path)
async def main() -> None:
input_file_path = "cartalk_4min.mp3"
segments_jsonl_file_path = input_file_path + "-segments.jsonl"
transcript_file_path = input_file_path + "-transcript.txt"
await async_transcribe_audio(
input_file_path, # positional: input file
segments_jsonl_file_path, # positional: diarized segments JSONL output
transcript_file_path, # positional: transcript text output
known_speaker_names=[
"Ray",
"Tom",
"Caller",
],
known_speaker_references=[
"cartalk_ray.mp3",
"cartalk_tom.mp3",
"cartalk_callers.mp3",
],
)
if __name__ == "__main__":
asyncio.run(main())
At the end, replace my call’s file and speakers from a radio show with your own. Output of the function is files.
Requires: pip install av