Streaming

This tutorial shows how to stream responses from chat models token by token.

Basic streaming

Every BaseChatModel provides a stream method that returns a BoxStream of ChatChunk values:

use futures_util::StreamExt;
use synwire_core::language_models::{FakeChatModel, BaseChatModel};
use synwire_core::messages::Message;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let model = FakeChatModel::new(vec!["Hello, world!".into()])
        .with_chunk_size(3); // Split into 3-character chunks

    let messages = vec![Message::human("Greet me")];
    let mut stream = model.stream(&messages, None).await?;

    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        if let Some(content) = &chunk.delta_content {
            print!("{content}");
        }
    }
    println!();
    // Output: Hel lo, wo rld !

    Ok(())
}

Streaming with OpenAI

use futures_util::StreamExt;
use synwire_core::language_models::BaseChatModel;
use synwire_core::messages::Message;
use synwire_llm_openai::ChatOpenAI;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let model = ChatOpenAI::builder()
        .model("gpt-4o-mini")
        .api_key_env("OPENAI_API_KEY")
        .build()?;

    let messages = vec![Message::human("Write a haiku about Rust")];
    let mut stream = model.stream(&messages, None).await?;

    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        if let Some(content) = &chunk.delta_content {
            print!("{content}");
        }
    }
    println!();

    Ok(())
}

ChatChunk fields

FieldTypeDescription
delta_contentOption<String>Incremental text content
delta_tool_callsVec<ToolCallChunk>Incremental tool call data
finish_reasonOption<String>"stop" on the final chunk
usageOption<UsageMetadata>Token usage (final chunk only)

Collecting streamed output

To accumulate the full response:

let mut full_text = String::new();
while let Some(chunk) = stream.next().await {
    let chunk = chunk?;
    if let Some(content) = &chunk.delta_content {
        full_text.push_str(content);
    }
}

Error handling during streaming

Errors can occur mid-stream. Handle them per-chunk:

while let Some(result) = stream.next().await {
    match result {
        Ok(chunk) => { /* process chunk */ }
        Err(e) => {
            eprintln!("Stream error: {e}");
            break;
        }
    }
}

Testing streams

FakeChatModel supports configurable chunking and error injection for stream testing:

// Split into 5-char chunks, inject error after 2 chunks
let model = FakeChatModel::new(vec!["abcdefghij".into()])
    .with_chunk_size(5)
    .with_stream_error_after(2);

Next steps