Streaming Chat Completions without Edge Runtime Next JS pages directory

I’m having trouble getting my UI to stream the response without edge runtime on next js pages router. It works fine with a edge run time setup but when I go to deploy on vercel the route size is too large and requires pro plan (approx 2.5mb).

The api route logs the chunks fine but I don’t know how to consume on the front end successfully.

import { NextApiRequest, NextApiResponse } from 'next';
import { ChatCompletionMessageParam } from "openai/resources/index.mjs";
import { Stream } from "@/lib/stream";
import { StatusCodes } from "http-status-codes";
import { encodingForModel } from "js-tiktoken";

interface Payload {
  model: string;
  messages: ChatCompletionMessageParam[];
  stream?: boolean;
}

const model = "gpt-4o";
const MAX_TOKENS = 14000;

const config = {
  api: {
    bodyParser: false,
  },
};

function calculateTotalTokens(messages: ChatCompletionMessageParam[], encoder: ReturnType<typeof encodingForModel>): number {
  return messages.reduce((total, message) => {
    if (typeof message.content === "string") {
      return (
        total +
        encoder.encode(message.role).length +
        encoder.encode(message.content).length +
        2
      );
    }
    return total;
  }, 0);
}

function trimConversation(conversation: ChatCompletionMessageParam[]) {
  const encoder = encodingForModel(model);
  let totalTokens = calculateTotalTokens(conversation, encoder);
  while (totalTokens > MAX_TOKENS && conversation.length > 1) {
    conversation.splice(1, 1);
    totalTokens = calculateTotalTokens(conversation, encoder);
  }
  return conversation;
}

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
  if (req.method !== 'POST') {
    res.status(405).json({ error: 'Method not allowed' });
    return;
  }

  let conversation: ChatCompletionMessageParam[];
  try {
    const body = await new Promise<string>((resolve, reject) => {
      let data = '';
      req.on('data', chunk => {
        data += chunk;
      });
      req.on('end', () => {
        resolve(data);
      });
      req.on('error', err => {
        reject(err);
      });
    });
    const parsedBody = JSON.parse(body);
    conversation = parsedBody.conversation;
  } catch (error) {
    res.status(400).json({ error: 'Invalid request body' });
    return;
  }

  if (!conversation || conversation.length === 0) {
    res.status(400).json({ error: 'Conversation is required' });
    return;
  }

  const trimmedConversation = trimConversation(conversation);

  const payload: Payload = {
    model,
    messages: trimmedConversation,
    stream: true,
  };

  try {
    const { stream, usageData } = await Stream(payload);
    console.log("[API] - Streaming chat response initiated.");

    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    stream.on('data', (chunk) => {
      console.log("Chunk sent:", chunk.toString());
    });

    stream.pipe(res);

    usageData
      .then((data) => {
        console.log(`[AI] - Usage data: ${JSON.stringify(data)}`);
      })
      .catch((error) => {
        console.error(`[AI] - Error fetching usage data: ${error}`);
      });

  } catch (error) {
    const responseError = error as Error;
    console.error(`[AI] - Error streaming chat: ${responseError.message}`);
    res.status(500).json({ error: 'Internal Server Error' });
  }
}

