How to save the history on a session while using Real-time API

How we save the conversational history of a session while using Real-time API.? We need to populate the conversational history to newer sessions. So how can we save and pass the history from a previous session?

1 Like

Save and populate conversational history across sessions:

Store History: Saves session history, e.g., messages and responses, in a database or persistent storage, like Redis, MongoDB, or SQL.
Retrieve on New Session: Retrieves the saved history when a new session begins and hands it to the API request’s messages parameter.
Format Your Data: Ensure the returned data is in the OpenAI format (e.g., [{role: “user”, content: “message”}]).
This provides continuity between sessions.

1 Like

Hi I am facing the same issue and need help to resolve it. I am trying to have a realtime conversation where I send speech to openai realtime api.
Can someone please help me on this

My flow is like this
1st I send system_message (as a text) on the websocket. and after sending I’ll check for the acknowledgement.
Next I run a loop to send conversation (text of the history) 1 by 1 and call websocket acknowledgement for each message.
However, for the 1st message I get’s proper acknowledgment response. form this code ack = await asyncio.wait_for(self.websocket.receive_json(), 5)
For the second message in the conversation, there is no error while sending the message. But, next for the acknowledgement “ack = await asyncio.wait_for(self.websocket.receive_json(), 5)” it throughs some exception "‘Exception.with_traceback(tb) –\n set self.traceback to tb and return self.’
"

And next the socket closes the exception.

