Concatenated streaming packages in API response (recent change & simple fix) [svelte/javascript]

Hi all,

Disclaimer: I’m not the best Javascript dev, so feel free to contribute any improvements for anyone having the same issue

Recently a Svelte app I use started having some issues parsing out the JSON packages received from the API. After some light debugging I noticed that some of the received streaming messages were getting concatenated, which prevented them from being correctly parsed by JSON.parse().

If you are running into a similar issue, I hope this saves you some time. A fix that worked for me was just making sure to separate any concatenated stream messages using regular expressions, and then processing as you regularly would. If this doesn’t quite fix it for you, at least now you know what to ask chatgpt about, or a possible thing to look out for.

Code before

let source = new SSE("https://api.openai.com/v1/chat/completions", {
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${DEFAULT_API_KEY}`,
    },
    method: "POST",
    payload: JSON.stringify({
      model: $gptModel.code,
      messages: msg,
      stream: true,
    }),
  });
  
source.addEventListener("message", (e) => {
	if (e.data != "[DONE]") {
	  let payload = JSON.parse(e.data);
	  let typing = false;
	  let text = payload.choices[0].delta.content;
	  // whatever processing you use for generated text
	}

Code after

let source = new SSE("https://api.openai.com/v1/chat/completions", {
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${DEFAULT_API_KEY}`,
    },
    method: "POST",
    payload: JSON.stringify({
      model: $gptModel.code,
      messages: msg,
      stream: true,
    }),
  });
  
source.addEventListener("message", (e) => {
	if (e.data != "[DONE]") {
		// console.log(e.data);
		// Split the data using a regex that detects boundaries between JSON objects
		let splitData = e.data.split(/(?<=})(?={)/);
		splitData.forEach(data => {
			try {
				let payload = JSON.parse(data);
				let typing = false;
				let text = payload.choices[0].delta.content;
				if (text == undefined) {
					typing = !typing;
				}
				if (text != undefined) {
					// whatever processing you use for generated text
				}
			} catch (error) {
				console.error("Error streaming response. Error:", error);
			}
		});
	}

I’d like to point out that there’s pipeline modifiers built-in to handle these kind of cases:

Here’s an example I use to strip the useless information and pass directly only the content as a text/event-stream to the client:

  const transformer = new TransformStream({
    transform(chunk, controller) {
      // Modify the chunk
      const decoded = new TextDecoder().decode(chunk);
      const parsed = JSON.parse(decoded);
      const text = parsed.choices?.[0].delta?.content;
      if (text) {
        const encoded = new TextEncoder().encode(text);
        controller.enqueue(encoded);
      }
      if (!text && parsed.choices[0].finish_reason) {
        controller.enqueue(null);
      }
    },
  });
  // Pipe the original stream through the transformer
  // Explicitly typed for readability 
  const modifiedStream = (completion as Stream<ChatCompletionChunk>)
    .toReadableStream()
    .pipeThrough(transformer);

For some reason I’ve never had any broken JSON objects but it definitely makes sense to handle it in the case that it happens.

Relying on the Streams API provides a lot of benefits:

There are more advantages too — you can detect when streams start or end, chain streams together, handle errors and cancel streams as required, and react to the speed at which the stream is being read.

1 Like

Definitely useful to know, thanks for the input!

2 Likes