Optimized WebSocket-Based OpenAI Real-Time Transcription
Overview
This code provides an optimized implementation of a WebSocket-based OpenAI real-time transcription system. The original version had a few issues, including inconsistent language detection, incomplete transcripts, and unnecessary redundant reconnection logic. This refined version ensures:
- Stable WebSocket Connection: Properly handles errors and prevents excessive reconnection attempts.
- Accurate Audio Processing: Efficiently encodes and sends audio data.
- Error Handling: Avoids redundant reconnection while maintaining stability.
Key Improvements
- Fixed Redundant Reconnection Issue
- Previously, multiple layers of reconnection logic caused infinite reconnection loops.
- Now, the reconnection attempts are capped, preventing unnecessary connections.
- Improved Language Detection Stability
- Instead of relying on automatic detection, the language is explicitly set to English (
language: "en"
). - This prevents OpenAI from incorrectly recognizing other languages when the user is speaking English.
- Optimized Audio Buffer Handling
- The previous implementation sometimes sent incomplete audio buffers.
- Now, it ensures that audio chunks are sent only when they contain sufficient data (100ms threshold).
Refined Code
interface TranscriberCallbacks {
onInterimTranscript: (text: string) => void;
onFinalTranscript: (text: string) => void;
onError: (error: string) => void;
}
export default class OpenAITranscriber {
private ws: WebSocket | null = null;
private transcriptionContext: AudioContext | null = null;
private workletNode: AudioWorkletNode | null = null;
private isManualStop: boolean = false;
private sessionTimeout: number;
private userStream: MediaStream;
private onInterimTranscript: (text: string) => void;
private onFinalTranscript: (text: string) => void;
private onError: (error: string) => void;
private reconnectAttempts = 0;
private maxReconnectAttempts = 3;
constructor(sessionTimeout: number, userStream: MediaStream, callbacks: TranscriberCallbacks) {
this.sessionTimeout = sessionTimeout;
this.userStream = userStream;
this.onInterimTranscript = callbacks.onInterimTranscript;
this.onFinalTranscript = callbacks.onFinalTranscript;
this.onError = callbacks.onError;
}
private async fetchAccessToken(): Promise<string> {
try {
const response = await fetch('/api/openai-token', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ expiresIn: Math.floor(this.sessionTimeout / 1000) })
});
if (!response.ok) throw new Error(await response.text());
const data = await response.json();
return data.client_secret.value;
} catch (error) {
throw new Error(`Token fetch failed: ${error}`);
}
}
public async start(): Promise<void> {
if (this.ws) return;
try {
const token = await this.fetchAccessToken();
this.ws = new WebSocket(`wss://api.openai.com/v1/realtime?intent=transcription`, [
"realtime", `openai-insecure-api-key.${token}`, "openai-beta.realtime-v1"
]);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
this.ws?.send(JSON.stringify({
type: "transcription_session.update",
session: {
input_audio_transcription: { model: "gpt-4o-mini-transcribe", language: "en" },
turn_detection: { prefix_padding_ms: 600, silence_duration_ms: 800, type: "server_vad", threshold: 0.5 }
}
}));
};
this.ws.onmessage = (event) => this.handleMessage(JSON.parse(event.data));
this.ws.onerror = () => this.handleError("WebSocket error");
this.ws.onclose = () => this.reconnect();
await this.initAudioProcessing();
} catch (error) {
this.onError(error instanceof Error ? error.message : 'Unknown error');
this.stop();
}
}
private handleMessage(data: any): void {
if (data.type === 'conversation.item.input_audio_transcription.completed') {
this.onFinalTranscript(data.transcript);
} else if (data.type === 'error') {
this.onError(data.error.message);
}
}
private async initAudioProcessing(): Promise<void> {
this.transcriptionContext = new AudioContext({ sampleRate: 24000 });
const source = this.transcriptionContext.createMediaStreamSource(this.userStream);
await this.transcriptionContext.audioWorklet.addModule('audio-processor.js');
this.workletNode = new AudioWorkletNode(this.transcriptionContext, 'audio-processor');
source.connect(this.workletNode);
this.workletNode.connect(this.transcriptionContext.destination);
let audioBufferQueue = new Int16Array(0);
this.workletNode.port.onmessage = (event) => {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
const currentBuffer = new Int16Array(event.data.audio_data);
audioBufferQueue = this.mergeBuffers(audioBufferQueue, currentBuffer);
if ((audioBufferQueue.length / this.transcriptionContext.sampleRate) * 1000 >= 100) {
const totalSamples = Math.floor(this.transcriptionContext.sampleRate * 0.1);
const finalBuffer = audioBufferQueue.subarray(0, totalSamples);
audioBufferQueue = audioBufferQueue.subarray(totalSamples);
this.ws.send(JSON.stringify({ type: 'input_audio_buffer.append', audio: this.encodeInt16ArrayToBase64(finalBuffer) }));
}
};
}
private mergeBuffers(lhs: Int16Array, rhs: Int16Array): Int16Array {
const mergedBuffer = new Int16Array(lhs.length + rhs.length);
mergedBuffer.set(lhs, 0);
mergedBuffer.set(rhs, lhs.length);
return mergedBuffer;
}
private encodeInt16ArrayToBase64(int16Array: Int16Array): string {
return btoa(String.fromCharCode(...new Uint8Array(int16Array.buffer)));
}
public async stop(): Promise<void> {
this.isManualStop = true;
this.ws?.close();
this.ws = null;
await this.transcriptionContext?.close();
this.transcriptionContext = null;
this.workletNode?.disconnect();
this.workletNode = null;
}
private async reconnect() {
if (++this.reconnectAttempts > this.maxReconnectAttempts) return;
this.isManualStop = false;
await this.start();
}
}
This version of the transcriber ensures better reliability, optimized performance, and robust error handling. Let us know if you need further improvements!