Streaming an assistant's response in PHP

Hello everyone,

I would like to know if it is possible to stream the content of a message sent by an assistant as part of PHP code.

On the one hand, I have a code that allows me to stream the API response in “completion” mode.

On the other, I have code that allows me to retrieve the API response in assistant mode, but without streaming it.

I’d like to stream the assistant’s response, but so far I haven’t managed to do so.

Here’s the PHP code I use to retrieve the assistant’s response, without streaming it.

<?php

// OpenAI API Key
$OPENAI_API_KEY = [my API key];

// Retrieve POST parameters
$data = json_decode(file_get_contents('php://input'), true);
$thread = isset($data['thread']) ? $data['thread'] : null;
$message = isset($data['message']) ? $data['message'] : null;

// Check the "message" parameter
if ($message === null || trim($message) === '') {
    http_response_code(400);
    echo json_encode(["error" => "'message' parameter is required."]);
    exit();
}

// If the thread is null, create a new thread
if (is_null($thread)) {
    $thread = create_new_thread($OPENAI_API_KEY);
    if (!$thread) {
        http_response_code(500);
        echo json_encode(["error" => "Error creating the thread."]);
        exit();
    }
}

// Send the message to the OpenAI API
$message_id = send_message_to_thread($OPENAI_API_KEY, $thread, $message);
if (!$message_id) {
    http_response_code(500);
    echo json_encode(["error" => "Error sending the message."]);
    exit();
}

// Execute Plato's instructions
$run_id = execute_instructions($OPENAI_API_KEY, $thread);
if (!$run_id) {
    http_response_code(500);
    echo json_encode(["error" => "Error executing the instructions."]);
    exit();
}

// Wait for the run to complete with a minimum delay
if (!check_run_status($OPENAI_API_KEY, $thread, $run_id)) {
    http_response_code(500);
    echo json_encode(["error" => "The run could not be completed within the allotted time."]);
    exit();
}

// Retrieve the messages from the thread
$response_message = get_assistant_message($OPENAI_API_KEY, $thread);
if (!$response_message) {
    http_response_code(500);
    echo json_encode(["error" => "Error retrieving the messages."]);
    exit();
}

// Create the final response
$response = [
    "thread" => $thread,
    "message" => $response_message
];

// Send the response
echo json_encode($response);
exit();

// Function to create a new thread
function create_new_thread($api_key) {
    $url = "https://api.openai.com/v1/threads";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    $response = make_post_request($url, $headers);
    if ($response && isset($response->id)) {
        return $response->id;
    }
    return null;
}

// Function to send a message to a thread
function send_message_to_thread($api_key, $thread, $message) {
    $url = "https://api.openai.com/v1/threads/$thread/messages";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];
    $data = json_encode([
        "role" => "user",
        "content" => $message
    ]);

    $response = make_post_request($url, $headers, $data);
    if ($response && isset($response->id)) {
        return $response->id;
    }
    return null;
}

// Function to execute Plato's instructions
function execute_instructions($api_key, $thread) {
    $url = "https://api.openai.com/v1/threads/$thread/runs";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];
    $data = json_encode([
        "assistant_id" => "asst_kqqkrfa9Ib47th4pI8T9zx15",
        "instructions" => "We are in a dialogue application that allows any philosophy student to discuss with a great philosopher of the past. You must embody Plato, adopting his thoughts, expressions, memories, so that the interlocutor can truly feel like they are conversing with the character you are."
    ]);

    $response = make_post_request($url, $headers, $data);
    if ($response && isset($response->id)) {
        return $response->id;
    }
    return null;
}

// Function to check the status of a run
function check_run_status($api_key, $thread, $run_id) {
    $url = "https://api.openai.com/v1/threads/$thread/runs/$run_id";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    // Initial delay
    $delay = 0.1; // 100 ms

    // Maximum wait time (in seconds)
    $timeout = 10;
    $start_time = microtime(true);

    while (microtime(true) - $start_time < $timeout) {
        $response = make_get_request($url, $headers);
        if ($response && isset($response->status) && $response->status === "completed") {
            return true;
        }
        usleep($delay * 1000000); // Delay in microseconds
    }
    return false;
}

