Tutorial 6: Checkpointing — Resumable Workflows
Prerequisites: Rust 1.85+, completion of Tutorial 5, familiarity with StateGraph from Getting Started: Graph Agents.
In this tutorial you will:
- Understand what checkpointing does and when you need it
- Wire
InMemoryCheckpointSaverinto aStateGraph - Resume a run from a checkpoint using
thread_id - Switch to
SqliteSaverfor persistence across process restarts - Fork a run from a past checkpoint
1. Why checkpoint?
Without checkpointing, every graph.invoke(...) starts from scratch. Checkpointing enables:
- Resume — a long-running workflow interrupted mid-way (network error, process restart) picks up from the last completed superstep
- Fork — run alternative continuations from the same past state without re-executing earlier steps
- Replay / debug — rewind to an intermediate state and inspect what changed
- Human-in-the-loop — pause execution at a decision point, wait for human input, then resume
📖 Rust note:
Arc<T>(Atomically Reference Counted) enables shared ownership of a value across threads. We useArc<dyn BaseCheckpointSaver>because the compiled graph and the caller both need access to the saver.
2. In-memory checkpointing
Add dependencies:
[dependencies]
synwire-orchestrator = "0.1"
synwire-checkpoint = "0.1"
synwire-derive = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
Define state and a simple two-step graph:
use synwire_derive::State; use synwire_orchestrator::graph::StateGraph; use synwire_orchestrator::constants::END; use synwire_orchestrator::func::sync_node; use synwire_checkpoint::{InMemoryCheckpointSaver, CheckpointConfig}; use std::sync::Arc; use serde::{Serialize, Deserialize}; #[derive(State, Clone, Debug, Default, Serialize, Deserialize)] struct WorkflowState { #[reducer(topic)] steps_completed: Vec<String>, #[reducer(last_value)] result: String, } #[tokio::main] async fn main() -> anyhow::Result<()> { let mut graph = StateGraph::<WorkflowState>::new(); graph.add_node("step_one", sync_node(|mut s: WorkflowState| { s.steps_completed.push("step_one".to_string()); Ok(s) }))?; graph.add_node("step_two", sync_node(|mut s: WorkflowState| { s.steps_completed.push("step_two".to_string()); s.result = format!("Completed: {:?}", s.steps_completed); Ok(s) }))?; graph.set_entry_point("step_one") .add_edge("step_one", "step_two") .add_edge("step_two", END); let compiled = graph.compile()?; // Attach the saver let saver = Arc::new(InMemoryCheckpointSaver::new()); let checkpointed = compiled.with_checkpoint_saver(saver.clone()); // Run with thread_id "session-1" let config = CheckpointConfig::new("session-1"); let state = checkpointed.invoke(WorkflowState::default(), Some(config)).await?; println!("Result: {}", state.result); Ok(()) }
After invoke completes, saver holds a checkpoint for "session-1". The checkpoint contains the full state after each superstep.
3. Resuming from a checkpoint
Pass the same thread_id on a subsequent call. The graph loads the latest checkpoint and continues from there:
#![allow(unused)] fn main() { // ... (same graph and saver from above) // First run let config = CheckpointConfig::new("my-thread"); checkpointed.invoke(WorkflowState::default(), Some(config.clone())).await?; // Simulate an interruption here. On resume: let resumed = checkpointed.invoke(WorkflowState::default(), Some(config)).await?; println!("Resumed result: {}", resumed.result); // The graph finds the existing checkpoint and skips already-completed supersteps. }
Note:
InMemoryCheckpointSaverloses all state when the process exits. For true resumability across restarts, useSqliteSaver(next section).
4. SQLite checkpointing for durable persistence
Add synwire-checkpoint-sqlite:
[dependencies]
synwire-checkpoint-sqlite = "0.1"
# ... rest unchanged
Replace InMemoryCheckpointSaver with SqliteSaver:
#![allow(unused)] fn main() { use synwire_checkpoint_sqlite::SqliteSaver; use std::path::Path; use std::sync::Arc; // Opens or creates "checkpoints.db" in the current directory. // File permissions are set to 0600 automatically. let saver = Arc::new(SqliteSaver::new(Path::new("checkpoints.db"))?); let checkpointed = compiled.with_checkpoint_saver(saver); let config = CheckpointConfig::new("persistent-session"); checkpointed.invoke(WorkflowState::default(), Some(config.clone())).await?; // Kill the process here. On restart: // The same code opens "checkpoints.db" and resumes from the last superstep. let resumed = checkpointed.invoke(WorkflowState::default(), Some(config)).await?; }
No system SQLite library is required — synwire-checkpoint-sqlite bundles SQLite via the rusqlite bundled feature.
5. Forking from a past checkpoint
To fork at a specific checkpoint, provide its checkpoint_id:
#![allow(unused)] fn main() { use synwire_checkpoint::{CheckpointConfig, BaseCheckpointSaver}; // List all checkpoints for a thread let checkpoints = saver.list(&CheckpointConfig::new("my-thread"), None).await?; // Fork from the first checkpoint (earliest in the run) if let Some(first) = checkpoints.first() { let fork_config = CheckpointConfig::new("my-thread") .with_checkpoint_id(first.checkpoint.id.clone()); let forked = checkpointed.invoke(WorkflowState::default(), Some(fork_config)).await?; println!("Fork result: {}", forked.result); } }
The forked run creates a new branch in the checkpoint tree, identified by a new checkpoint_id. The original thread remains unchanged.
Next steps
- How-To: Add Checkpointing — configuration options and advanced patterns
- Checkpointing Explanation —
BaseStoreand the serde protocol - Pregel Execution Model — how supersteps relate to checkpoints