Stuck on getting an error at the end of a streamed answer

I’ve set up streaming and it seems to work except I keep getting an error at the end of the stream. The full content is still shown on the front end. Maybe it has something to do with how the stream is being closed? I can’t figure out what’s wrong even after trying to print and log everything. Here is my code and a screenshot of the console:

script.js

document.querySelector('.search-button').addEventListener('click', function(event) {
    event.preventDefault();
    var query = document.getElementById('searchQuery').value;

    // Clear existing content
    var answerContainer = document.getElementById('answerContainer');
    answerContainer.innerHTML = '';

    // Establish connection to the server for streaming responses
    var stream = new EventSource('/search?query=' + encodeURIComponent(query));

    // Listen for when the stream is successfully opened
    stream.onopen = function(event) {
      console.log('Stream opened successfully:', event);
    };

    // Listen for messages from the server
    stream.onmessage = function(event) {
      answerContainer.textContent += event.data; // Append each chunk of data
    };

    stream.onerror = function(error) {
        console.error('Stream Error:', error);
        if (event.target && event.target.readyState === EventSource.CLOSED) {
          console.log('Stream Closed');
      }
        answerContainer.textContent += '\nError in stream.';
        stream.close(); // Close the stream on error
    };
  });
});

search.py

# Stream response from OpenAI
    def generate():
        try:
            stream = openai_client.chat.completions.create(
                model="gpt-4-1106-preview",
                messages=[
                    {"role": "system", "content": "This is a test."},
                    {"role": "user", "content": f"{context} {query}"}
                ],
                stream=True
            )
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    yield f"data: {chunk.choices[0].delta.content}\n\n"
                    
        except Exception as e:
            error_info = traceback.format_exc()
            print(f"Error occurred: {e}")
            print(f"Error occurred during streaming: {error_info}")
            print("An error occurred:", traceback.format_exc())
            yield f"data: Error: {str(e)}\n\n"

    return Response(generate(), content_type='text/event-stream')

def extract_context(es_data):
    context = ''
    for hit in es_data['hits']['hits']:
        article_url = hit['_source'].get('articleURL', '')
        chunked_content = hit['_source'].get('chunkedContent', '')
        # Adding a separator between hits
        context += f"Article URL: {article_url}. Excerpt: {chunked_content}\n\n---\n\n"
    return context

if __name__ == '__main__':
    app.run(debug=True)

I remember scratching my head at something like this as well. Unfortunately I can’t remember what the issue was, except that it made me stare at my code and wonder why one approach gave the error and the other didn’t.

For my server (.net), I had to add some handling for the "data: " header and “[DONE]” token. I feel like there should be a built-in way of handling this that I’m not aware of, but it’s how I ended up doing it.

public static async IAsyncEnumerable<T> DeserializeChunkedAsync<T>(Stream stream, JsonSerializerOptions jsonSerializerOptions, [EnumeratorCancellation] CancellationToken cancellationToken) where T : class
{
    using var reader = new StreamReader(stream);
    while (await reader.ReadLineAsync(cancellationToken) is { } chunk)
    {
        var result = HandleSSE<T>(chunk, jsonSerializerOptions);
        switch (result.ResultKind)
        {
            case SseResultKind.Ok:
                yield return result.Value!;
                break;
            case SseResultKind.Done:
                yield break;
            case SseResultKind.Empty:
                continue;
        }
    }
}

private static SseResult<T> HandleSSE<T>(string chunk, JsonSerializerOptions jsonSerializerOptions) where T : class
{
    if (chunk.StartsWith("data: ", StringComparison.Ordinal))
        chunk = chunk[6..];

    if (string.IsNullOrWhiteSpace(chunk))
        return new SseResult<T>(SseResultKind.Empty);

    if (string.Equals(chunk, "[DONE]", StringComparison.Ordinal))
        return new SseResult<T>(SseResultKind.Done);

    var data = JsonSerializer.Deserialize<T>(chunk, jsonSerializerOptions);
    return new SseResult<T>(SseResultKind.Ok, data);
}