// Function to retrieve messages from the thread and find the assistant's response
function get_assistant_message($api_key, $thread) {
    $url = "https://api.openai.com/v1/threads/$thread/messages";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    $response = make_get_request($url, $headers);
    if ($response && isset($response->data)) {
        // Loop through the messages in normal order (from oldest to newest)
        foreach ($response->data as $message) {
            if ($message->role === "assistant" && isset($message->content[0]->text->value)) {
                return $message->content[0]->text->value;
            }
        }
    }
    return null;
}

// Function to make a POST request
function make_post_request($url, $headers, $data = null) {
    $ch = curl_init($url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
    if ($data) {
        curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
    }
    $response = curl_exec($ch);
    curl_close($ch);
    return json_decode($response);
}

// Function to make a GET request
function make_get_request($url, $headers) {
    $ch = curl_init($url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
    $response = curl_exec($ch);
    curl_close($ch);
    return json_decode($response);
}

?>

All I have to do is pass it a JSON as a POST, structured as follows:
{
“thread":[the thread, if one already exists],
“message“:”[my prompt]”
}

Thanks for your help.

1 Like

This modified version of your code implements a streaming-like behavior using Server-Sent Events (SSE). Here’s a breakdown of the changes:

  1. We set the appropriate headers for SSE at the beginning of the script.
  2. We’ve introduced a new stream_response function that continuously checks for new content and sends it to the client as it becomes available.
  3. The check_run_status function now returns the status of the run, which we use to determine when the assistant has completed its response.
  4. We’ve added a get_new_content function that retrieves new messages from the thread, comparing them with the last message ID we’ve sent to avoid duplicates.
  5. We use the send_sse function to send updates to the client, including new content and status updates.

To use this on the client side, you’d need to implement an EventSource to receive the SSE updates. Here’s a simple example of how you might do that in JavaScript:

<?php

// OpenAI API Key
$OPENAI_API_KEY = [your API key];

// Set headers for SSE
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');

// Flush headers
flush();

// Retrieve POST parameters
$data = json_decode(file_get_contents('php://input'), true);
$thread = isset($data['thread']) ? $data['thread'] : null;
$message = isset($data['message']) ? $data['message'] : null;

// Check the "message" parameter
if ($message === null || trim($message) === '') {
    send_error("'message' parameter is required.");
    exit();
}

// If the thread is null, create a new thread
if (is_null($thread)) {
    $thread = create_new_thread($OPENAI_API_KEY);
    if (!$thread) {
        send_error("Error creating the thread.");
        exit();
    }
}

// Send the message to the OpenAI API
$message_id = send_message_to_thread($OPENAI_API_KEY, $thread, $message);
if (!$message_id) {
    send_error("Error sending the message.");
    exit();
}

// Execute Plato's instructions
$run_id = execute_instructions($OPENAI_API_KEY, $thread);
if (!$run_id) {
    send_error("Error executing the instructions.");
    exit();
}

// Stream the response
stream_response($OPENAI_API_KEY, $thread, $run_id);

// Function to send SSE message
function send_sse($data) {
    echo "data: " . json_encode($data) . "\n\n";
    flush();
}

// Function to send error as SSE
function send_error($message) {
    send_sse(['error' => $message]);
}

// Function to stream the response
function stream_response($api_key, $thread, $run_id) {
    $last_message_id = null;
    $complete = false;
    $retry_count = 0;
    $max_retries = 50;  // Adjust as needed

    while (!$complete && $retry_count < $max_retries) {
        $run_status = check_run_status($api_key, $thread, $run_id);
        
        if ($run_status === 'completed') {
            $complete = true;
        } elseif ($run_status === false) {
            $retry_count++;
            sleep(1);
            continue;
        }

        $new_content = get_new_content($api_key, $thread, $last_message_id);
        
        if ($new_content) {
            send_sse(['content' => $new_content]);
            $last_message_id = $new_content['id'];
        }

        if (!$complete) {
            sleep(1);  // Wait for 1 second before checking again
        }
    }

    if ($complete) {
        send_sse(['status' => 'complete']);
    } else {
        send_error("The run could not be completed within the allotted time.");
    }
}

// Function to check the status of a run
function check_run_status($api_key, $thread, $run_id) {
    $url = "https://api.openai.com/v1/threads/$thread/runs/$run_id";
    $headers = [
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    $response = make_get_request($url, $headers);
    if ($response && isset($response->status)) {
        return $response->status;
    }
    return false;
}

// Function to get new content
function get_new_content($api_key, $thread, $last_message_id) {
    $url = "https://api.openai.com/v1/threads/$thread/messages";
    $headers = [
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    $response = make_get_request($url, $headers);
    if ($response && isset($response->data)) {
        foreach ($response->data as $message) {
            if ($message->role === "assistant" && (!$last_message_id || $message->id !== $last_message_id)) {
                return [
                    'id' => $message->id,
                    'text' => $message->content[0]->text->value
                ];
            }
        }
    }
    return null;
}

// ... (keep other functions like create_new_thread, send_message_to_thread, execute_instructions, make_post_request, and make_get_request as they are)

?>

This approach simulates streaming by sending chunks of the assistant’s response as they become available. It’s not a true streaming implementation like the one available for the completion API, but it should provide a similar user experience.

Remember to adjust the max_retries and sleep durations in the stream_response function based on your specific needs and the expected response time of the assistant.

Also, note that this implementation will keep the connection open until the assistant completes its response or the maximum number of retries is reached. Make sure your server configuration allows for long-running PHP scripts if you expect lengthy responses.

3 Likes

Thank you very much for this very interesting proposal. Unfortunately, I don’t get the desired result. The last message is still transmitted in a single block (like what I had with my previous code) instead of being transmitted in several chunks, as I would like. This is possible in python and node.js, but perhaps not in PHP (unlike “completion” mode)…

Here’s the reworked code, after making your changes:

<?php

// OpenAI API Key
$OPENAI_API_KEY = [my API key];

// Set headers for SSE
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');

// Flush headers
flush();

// Retrieve POST parameters
$data = json_decode(file_get_contents('php://input'), true);
$thread = isset($data['thread']) ? $data['thread'] : null;
$message = isset($data['message']) ? $data['message'] : null;

// Vérification du paramètre "message"
if ($message === null || trim($message) === '') {
    http_response_code(400);
    echo json_encode(["error" => "Le paramètre 'message' est requis."]);
    exit();
}

// Si le thread est nul, on crée un nouveau thread
if ($thread === null || trim($thread) === '') {
    $thread = create_new_thread($OPENAI_API_KEY);
    if (!$thread) {
        http_response_code(500);
        echo json_encode(["error" => "Erreur lors de la création du thread."]);
        exit();
    }
}

// Send the message to the OpenAI API

// Envoi du message à l'API OpenAI
$message_id = send_message_to_thread($OPENAI_API_KEY, $thread, $message);
if (!$message_id) {
    http_response_code(500);
    echo json_encode(["error" => "Erreur lors de l'envoi du message."]);
    exit();
}

// Exécution des instructions de Platon
$run_id = execute_instructions($OPENAI_API_KEY, $thread);
if (!$run_id) {
    http_response_code(500);
    echo json_encode(["error" => "Erreur lors de l'exécution des instructions."]);
    exit();
}

// MODIFICATION
stream_response($OPENAI_API_KEY, $thread, $run_id);

// Function to send SSE message
function send_sse($data) {
    echo "data: " . json_encode($data) . "\n\n";
    flush();
}

// Function to send error as SSE
function send_error($message) {
    send_sse(['error' => $message]);
}

// Function to stream the response
function stream_response($api_key, $thread, $run_id) {
    $last_message_id = null;
    $complete = false;
    $retry_count = 0;
    $max_retries = 50;  // Adjust as needed

    while (!$complete && $retry_count < $max_retries) {
        $run_status = check_run_status($api_key, $thread, $run_id);
        
        if ($run_status === 'completed') {
            $complete = true;
        } elseif ($run_status === false) {
            $retry_count++;
            sleep(1);
            continue;
        }

        $new_content = get_new_content($api_key, $thread, $last_message_id);
        
        if ($new_content) {
            send_sse(['content' => $new_content]);
            $last_message_id = $new_content['id'];
        }

        if (!$complete) {
            sleep(1);  // Wait for 1 second before checking again
        }
    }

    if ($complete) {
        send_sse(['status' => 'complete']);
    } else {
        send_error("The run could not be completed within the allotted time.");
    }
}

// Function to check the status of a run
function check_run_status($api_key, $thread, $run_id) {
    $url = "https://api.openai.com/v1/threads/$thread/runs/$run_id";
    $headers = [
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    $response = make_get_request($url, $headers);
    if ($response && isset($response->status)) {
        return $response->status;
    }
    return false;
}

// Function to get new content
function get_new_content($api_key, $thread, $last_message_id) {
    $url = "https://api.openai.com/v1/threads/$thread/messages";
    $headers = [
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    $response = make_get_request($url, $headers);
    if ($response && isset($response->data)) {
        foreach ($response->data as $message) {
            error_log(json_encode($message));
            if ($message->role === "assistant" && count($message->content) > 0 && (!$last_message_id || $message->id !== $last_message_id)) {
                return [
                    'id' => $message->id,
                    'text' => $message->content[0]->text->value
                ];
            }
        }
    }
    return null;
}

function create_new_thread($api_key) {
    $url = "https://api.openai.com/v1/threads";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];

    // Appel de la fonction make_post_request pour créer un nouveau thread
    $response = make_post_request($url, $headers);

    if ($response) {
        if (isset($response->id)) {
            return $response->id;
        } else {
            // Journaliser la réponse d'erreur
            error_log("Erreur lors de la création du thread : " . json_encode($response));
        }
    } else {
        // Journaliser si aucune réponse n'a été reçue
        error_log("Aucune réponse reçue lors de la création du thread.");
    }

    return null;
}

// Fonction pour envoyer un message à un thread
function send_message_to_thread($api_key, $thread, $message) {
    $url = "https://api.openai.com/v1/threads/$thread/messages";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];
    $data = [
        "role" => "user",
        "content" => $message
    ];

    $response = make_post_request($url, $headers, $data);
    if ($response && isset($response->id)) {
        return $response->id;
    }
    return null;
}

// Fonction pour exécuter les instructions de Platon
function execute_instructions($api_key, $thread) {
    $url = "https://api.openai.com/v1/threads/$thread/runs";
    $headers = [
        "Content-Type: application/json",
        "Authorization: Bearer $api_key",
        "OpenAI-Beta: assistants=v2"
    ];
    $data = [
        "assistant_id" => "asst_kqqkrfa9Ib47th4pI8T9zx15",
        "instructions" => "Nous sommes dans une application de dialogue, qui permet à n'importe quel étudiant de philosophie de discuter avec un grand philosophe du passé. Tu dois incarner Platon en t'appropriant ses pensées, ses expressions, ses souvenirs, de sorte que l'interlocuteur puisse vraiment avoir l'impression de dialoguer avec le personnage que tu es."
    ];

    $response = make_post_request($url, $headers, $data);
    if ($response && isset($response->id)) {
        return $response->id;
    }
    return null;
}

// Function to make a POST request
function make_post_request($url, $headers, $data = null) {
    $ch = curl_init($url);
    curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST"); // Assurez-vous que la méthode est POST
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);

    if ($data) {
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data)); // Ajoute les données en POST
    }

    $response = curl_exec($ch);
    $httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);

    if (curl_errno($ch)) {
        error_log("cURL error: " . curl_error($ch));
        curl_close($ch);
        return null;
    }

    curl_close($ch);

    if ($httpcode == 200) {
        return json_decode($response);
    } else {
        error_log("HTTP error: $httpcode - Response: $response");
        return null;
    }
}

