Skip to content

stream

Streaming event types, SSE parser, and event-stream construction.

This module defines the [StreamEvent] vocabulary emitted during an LLM streaming response and provides the machinery to parse raw SSE byte streams from OpenAI-compatible providers into typed events.

Text response: StreamStart -> ContentDelta* -> Done

Tool calls: StreamStart -> ToolCallStart -> ToolCallDelta* -> Done

Reasoning models (Qwen3, DeepSeek R1): StreamStart -> ReasoningDelta* -> ContentDelta* -> Done

type LlmEventStream = Pin<Box<dyn futures::Stream + Send>>;

A boxed, pinned, Send stream of [StreamEvent] results.

This is the canonical return type for all streaming LLM methods.

Stateful SSE line parser.

Feeds raw bytes from the HTTP response body and emits complete data: payloads as strings. Handles line buffering across chunk boundaries and supports both data-only and typed event consumption.

Use [next_event] for data-only payloads (OpenAI-compatible) or [next_typed_event] for (event_type, data) pairs (Anthropic-compatible).

Methods

fn new() -> Self
fn feed(&mut self, bytes: &[u8])

Feed a chunk of bytes from the response body.

fn next_event(&mut self) -> Option<String>

Get the next complete SSE data payload, if available.

Returns data-only strings, ignoring event: fields. Use this for OpenAI-compatible streams.

fn next_typed_event(&mut self) -> Option<(Option<String>, String)>

Get the next (event_type, data) pair, if available.

The event_type is Some when the data line was preceded by an event: SSE field, None otherwise. Use this for Anthropic-style streams that require the event type to dispatch parsing.

A single event from an LLM streaming response.

Events are emitted in real-time as the provider generates tokens. Variants cover content deltas, native reasoning/CoT deltas (never fabricated), incremental tool-call fragments, and lifecycle signals.

Variants

VariantDescription
StreamStart { ... }Stream started. Emitted once from the first chunk that contains a role.
ContentDelta { ... }A text content delta from the assistant.
ReasoningDelta { ... }A reasoning/thinking delta from reasoning models.
ToolCallStart { ... }A new tool call started in the stream.
ToolCallDelta { ... }An arguments JSON fragment for an in-progress tool call.
Done { ... }The stream completed.
Error { ... }An error occurred during streaming.
fn sse_event_stream(response: reqwest::Response) -> LlmEventStream

Convert a raw reqwest::Response (whose body is an SSE byte stream) into a typed [LlmEventStream].

Spawns a background tokio task that reads chunks, feeds them through [SseParser], parses each data line via [parse_chat_chunk], and sends the resulting [StreamEvent]s through a bounded channel.

Repeated [StreamEvent::StreamStart] (e.g. when every chunk carries delta.role) is collapsed to a single start event per response.

The returned stream completes when the upstream sends data: [DONE], the connection closes, or an error occurs.

fn sse_event_stream_from_buffer(body: Vec<u8>) -> LlmEventStream

Convert a buffered SSE response body into a typed [LlmEventStream].

This is the WASM-compatible counterpart of [sse_event_stream]. Instead of reading chunks incrementally from a network stream, it parses the entire buffered body at once. All events are emitted immediately.

Limitations: no incremental token delivery — the caller receives all events after the full response completes. True incremental WASM streaming is deferred until WASI 0.3.

Like [sse_event_stream], repeated [StreamEvent::StreamStart] is deduped per response.