I’m having trouble getting my UI to stream the response without edge runtime on next js pages router. It works fine with a edge run time setup but when I go to deploy on vercel the route size is too large and requires pro plan (approx 2.5mb).
The api route logs the chunks fine but I don’t know how to consume on the front end successfully.
import { NextApiRequest, NextApiResponse } from 'next';
import { ChatCompletionMessageParam } from "openai/resources/index.mjs";
import { Stream } from "@/lib/stream";
import { StatusCodes } from "http-status-codes";
import { encodingForModel } from "js-tiktoken";
interface Payload {
model: string;
messages: ChatCompletionMessageParam[];
stream?: boolean;
}
const model = "gpt-4o";
const MAX_TOKENS = 14000;
const config = {
api: {
bodyParser: false,
},
};
function calculateTotalTokens(messages: ChatCompletionMessageParam[], encoder: ReturnType<typeof encodingForModel>): number {
return messages.reduce((total, message) => {
if (typeof message.content === "string") {
return (
total +
encoder.encode(message.role).length +
encoder.encode(message.content).length +
2
);
}
return total;
}, 0);
}
function trimConversation(conversation: ChatCompletionMessageParam[]) {
const encoder = encodingForModel(model);
let totalTokens = calculateTotalTokens(conversation, encoder);
while (totalTokens > MAX_TOKENS && conversation.length > 1) {
conversation.splice(1, 1);
totalTokens = calculateTotalTokens(conversation, encoder);
}
return conversation;
}
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
if (req.method !== 'POST') {
res.status(405).json({ error: 'Method not allowed' });
return;
}
let conversation: ChatCompletionMessageParam[];
try {
const body = await new Promise<string>((resolve, reject) => {
let data = '';
req.on('data', chunk => {
data += chunk;
});
req.on('end', () => {
resolve(data);
});
req.on('error', err => {
reject(err);
});
});
const parsedBody = JSON.parse(body);
conversation = parsedBody.conversation;
} catch (error) {
res.status(400).json({ error: 'Invalid request body' });
return;
}
if (!conversation || conversation.length === 0) {
res.status(400).json({ error: 'Conversation is required' });
return;
}
const trimmedConversation = trimConversation(conversation);
const payload: Payload = {
model,
messages: trimmedConversation,
stream: true,
};
try {
const { stream, usageData } = await Stream(payload);
console.log("[API] - Streaming chat response initiated.");
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
stream.on('data', (chunk) => {
console.log("Chunk sent:", chunk.toString());
});
stream.pipe(res);
usageData
.then((data) => {
console.log(`[AI] - Usage data: ${JSON.stringify(data)}`);
})
.catch((error) => {
console.error(`[AI] - Error fetching usage data: ${error}`);
});
} catch (error) {
const responseError = error as Error;
console.error(`[AI] - Error streaming chat: ${responseError.message}`);
res.status(500).json({ error: 'Internal Server Error' });
}
}
export { config };
const sendMessage = async (message: string) => {
console.log("Sending message:", message);
setIsSending(true);
const userMessage = { role: Roles.USER, content: message };
setConversation((prev) => [...prev, userMessage]);
if (user) {
await addMessage(user.id, userMessage);
}
let context = null;
try {
const ragResponse = await useRAGService.queryRAG(message);
if (ragResponse && typeof ragResponse === "string") {
context = ragResponse;
}
} catch (error) {
logger.error(`[RAG] - Error fetching context: ${error}`);
}
const assistantMessage = {
role: Roles.ASSISTANT,
content: "",
};
assistantMessageRef.current = assistantMessage;
setConversation((prev) => [...prev, assistantMessage]);
const response = await fetch("/api/chat", {
method: "POST",
body: JSON.stringify({ conversation: [...conversation, userMessage] }),
headers: {
"Content-Type": "application/json",
},
});
// Get the streamed response and usage data
const reader = response.body?.getReader();
const textDecoder = new TextDecoder("utf-8");
if (!reader) {
throw new Error("No reader");
}
let accumulatedContent = "";
reader.read().then(function processText({ done, value }) {
if (done) {
console.log("Stream done.");
// Handle any remaining content in accumulatedContent
if (accumulatedContent.length > 0) {
const formattedContent = accumulatedContent.replace(
/\*\*(.*?)\*\*/g,
"<strong>$1</strong>"
).replace(
/###\s*(.*?)(\n|$)/g,
'<h3 class="heading">$1</h3>'
);
if (assistantMessageRef.current) {
assistantMessageRef.current.content += formattedContent;
setConversation((prevMessages) =>
prevMessages.map((message) =>
message === assistantMessageRef.current
? assistantMessageRef.current
: message
)
);
}
}
setIsSending(false);
return;
}
const decodedValue = textDecoder.decode(value, { stream: true });
accumulatedContent += decodedValue;
console.log("Received chunk:", decodedValue);
// Check if accumulated content contains a complete segment to apply formatting
let index;
while ((index = accumulatedContent.indexOf("\n")) !== -1) {
const completeSegment = accumulatedContent.slice(0, index + 1);
accumulatedContent = accumulatedContent.slice(index + 1);
// Apply minimal formatting to the complete segment
const formattedSegment = completeSegment.replace(
/\*\*(.*?)\*\*/g,
"<strong>$1</strong>"
).replace(
/###\s*(.*?)(\n|$)/g,
'<h3 class="heading">$1</h3>'
);
if (assistantMessageRef.current) {
assistantMessageRef.current.content += formattedSegment;
setConversation((prevMessages) =>
prevMessages.map((message) =>
message === assistantMessageRef.current
? assistantMessageRef.current
: message
)
);
}
}
reader.read().then(processText);
});
};
import { ChatCompletionMessageParam } from "openai/resources/index.mjs";
import logger from "../../logger";
import { PassThrough } from 'stream';
interface Payload {
model: string;
messages: ChatCompletionMessageParam[];
stream?: boolean;
}
export async function Stream(payload: Payload) {
const response = await fetch("https://api.openai.com/v1/chat/completions", {
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
method: "POST",
body: JSON.stringify({
...payload,
stream: true,
stream_options: { include_usage: true },
}),
});
if (response.ok && response.body) {
const textDecoder = new TextDecoder();
let accumulatedData = "";
let usageData: any = null;
const passThroughStream = new PassThrough();
const reader = response.body.getReader();
reader.read().then(function processText({ done, value }) {
if (done) {
passThroughStream.end();
return;
}
const chunk = textDecoder.decode(value, { stream: true });
accumulatedData += chunk;
let index;
while ((index = accumulatedData.indexOf("\n")) !== -1) {
const completeEvent = accumulatedData.slice(0, index);
accumulatedData = accumulatedData.slice(index + 1);
if (completeEvent.trim().startsWith("data:")) {
const eventData = completeEvent.trim().slice(5);
if (eventData.length === 7) {
passThroughStream.end();
return;
}
try {
const json = JSON.parse(eventData);
if (
json.choices &&
json.choices.length > 0 &&
json.choices[0].delta.content
) {
const content = json.choices[0].delta.content;
passThroughStream.write(content);
}
if (json.usage && !usageData) {
usageData = json.usage;
}
} catch (error) {
logger.error("Error parsing JSON chunk", error);
}
}
}
reader.read().then(processText);
});
const usageDataPromise = new Promise((resolve) => {
const interval = setInterval(() => {
if (usageData) {
clearInterval(interval);
resolve(usageData);
}
}, 100);
});
return {
stream: passThroughStream,
usageData: usageDataPromise,
};
} else {
const errorMessage = await response.text();
throw new Error(`Failed to stream: ${errorMessage}`);
}
}