Retrieving file_id from the streaming API

I am using the streaming API to interface with an Assistant I created.

There seems to be 3 types of events coming back:

thread.run.created
thread.run.queued
thread.run.in_progress

I use the ‘created’ event to get the streaming text. But I dont know and I cant find how to get the file_id from the annotation. I cant find where the annotation is. Please see my code below:


        async for event in stream:
            logging.debug(f"Received event: {event.event}")
            if event.event in ['thread.run.step.created', 'thread.message.delta']:
                if event.event == 'thread.message.delta':
                    text = event.data.delta.content[0].text.value
                    logging.debug(f"Received text delta in thread.run.step.created: {text}")
                    buffer += text
                    formatted_buffer = format_text(buffer)
                    chunks = re.split(r'\n(?=\d+\.|\S)', formatted_buffer)
                    if len(chunks) > 1:
                        for chunk in chunks[:-1]:
                            logging.debug(f"Yielding chunk: {chunk.strip()}")
                            yield {"type": "text", "content": chunk.strip()}
                        buffer = chunks[-1]
                elif event.event == 'thread.run.step.created':
                    logging.debug("Thread run step created")
                    logging.debug(f"Event data: {event.data}")

                    async for text in stream.text_deltas:
                        # logging.debug(f"on_text_done: {(dir(stream.on_text_done))}")
                        logging.debug(f"Received text delta in thread.run.step.created: {text}")
                        buffer += text
                        formatted_buffer = format_text(buffer)
                        chunks = re.split(r'\n(?=\d+\.|\S)', formatted_buffer)
                        if len(chunks) > 1:
                            for chunk in chunks[:-1]:
                                logging.debug(f"Yielding chunk: {chunk.strip()}")
                                yield {"type": "text", "content": chunk.strip()}
                            buffer = chunks[-1]
            elif event.event == 'annotation':
                annotation = event.data.annotation
                logging.debug(f"Received annotation: {annotation}")
                yield {"type": "annotation", "content": annotation}

Only the 2nd elif seems to match (elif event.event == ‘thread.run.step.created’:slight_smile: but I dont understand where is annotations?

Please help.

1 Like

I thin I may have the perfect example for you! I use thread.message.completed to get the annotation.file_path.file_id

You can check my example bellow!

Thank you Marco! ’ thread.message.completed was it. That works perfectly.

2 Likes