And here’s the web client. Since my server is transforming the response before returning it, this code might be less relevant:

export async function callChunkedAsync(
  request: object,
  callback: (segment: string) => void
): Promise<void> {
  const body: BodyInit = JSON.stringify(request);
  const controller = new AbortController();
  const _ = setTimeout(() => controller.abort(), config.timeoutMs);

  const options: RequestInit = {
    credentials: "omit",
    headers: {
      Accept: "text/event-stream",
      "Content-Type": "application/json",
    },
    method: "POST",
    body,
    signal: controller.signal,
  };

  var response = await fetch(chatCompletionUrl, options);
  if (response.body == null) throw Error();

  const reader = response.body.getReader();
  let result = await reader.read();
  const textDecoder = new TextDecoder();
  while (!result.done) {
    const segment = textDecoder.decode(result.value);

    callback(segment);

    result = await reader.read();
  }
}

Thanks for sharing this! I had to use GPT-4 to see how I could implement what you did into my flask app, but I’m still getting the error. Here are the relevant code snippets. Curious if you can spot any solution…

search.py

def generate():
        try:
            stream = openai_client.chat.completions.create(
                model="gpt-4-1106-preview",
                messages=[
                    {"role": "system", "content": "You are an expert in Web3 and love teaching people all about it. When questions come in, give a helpful answer, but keep responses concise and short. You'll receive extra content with each question that you can use as context. Your answers should focus on the provided context, but you can also use your own knowledge when necessary to provide the user with a great answer."},
                    {"role": "user", "content": f"Using the following context, answer this question: '{query}'. Here is the extra context: {context}"}
                ],
                stream=True
            )
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    yield f"data: {chunk.choices[0].delta.content}\n\n"
                if 'done' in chunk:  # Check if 'done' token is present
                    yield "data: [DONE]\n\n"
                    break
                    
        except Exception as e:
            error_info = traceback.format_exc()
            print(f"Error occurred: {e}")
            print(f"Error occurred during streaming: {error_info}")
            print("An error occurred:", traceback.format_exc())
            yield f"data: Error: {str(e)}\n\n"

    return Response(generate(), content_type='text/event-stream')

scripts.js

stream.onmessage = function(event) {
      if (event.data === "[DONE]") {
        console.log("Stream complete");
        stream.close();
      } else { ...

Hey, when streaming the last piece of data that comes is different than the rest, it is one which signifies the stream has ended. See below the 2nd last item received which has ‘content’ like the majority (it was an ‘!’ an the end of the sentence as can be seen in the ‘content’), and also the last data received which has no data but has a “finishReason”: “stop”. Perhaps this is what is causing your issue.

2nd last:

Raw event: {
  "id": "chatcmpl-95OXPdR9Jxpe79mfJpb5vXeO3hjex",
  "model": "gpt-4",
  "object": "chat.completion.chunk",
  "systemFingerprint": "fp_2f57f81c11",
  "created": "1970-01-20T19:17:52.991Z",
  "promptFilterResults": [],
  "choices": [
    {
      "index": 0,
      "finishReason": null,
      "delta": {
        "content": "!",
        "toolCalls": []
      },
      "contentFilterResults": {
        "hate": {
          "filtered": false,
          "severity": "safe"
        },
        "sexual": {
          "filtered": false,
          "severity": "safe"
        },
        "violence": {
          "filtered": false,
          "severity": "safe"
        },
        "selfHarm": {
          "filtered": false,
          "severity": "safe"
        }
      }
    }
  ]
}

and the last piece of data received with the “finishReason”: “stop” and no content:

Raw event: {
  "id": "chatcmpl-95OXPdR9Jxpe79mfJpb5vXeO3hjex",
  "model": "gpt-4",
  "object": "chat.completion.chunk",
  "systemFingerprint": "fp_2f57f81c11",
  "created": "1970-01-20T19:17:52.991Z",
  "promptFilterResults": [],
  "choices": [
    {
      "index": 0,
      "finishReason": "stop",
      "delta": {
        "toolCalls": []
      },
      "contentFilterResults": {}
    }
  ]
}