Displaying assistant api on frontend - Websockets needed?

Hi all,

Do I need websockets to display the output of the assistant-api with streaming? Because at the moment the output works in the backend log but I cannot seem to display the output to the frontend.

I use Node.js and React by the way :slight_smile:

You would usually subscribe to the back-end using SSE (one-way WebSockets).

Thanks! So SSE is sufficient then? I will proceed with that approach - thanks again!

1 Like

Hey man,

I get the streaming to work on the backend using SSE, but I cannot display it on the frontend. Can you please explain how you display the SSE output?

SSEs are captured with this API:

There are also numerous guides & libraries available online to make it a bit easier. Good luck in your coding.

1 Like

I got it working, thanks a lot man - really appreciated!

1 Like

@yvoderooij - could you share how you did this? I am trying to figure out how to get the streaming working on the front end through websocket. Anything you can share? Thanks a bunch!

Hey @william.zebrowski ,

I have two code snippets I can share:

  1. Streaming output without conversation (works fine)
  2. streaming output where the user can interact with the assistant (Works partially, I am having trouble displaying the messages from the api properly, it now keeps appending all messages when a new output is given)

Please let me know which one you would like to see

1 Like

Hi @yvoderooij - Appreciate the quick reply! I’d love to see the snippet to your first option from above and see where that can lead me.

const fetchOpenAIResponse = async (promptText) => {
try {
setLoading(true);
console.log(“Starting fetchOpenAIResponse”);
setDisplayContent(“”);
setRawContent(“”);

  // Close the old EventSource connection if it exists
  if (eventSourceRef.current) {
    eventSourceRef.current.close();
  }

  const response = await fetch(`${API_URL_NEW_BACKEND}/api/hero-completion-assistant-stream`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: JSON.stringify({ userMessage: promptText }),
  });

  if (!response.ok) throw new Error("Network response was not ok");

  const data = await response.json();
  console.log("Received thread ID from server:", data.threadId);

  const newEventSource = new EventSource(`${API_URL_NEW_BACKEND}/api/hero-completion-assistant-stream?threadId=${data.threadId}`);

  newEventSource.addEventListener("textDelta", (event) => {
    const eventData = JSON.parse(event.data);
    console.log("Received textDelta event:", eventData);

    setRawContent((prevRawContent) => {
      const newRawContent = `${prevRawContent}${eventData.value}`;
      const html = DOMPurify.sanitize(marked(newRawContent));
      setDisplayContent(html);
      return newRawContent;
    });
  });

  newEventSource.onopen = () => console.log("SSE connection opened successfully.");
  newEventSource.onerror = (event) => {
    console.error("SSE connection encountered an error:", event);
    setLoading(false);
    setShowPopup(true);
    newEventSource.close();
  };

  // Store the new EventSource object in the useRef
  eventSourceRef.current = newEventSource;
} catch (error) {
  console.error("An error occurred:", error);
  setLoading(false);
  setShowPopup(true);
}

};

1 Like

Backend:

// POST route to initiate streaming and handle user message
router.post(“/hero-completion-assistant-stream”, apiLimiter, async (req, res) => {
const userMessage = req.body.userMessage;

if (apiLimitReached) {
console.log(“API limit reached, not starting a new stream.”);
return res.status(429).send(“API limit reached, please try again later.”);
}

try {
// Step 1: Create a new thread for each interaction
const threadResponse = await openai.beta.threads.create();
console.log(“New thread created with ID:”, threadResponse.id);

// Step 2: Add a Message to the Thread
const messageResponse = await openai.beta.threads.messages.create(threadResponse.id, {
  role: "user",
  content: userMessage,
});
console.log("User message added to the thread:", messageResponse);

// Initialize storage for this thread if not already done
if (!threadResponses[threadResponse.id]) {
  threadResponses[threadResponse.id] = { events: [], clients: [] };
}

// Step 3: Stream the Run using the newly created thread ID
const stream = openai.beta.threads.runs
  .createAndStream(threadResponse.id, {
    assistant_id: assistantIdToUseSimone, // Ensure this variable is correctly defined
  })
  .on("textCreated", (text) => {
    console.log("textCreated event:", text);
    sendEventToAllClients(threadResponse.id, { event: "textCreated", data: text });
  })
  .on("textDelta", (textDelta) => {
    console.log("textDelta event:", textDelta);
    sendEventToAllClients(threadResponse.id, { event: "textDelta", data: textDelta });
  })
  .on("toolCallCreated", (toolCall) => {
    console.log("toolCallCreated event:", toolCall);
    sendEventToAllClients(threadResponse.id, { event: "toolCallCreated", data: toolCall });
  })
  .on("toolCallDelta", (toolCallDelta) => {
    console.log("toolCallDelta event:", toolCallDelta);
    sendEventToAllClients(threadResponse.id, { event: "toolCallDelta", data: toolCallDelta });
  })
  .on("end", () => {
    console.log("Stream ended");
    sendEventToAllClients(threadResponse.id, { event: "end", data: null });
  });

res.status(200).json({ threadId: threadResponse.id });

} catch (error) {
console.error(“Error:”, error);
res.status(500).send(“Internal server error”);
}
});

