here is a complete script for people if they need for a stand alone test. you can adjust the buffer on it etc.
import os
import requests
import openai
from pydub import AudioSegment
from pydub.playback import play
import io
from dotenv import load_dotenv
from queue import Queue
import threading
import time
Load environment variables from .env file
load_dotenv()
Set OpenAI API key
openai.api_key = os.getenv(âOPENAI_API_KEYâ)
def stream_audio_from_openai(api_url, params, headers, audio_queue):
response = requests.post(api_url, json=params, headers=headers, stream=True)
response.raise_for_status()
try:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
audio_queue.put(chunk)
except Exception as e:
print(f"Failed to stream audio: {e}")
finally:
audio_queue.put(None) # Sentinel to indicate the end of stream
def play_audio_from_queue(audio_queue, initial_buffer_size=32768):
audio_buffer = io.BytesIO()
buffer_size = 0
playback_started = False
while True:
chunk = audio_queue.get()
if chunk is None:
break
if chunk:
audio_buffer.write(chunk)
buffer_size += len(chunk)
if buffer_size >= initial_buffer_size and not playback_started:
playback_started = True
audio_buffer.seek(0)
try:
audio_segment = AudioSegment.from_file(audio_buffer, format="mp3")
play(audio_segment)
audio_buffer.seek(0)
audio_buffer.truncate(0)
buffer_size = 0
except Exception as e:
print(f"Error processing initial buffer: {e}")
audio_buffer.seek(0)
audio_buffer.truncate(0)
buffer_size = 0
# Play any remaining buffered audio
if buffer_size > 0:
audio_buffer.seek(0)
try:
audio_segment = AudioSegment.from_file(audio_buffer, format="mp3")
play(audio_segment)
except Exception as e:
print(f"Error processing remaining buffer: {e}")
def generate_and_play_speech(text):
api_url = âhttps://api.openai.com/v1/audio/speechâ
params = {
âmodelâ: âtts-1â,
âvoiceâ: âalloyâ,
âinputâ: text
}
headers = {
âAuthorizationâ: f"Bearer {os.getenv(âOPENAI_API_KEYâ)}",
âContent-Typeâ: âapplication/jsonâ
}
audio_queue = Queue()
stream_thread = threading.Thread(target=stream_audio_from_openai, args=(api_url, params, headers, audio_queue))
play_thread = threading.Thread(target=play_audio_from_queue, args=(audio_queue, 32768)) # Adjust buffer size here
stream_thread.start()
play_thread.start()
stream_thread.join()
audio_queue.put(None) # Ensure play thread finishes if streaming ends early
play_thread.join()
Example usage
generate_and_play_speech(âHello world! This is a streaming test.â)