How to streaming the assistant api output and output citation at the same time?

When I was streaming the assistant output, I have a logic that will replace the citation tag with the file source.

async def chat_with_streaming(
    thread_id: str, assistant_id: str, additional_instructions: str
):
    with client.beta.threads.runs.stream(
        thread_id=thread_id,
        assistant_id=assistant_id,
        additional_instructions=additional_instructions,
    ) as stream:
        for event in stream:
            if event.event == "thread.run.step.created":
                for text in stream.text_deltas:
                    response_text = text
                    citation_pattern = re.compile(r'【\d+:\d+†source】')
                    if citation_pattern.search(text):
                        response_text = ""
                        annotation = stream.get_final_messages()[0].content[0].text.annotations[0]
                        if file_citation := getattr(annotation, "file_citation", None):
                            cited_file = client.files.retrieve(file_citation.file_id)
                            response_text = f"【Source: {cited_file.filename}】"
                    yield json.dumps(
                        {"response": response_text, "reference": [], "thread_id": thread_id}
                    )
            logger.info(event.event)

However, most of the time, the streaming abruptly stopped at the token when it took a while to retrieve the corresponding file source. I am not sure what is happening.

One thing I notice is the streaming will stop whenever it took some time to retrievel the correct file source.

1 Like