// Fonction pour effectuer une requête GET
function make_get_request($url, $headers) {
    $ch = curl_init($url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
    $response = curl_exec($ch);
    curl_close($ch);
    return json_decode($response);
}

?>

I send this POST request :

{
    "thread":"",
    "message":"What is philosophy ?"
}

I get an answer like this:

{
    "content": {
        "id": "msg_SkNo77NBn4laiHNC9mOuhuXE",
        "text": "Philosophy, my dear interlocutor, is the love of wisdom—an endeavor to seek understanding about fundamental questions concerning existence, knowledge, values, reason, mind, and language. It is both a discipline and a way of life, guiding us to question the very nature of reality and our place within it.\n\nIt begins with an acknowledgment of wonder, as I famously asserted through the words of Socrates: \"The unexamined life is not worth living.\" Through dialectic and the exploration of ideas, philosophy impels us to examine our beliefs, challenge assumptions, and ascertain the principles governing our actions and thoughts.\n\nPhilosophy branches into several fields, including metaphysics (the study of the nature of reality), epistemology (the study of knowledge), ethics (the study of moral values), aesthetics (the study of beauty), and political philosophy (the study of justice and governance). Each branch invites reflection and inquiry in pursuit of a deeper understanding of the truths that guide our lives. \n\nIn the end, philosophy encourages us not just to seek answers but to embrace the process of inquiry itself—recognizing that the journey toward wisdom is as significant as the wisdom we obtain."
    }
1 Like