Agent SDK support for generator based tools/websocket based tools

Hey Team,

We have a use case where we need to implement an existing websocket service as a tool. That existing service receives and send multiple events. It does not return anything like we expect tool to return. It streams multiple events.

I have UI which calls this Agent and directs to this tool and in return expects analyst stream events.

How can I integrate this inside agents-sdk?

This is what I was trying but not working properly.

from agents import Agent, Runner
import asyncio
from dotenv import load_dotenv
import os

load_dotenv() 
api_key = os.getenv("OPENAI_API_KEY")



from agents import function_tool
import socketio
from urllib.parse import urlencode

@function_tool
def emit_to_analyst() -> str:
    PROJECT_ID = 1
    ANALYST_URL = "wss://wensocket_service.com"
    NAMESPACE = "/notebook/"
    AZURE_AD_TOKEN = "..."

    sio = socketio.AsyncClient(logger=True, engineio_logger=True)

    event_queue = asyncio.Queue()

    @sio.on("*", namespace=NAMESPACE)
    async def catch_all(event, data):
        # Push all events into async queue
        await event_queue.put((event, data))

    @sio.event(namespace=NAMESPACE)
    async def connect():
        print("Connected to server.")
        payload = {
            "code": "",
            "command": "show me top 5 rows.",
            "pseudocode": [],
            "codefix": False,
            "auto_execute": True,
            "data_context": [{"name": "titanic.csv", "context_type": "dataset"}],
            "clarifications": [],
            "instructions": [],
            "skill": "Default"
        }
        await sio.emit("process", payload, namespace=NAMESPACE)
    
    @sio.event
    async def disconnect(namespace=NAMESPACE):
        await event_queue.put(("disconnect", None))

    query_params = {
            'projectId': PROJECT_ID,
            'objectId': OBJECT_ID,
            'token': AZURE_AD_TOKEN
        }

    await sio.connect(
        f"{ANALYST_URL}?{urlencode(query_params)}",
        namespaces=[NAMESPACE],
        transports=["websocket"],
        auth={
            "authorization_token": AZURE_AD_TOKEN,
            "projectId": PROJECT_ID,
            "notebookId": OBJECT_ID
        }
    )
     
    try:
        # Yield events as they arrive
        while True:
            event, data = await event_queue.get()
            if event == "disconnect":
                break
            yield f"[{event.upper()}] {data}"
    except Exception as e:
        yield f"[ERROR] {e}"
    finally:
        await sio.disconnect()
        yield "[SYSTEM] Disconnected cleanly."


async def main():
    agent = Agent(
        name="AnalystCaller",
        instructions="Call the `emit_to_analyst` tool to trigger the analyst event.",
        tools=[emit_to_analyst],
    )

    result = Runner.run_streamed(
        agent,
        input="Please trigger the analyst event.",
    )
    print("=== Run starting ===")

    async for event in result.stream_events():
        # We'll ignore the raw responses event deltas
        if event.type == "raw_response_event":
            print(f'Here: {event}')
            continue
        # When the agent updates, print that
        elif event.type == "agent_updated_stream_event":
            print(f"Agent updated: {event.new_agent.name}")
            continue
        # When items are generated, print them
        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                print("-- Tool was called")
            elif event.item.type == "tool_call_output_item":
                print(f"-- Tool output: {event.item.output}")
            elif event.item.type == "message_output_item":
                print(f"-- Message output:\n {event.item}")
            else:
                pass  # Ignore other event types

    print("=== Run complete ===")


if __name__ == "__main__":
    asyncio.run(main())

Thanks in advance for the help! :slight_smile:

If you had any luck with this, I would appreciate hearing how you did it.

Otherwise, I would like to second that I am looking for a solution that would allow tools to return generators.