Streaming Dataflow in Numaflow¶
This document explains how Numaflow's Rust data-plane moves data in a streaming fashion, ensuring at-least-once semantics and correct watermark propagation for reduce operations. You can read why we changed to streaming in our blog post.
Overview¶
Numaflow processes data as a continuous stream rather than discrete batches (a major change in 1.6). The key design principles are:
- Always maintain active work - Keep
batch_sizemessages in-flight at all times for maximum throughput and reduce tail latency. - Track the smallest offset - Never advance watermarks past unprocessed data, ensuring reduce correctness (oldest work yet to be completed)
- Backpressure via semaphores - Prevent memory exhaustion while maximizing parallelism (throttling read if processing or write is pending)
flowchart LR
subgraph "Vertex N"
ISB_R[ISB Reader] --> Processor
Processor --> ISB_W[ISB Writer]
end
subgraph "Vertex N+1"
ISB_R2[ISB Reader] --> Processor2[Processor]
end
ISB_W -->|ISB Edge| ISB_R2
style ISB_R fill:#4a9eff,color:#fff
style ISB_W fill:#4a9eff,color:#fff
style Processor fill:#10b981,color:#fff
style ISB_R2 fill:#4a9eff,color:#fff
style Processor2 fill:#10b981,color:#fff
Core Components¶
ISB Reader¶
Reads messages from the Inter-Step Buffer (e.g., JetStream), enriches them with watermarks, and tracks them until fully processed, and written/forwarded to the next Vertex.
ISB Writer¶
Writes processed messages to downstream ISB streams, handles routing based on tags/conditions, and publishes watermarks.
Tracker¶
Maintains a sorted map (BTreeMap) of all in-flight messages per partition. Used to:
- Track message completion for ACK/NACK
- Compute the lowest watermark among all in-flight messages
- Handle serving callbacks
Watermark Computation¶
Ensures temporal correctness by tracking event-time progress and publishing watermarks to downstream vertices.
Maintaining Active Work with Semaphores¶
The reader maintains a bounded number of in-flight messages using a semaphore. This provides:
- Constant throughput - Always
batch_sizemessages being processed - Backpressure - Prevents unbounded memory growth
- Graceful shutdown - Waits for all the messages to be processed before exiting
sequenceDiagram
participant Sem as Semaphore<br/>(max_ack_pending)
participant Reader as ISB Reader
participant Processor as Processor
participant Writer as ISB Writer
Note over Sem: Initial permits = max_ack_pending
Reader->>Sem: acquire_many(batch_size)
Sem-->>Reader: permits granted
Reader->>Reader: fetch messages from ISB
loop For each message
Reader->>Processor: send message (with 1 permit)
Processor->>Writer: processed message
Writer->>Writer: write to downstream ISB
Writer-->>Sem: release 1 permit (on ACK)
end
Note over Sem: Permits returned = ready for next batch
How It Works¶
- Reader acquires permits - before fetching messages (
acquire_many(batch_size)) - Each message carries a permit - split from the batch permit
- Permit released on ACK - when downstream confirms receipt
- Backpressure automatic - if processing is slow, reader blocks on permit acquisition
// Semaphore controls max in-flight messages
let semaphore = Arc::new(Semaphore::new(max_ack_pending));
loop {
// Block until we can process more messages
let permits = semaphore.acquire_many_owned(batch_size).await?;
// Fetch and process batch
let batch = self.fetch_messages(batch_size).await;
for message in batch {
// Each message gets 1 permit, released on ACK
let permit = permits.split(1);
self.process_message(message, permit).await;
}
}
Message Lifecycle¶
A message flows through several stages from read to acknowledgment:
flowchart TB
subgraph "1. Read Phase"
A[Fetch from ISB/Source] --> B[Enrich with Watermark]
B --> C[Insert into Tracker]
C --> D[Spawn WIP Loop]
end
subgraph "2. Process Phase"
D --> E[Send to Processor Channel]
E --> F[Map/Reduce Processing]
end
subgraph "3. Write Phase"
F --> G[Route to Target Streams]
G --> H[Write to Downstream ISB/Sink]
H --> I[Await Write Confirmation]
end
subgraph "4. Completion Phase"
I --> J[Send ACK/NACK via AckHandle]
J --> K[ACK/NACK to ISB/Source]
K --> L[Delete from Tracker]
L --> M[Release Semaphore Permit]
end
style A fill:#4a9eff,color:#fff
style F fill:#10b981,color:#fff
style H fill:#f59e0b,color:#fff
style K fill:#ef4444,color:#fff
Key Points¶
- Watermark enriched at read time - Each message gets the watermark for its offset
- Tracker insertion before processing - Ensures we never lose track of a message
- WIP (Work-In-Progress) loop - Periodically marks message as still being processed
- ACK/NACK triggers cleanup - Removes from tracker and releases permit
Offset Tracking for Reduce Correctness¶
The Tracker maintains a BTreeMap of offsets per partition. This sorted structure is critical for watermark correctness:
flowchart TB
subgraph "Tracker State (per partition)"
direction LR
subgraph BTreeMap["BTreeMap<Offset, TrackerEntry>"]
O1["Offset: 100<br/>WM: 1000"]
O2["Offset: 105<br/>WM: 1050"]
O3["Offset: 110<br/>WM: 1100"]
O4["Offset: 115<br/>WM: 1150"]
end
end
O1 -->|"first_key_value()"| LW[Lowest Watermark: 1000]
style O1 fill:#ef4444,color:#fff
style LW fill:#ef4444,color:#fff
style O2 fill:#4a9eff,color:#fff
style O3 fill:#4a9eff,color:#fff
style O4 fill:#4a9eff,color:#fff
Why BTreeMap?¶
- Sorted by offset - Oldest message is always
first_key_value() - O(log n) operations - Efficient insert/delete
- Minimum watermark - The first entry's watermark is the oldest in-flight
The Lowest Watermark Guarantee¶
/// Returns the lowest watermark among all tracked offsets.
pub async fn lowest_watermark(&self) -> DateTime<Utc> {
let state = self.state.read().await;
state.entries
.values()
.filter_map(|partition_entries| {
partition_entries
.first_key_value() // Oldest offset per partition
.and_then(|(_, entry)| entry.watermark)
})
.min() // Minimum across all partitions
}
Why This Matters for Reduce¶
Reduce operations group data by time windows. If we wrongly advance the watermark past unprocessed data, we will wrongly invoke close-of-book (COB) for windows, which will lead to incorrect results, like:
- Windows close prematurely - Data arrives after window is closed
- Late data is dropped - Correctness is violated
- Results are wrong - Aggregations miss data
By always publishing the lowest watermark among in-flight messages:
flowchart LR
subgraph "In-Flight Messages"
M1["Msg 1<br/>WM: 1000"]
M2["Msg 2<br/>WM: 1200"]
M3["Msg 3<br/>WM: 1100"]
end
M1 & M2 & M3 --> MIN["min() = 1000"]
MIN --> PUB["Published WM: 1000"]
subgraph "Downstream Reduce"
W1["Window [0-1000]<br/>Safe to close (COB)"]
W2["Window [1000-2000]<br/>Keep open"]
end
PUB --> W1 & W2
style M1 fill:#ef4444,color:#fff
style MIN fill:#ef4444,color:#fff
style PUB fill:#ef4444,color:#fff
style W1 fill:#10b981,color:#fff
style W2 fill:#f59e0b,color:#fff
The watermark is a promise: "No more data with event-time < watermark will arrive."
Watermark Publishing¶
Watermarks are published to downstream vertices via OT (Offset-Timeline) stores in JetStream KV.
Computing the Watermark¶
The watermark to publish depends on the vertex type:
flowchart TB
START[Compute Watermark] --> CHECK{Window Manager<br/>Configured?}
CHECK -->|Yes - Reduce Vertex| WM_WIN["oldest_window_end_time - 1ms"]
CHECK -->|No - Map/Sink| WM_TRACK["tracker.lowest_watermark()"]
WM_WIN --> COMPARE["min(window_wm, latest_fetched_wm)"]
WM_TRACK --> PUBLISH
COMPARE --> PUBLISH[Publish to OT Store]
style START fill:#4a9eff,color:#fff
style PUBLISH fill:#10b981,color:#fff
For Map/Sink Vertices¶
Use the tracker's lowest watermark - the minimum watermark among all in-flight messages.
For Reduce Vertices¶
Use the oldest open window's end time - 1ms. This ensures:
- Windows aren't closed until all their data is processed
- Late data within allowed lateness is handled correctly
Idle Watermark Handling¶
When no data is flowing, we still need to advance watermarks:
- Detect idle state - No messages read for a period
- Fetch head WMB - Get the latest watermark marker from upstream
- Publish idle watermark - Allow downstream to make progress
Streaming Flow Patterns¶
Map Forwarder¶
flowchart LR
subgraph "Map Vertex"
R[ISB Reader] -->|ReceiverStream| M[Mapper]
M -->|ReceiverStream| W[ISB Writer]
end
R -.->|"streaming_read()"| STREAM1[Message Stream]
M -.->|"streaming_map()"| STREAM2[Mapped Stream]
W -.->|"streaming_write()"| DONE[Complete]
style R fill:#4a9eff,color:#fff
style M fill:#10b981,color:#fff
style W fill:#f59e0b,color:#fff
All components are connected via Tokio channels ReceiverStream,
enabling true streaming without batch boundaries.
Reduce Forwarder¶
flowchart LR
subgraph "Reduce Vertex"
R[ISB Reader] --> PBQ[Persistent<br/>Bounded Queue]
PBQ --> RED[Reducer]
PBQ --> WAL[WAL]
RED --> W[ISB Writer]
end
RED -.->|"Keyed Windowing"| WINDOWS["Window 1<br/>Window 2<br/>Window N"]
style R fill:#4a9eff,color:#fff
style PBQ fill:#8b5cf6,color:#fff
style WAL fill:#ef4444,color:#fff
style RED fill:#10b981,color:#fff
style W fill:#f59e0b,color:#fff
Summary¶
The streaming architecture ensures that:
- Throughput is maximized - Always
batch_sizemessages in flight - Memory is bounded - Semaphore prevents unbounded growth
- Reduce is correct - Watermarks never advance past unprocessed data
- Recovery is possible - Tracker + WIP loop enables at-least-once or "almost" exactly-once
- WAL - WAL is used for recovery in case of pod restarts for Reduce vertices.