Python integration of real time?

lets create a base python tamplate for microphone audio in, procesing (with function calls) and output audio, play
lets figure it out : this code does not work, mabye somone can make it work? we need a template

import asyncio
import json
import os
import base64
from dotenv import load_dotenv
import websockets
from pydub import AudioSegment
import io
import numpy as np

# Load environment variables
load_dotenv()

OPENAI_API_KEY =os.environ["OPENAI_API_KEY"]

if not OPENAI_API_KEY:
    print('Please set your OPENAI_API_KEY in the .env file')
    exit(1)

URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"


def save_to_file(data, filename, mode='w'):
    """
    Save data to a file.

    :param data: The data to save. Can be string, bytes, or any serializable object.
    :param filename: The name of the file to save to.
    :param mode: The file mode. 'w' for text, 'wb' for binary.
    """
    try:
        if mode == 'w':
            with open(filename, mode, encoding='utf-8') as file:
                if isinstance(data, str):
                    file.write(data)
                else:
                    json.dump(data, file, indent=2)
        elif mode == 'wb':
            with open(filename, mode) as file:
                file.write(data)
        print(f"Data saved to {filename}")
    except Exception as e:
        print(f"Error saving to file: {e}")


async def send_event(websocket, event):
    await websocket.send(json.dumps(event))


def audio_to_item_create_event(audio_bytes: bytes) -> dict:
    # Load the audio file from the byte stream
    audio = AudioSegment.from_file(io.BytesIO(audio_bytes))

    # Resample to 24kHz mono pcm16
    pcm_audio = audio.set_frame_rate(24000).set_channels(1).set_sample_width(2).raw_data

    # Encode to base64 string
    pcm_base64 = base64.b64encode(pcm_audio).decode()

    event = {
        "type": "conversation.item.create",
        "item": {
            "type": "message",
            "role": "user",
            "content": [{
                "type": "input_audio",
                "audio": pcm_base64
            }]
        }
    }
    return event


async def handle_function_call(websocket, function_call):
    print('Function call:', function_call)
    # Here you would typically call the actual function and send the result back
    function_result_event = {
        "type": "conversation.item.create",
        "item": {
            "type": "function_call_output",
            "function_call_id": function_call["id"],
            "output": json.dumps({
                "temperature": 22,
                "unit": "celsius",
                "description": "Partly cloudy"
            })
        }
    }
    await send_event(websocket, function_result_event)

    # Request another response after sending function result
    response_create_event = {
        "type": "response.create",
        "response": {
            "modalities": ["text", "audio"],
        }
    }
    await send_event(websocket, response_create_event)


async def handle_audio_response(audio_base64):
    audio_bytes = base64.b64decode(audio_base64)
    print(f"Received audio response of {len(audio_bytes)} bytes")
    # Save the audio response to a file
    save_to_file(audio_bytes, "assistant_response.wav", mode='wb')


def float_to_16bit_pcm(float32_array):
    """Convert Float32Array to 16-bit PCM."""
    int16_array = (float32_array * 32767).astype(np.int16)
    return int16_array.tobytes()


def base64_encode_audio(float32_array):
    """Convert Float32Array to base64-encoded PCM16 data."""
    pcm_data = float_to_16bit_pcm(float32_array)
    return base64.b64encode(pcm_data).decode()


async def stream_audio_files(websocket, file_paths):
    """Stream audio files to the API."""
    for file_path in file_paths:
        audio = AudioSegment.from_file(file_path)
        audio = audio.set_frame_rate(24000).set_channels(1).set_sample_width(2)

        # Convert to numpy array
        samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0

        # Encode and send
        base64_chunk = base64_encode_audio(samples)
        await send_event(websocket, {
            "type": "input_audio_buffer.append",
            "audio": base64_chunk
        })

    # Commit the audio buffer
    await send_event(websocket, {"type": "input_audio_buffer.commit"})