export { config };



  const sendMessage = async (message: string) => {
    console.log("Sending message:", message);
    setIsSending(true);
    const userMessage = { role: Roles.USER, content: message };
    setConversation((prev) => [...prev, userMessage]);
  
    if (user) {
      await addMessage(user.id, userMessage);
    }
  
    let context = null;
    try {
      const ragResponse = await useRAGService.queryRAG(message);
      if (ragResponse && typeof ragResponse === "string") {
        context = ragResponse;
      }
    } catch (error) {
      logger.error(`[RAG] - Error fetching context: ${error}`);
    }
  
    const assistantMessage = {
      role: Roles.ASSISTANT,
      content: "",
    };
  
    assistantMessageRef.current = assistantMessage;
    setConversation((prev) => [...prev, assistantMessage]);
  
    const response = await fetch("/api/chat", {
      method: "POST",
      body: JSON.stringify({ conversation: [...conversation, userMessage] }),
      headers: {
        "Content-Type": "application/json",
      },
    });
  
    // Get the streamed response and usage data
    const reader = response.body?.getReader();
    const textDecoder = new TextDecoder("utf-8");
  
    if (!reader) {
      throw new Error("No reader");
    }
  
    let accumulatedContent = "";
    reader.read().then(function processText({ done, value }) {
      if (done) {
        console.log("Stream done.");
        // Handle any remaining content in accumulatedContent
        if (accumulatedContent.length > 0) {
          const formattedContent = accumulatedContent.replace(
            /\*\*(.*?)\*\*/g,
            "<strong>$1</strong>"
          ).replace(
            /###\s*(.*?)(\n|$)/g,
            '<h3 class="heading">$1</h3>'
          );
  
          if (assistantMessageRef.current) {
            assistantMessageRef.current.content += formattedContent;
            setConversation((prevMessages) =>
              prevMessages.map((message) =>
                message === assistantMessageRef.current
                  ? assistantMessageRef.current
                  : message
              )
            );
          }
        }
        setIsSending(false);
        return;
      }
  
      const decodedValue = textDecoder.decode(value, { stream: true });
      accumulatedContent += decodedValue;
      console.log("Received chunk:", decodedValue);
  
      // Check if accumulated content contains a complete segment to apply formatting
      let index;
      while ((index = accumulatedContent.indexOf("\n")) !== -1) {
        const completeSegment = accumulatedContent.slice(0, index + 1);
        accumulatedContent = accumulatedContent.slice(index + 1);
  
        // Apply minimal formatting to the complete segment
        const formattedSegment = completeSegment.replace(
          /\*\*(.*?)\*\*/g,
          "<strong>$1</strong>"
        ).replace(
          /###\s*(.*?)(\n|$)/g,
          '<h3 class="heading">$1</h3>'
        );
  
        if (assistantMessageRef.current) {
          assistantMessageRef.current.content += formattedSegment;
          setConversation((prevMessages) =>
            prevMessages.map((message) =>
              message === assistantMessageRef.current
                ? assistantMessageRef.current
                : message
            )
          );
        }
      }
  
      reader.read().then(processText);
    });
  };
  
import { ChatCompletionMessageParam } from "openai/resources/index.mjs";
import logger from "../../logger";
import { PassThrough } from 'stream';

interface Payload {
  model: string;
  messages: ChatCompletionMessageParam[];
  stream?: boolean;
}

export async function Stream(payload: Payload) {
  const response = await fetch("https://api.openai.com/v1/chat/completions", {
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
    },
    method: "POST",
    body: JSON.stringify({
      ...payload,
      stream: true,
      stream_options: { include_usage: true },
    }),
  });

  if (response.ok && response.body) {
    const textDecoder = new TextDecoder();
    let accumulatedData = "";
    let usageData: any = null;

    const passThroughStream = new PassThrough();

    const reader = response.body.getReader();

    reader.read().then(function processText({ done, value }) {
      if (done) {
        passThroughStream.end();
        return;
      }
      
      const chunk = textDecoder.decode(value, { stream: true });
      accumulatedData += chunk;

      let index;
      while ((index = accumulatedData.indexOf("\n")) !== -1) {
        const completeEvent = accumulatedData.slice(0, index);
        accumulatedData = accumulatedData.slice(index + 1);

        if (completeEvent.trim().startsWith("data:")) {
          const eventData = completeEvent.trim().slice(5);

          if (eventData.length === 7) {
            passThroughStream.end();
            return;
          }

          try {
            const json = JSON.parse(eventData);
            if (
              json.choices &&
              json.choices.length > 0 &&
              json.choices[0].delta.content
            ) {
              const content = json.choices[0].delta.content;
              passThroughStream.write(content);
            }
            if (json.usage && !usageData) {
              usageData = json.usage;
            }
          } catch (error) {
            logger.error("Error parsing JSON chunk", error);
          }
        }
      }

      reader.read().then(processText);
    });

    const usageDataPromise = new Promise((resolve) => {
      const interval = setInterval(() => {
        if (usageData) {
          clearInterval(interval);
          resolve(usageData);
        }
      }, 100);
    });

    return {
      stream: passThroughStream,
      usageData: usageDataPromise,
    };
  } else {
    const errorMessage = await response.text();
    throw new Error(`Failed to stream: ${errorMessage}`);
  }
}