Inconsistent buffering using openai-node

Hi people, I want to show you an interesting issue I’m having using openai-node on a Remix project.

High level, the server part makes the request to OpenAI api and returned stream is embedded in the response. The client side which made the request using fetch parse the streamed response.

Speaking of code, the server part is this:

const stream = openai.beta.threads.runs
    .stream(threadId, {
      assistant_id: assistantId,
      include: ['step_details.tool_calls[*].file_search.results[*].content'],
    })
    .on('messageDelta', (message) => {
      console.log('messageDelta', JSON.stringify(message));
    })
   .toReadableStream();

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
      Thread: threadId,
    },
  });

The client part is

const response = await fetch(localizedChatAPIURL, {
    method: 'POST',
    body: thePayload,
  });
const reader = response.body
          .pipeThrough(new TextDecoderStream())
          .pipeThrough(new TextLineDecoderStream())
          .pipeThrough<AssistantStreamEvent>(new JSONDecoderStream())
          .getReader();

The strange issue is that messages appears to be different from server to client.

The output of that console.log on messageDelta, server side is:

messageDelta {"content":[{"index":0,"type":"text","text":{"value":"E","annotations":[]}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"x"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":","}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" come"}}]}

While the network inspector on the browser side logs these messages:

{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"Eddy Merckx, noto","annotations":[]}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"x"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":","}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" come"}}]}}}

The issue here is that this inconsistency is causing malformations of the first sentences. The example is in italian, I’m sorry, but the beginning of that sentence should be “Eddy Merkx , noto come”. Instead, the client part prints “Eddy Merckx, notoddy Merckx, noto come”.

I noticed the first message on the client side is longer than the first message on the server side so I’m suspecting some buffering happening somewhere.

Do you people have already encounter this issue? Do you have feedbacks about how to handle this and avoid the malformations?

Thank you

Welcome to the dev forum @acavestro

Upon creating a run with streaming, a run object (sse) is returned:

event: thread.run.created
data: {"id":"run_123","object":"thread.run","created_at":1710330640,"assistant_id":"asst_123","thread_id":"thread_123","status":"queued","started_at":null,"expires_at":1710331240,"cancelled_at":null,"failed_at":null,"completed_at":null,"required_action":null,"last_error":null,"model":"gpt-4o","instructions":null,"tools":[],"metadata":{},"temperature":1.0,"top_p":1.0,"max_completion_tokens":null,"max_prompt_tokens":null,"truncation_strategy":{"type":"auto","last_messages":null},"incomplete_details":null,"usage":null,"response_format":"auto","tool_choice":"auto","parallel_tool_calls":true}}

event: thread.run.queued
data: {"id":"run_123","object":"thread.run","created_at":1710330640,"assistant_id":"asst_123","thread_id":"thread_123","status":"queued","started_at":null,"expires_at":1710331240,"cancelled_at":null,"failed_at":null,"completed_at":null,"required_action":null,"last_error":null,"model":"gpt-4o","instructions":null,"tools":[],"metadata":{},"temperature":1.0,"top_p":1.0,"max_completion_tokens":null,"max_prompt_tokens":null,"truncation_strategy":{"type":"auto","last_messages":null},"incomplete_details":null,"usage":null,"response_format":"auto","tool_choice":"auto","parallel_tool_calls":true}}

event: thread.run.in_progress
data: {"id":"run_123","object":"thread.run","created_at":1710330640,"assistant_id":"asst_123","thread_id":"thread_123","status":"in_progress","started_at":1710330641,"expires_at":1710331240,"cancelled_at":null,"failed_at":null,"completed_at":null,"required_action":null,"last_error":null,"model":"gpt-4o","instructions":null,"tools":[],"metadata":{},"temperature":1.0,"top_p":1.0,"max_completion_tokens":null,"max_prompt_tokens":null,"truncation_strategy":{"type":"auto","last_messages":null},"incomplete_details":null,"usage":null,"response_format":"auto","tool_choice":"auto","parallel_tool_calls":true}}

