Pipeline Tuning¶
In a data processing pipeline, certain parameters can be fine-tuned according to the specific use case of the data processing.
Vertex Tuning¶
Each vertex keeps running the cycle of reading data from an Inter-Step Buffer (or data source), processing the data, and writing to the next Inter-Step Buffers (or sinks). There are some parameters can be adjusted for this data processing cycle.
readBatchSize- The number of messages to read in each cycle, with a default value of500. It works together withreadTimeoutduring a read operation, concluding when either limit is reached first.readBatchSizecontrols only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (useconcurrencyfor that).readTimeout- Read timeout from the source or Inter-Step Buffer, defaults to1s. It works in conjunction withreadBatchSize.bufferMaxLength- How many unprocessed messages can be existing in the Inter-Step Buffer, defaults to30000.bufferUsageLimit- The percentage of the buffer usage limit, a valid number should be less than 100. Default value is80, which means80%.concurrency- Maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. Defaults to the value ofreadBatchSize. See Concurrency and read-ahead below.
These parameters can be customized under spec.limits as below, once defined, they apply to all the vertices and Inter-Step Buffers of the pipeline.
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
spec:
limits:
readBatchSize: 100
readTimeout: 200ms
bufferMaxLength: 30000
bufferUsageLimit: 85
concurrency: 500
They also can be defined in a vertex level, which will override the pipeline level settings.
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
spec:
limits: # Default limits for all the vertices and edges (buffers) of this pipeline
readBatchSize: 100
readTimeout: 200ms
bufferMaxLength: 30000
bufferUsageLimit: 85
vertices:
- name: in
source:
generator:
rpu: 5
duration: 1s
- name: cat
udf:
container:
image: quay.io/numaio/numaflow-go/map-cat:stable # A UDF which simply cats the message
imagePullPolicy: Always
limits:
readBatchSize: 200 # It overrides the default limit "100"
bufferMaxLength: 20000 # It overrides the default limit "30000" for the buffers owned by this vertex
bufferUsageLimit: 70 # It overrides the default limit "85" for the buffers owned by this vertex
- name: out
sink:
log: {}
edges:
- from: in
to: cat
- from: cat
to: out
Concurrency and read-ahead¶
Throughput on a vertex is governed by two independent knobs:
readBatchSize— the size of an individual read from the source or Inter-Step Buffer. The data plane keeps using this batch size for every read; it does not cap how many messages may be in-flight at the same time.concurrency— the maximum number of messages that may be in-flight (read but not yet acknowledged). Withconcurrencyslots free, the data plane is willing to start processing that many messages in parallel.
The data plane runs in one of two modes:
| Mode | Default for | Behavior |
|---|---|---|
| Read-ahead enabled | Map / Sink / Reduce vertices | The data plane keeps reading new batches until the in-flight count hits concurrency. Once full, it may pre-fetch one more batch and hold it ready, so a completed message is replaced immediately. The hard upper bound on in-flight is therefore concurrency + readBatchSize. |
| Read-ahead disabled | Source vertices (and MonoVertex, which is always source-driven) | The data plane drains the current batch fully before issuing the next read. This keeps source ordering / re-read cost low. The upper bound on in-flight is min(concurrency, readBatchSize). |
Read-ahead is controlled by the NUMAFLOW_READ_AHEAD environment variable. The controller injects a sensible default per vertex type, but you can override it on a per-vertex basis via the container template:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
spec:
vertices:
- name: cat
udf:
container:
image: quay.io/numaio/numaflow-go/map-cat:stable
limits:
readBatchSize: 100
concurrency: 500 # up to 500 messages may be processing in parallel
containerTemplate:
env:
- name: NUMAFLOW_READ_AHEAD
value: "false" # turn read-ahead off for this Map vertex
Strictly sequential processing¶
To force a vertex to process one message at a time in the order it was read, set:
concurrency: 1, andNUMAFLOW_READ_AHEAD=false(already the default for source vertices and MonoVertex).
For pipeline-wide ordered processing, see Ordered Processing.
Edge Tuning¶
There is an edge level setting to drop the messages if the buffer.isFull == true. Even if the UDF or UDSink drops
a message due to some internal error in the user-defined code, the processing latency will spike up causing a natural
back pressure. A kill switch to drop messages can help alleviate/avoid any repercussions on the rest of the DAG.
This setting is an edge-level setting and can be enabled by onFull and the default is retryUntilSuccess (other option
is discardLatest).
This is a data loss scenario but can be useful in cases where we are doing user-introduced experimentations, like A/B testing, on the pipeline. It is totally okay for the experimentation side of the DAG to have data loss while the production is unaffected.
discardLatest¶
Setting onFull to discardLatest will drop the incoming message on the floor if the edge is full.
edges:
- from: a
to: b
onFull: discardLatest
retryUntilSuccess¶
The default setting for onFull in retryUntilSuccess which will make sure the incoming message is retried until successful.
edges:
- from: a
to: b
onFull: retryUntilSuccess