React, RTK Query, Mutations w/ Streaming Completions

Hi all. First time posting so I would like to thank all who take the time to help in advance.

Chat GPT Noob. I have created an app with React, RTKQuery on the frontend and Node, Express, OpenApi 3.5 turbo on the backend. Does anyone have a simple example of a POST request using the completions api with streaming on the backend. On the frontend, RTK query’s builder mutation should use onQueryStarted to dispatch the result as the result streams in. It is definitely not my first rodeo with react and I can put together basic CRUD with the latest RTK Query design patterns. But this streaming seems like a whole new concept and not much out there in terms of example. A couple of articles I have read so far but seems to only offer parts of what I need are here:

Questions I have with Open AI’s Completion API:

  1. What content-type header should I set for this? I am sending JSON but completions api does not seem like JSON is returned and I get an error in the payload which complains it is not valid JSON.

Hi and welcome @jonnydungeons

Here’s a sample code for streaming in nodeJS with openai client:

import OpenAI from "openai";

const openai = new OpenAI();

async function main() {
  const completion = await openai.chat.completions.create({
    model: "gpt-3.5-turbo",
    messages: [
      {"role": "system", "content": "You are a helpful assistant."},
      {"role": "user", "content": "Hello!"}
    ],
    stream: true,
  });

  for await (const chunk of completion) {
    console.log(chunk.choices[0].delta.content);
  }
}

main();

The streaming response will look like:

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}]}

....

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":" today"},"logprobs":null,"finish_reason":null}]}

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}]}

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}

If you are expecting json in the streaming, you’ll have to wait till response streaming ends and then validate the json.

Thank you for the warm welcome @sps and for the time you took. Based on your answer back my apologies for not being clear with what I am looking for and what I already know. That’s my fault. Let me set the stage on the things I do know.

  1. I already have a post request set up in my backend like this one.

const { content } = req.body;

  try {
    const stream = await openai.chat.completions.create({
      messages: [
        {
          role: "system",
          content: "Introduce yourself as a helpful assistant.",
        },
        { role: "user", content },
      ],
      model: config.get("chatGptModel"),
      stream: true,
    });

    for await (const chunk of stream) {
      console.log(chunk.choices[0]?.delta?.content || "");
      res.write(chunk.choices[0]?.delta?.content || "");
    }

    res.end();

  1. The parts of this problem I need answers to on the backend - Since I do not want JSON sent from the payload do I need to do anything different with setting response headers? Or should everything be fine, for the front end to properly accept the stream? Is this multi-part form data or something different?

At this point it is safe to assume that I definitely to do not wish to use JSON data but up until now in my career, I have only dealt with JSON data. I want to preserve the streaming nature of the response payload returned from the completion api, so that on the front end I can set up the same type of non-blocking experience which ChatGPT has. This gets me to my next set of questions, which are all front end related. And so you and the communicate can better answer those questions, I will list out some of the key frameworks for you -


"@reduxjs/toolkit": "^1.8.1",

"react": "^18.0.0",

"typescript": "^4.4.2",

I will also say that I am very familiar with setting up basic to complex CRUD functionality using the frameworks above so the answer I am searching for has more to do with my following questions:

  1. Is there a working solution, using a post request on the front end, consuming the stream and outputting with RTK Query? NOTE: I have gone to redux-toolkit’s website and I have already read through the following two key topics - “Streaming Updates” and “Pessimistic Updates”. These two topics seem to dance around a solution which I want, while not actually addressing my solution in my context.

“Streaming Update” - the working example on redux-toolkit’s website - uses “query” and NOT “mutation”. I am using mutation since I am positing. Because they use “query” their solution has “ updateCachedData” which is not available for “mutation”.


async onCacheEntryAdded(
        arg,
        { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
      ) {
        // create a websocket connection when the cache subscription starts
        const ws = new WebSocket('ws://localhost:8080')
        try {
          // wait for the initial query to resolve before proceeding
          await cacheDataLoaded

          // when data is received from the socket connection to the server,
          // if it is a message and for the appropriate channel,
          // update our query result with the received message
          const listener = (event: MessageEvent) => {
            const data = JSON.parse(event.data)
            if (!isMessage(data) || data.channel !== arg) return

            updateCachedData((draft) => {
              draft.push(data)
            })
          }

          ws.addEventListener('message', listener)
        } catch {
          // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
          // in which case `cacheDataLoaded` will throw
        }
        // cacheEntryRemoved will resolve when the cache subscription is no longer active
        await cacheEntryRemoved
        // perform cleanup steps once the `cacheEntryRemoved` promise resolves
        ws.close()
      },

“Pessimistic Updates” - While the solution redux-toolkit provides on their website is directly appropriate since it uses “mutation”, it does not seem to address streaming since it does not include initializing a websocket connection and creating a listener to stream the messages. The “Streaming Updates” does include a websocket.


updatePost: build.mutation<void, Pick<Post, 'id'> & Partial<Post>>({
      query: ({ id, ...patch }) => ({
        url: `post/${id}`,
        method: 'PATCH',
        body: patch,
      }),
      async onQueryStarted({ id, ...patch }, { dispatch, queryFulfilled }) {
        const patchResult = dispatch(
          api.util.updateQueryData('getPost', id, (draft) => {
            Object.assign(draft, patch)
          })
        )
        try {
          await queryFulfilled
        } catch {
          patchResult.undo()

          /**
           * Alternatively, on failure you can invalidate the corresponding cache tags
           * to trigger a re-fetch:
           * dispatch(api.util.invalidateTags(['Post']))
           */
        }
      },

My hunch is I need to use a mix of the above solutions but before I march forward with my own naive assumptions let me present that as my second question to you and the communicate.

  1. Do I need some combination of the “Streaming Updates” and “Pessimistic Updates”?

(again don’t know the validity of my questions since this is my first time ever consuming non-json in a stream so my apologies, if I am still unclear)

If you’re streaming plain text from your backend to the frontend, you should set the appropriate Content-Type - which for plaintext is text/plain.

Since you’re not using JSON, you’re not dealing with multipart form data either.

You’ll likely need to set up an event listener on the frontend to handle the incoming stream. This can be done using a regular fetch request in JavaScript, which can read the response stream and update the state accordingly.

Apologies, but my knowledge about Redux is limited, so that’s all I can provide.

However, you can use ChatGPT to move towards a working prototype.

I really appreciate your follow up again and the time you took. Thanks for clearing up the content-type issue of being text/plain. Perhaps when I make that change to accept text/plain the rest will come for free…being overly optimistic of course…haha.

I see example on redux where they use websockets so perhaps I need to fiddle with that code. My “Streaming Update” code sample is code directly lifted from their site and demos how to use a websocket.

In either case thank you again.

I will keep this open, in case someone with more redux experience shall stumble on this post and have some more answers to my problem.

1 Like

Hi @jonnydungeons , did you finally get to have a working streaming solution with RTK Query? I’m stuck with the same problem