How we save the conversational history of a session while using Real-time API.? We need to populate the conversational history to newer sessions. So how can we save and pass the history from a previous session?
Save and populate conversational history across sessions:
Store History: Saves session history, e.g., messages and responses, in a database or persistent storage, like Redis, MongoDB, or SQL.
Retrieve on New Session: Retrieves the saved history when a new session begins and hands it to the API request’s messages parameter.
Format Your Data: Ensure the returned data is in the OpenAI format (e.g., [{role: “user”, content: “message”}]).
This provides continuity between sessions.
Hi I am facing the same issue and need help to resolve it. I am trying to have a realtime conversation where I send speech to openai realtime api.
Can someone please help me on this
My flow is like this
1st I send system_message (as a text) on the websocket. and after sending I’ll check for the acknowledgement.
Next I run a loop to send conversation (text of the history) 1 by 1 and call websocket acknowledgement for each message.
However, for the 1st message I get’s proper acknowledgment response. form this code ack = await asyncio.wait_for(self.websocket.receive_json(), 5)
For the second message in the conversation, there is no error while sending the message. But, next for the acknowledgement “ack = await asyncio.wait_for(self.websocket.receive_json(), 5)” it throughs some exception "‘Exception.with_traceback(tb) –\n set self.traceback to tb and return self.’
"
And next the socket closes the exception.
async def send_raw_audio_stream(
self,
raw_audio_data: bytes,
system_prompt: str,
conversation_history: list = None,
chunk_callback: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None
) → Optional[bytes]:
if not self.websocket:
print(ERROR_NO_ACTIVE_CONNECTION)
return None
try:
# Import the utility functions
from src.utils.audio_utils import ensure_valid_audio, raw_to_float32, base64_encode_audio
# Ensure audio is valid (long enough)
raw_audio_data = ensure_valid_audio(raw_audio_data, min_duration_ms=100, sample_rate=24000)
# Step 1: Send the system prompt
system_message = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "system",
"content": [
{
"type": "input_text",
"text": system_prompt
}
]
}
}
print("Sending system prompt...")
await self.websocket.send_json(system_message)
print("System prompt sent")
#Wait for acknowledgment
try:
response = await asyncio.wait_for(self.websocket.receive_json(), 10)
print(f"System message acknowledgment: {response}")
except asyncio.TimeoutError:
print("No acknowledgment for system message, but continuing")
except Exception as e:
print(f"Warning: Error receiving acknowledgment for system message: {e}")
# Step 2: Send conversation history if provided
if conversation_history and len(conversation_history) > 0:
print(f"Sending conversation history ({len(conversation_history)} messages)...")
for idx, message in enumerate(conversation_history):
role = message.get("role", "user")
content = message.get("content", "")
history_message = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": role,
"content": [
{
"type": "input_text",
"text": content
}
]
}
}
await self.websocket.send_json(history_message)
print(f"Sent {role} message {idx+1}/{len(conversation_history)}")
# Brief pause between messages
await asyncio.sleep(0.1)
# Try to get acknowledgment
try:
ack = await asyncio.wait_for(self.websocket.receive_json(), 5)
print(f"Message acknowledgment received")
except (asyncio.TimeoutError, Exception):
print("No immediate acknowledgment for history message, continuing")
# Step 3: Create a user message with empty text - this will be associated with the audio
user_message = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": "" # Empty text since we're using audio
}
]
}
}
await self.websocket.send_json(user_message)
print("Empty user message sent (will be associated with audio)")
# Wait for acknowledgment
user_message_id = None
try:
response = await asyncio.wait_for(self.websocket.receive_json(), 5)
print(f"User message acknowledgment: {response}")
# Extract the message ID if available (for future reference)
if "item" in response and "id" in response["item"]:
user_message_id = response["item"]["id"]
print(f"User message ID: {user_message_id}")
except (asyncio.TimeoutError, Exception) as e:
print(f"No acknowledgment for user message, continuing: {e}")
# Step 4: Convert the raw audio to float32, then to base64
# First, check if it's already in the right format
try:
# Convert raw PCM to float32 array
float_audio = raw_to_float32(raw_audio_data)
# Encode to base64
base64_audio = base64_encode_audio(float_audio)
# Log for debugging
print(f"Converted {len(raw_audio_data)} bytes to {len(base64_audio)} base64 characters")
# Send the audio
audio_append_message = {
"type": "input_audio_buffer.append",
"audio": base64_audio
}
await self.websocket.send_json(audio_append_message)
print("Audio data sent")
# Step 5: Commit the audio buffer
commit_message = {
"type": "input_audio_buffer.commit"
}
await self.websocket.send_json(commit_message)
print("Audio buffer committed")
except Exception as e:
print(f"Error processing audio data: {e}")
import traceback
print(traceback.format_exc())
return None
# Step 6: Request a response
response_message = {
"type": "response.create"
}
await self.websocket.send_json(response_message)
print("Response requested")
# Step 7: Collect the response
audio_data = bytearray()
collection_complete = False
timeout_seconds = 60
# Set an absolute end time
end_time = asyncio.get_event_loop().time() + timeout_seconds
print("Collecting response...")
while not collection_complete:
# Calculate remaining time
remaining = end_time - asyncio.get_event_loop().time()
if remaining <= 0:
print(f"Collection timed out after {timeout_seconds} seconds")
break
try:
# Use the remaining time as the timeout
message = await asyncio.wait_for(self.websocket.receive(), min(remaining, 5))
# Handle different message types
if message.type == aiohttp.WSMsgType.TEXT:
data = json.loads(message.data)
message_type = data.get("type", "unknown")
print(f"Received message type: {message_type}")
# Process different message types
if message_type == "audio":
audio_chunk = base64.b64decode(data.get("audio", ""))
if chunk_callback:
await chunk_callback(audio_chunk)
audio_data.extend(audio_chunk)
print(f"Received audio chunk: {len(audio_chunk)} bytes")
elif message_type == "response.audio.delta":
if "delta" in data:
delta_data = data["delta"]
audio_chunk = base64.b64decode(delta_data)
if chunk_callback:
await chunk_callback(audio_chunk)
audio_data.extend(audio_chunk)
print(f"Received audio delta chunk: {len(audio_chunk)} bytes")
elif message_type in ("audio.final", "response.audio.done", "response.done"):
print(f"Final audio marker received: {message_type}")
collection_complete = True
elif message_type == "error":
print(f"Error from OpenAI: {data.get('error')}")
if isinstance(data.get('error'), dict):
error_message = data.get('error', {}).get('message', 'Unknown error')
print(f"Error details: {error_message}")
collection_complete = True
elif message_type == "session.closed":
print("Session closed by server")
collection_complete = True
elif message.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSING):
print(f"WebSocket closed with type: {message.type}, data: {message.data}")
collection_complete = True
elif message.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {message.data}")
collection_complete = True
except asyncio.TimeoutError:
print(f"Waiting for more data... {int(remaining)} seconds remaining")
except Exception as e:
print(f"Error processing response: {e}")
print(traceback.format_exc())
collection_complete = True
# Return the collected audio data if any
if len(audio_data) > 0:
total_size = len(audio_data)
print(f"Successfully received {total_size} bytes of audio response")
return bytes(audio_data)
else:
print("No audio response received")
return None
except Exception as e:
print(f"{ERROR_DURING_AUDIO_STREAMING}: {e}")
print(traceback.format_exc())
return None