async def main():
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "OpenAI-Beta": "realtime=v1",
    }

    async with websockets.connect(URL, extra_headers=headers) as websocket:
        print('Connected to OpenAI Realtime API')

        # Set up the session
        session_update_event = {
            "type": "session.update",
            "session": {
                "instructions": "You are a helpful AI assistant. Respond concisely.",
                "tools": [
                    {
                        "name": "get_current_weather",
                        "description": "Get the current weather in a given location",
                        "parameters": {
                            "type": "object",
                            "properties": {
                                "location": {
                                    "type": "string",
                                    "description": "The city and state, e.g. San Francisco, CA",
                                },
                                "unit": {
                                    "type": "string",
                                    "enum": ["celsius", "fahrenheit"]
                                },
                            },
                            "required": ["location", "unit"],
                        },
                    },
                ],
                "voice": "alloy",
                "turn_detection": "server_vad",
            },
        }
        await send_event(websocket, session_update_event)

        # Send a user text message
        user_message_event = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [
                    {
                        "type": "input_text",
                        "text": "What's the weather like in New York?"
                    }
                ]
            }
        }
        await send_event(websocket, user_message_event)

        # Stream audio files
        audio_files = [
            './path/to/sample1.wav',
            './path/to/sample2.wav',
            './path/to/sample3.wav'
        ]
        await stream_audio_files(websocket, audio_files)

        # Request a response
        response_create_event = {
            "type": "response.create",
            "response": {
                "modalities": ["text", "audio"],
            }
        }
        await send_event(websocket, response_create_event)

        try:
            while True:
                message = await websocket.recv()
                event = json.loads(message)
                print('Received event:', event['type'])

                if event['type'] == 'conversation.item.created':
                    if event['item']['type'] == 'message' and event['item']['role'] == 'assistant':
                        for content in event['item']['content']:
                            if content['type'] == 'text':
                                print('Assistant:', content['text'])
                                save_to_file(content['text'], "assistant_response.txt")
                            elif content['type'] == 'audio':
                                await handle_audio_response(content['audio'])
                    elif event['item']['type'] == 'function_call':
                        await handle_function_call(websocket, event['item']['function_call'])
                elif event['type'] == 'error':
                    print('Error:', event['error'])
                    save_to_file(event['error'], "error_log.json")
                elif event['type'] == 'input_audio_buffer.speech_started':
                    print('Speech started')
                elif event['type'] == 'input_audio_buffer.speech_stopped':
                    print('Speech stopped')
                elif event['type'] == 'input_audio_buffer.committed':
                    print('Audio buffer committed')

        except websockets.exceptions.ConnectionClosed:
            print('Disconnected from OpenAI Realtime API')


if __name__ == "__main__":
    asyncio.run(main())

@xfluids GitHub - openai/openai-realtime-console: React App for inspecting, building and debugging with the Realtime API

2 Likes

its not python integration: anyways:
here is txt summary of
GitHub - openai/openai-realtime-console: React App for inspecting, building and debugging with the Realtime API

and here is a python script to sumarise whole code bases:

you can use gemini for long context to process it
Untitled prompt | Google AI Studio

import openai
import os
import asyncio
import json
import websockets

# Set your API key.  Best practice is to store it in an environment variable
OPENAI_API_KEY =  os.environ["OPENAI_API_KEY"]
if not OPENAI_API_KEY:
    print("API key not set. Please set OPENAI_API_KEY environment variable.")
    exit(1)  # Exit if the API key is not found


MODEL = "gpt-4o-realtime-preview-2024-10-01"
URL = f"wss://api.openai.com/v1/realtime?model={MODEL}"

async def test_realtime_api():
    try:
        async with websockets.connect(URL, extra_headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "OpenAI-Beta": "realtime=v1"}) as websocket:
            print("Connected to Realtime API")

            message = {
                "type": "response.create",
                "response": {
                    "modalities": ["text"],
                    "instructions": "Say hello"
                }
            }
            await websocket.send(json.dumps(message))

            try:
                response_str = await websocket.recv()
                response = json.loads(response_str)
                print(f"Received response: {response}")

            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
            except websockets.exceptions.ConnectionClosedError as e:
                print(f"Connection closed unexpectedly: {e}")
            except Exception as e:
                print(f"An error occurred during receive: {e}")

    except websockets.exceptions.InvalidHandshake as e:
        print(f"Invalid handshake (check API key and headers): {e}")

    except Exception as e:  # Catching connection errors
        print(f"Error connecting to the Realtime API: {e}")



async def main():
    await test_realtime_api()



if __name__ == "__main__":
    asyncio.run(main())

Invalid handshake (check API key and headers): server rejected WebSocket connection: HTTP 403