// GET route to stream responses back to the client
router.get(“/hero-completion-assistant-stream”, apiLimiter, (req, res) => {
const threadId = req.query.threadId;

res.writeHead(200, {
“Content-Type”: “text/event-stream”,
“Cache-Control”: “no-cache”,
Connection: “keep-alive”,
});

if (threadResponses[threadId]) {
// Stream existing events to the new client
threadResponses[threadId].events.forEach((event) => {
res.write(event: ${event.event}\ndata: ${JSON.stringify(event.data)}\n\n);
});

// Add this client to the list for future events
threadResponses[threadId].clients.push(res);

}

// Handle client disconnection
req.on(“close”, () => {
threadResponses[threadId].clients = threadResponses[threadId].clients.filter((client) => client !== res);
});
});

// Function to send an event to all clients connected to a thread
function sendEventToAllClients(threadId, event) {
if (threadResponses[threadId]) {
const eventData = event: ${event.event}\ndata: ${JSON.stringify(event.data)}\n\n;
threadResponses[threadId].events.push(event); // Store event for new clients

// Stream to all connected clients
threadResponses[threadId].clients.forEach((client) => client.write(eventData));

}
}

1 Like

Fantastic @yvoderooij !! Going to dig into these! Will be converting these to python since that’s my preferred lang. Thanks a ton!

1 Like

Good luck! Let me know if you need any more help :slight_smile:

1 Like

Hey @yvoderooij ! Thanks for bringing this up, and it was quite helpful to me as I was facing the same issue of having streaming work on my terminal but UI integration is difficiult. I was wondering if you have a any code snippets that you would like to share so that I can fix the errors on my side? I would greatly appreciate it! Thanks!

Integration is difficult because of ying-n-yang nature of web sockets. For example, i needed to introduce an eventlet.sleep(0) into the loop to make it not stream all the time.

I also think choice of front-end framework also helps me because i don’t have to constantly context switch between python and Javascript

Hey @Ricky57 ,

Sure! Check out my code below:

// backend route for the assistant
router.post(“/assistant-chat”, async (req, res) => {
const { userMessage, threadId } = req.body;

try {
let currentThreadId = threadId;
// If a threadId is provided and exists, continue, otherwise, create a new thread
if (!currentThreadId || !threadResponses[currentThreadId]) {
// Create a new thread for the new case
const threadResponse = await openai.beta.threads.create();
currentThreadId = threadResponse.id;
console.log(“New thread created with ID:”, currentThreadId);

  // Initialize storage for this new thread
  threadResponses[currentThreadId] = { events: [], clients: [] };
} else {
  console.log("Continuing conversation on thread:", currentThreadId);
}

// Here, before sending the user's message to OpenAI, signal the start of a new message
sendEventToAllClients(currentThreadId, { event: "messageStart", data: {} });

// Add the user's message to the thread
const messageResponse = await openai.beta.threads.messages.create(currentThreadId, {
  role: "user",
  content: userMessage,
});
console.log("User message added to the thread:", messageResponse);

// Stream the Run using the newly created or existing thread ID
const stream = openai.beta.threads.runs
  .createAndStream(currentThreadId, {
    assistant_id: assistantIdToUseSimone, // Ensure this variable is correctly defined
  })
  .on("textCreated", (text) => {
    console.log("textCreated event:", text);
    sendEventToAllClients(currentThreadId, { event: "textCreated", data: text });
  })
  .on("textDelta", (textDelta) => {
    // Optionally log textDelta events
    console.log("textDelta event Carl:", textDelta);
    sendEventToAllClients(currentThreadId, { event: "textDelta", data: textDelta });
  })
  .on("toolCallCreated", (toolCall) => {
    console.log("toolCallCreated event:", toolCall);
    sendEventToAllClients(currentThreadId, { event: "toolCallCreated", data: toolCall });
  })
  .on("toolCallDelta", (toolCallDelta) => {
    console.log("toolCallDelta event:", toolCallDelta);
    sendEventToAllClients(currentThreadId, { event: "toolCallDelta", data: toolCallDelta });
  })
  .on("end", () => {
    console.log("Stream ended for threadId:", currentThreadId);
    sendEventToAllClients(currentThreadId, { event: "end", data: null });
  });

res.status(200).json({ threadId: currentThreadId });

} catch (error) {
console.error(“Error handling /assistant-chat:”, error);
res.status(500).send(“Internal server error”);
}
});

// SSE GET Route for streaming responses to clients
router.get(“/stream-responses”, (req, res) => {
const threadId = req.query.threadId;
console.log(“Received threadId from frontend:”, threadId);

res.writeHead(200, {
“Content-Type”: “text/event-stream”,
“Cache-Control”: “no-cache”,
Connection: “keep-alive”,
});

if (threadResponses[threadId]) {
// Immediately inform the client about the start of a new message
res.write(event: messageStart\ndata: {}\n\n);

threadResponses[threadId].events.forEach((event) => {
  res.write(`event: ${event.event}\ndata: ${JSON.stringify(event.data)}\n\n`);
});

threadResponses[threadId].clients.push(res);

} else {
res.write(“event: message\ndata: No active stream found for this thread.\n\n”);
}

req.on(“close”, () => {
console.log(Client disconnected from thread ${threadId});
threadResponses[threadId].clients = threadResponses[threadId].clients.filter((client) => client !== res);
});
});

@yvoderooij Its working now! Appreciate it!

1 Like