Numaflow provides the following prometheus metrics which we can use to monitor our pipeline and setup any alerts if needed.
Golden Signals¶
These metrics in combination can be used to determine the overall health of your pipeline
These metrics can be used to determine throughput of your pipeline.
Metric name | Metric type | Labels | Description |
forwarder_read_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages read by a given Vertex from an Inter-Step Buffer Partition or Source |
forwarder_data_read_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of data messages read by a given Vertex from an Inter-Step Buffer Partition or Source |
forwarder_read_bytes_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of bytes read by a given Vertex from an Inter-Step Buffer Partition or Source |
forwarder_data_read_bytes_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of data bytes read by a given Vertex from an Inter-Step Buffer Partition or Source |
source_forwarder_transformer_read_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages read by source transformer |
forwarder_write_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages written to Inter-Step Buffer by a given Vertex |
forwarder_write_bytes_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of bytes written to Inter-Step Buffer by a given Vertex |
source_forwarder_transformer_write_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages written by source transformer |
forwarder_fbsink_write_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages written to a fallback sink |
forwarder_fbsink_write_bytes_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of bytes written to a fallback sink |
forwarder_ack_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages acknowledged by a given Vertex from an Inter-Step Buffer Partition |
forwarder_drop_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages dropped by a given Vertex due to a full Inter-Step Buffer Partition |
forwarder_drop_bytes_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of bytes dropped by a given Vertex due to a full Inter-Step Buffer Partition |
forwarder_udf_read_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages read by UDF |
forwarder_udf_write_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the total number of messages written by UDF |
These metrics can be used to determine the latency of your pipeline.
Metric name | Metric type | Labels | Description |
pipeline_processing_lag |
Gauge | pipeline=<pipeline-name> |
Pipeline processing lag in milliseconds (max watermark - min watermark) |
pipeline_watermark_cmp_now |
Gauge | pipeline=<pipeline-name> |
Max watermark of source compared with current time in milliseconds |
forwarder_read_processing_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the histogram distribution of the processing times of read operations |
forwarder_write_processing_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the histogram distribution of the processing times of write operations |
forwarder_ack_processing_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the histogram distribution of the processing times of ack operations |
forwarder_fbsink_write_processing_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Provides the histogram distribution of the processing times of write operations to a fallback sink |
source_forwarder_transformer_processing_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> replica=<replica-index> partition_name=<partition-name> |
Provides a histogram distribution of the processing times of source transformer |
forwarder_udf_processing_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> |
Provides a histogram distribution of the processing times of User-defined Functions. (UDF's) |
forwarder_forward_chunk_processing_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> |
Provides a histogram distribution of the processing times of the forwarder function as a whole |
reduce_pnf_process_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> replica=<replica-index> |
Provides a histogram distribution of the processing times of the reducer |
reduce_pnf_forward_time |
Histogram | pipeline=<pipeline-name> vertex=<vertex-name> replica=<replica-index> |
Provides a histogram distribution of the forwarding times of the reducer |
vertex_pending_messages |
Gauge | pipeline=<pipeline-name> vertex=<vertex-name> period=<duration> partition_name=<partition-name> |
Provides the average pending messages in the last period of seconds. It is the pending messages of a vertex |
These metrics can be used to determine if there are any errors in the pipeline.
Metric name | Metric type | Labels | Description |
pipeline_data_processing_health |
Gauge | pipeline=<pipeline-name> |
Pipeline data processing health status. 1: Healthy, 0: Unknown, -1: Warning, -2: Critical |
controller_isbsvc_health |
Gauge | ns=<namespace> isbsvc=<isbsvc-name> |
A metric to indicate whether the ISB Service is healthy. '1' means healthy, '0' means unhealthy |
controller_pipeline_health |
Gauge | ns=<namespace> pipeline=<pipeline-name> |
A metric to indicate whether the Pipeline is healthy. '1' means healthy, '0' means unhealthy |
controller_monovtx_health |
Gauge | ns=<namespace> mvtx_name=<mvtx-name> |
A metric to indicate whether the MonoVertex is healthy. '1' means healthy, '0' means unhealthy |
forwarder_platform_error_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> |
Indicates any internal errors which could stop pipeline processing |
forwarder_read_error_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Indicates any errors while reading messages by the forwarder |
source_forwarder_transformer_error_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> replica=<replica-index> partition_name=<partition-name> |
Indicates source transformer errors |
forwarder_write_error_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Indicates any errors while writing messages by the forwarder |
forwarder_fbsink_write_error_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Indicates any errors while writing to a fallback sink |
forwarder_ack_error_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> vertex_type=<vertex-type> replica=<replica-index> partition_name=<partition-name> |
Indicates any errors while acknowledging messages by the forwarder |
kafka_sink_write_timeout_total |
Counter | pipeline=<pipeline-name> vertex=<vertex-name> |
Provides the write timeouts while writing to the Kafka sink |
isb_jetstream_read_error_total |
Counter | partition_name=<partition-name> |
Indicates any read errors with NATS Jetstream ISB |
isb_jetstream_write_error_total |
Counter | partition_name=<partition-name> |
Indicates any write errors with NATS Jetstream ISB |
isb_redis_read_error_total |
Counter | partition_name=<partition-name> |
Indicates any read errors with Redis ISB |
isb_redis_write_error_total |
Counter | partition_name=<partition-name> |
Indicates any write errors with Redis ISB |
NATS JetStream ISB¶
Metric name | Metric type | Labels | Description |
isb_jetstream_isFull_total |
Counter | buffer=<buffer-name> |
Indicates if the ISB is full. Continual increase of this counter metric indicates a potential backpressure that can be built on the pipeline |
isb_jetstream_buffer_soft_usage |
Gauge | buffer=<buffer-name> |
Indicates the usage/utilization of a NATS Jetstream ISB |
isb_jetstream_buffer_solid_usage |
Gauge | buffer=<buffer-name> |
Indicates the solid usage of a NATS Jetstream ISB |
isb_jetstream_buffer_pending |
Gauge | buffer=<buffer-name> |
Indicate the number of pending messages at a given point in time. |
isb_jetstream_buffer_ack_pending |
Gauge | buffer=<buffer-name> |
Indicates the number of messages pending acknowledge at a given point in time |
Redis ISB¶
Metric name | Metric type | Labels | Description |
isb_redis_isFull_total |
Counter | buffer=<buffer-name> |
Indicates if the ISB is full. Continual increase of this counter metric indicates a potential backpressure that can be built on the pipeline |
isb_redis_buffer_usage |
Gauge | buffer=<buffer-name> |
Indicates the usage/utilization of a Redis ISB |
isb_redis_consumer_lag |
Gauge | buffer=<buffer-name> |
Indicates the the consumer lag of a Redis ISB |
Prometheus Operator for Scraping Metrics:¶
You can follow the prometheus operator setup guide if you would like to use prometheus operator configured in your cluster.
You can also set up prometheus operator via helm.
Configure the below Service/Pod Monitors for scraping your pipeline metrics:¶
kind: ServiceMonitor
labels: numaflow
name: numaflow-pipeline-metrics
- scheme: https
port: metrics
targetPort: 2469
insecureSkipVerify: true
matchLabels: vertex vertex-controller numaflow
- key:
operator: Exists
- key:
operator: Exists
kind: ServiceMonitor
labels: numaflow
name: numaflow-pipeline-daemon-metrics
- scheme: https
port: tcp
targetPort: 4327
insecureSkipVerify: true
matchLabels: daemon pipeline-controller numaflow
- key:
operator: Exists
kind: PodMonitor
labels: numaflow
name: numaflow-controller-metrics
- scheme: http
port: metrics
targetPort: 9090
matchLabels: controller-manager controller-manager numaflow
Configure the below Service Monitor if you use the NATS Jetstream ISB for your NATS Jetstream metrics:¶
kind: ServiceMonitor
labels: numaflow
name: numaflow-isbsvc-jetstream-metrics
- scheme: http
port: metrics
targetPort: 7777
matchLabels: isbsvc isbsvc-controller numaflow jetstream
- key:
operator: Exists