event: thread.run.step.created
data: {"id":"step_001","object":"thread.run.step","created_at":1710330641,"run_id":"run_123","assistant_id":"asst_123","thread_id":"thread_123","type":"message_creation","status":"in_progress","cancelled_at":null,"completed_at":null,"expires_at":1710331240,"failed_at":null,"last_error":null,"step_details":{"type":"message_creation","message_creation":{"message_id":"msg_001"}},"usage":null}

event: thread.run.step.in_progress
data: {"id":"step_001","object":"thread.run.step","created_at":1710330641,"run_id":"run_123","assistant_id":"asst_123","thread_id":"thread_123","type":"message_creation","status":"in_progress","cancelled_at":null,"completed_at":null,"expires_at":1710331240,"failed_at":null,"last_error":null,"step_details":{"type":"message_creation","message_creation":{"message_id":"msg_001"}},"usage":null}

event: thread.message.created
data: {"id":"msg_001","object":"thread.message","created_at":1710330641,"assistant_id":"asst_123","thread_id":"thread_123","run_id":"run_123","status":"in_progress","incomplete_details":null,"incomplete_at":null,"completed_at":null,"role":"assistant","content":[],"metadata":{}}

event: thread.message.in_progress
data: {"id":"msg_001","object":"thread.message","created_at":1710330641,"assistant_id":"asst_123","thread_id":"thread_123","run_id":"run_123","status":"in_progress","incomplete_details":null,"incomplete_at":null,"completed_at":null,"role":"assistant","content":[],"metadata":{}}

event: thread.message.delta
data: {"id":"msg_001","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"Hello","annotations":[]}}]}}

...

event: thread.message.delta
data: {"id":"msg_001","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" today"}}]}}

event: thread.message.delta
data: {"id":"msg_001","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"?"}}]}}

event: thread.message.completed
data: {"id":"msg_001","object":"thread.message","created_at":1710330641,"assistant_id":"asst_123","thread_id":"thread_123","run_id":"run_123","status":"completed","incomplete_details":null,"incomplete_at":null,"completed_at":1710330642,"role":"assistant","content":[{"type":"text","text":{"value":"Hello! How can I assist you today?","annotations":[]}}],"metadata":{}}

event: thread.run.step.completed
data: {"id":"step_001","object":"thread.run.step","created_at":1710330641,"run_id":"run_123","assistant_id":"asst_123","thread_id":"thread_123","type":"message_creation","status":"completed","cancelled_at":null,"completed_at":1710330642,"expires_at":1710331240,"failed_at":null,"last_error":null,"step_details":{"type":"message_creation","message_creation":{"message_id":"msg_001"}},"usage":{"prompt_tokens":20,"completion_tokens":11,"total_tokens":31}}

event: thread.run.completed
data: {"id":"run_123","object":"thread.run","created_at":1710330640,"assistant_id":"asst_123","thread_id":"thread_123","status":"completed","started_at":1710330641,"expires_at":null,"cancelled_at":null,"failed_at":null,"completed_at":1710330642,"required_action":null,"last_error":null,"model":"gpt-4o","instructions":null,"tools":[],"metadata":{},"temperature":1.0,"top_p":1.0,"max_completion_tokens":null,"max_prompt_tokens":null,"truncation_strategy":{"type":"auto","last_messages":null},"incomplete_details":null,"usage":{"prompt_tokens":20,"completion_tokens":11,"total_tokens":31},"response_format":"auto","tool_choice":"auto","parallel_tool_calls":true}}

event: done
data: [DONE]

I’d recommend piping the stream directly to the client and using the streaming helpers from the SDK to process the message inside the client code.

Here’s an example from the OpenAI Assistants Quickstart GitHub repo.

Also make sure that you’ve implemented back-pressure handling.

Hi @sps thank you for the answer!

Well, more or less I’m already piping the stream directly to the client (that toReadableStream()).

However, what makes me a bit confused is how the first message contains more text that what it should. It’s like during the creation of the returned stream by toReadableStream() the first textDelta are merged together but some following events are still kept.