Skip to content

Watermarks

When processing an unbounded data stream, Numaflow has to materialize the results of the processing done on the data. The materialization of the output depends on the notion of time, e.g., the total number of logins served per minute. Without the idea of time inbuilt into the platform, we will not be able to determine the passage of time, which is necessary for grouping elements together to materialize the result. Watermarks is that notion of time that will help us group unbounded data into discrete chunks. Numaflow supports watermarks out-of-the-box. Source vertices generate watermarks based on the event time, and propagate to downstream vertices.

Watermark is defined as “a monotonically increasing timestamp of the oldest work/event not yet completed”. In other words, if the watermark has advanced past some timestamp T, we are guaranteed by its monotonic property that no more processing will occur for on-time events at or before T.

Configuration

Disable Watermark

Watermarks can be disabled with by setting disabled: true.

Idle Detection

Watermark is assigned at the source; this means that the watermark will only progress if there is data coming into the source. There are many cases where the source might not be getting data, causing the source to idle (e.g., data is periodic, say once an hour). When the source is idling the reduce vertices won't emit results because the watermark is not moving. To detect source idling and propagate watermark, we can use the idle detection feature. The idle source watermark progressor will make sure that the watermark cannot progress beyond time.now() - maxDelay (maxDelay is defined below). To enable this, we provide the following setting:

Threshold

Threshold is the duration after which a source is marked as Idle due to a lack of data flowing into the source.

StepInterval

StepInterval is the duration between the subsequent increment of the watermark as long the source remains Idle. The default value is 0s, which means that once we detect an idle source, we will increment the watermark by IncrementBy for the time we detect that our source is empty (in other words, this will be a very frequent update).

Default Value: 0s

IncrementBy

IncrementBy is the duration to be added to the current watermark to progress the watermark when the source is idling.

Example

The below example will consider the source as idle after there is no data at the source for 5s. After 5s, every other 2s an idle watermark will be emitted which increments the watermark by 3s.

  watermark:
    idleSource:
      threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value.
      incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value.
      stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed.

maxDelay

Watermark assignments happen at the source. Sources could be out of order, so sometimes we want to extend the window (default is 0s) to wait before we start marking data as late-data. You can give more time for the system to wait for late data with maxDelay so that the late data within the specified time duration will be considered as data on-time. This means the watermark propagation will be delayed by maxDelay.

Example

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
spec:
  watermark:
    disabled: false # Optional, defaults to false.
    maxDelay: 60s # Optional, defaults to "0s".

Watermark API

When processing data in user-defined functions, you can get the current watermark through an API. Watermark API is supported in all our client SDKs.

Example Golang

// Go
func mapFn(context context.Context, keys []string, d mapper.Datum) mapper.Messages {
    _ = d.EventTime() // Event time
    _ = d.Watermark() // Watermark
    ... ...
}