Pipeline Tuning¶
In a data processing pipeline, certain parameters can be fine-tuned according to the specific use case of the data processing.
Vertex Tuning¶
Each vertex keeps running the cycle of reading data from an Inter-Step Buffer (or data source), processing the data, and writing to the next Inter-Step Buffers (or sinks). There are some parameters can be adjusted for this data processing cycle.
readBatchSize
- The number of messages to read in each cycle, with a default value of500
. It works together withreadTimeout
during a read operation, concluding when either limit is reached first.readTimeout
- Read timeout from the source or Inter-Step Buffer, defaults to1s
. It works in conjunction withreadBatchSize
.bufferMaxLength
- How many unprocessed messages can be existing in the Inter-Step Buffer, defaults to30000
.bufferUsageLimit
- The percentage of the buffer usage limit, a valid number should be less than 100. Default value is80
, which means80%
.
These parameters can be customized under spec.limits
as below, once defined, they apply to all the vertices and Inter-Step Buffers of the pipeline.
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
spec:
limits:
readBatchSize: 100
readTimeout: 200ms
bufferMaxLength: 30000
bufferUsageLimit: 85
They also can be defined in a vertex level, which will override the pipeline level settings.
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
spec:
limits: # Default limits for all the vertices and edges (buffers) of this pipeline
readBatchSize: 100
readTimeout: 200ms
bufferMaxLength: 30000
bufferUsageLimit: 85
vertices:
- name: in
source:
generator:
rpu: 5
duration: 1s
- name: cat
udf:
builtin:
name: cat
limits:
readBatchSize: 200 # It overrides the default limit "100"
bufferMaxLength: 20000 # It overrides the default limit "30000" for the buffers owned by this vertex
bufferUsageLimit: 70 # It overrides the default limit "85" for the buffers owned by this vertex
- name: out
sink:
log: {}
edges:
- from: in
to: cat
- from: cat
to: out
Edge Tuning¶
There is an edge level setting to drop the messages if the buffer.isFull == true
. Even if the UDF or UDSink drops
a message due to some internal error in the user-defined code, the processing latency will spike up causing a natural
back pressure. A kill switch to drop messages can help alleviate/avoid any repercussions on the rest of the DAG.
This setting is an edge-level setting and can be enabled by onFull
and the default is retryUntilSuccess
(other option
is discardLatest
).
This is a data loss scenario but can be useful in cases where we are doing user-introduced experimentations, like A/B testing, on the pipeline. It is totally okay for the experimentation side of the DAG to have data loss while the production is unaffected.
discardLatest¶
Setting onFull
to discardLatest
will drop the incoming message on the floor if the edge is full.
edges:
- from: a
to: b
onFull: discardLatest
retryUntilSuccess¶
The default setting for onFull
in retryUntilSuccess
which will make sure the incoming message is retried until successful.
edges:
- from: a
to: b
onFull: retryUntilSuccess