Process finished with exit code 0

Wait for access

4 Likes

@RonaldGRuckus lol

this is the way

3 Likes

but we can not wait, robots are waiting, people are dying…

1 Like

This code worked for me. Its not fully realtime, but at least connects to the right endpoint and sends and receives audio or text.

(To implement I just copied the documentation to claude, asked for a python version and iterate few times)


import asyncio
import websockets
import json
import pyaudio
import wave
import base64
import logging
import os
#from dotenv import load_dotenv

# Load environment variables
#load_dotenv()

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Audio configuration
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 24000

# WebSocket configuration
WS_URL = "wss://api.openai.com/v1/realtime"
MODEL = "gpt-4o-realtime-preview-2024-10-01"
OPENAI_API_KEY = "..."

"""
This important message is lost with all log information:
Enter 't' for text, 'a' for audio, or 'q' to quit:
"""

class RealtimeClient:
    def __init__(self):
        logger.info("Initializing RealtimeClient")
        self.ws = None
        self.p = pyaudio.PyAudio()
        self.stream = None
        self.audio_buffer = b''

    async def connect(self):
        logger.info(f"Connecting to WebSocket: {WS_URL}")
        headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
            "OpenAI-Beta": "realtime=v1"
        }
        self.ws = await websockets.connect(f"{WS_URL}?model={MODEL}", extra_headers=headers)
        logger.info("Successfully connected to OpenAI Realtime API")

    async def send_event(self, event):
        logger.debug(f"Sending event: {event}")
        await self.ws.send(json.dumps(event))
        logger.debug("Event sent successfully")

    async def receive_events(self):
        logger.info("Starting to receive events")
        async for message in self.ws:
            logger.debug(f"Received raw message: {message}")
            event = json.loads(message)
            await self.handle_event(event)

    async def handle_event(self, event):
        event_type = event.get("type")
        logger.info(f"Handling event of type: {event_type}")

        if event_type == "error":
            logger.error(f"Error event received: {event['error']['message']}")
        elif event_type == "response.text.delta":
            logger.debug(f"Text delta received: {event['delta']}")
            print(event["delta"], end="", flush=True)
        elif event_type == "response.audio.delta":
            logger.debug(f"Audio delta received, length: {len(event['delta'])}")
            audio_data = base64.b64decode(event["delta"])
            self.audio_buffer += audio_data
        elif event_type == "response.audio.done":
            logger.info("Audio response complete, playing audio")
            self.play_audio(self.audio_buffer)
            self.audio_buffer = b''
        else:
            logger.info(f"Received other event type: {event_type}")

    def start_audio_stream(self):
        logger.info("Starting audio input stream")
        self.stream = self.p.open(format=FORMAT,
                                  channels=CHANNELS,
                                  rate=RATE,
                                  input=True,
                                  frames_per_buffer=CHUNK)
        logger.debug("Audio input stream started successfully")

    def stop_audio_stream(self):
        logger.info("Stopping audio input stream")
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
        logger.debug("Audio input stream stopped successfully")

    def record_audio(self, duration):
        logger.info(f"Recording audio for {duration} seconds")
        frames = []
        for i in range(0, int(RATE / CHUNK * duration)):
            data = self.stream.read(CHUNK)
            frames.append(data)
            if i % 10 == 0:  # Log every 10th frame
                logger.debug(f"Recorded frame {i}")
        audio_data = b''.join(frames)
        logger.info(f"Audio recording complete, total size: {len(audio_data)} bytes")
        return audio_data

    def play_audio(self, audio_data):
        logger.info(f"Playing audio, size: {len(audio_data)} bytes")
        stream = self.p.open(format=FORMAT,
                             channels=CHANNELS,
                             rate=RATE,
                             output=True)
        stream.write(audio_data)
        stream.stop_stream()
        stream.close()
        logger.debug("Audio playback complete")

    async def send_text(self, text):
        logger.info(f"Sending text message: {text}")
        event = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [{
                    "type": "input_text",
                    "text": text
                }]
            }
        }
        await self.send_event(event)
        logger.debug("Text message sent, creating response")
        await self.send_event({"type": "response.create"})

    async def send_audio(self, duration):
        logger.info(f"Preparing to send audio of duration: {duration} seconds")
        self.start_audio_stream()
        audio_data = self.record_audio(duration)
        self.stop_audio_stream()

        base64_audio = base64.b64encode(audio_data).decode('utf-8')
        logger.debug(f"Audio encoded to base64, length: {len(base64_audio)}")
        
        event = {
            "type": "input_audio_buffer.append",
            "audio": base64_audio
        }
        await self.send_event(event)
        logger.debug("Audio buffer appended, committing buffer")
        await self.send_event({"type": "input_audio_buffer.commit"})
        logger.debug("Audio buffer committed, creating response")
        await self.send_event({"type": "response.create"})

    async def run(self):
        logger.info("Starting RealtimeClient run")
        await self.connect()
        
        # Create a task for receiving events
        receive_task = asyncio.create_task(self.receive_events())
        
        logger.info("Sending initial message to start the conversation")
        await self.send_event({
            "type": "response.create",
            "response": {
                "modalities": ["text", "audio"],
                "instructions": "You are a helpful AI assistant. Respond to the user's messages.",
            }
        })

        try:
            while True:
                command = await asyncio.get_event_loop().run_in_executor(None, input, "Enter 't' for text, 'a' for audio, or 'q' to quit: ")
                if command == 't':
                    text = await asyncio.get_event_loop().run_in_executor(None, input, "Enter your message: ")
                    await self.send_text(text)
                elif command == 'a':
                    logger.info("Audio input selected")
                    print("Recording for 5 seconds...")
                    await self.send_audio(5)
                elif command == 'q':
                    logger.info("Quit command received")
                    break
                else:
                    logger.warning(f"Invalid command received: {command}")

                # Give some time for the response to be processed
                await asyncio.sleep(0.1)

        except Exception as e:
            logger.error(f"An error occurred: {e}")
        finally:
            logger.info("Ending conversation and closing connection")
            receive_task.cancel()
            try:
                await receive_task
            except asyncio.CancelledError:
                pass
            await self.ws.close()

