Hey Team,
We have a use case where we need to implement an existing websocket service as a tool. That existing service receives and send multiple events. It does not return anything like we expect tool to return. It streams multiple events.
I have UI which calls this Agent and directs to this tool and in return expects analyst stream events.
How can I integrate this inside agents-sdk?
This is what I was trying but not working properly.
from agents import Agent, Runner
import asyncio
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
from agents import function_tool
import socketio
from urllib.parse import urlencode
@function_tool
def emit_to_analyst() -> str:
PROJECT_ID = 1
ANALYST_URL = "wss://wensocket_service.com"
NAMESPACE = "/notebook/"
AZURE_AD_TOKEN = "..."
sio = socketio.AsyncClient(logger=True, engineio_logger=True)
event_queue = asyncio.Queue()
@sio.on("*", namespace=NAMESPACE)
async def catch_all(event, data):
# Push all events into async queue
await event_queue.put((event, data))
@sio.event(namespace=NAMESPACE)
async def connect():
print("Connected to server.")
payload = {
"code": "",
"command": "show me top 5 rows.",
"pseudocode": [],
"codefix": False,
"auto_execute": True,
"data_context": [{"name": "titanic.csv", "context_type": "dataset"}],
"clarifications": [],
"instructions": [],
"skill": "Default"
}
await sio.emit("process", payload, namespace=NAMESPACE)
@sio.event
async def disconnect(namespace=NAMESPACE):
await event_queue.put(("disconnect", None))
query_params = {
'projectId': PROJECT_ID,
'objectId': OBJECT_ID,
'token': AZURE_AD_TOKEN
}
await sio.connect(
f"{ANALYST_URL}?{urlencode(query_params)}",
namespaces=[NAMESPACE],
transports=["websocket"],
auth={
"authorization_token": AZURE_AD_TOKEN,
"projectId": PROJECT_ID,
"notebookId": OBJECT_ID
}
)
try:
# Yield events as they arrive
while True:
event, data = await event_queue.get()
if event == "disconnect":
break
yield f"[{event.upper()}] {data}"
except Exception as e:
yield f"[ERROR] {e}"
finally:
await sio.disconnect()
yield "[SYSTEM] Disconnected cleanly."
async def main():
agent = Agent(
name="AnalystCaller",
instructions="Call the `emit_to_analyst` tool to trigger the analyst event.",
tools=[emit_to_analyst],
)
result = Runner.run_streamed(
agent,
input="Please trigger the analyst event.",
)
print("=== Run starting ===")
async for event in result.stream_events():
# We'll ignore the raw responses event deltas
if event.type == "raw_response_event":
print(f'Here: {event}')
continue
# When the agent updates, print that
elif event.type == "agent_updated_stream_event":
print(f"Agent updated: {event.new_agent.name}")
continue
# When items are generated, print them
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print("-- Tool was called")
elif event.item.type == "tool_call_output_item":
print(f"-- Tool output: {event.item.output}")
elif event.item.type == "message_output_item":
print(f"-- Message output:\n {event.item}")
else:
pass # Ignore other event types
print("=== Run complete ===")
if __name__ == "__main__":
asyncio.run(main())
Thanks in advance for the help! ![]()