Skip to content

Sliding

Overview

Sliding windows are similar to Fixed windows, the size of the windows is measured in time and is fixed. The important difference from the Fixed window is the fact that it allows an element to be present in more than one window. The additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows will be overlapping and the slide should be smaller than the window length.

plot

vertices:
  - name: my-udf
    udf:
      groupBy:
        window:
          sliding:
            length: duration
            slide: duration

NOTE: A duration string is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix, such as "300ms", "1.5h" or "2h45m". Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".

Length

The length is the window size of the fixed window.

Slide

slide is the slide parameter that controls the frequency at which the sliding window is created.

Example

To create a sliding window of length 1 minute which slides every 10 seconds, we can use the following snippet.

vertices:
  - name: my-udf
    udf:
      groupBy:
        window:
          sliding:
            length: 60s
            slide: 10s

The yaml snippet above contains an example spec of a reduce vertex that uses sliding window aggregation. As we can see, the length of the window is 60s and sliding frequency is once every 10s. This means there will be multiple windows active at any point in time.

Let's say, time.now() in the pipeline is 2031-09-29T18:46:30Z the active window boundaries will be as follows (there are total of 6 windows 60s/10s)

[2031-09-29T18:45:40Z, 2031-09-29T18:46:40Z)
[2031-09-29T18:45:50Z, 2031-09-29T18:46:50Z) # notice the 10 sec shift from the above window
[2031-09-29T18:46:00Z, 2031-09-29T18:47:00Z)
[2031-09-29T18:46:10Z, 2031-09-29T18:47:10Z)
[2031-09-29T18:46:20Z, 2031-09-29T18:47:20Z)
[2031-09-29T18:46:30Z, 2031-09-29T18:47:30Z)

The window start time is always be left inclusive and right exclusive. That is why [2031-09-29T18:45:30Z, 2031-09-29T18:46:30Z) window is not considered active (it fell on the previous window, right exclusive) but [2031-09-29T18:46:30Z, 2031-09-29T18:47:30Z) is an active (left inclusive).

The first window always ends after the sliding seconds from the time.Now(), the start time of the window will be the nearest integer multiple of the slide which is less than the message's event time. So the first window starts in the past and ends _sliding_duration (based on time progression in the pipeline and not the wall time) from present. It is important to note that regardless of the window boundary (starting in the past or ending in the future) the target element set totally depends on the matching time (in case of event time, all the elements with the time that falls with in the boundaries of the window, and in case of system time, all the elements that arrive from the present until the end of window present + sliding)

From the point above, it follows then that immediately upon startup, for the first window, fewer elements may get aggregated depending on the current lateness of the data stream.

Check the links below to see the UDF examples for different languages.

Streaming Mode

Reduce can be enabled on streaming mode to stream messages or forward partial responses to the next vertex. This is useful for custom triggering, where we want to forward responses to the next vertex quickly, even before the fixed window closes. The close-of-book and a final triggering will still happen even if partial results have been emitted.

To enable reduce streaming, set the streaming flag to true in the sliding window configuration.

vertices:
  - name: my-udf
    udf:
      groupBy:
        window:
          sliding:
            length: duration
            slide: duration
            streaming: true # set streaming to true to enable reduce streamer

Note: UDFs should use the ReduceStreamer functionality in the SDKs to use this feature.

Check the links below to see the UDF examples in streaming mode for different languages.