async def main():
    logger.info("Starting main function")
    client = RealtimeClient()
    try:
        await client.run()
    except Exception as e:
        logger.error(f"An error occurred in main: {e}")
    finally:
        logger.info("Main function completed")

if __name__ == "__main__":
    logger.info("Script started")
    asyncio.run(main())
    logger.info("Script completed")
2 Likes

…I just tested 2 “hello, how are you?” messages to test this code and I spent 0.5$… I dont know if this “realtime” is too expensive or I did (the coded did) something wrong…

I was able to get this to work as well. A short audio conversation (4 or 5 exchanges) about dominos pizza cost me $0.27

i think the issue is it is counting time, even if its silence:, maybe, only record auido if volume is higher then x level, and dont use real time, but audio in: ideas?

Thanks for the code… Helped me after 2 days wandering through the wilderness.

Here is a thought. I have multiple app users that I want to use this with. So we need to keep conversations separate so they flow and also have the proper “backstory”. How does this work in this architecture??

Do we need multiple threads running this code? IS there some way to specify a unique session for each??

Thanks and I look forward to any assistance in direction.

HERE IS LINK * the video is only from GEMINI INTEGRATION: but we could use it to create hybrid solution

  1. Gemini-based solution:
  • Processes video frames and transcribes speech from a microphone.
  • Sends transcription to Gemini model for generating audio output.
  • Avoids sending unnecessary silence, optimizing efficiency.
  • Main advantage: Cost-effective compared to OpenAI.
  1. OpenAI advantage:
  • Provides high-quality audio output that can simulate emotions.
  • Main value: Real-time conversational quality and emotional simulation in audio.
  1. Proposed hybrid solution:
  • Step 1: Use Google for transcription (lower cost).
  • Step 2: Send the transcription to OpenAI for high-quality emotional audio generation.
  • Goal: Retain OpenAI’s premium audio output while minimizing costs by outsourcing transcription.
  1. Drawbacks:
  • Slightly longer wait time due to integrating two services.
  • Loss of some emotional nuance in real-time transcription due to delayed audio generation.
  1. Benefit:
  • Significant cost reduction while maintaining the superior quality of OpenAI’s audio.

HERE IS LINK * the video is only from GEMINI INTEGRATION: but we could use it to create hybrid solution

I dont know. I used claude cause I dont know too much about websockets, so I cant answer your questions.