Skip to content

AG-UI Streaming

AG-UI provides a Server-Sent Events (SSE) streaming surface for your agent, designed for frontend consumption. It translates internal AgentTraceEvent streams into typed AgentIoEvent SSE frames that frontends can render incrementally — text deltas, reasoning steps, tool calls, interaction prompts, and run lifecycle signals.

[dependencies]
agent_sdk = { workspace = true, features = ["event-stream", "llm-engine"] }
  1. Build an agent with a stream-capable LLM runtime — the agent must have configure_llm_runtime called so can_run_stream() returns true.

  2. Create an AgUiConfig — configure the SSE endpoint path (defaults to /agui/v1/runs).

  3. Build an AgUiApp from the agent and config.

  4. Mount the router — call .router() to get an axum Router, then merge it into your server.

use agent_sdk::agui::{AgUiApp, AgUiConfig};
use std::sync::Arc;
let agent = build_my_agent()?;
let agui = AgUiApp::from_agent(agent, AgUiConfig::default())?;
let router = agui.router();
// Serve with axum
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, router).await?;

If you need the agent in multiple places (e.g., A2A + AG-UI), wrap it in an Arc:

use std::sync::Arc;
let agent = Arc::new(build_my_agent()?);
let agui = AgUiApp::from_shared_agent(agent.clone(), AgUiConfig {
route_path: "/agui/v1/runs".to_string(),
})?;

Use AgentHostBuilder::with_agui to serve both A2A JSON-RPC and AG-UI on one router:

use agent_sdk::AgentHostBuilder;
let host = AgentHostBuilder::new(agent)
.with_agui(AgUiConfig::default())
.build()?;
// host.router() includes both /jsonrpc and /agui/v1/runs

The AgentIoEvent enum defines the SSE frame vocabulary:

EventFieldsPurpose
RunStartedthread_id, run_idStream opened
RunFinishedthread_id, run_id, resultStream complete
RunErrormessage, codeError during run
StepStartedstep_nameTool/step begins
StepFinishedstep_nameTool/step ends
TextMessageStartmessage_id, roleText message begins
TextMessageContentmessage_id, deltaText token delta
TextMessageEndmessage_idText message complete
ReasoningStartmessage_idReasoning block begins
ReasoningMessageStartmessage_id, roleReasoning message begins
ReasoningMessageContentmessage_id, deltaReasoning token delta
ToolCallStarttool_call_id, tool_nameTool invocation begins
ToolCallEndtool_call_id, resultTool invocation ends
RunInputRequiredinteraction_requestAgent needs user input

For advanced use, AgUiStreamDriver gives fine-grained control over the mapping pipeline:

use agent_sdk::agui::{AgUiStreamDriver, AgUiDriverConfig, IoEventContext};
let driver = AgUiStreamDriver::new(AgUiDriverConfig {
ctx: IoEventContext {
thread_id: "thread-123".into(),
run_id: "run-456".into(),
message_id: "msg-789".into(),
},
cancel_flag: None,
enrichers: vec![],
});

Implement the StreamEnricher trait to inject custom events into the SSE stream (e.g., citations, metadata):

use agent_sdk::agui::CitationEnricher;
let config = AgUiDriverConfig {
// ...
enrichers: vec![Box::new(CitationEnricher::new(sources))],
};

StreamBroadcast lets multiple subscribers receive the same SSE stream:

use agent_sdk::streaming::StreamBroadcast;
let broadcast = StreamBroadcast::new(source_stream);
let subscriber_a = broadcast.subscribe();
let subscriber_b = broadcast.subscribe();
TypeModuleDescription
AgUiAppagent_sdk::aguiApp builder — from_agent(), from_shared_agent(), router()
AgUiConfigagent_sdk::aguiConfig with route_path field (default: /agui/v1/runs)
AgUiStreamDriveragent_sdk::streamingLow-level driver for trace → SSE mapping
AgUiDriverConfigagent_sdk::streamingDriver config: context, cancel flag, enrichers
AgentIoEventagent_sdk::streamingSSE event enum for frontend consumption
IoEventContextagent_sdk::streamingthread_id, run_id, message_id
StreamBroadcastagent_sdk::streamingFan-out a single stream to multiple subscribers
StreamResponsea2a_protocol_core::streamingA2A streaming response type
StreamEnricheragent_sdk::streamingTrait for injecting custom events
SummaryHandleagent_sdk::streamingAccess to run summary after stream completes
RunSummaryagent_sdk::streamingSummary data: status, token usage, timing
RunStatusagent_sdk::streamingCompleted, Error, InputRequired, Cancelled