Occasional Intermittent Silence in Real-Time API Conversations

We’ve noticed that the real-time API occasionally stops mid-conversation—there’s a 10–30 second pause with no response. For instance, it cut off after “Could you ask your compa…” (it should have completed as “Could you tell me your company name?”) and stayed silent in no external noise environment until the caller spoke again.

Because each conversation only lasts two to three minutes, the 15-minute session timeout doesn’t seem to be the cause.

Do you have any ideas on how we can prevent this issue?

The following is relevant code; we are thinking if we need to optimize the setting of websocket.

async def keep_openai_connection_alive(ws, call_sid="unknown"):
    """Keep the OpenAI WebSocket connection alive with periodic pings.

    Args:
        ws: WebSocket connection to keep alive
        call_sid: Call SID for logging
    """
    try:
        ping_counter = 0
        while True:
            await asyncio.sleep(30)  # Send a ping every 30 seconds
            if is_ws_open(ws):  # Check if connection is still open
                ping_counter += 1
                try:
                    await ws.ping()
                    if ping_counter % 5 == 0:  # Log every 5th ping (every 2.5 minutes)
                        logger.info(f"Sent ping #{ping_counter} to keep OpenAI connection alive for call {call_sid}")
                    else:
                        logger.debug(f"Sent ping #{ping_counter} to keep OpenAI connection alive for call {call_sid}")
                except Exception as e:
                    logger.warning(f"Failed to send ping to OpenAI for call {call_sid}: {e}")
                    break
            else:
                logger.warning(f"WebSocket connection to OpenAI is closed for call {call_sid}, stopping ping task")
                break
    except websockets.exceptions.ConnectionClosed:
        logger.warning(f"OpenAI WebSocket connection closed for call {call_sid} while sending ping")
    except Exception as e:
        logger.error(f"Error in keep_openai_connection_alive for call {call_sid}: {e}")
        logger.error(traceback.format_exc())


async def connect_to_openai(attempt=1, max_attempts=3, call_sid="unknown") -> Optional[websockets.WebSocketClientProtocol]:
    """Connect to OpenAI WebSocket API with retry logic and detailed error handling.

    Args:
        attempt: Current attempt number
        max_attempts: Maximum number of attempts
        call_sid: Call SID for logging

    Returns:
        WebSocket connection or None if connection fails
    """
    try:
        # Log detailed connection attempt
        logger.info(f"Attempting to connect to OpenAI WebSocket API for call {call_sid} (attempt {attempt}/{max_attempts})...")
        logger.info(f"Using API key starting with: {settings.OPENAI_API_KEY_AGENT[:4]}...")

        # Verify settings before connecting
        if not settings.OPENAI_API_KEY_AGENT:
            logger.error("OpenAI API key is missing or empty")
            return None

        # Build and log URL
        url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-12-17"
        logger.info(f"Connecting to OpenAI at: {url}")

        # Attempt connection with timeout
        connection = await asyncio.wait_for(
            websockets.connect(
                url,
                additional_headers={
                    "Authorization": f"Bearer {settings.OPENAI_API_KEY_AGENT}",
                    "OpenAI-Beta": "realtime=v1",
                },
                ping_interval=30,  # Send pings every 30 seconds to keep connection alive
                ping_timeout=10,   # Wait 10 seconds for pong response
                close_timeout=5,   # Wait 5 seconds for closing handshake
            ),
            timeout=15  # 15 second timeout for connection
        )

        logger.info(f"Successfully connected to OpenAI WebSocket API for call {call_sid}")
        return connection

    except asyncio.TimeoutError:
        logger.error(f"Timeout while connecting to OpenAI for call {call_sid}")

    except websockets.exceptions.InvalidStatusCode as e:
        logger.error(f"Invalid status code connecting to OpenAI: {e.status_code} for call {call_sid}")
        logger.error(f"Response headers: {e.headers}")

    except Exception as e:
        logger.error(f"Error connecting to OpenAI for call {call_sid}: {type(e).__name__}: {str(e)}")
        logger.error(traceback.format_exc())

    # Retry logic
    if attempt < max_attempts:
        wait_time = settings.CONNECTION_RETRY_DELAY ** attempt  # Exponential backoff
        logger.warning(f"Retrying OpenAI connection in {wait_time} seconds for call {call_sid}")
        await asyncio.sleep(wait_time)
        return await connect_to_openai(attempt + 1, max_attempts, call_sid)
    else:
        logger.error(f"Failed to connect to OpenAI after {max_attempts} attempts for call {call_sid}")
        return None


# In openai_service.py, update the initialize_session function
async def initialize_session(
    openai_ws,
    call_sid="unknown",
    retry_count=0,
    max_retries=3,
    preferred_language="ja"
) -> bool:
    """Initialize OpenAI session with system message and settings.

    Args:
        openai_ws: WebSocket connection to OpenAI
        call_sid: Call SID for logging
        retry_count: Current retry attempt
        max_retries: Maximum number of retries
        preferred_language: Preferred language code

    Returns:
        Boolean indicating if initialization was successful
    """
    if not openai_ws:
        logger.error(f"Cannot initialize session: WebSocket connection is None for call {call_sid}")
        return False

    try:
        logger.info(f"Initializing OpenAI session for call {call_sid}")

        # Create session update without the speech_recognition parameter
        session_update = {
            "type": "session.update",
            "session": {
                "turn_detection": {"type": "server_vad",
                                    "threshold": 0.8,
                                   "silence_duration_ms": 1500},
                "input_audio_format": "g711_ulaw",
                "output_audio_format": "g711_ulaw",
                "voice": VOICE,
                "instructions": SYSTEM_MESSAGE,
                "modalities": ["text", "audio"],
                "temperature": 0.8
            },
        }

        logger.info(f"Sending session update for call {call_sid}")

        # Attempt to send the session update with timeout
        await asyncio.wait_for(
            openai_ws.send(json.dumps(session_update)),
            timeout=10
        )

        # Determine greeting based on language
        from app.constants.messages import DEFAULT_GREETING_EN, DEFAULT_GREETING_JA
        greeting_text = DEFAULT_GREETING_JA if preferred_language == "ja" else DEFAULT_GREETING_EN

        # Log the greeting being sent
        logger.info(f"Sending initial greeting '{greeting_text}' for call {call_sid}")

        # Create initial conversation item
        initial_conversation_item = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [
                    {
                        "type": "input_text",
                        "text": greeting_text,
                    }
                ],
            },
        }

        # Send initial conversation item with timeout
        await asyncio.wait_for(
            openai_ws.send(json.dumps(initial_conversation_item)),
            timeout=10
        )

        # Send response create request with timeout
        await asyncio.wait_for(
            openai_ws.send(json.dumps({"type": "response.create"})),
            timeout=10
        )

        logger.info(f"Initial greeting sent for call {call_sid}")
        return True

    except asyncio.TimeoutError:
        logger.error(f"Timeout initializing OpenAI session for call {call_sid}")

    except Exception as e:
        logger.error(f"Error initializing OpenAI session for call {call_sid}: {type(e).__name__}: {str(e)}")
        logger.error(traceback.format_exc())

    # Retry logic
    if retry_count < max_retries:
        retry_count += 1
        wait_time = 2 ** retry_count  # Exponential backoff
        logger.warning(f"Retrying session initialization (attempt {retry_count}/{max_retries}) in {wait_time} seconds for call {call_sid}")
        await asyncio.sleep(wait_time)
        return await initialize_session(openai_ws, call_sid, retry_count, max_retries, preferred_language)
    else:
        logger.error(f"Failed to initialize OpenAI session after {max_retries} attempts for call {call_sid}")
        return False