async def send_raw_audio_stream(
self,
raw_audio_data: bytes,
system_prompt: str,
conversation_history: list = None,
chunk_callback: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None
) → Optional[bytes]:

    if not self.websocket:
        print(ERROR_NO_ACTIVE_CONNECTION)
        return None
    
    try:
        # Import the utility functions
        from src.utils.audio_utils import ensure_valid_audio, raw_to_float32, base64_encode_audio
        
        # Ensure audio is valid (long enough)
        raw_audio_data = ensure_valid_audio(raw_audio_data, min_duration_ms=100, sample_rate=24000)
        
        # Step 1: Send the system prompt
        system_message = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "system",
                "content": [
                    {
                        "type": "input_text",
                        "text": system_prompt
                    }
                ]
            }
        }
        
        print("Sending system prompt...")
        await self.websocket.send_json(system_message)
        print("System prompt sent")
        
        #Wait for acknowledgment
        try:
            response = await asyncio.wait_for(self.websocket.receive_json(), 10)
            print(f"System message acknowledgment: {response}")
        except asyncio.TimeoutError:
            print("No acknowledgment for system message, but continuing")
        except Exception as e:
            print(f"Warning: Error receiving acknowledgment for system message: {e}")
        
        # Step 2: Send conversation history if provided
        if conversation_history and len(conversation_history) > 0:
            print(f"Sending conversation history ({len(conversation_history)} messages)...")
            
            for idx, message in enumerate(conversation_history):
                role = message.get("role", "user")
                content = message.get("content", "")
                
                history_message = {
                    "type": "conversation.item.create",
                    "item": {
                        "type": "message",
                        "role": role,
                        "content": [
                            {
                                "type": "input_text",
                                "text": content
                            }
                        ]
                    }
                }
                
                
                await self.websocket.send_json(history_message)
                print(f"Sent {role} message {idx+1}/{len(conversation_history)}")
                
                # Brief pause between messages
                await asyncio.sleep(0.1)
                
                # Try to get acknowledgment
                try:
                    ack = await asyncio.wait_for(self.websocket.receive_json(), 5)
                    print(f"Message acknowledgment received")
                except (asyncio.TimeoutError, Exception):
                    print("No immediate acknowledgment for history message, continuing")
        
        # Step 3: Create a user message with empty text - this will be associated with the audio
        user_message = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [
                    {
                        "type": "input_text",
                        "text": ""  # Empty text since we're using audio
                    }
                ]
            }
        }
        
        await self.websocket.send_json(user_message)
        print("Empty user message sent (will be associated with audio)")
        
        # Wait for acknowledgment
        user_message_id = None
        try:
            response = await asyncio.wait_for(self.websocket.receive_json(), 5)
            print(f"User message acknowledgment: {response}")
            # Extract the message ID if available (for future reference)
            if "item" in response and "id" in response["item"]:
                user_message_id = response["item"]["id"]
                print(f"User message ID: {user_message_id}")
        except (asyncio.TimeoutError, Exception) as e:
            print(f"No acknowledgment for user message, continuing: {e}")
        
        # Step 4: Convert the raw audio to float32, then to base64
        # First, check if it's already in the right format
        try:
            # Convert raw PCM to float32 array
            float_audio = raw_to_float32(raw_audio_data)
            
            # Encode to base64
            base64_audio = base64_encode_audio(float_audio)
            
            # Log for debugging
            print(f"Converted {len(raw_audio_data)} bytes to {len(base64_audio)} base64 characters")
            
            # Send the audio
            audio_append_message = {
                "type": "input_audio_buffer.append",
                "audio": base64_audio
            }
            
            await self.websocket.send_json(audio_append_message)
            print("Audio data sent")
            
            # Step 5: Commit the audio buffer
            commit_message = {
                "type": "input_audio_buffer.commit"
            }
            await self.websocket.send_json(commit_message)
            print("Audio buffer committed")
            
        except Exception as e:
            print(f"Error processing audio data: {e}")
            import traceback
            print(traceback.format_exc())
            return None
        
        # Step 6: Request a response
        response_message = {
            "type": "response.create"
        }
        await self.websocket.send_json(response_message)
        print("Response requested")
        
        # Step 7: Collect the response
        audio_data = bytearray()
        collection_complete = False
        timeout_seconds = 60
        
        # Set an absolute end time
        end_time = asyncio.get_event_loop().time() + timeout_seconds
        
        print("Collecting response...")
        while not collection_complete:
            # Calculate remaining time
            remaining = end_time - asyncio.get_event_loop().time()
            if remaining <= 0:
                print(f"Collection timed out after {timeout_seconds} seconds")
                break
            
            try:
                # Use the remaining time as the timeout
                message = await asyncio.wait_for(self.websocket.receive(), min(remaining, 5))
                
                # Handle different message types
                if message.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(message.data)
                    message_type = data.get("type", "unknown")
                    print(f"Received message type: {message_type}")
                    
                    # Process different message types
                    if message_type == "audio":
                        audio_chunk = base64.b64decode(data.get("audio", ""))
                        if chunk_callback:
                            await chunk_callback(audio_chunk)
                        audio_data.extend(audio_chunk)
                        print(f"Received audio chunk: {len(audio_chunk)} bytes")
                    
                    elif message_type == "response.audio.delta":
                        if "delta" in data:
                            delta_data = data["delta"]
                            audio_chunk = base64.b64decode(delta_data)
                            if chunk_callback:
                                await chunk_callback(audio_chunk)
                            audio_data.extend(audio_chunk)
                            print(f"Received audio delta chunk: {len(audio_chunk)} bytes")
                    
                    elif message_type in ("audio.final", "response.audio.done", "response.done"):
                        print(f"Final audio marker received: {message_type}")
                        collection_complete = True
                    
                    elif message_type == "error":
                        print(f"Error from OpenAI: {data.get('error')}")
                        if isinstance(data.get('error'), dict):
                            error_message = data.get('error', {}).get('message', 'Unknown error')
                            print(f"Error details: {error_message}")
                        collection_complete = True
                    
                    elif message_type == "session.closed":
                        print("Session closed by server")
                        collection_complete = True
                
                elif message.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSING):
                    print(f"WebSocket closed with type: {message.type}, data: {message.data}")
                    collection_complete = True
                
                elif message.type == aiohttp.WSMsgType.ERROR:
                    print(f"WebSocket error: {message.data}")
                    collection_complete = True
                
            except asyncio.TimeoutError:
                print(f"Waiting for more data... {int(remaining)} seconds remaining")
            
            except Exception as e:
                print(f"Error processing response: {e}")
                print(traceback.format_exc())
                collection_complete = True
        
        # Return the collected audio data if any
        if len(audio_data) > 0:
            total_size = len(audio_data)
            print(f"Successfully received {total_size} bytes of audio response")
            return bytes(audio_data)
        else:
            print("No audio response received")
            return None
            
    except Exception as e:
        print(f"{ERROR_DURING_AUDIO_STREAMING}: {e}")
        print(traceback.format_exc())
        return None