Reduce Examples¶
Please read reduce to get the best out of these examples.
Prerequisites¶
Inter-Step Buffer Service (ISB Service)¶
What is ISB Service?¶
An Inter-Step Buffer Service is described by a Custom Resource, which is used to pass data between vertices of a numaflow pipeline. Please refer to the doc Inter-Step Buffer Service for more information on ISB.
How to install the ISB Service¶
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/stable/examples/0-isbsvc-jetstream.yaml
The expected output of the above command is shown below:
$ kubectl get isbsvc
NAME TYPE PHASE MESSAGE AGE
default jetstream Running 3d19h
# Wait for pods to be ready
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
isbsvc-default-js-0 3/3 Running 0 19s
isbsvc-default-js-1 3/3 Running 0 19s
isbsvc-default-js-2 3/3 Running 0 19s
NOTE
The Source used in the examples is an HTTP source producing messages with values 5 and 10 with event time starting from 60000. Please refer to the doc http source on how to use an HTTP source. An example will be as follows,
curl -kq -X POST -H "x-numaflow-event-time: 60000" -d "5" ${http-source-url}
curl -kq -X POST -H "x-numaflow-event-time: 60000" -d "10" ${http-source-url}
Sum Pipeline Using Fixed Window¶
This is a simple reduce pipeline that just does summation (sum of numbers) but uses fixed window. The snippet for the reduce vertex is as follows.
- name: compute-sum
udf:
container:
# compute the sum
image: quay.io/numaio/numaflow-go/reduce-sum:v0.5.0
groupBy:
window:
fixed:
length: 60s
keyed: true
6-reduce-fixed-window.yaml has the complete pipeline definition.
In this example we use a partitions
of 2
. We are setting a partitions > 1 because it is a
keyed window.
- name: compute-sum
partitions: 2
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/main/examples/6-reduce-fixed-window.yaml
Output :
2023/01/05 11:54:41 (sink) Payload - 300 Key - odd Start - 60000 End - 120000
2023/01/05 11:54:41 (sink) Payload - 600 Key - even Start - 60000 End - 120000
2023/01/05 11:54:41 (sink) Payload - 300 Key - odd Start - 120000 End - 180000
2023/01/05 11:54:41 (sink) Payload - 600 Key - even Start - 120000 End - 180000
2023/01/05 11:54:42 (sink) Payload - 600 Key - even Start - 180000 End - 240000
2023/01/05 11:54:42 (sink) Payload - 300 Key - odd Start - 180000 End - 240000
In our example, input is an HTTP source producing 2 messages each second with values 5 and 10, and the event time starts from 60000. Since we have considered a fixed window of length 60s, and also we are producing two messages with different keys "even" and "odd", Numaflow will create two different windows with a start time of 60000 and an end time of 120000. So the output will be 300(5 * 60) and 600(10 * 60).
If we had used a non keyed window (keyed: false
), we would have seen one single output with value
of 900(300 of odd + 600 of even) for each window.
Sum Pipeline Using Sliding Window¶
This is a simple reduce pipeline that just does summation (sum of numbers) but uses sliding window. The snippet for the reduce vertex is as follows.
- name: reduce-sliding
udf:
container:
# compute the sum
image: quay.io/numaio/numaflow-go/reduce-sum:v0.5.0
groupBy:
window:
sliding:
length: 60s
slide: 10s
keyed: true
7-reduce-sliding-window.yaml has the complete pipeline definition
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/main/examples/7-reduce-sliding-window.yaml
Output:
2023/01/05 15:13:16 (sink) Payload - 300 Key - odd Start - 60000 End - 120000
2023/01/05 15:13:16 (sink) Payload - 600 Key - even Start - 60000 End - 120000
2023/01/05 15:13:16 (sink) Payload - 300 Key - odd Start - 70000 End - 130000
2023/01/05 15:13:16 (sink) Payload - 600 Key - even Start - 700000 End - 1300000
2023/01/05 15:13:16 (sink) Payload - 300 Key - odd Start - 80000 End - 140000
2023/01/05 15:13:16 (sink) Payload - 600 Key - even Start - 80000 End - 140000
In our example, input is an HTTP source producing 2 messages each second with values 5 and 10, and the event time starts from 60000. Since we have considered a sliding window of length 60s and slide 10s, and also we are producing two messages with different keys "even" and "odd". Numaflow will create two different windows with a start time of 60000 and an end time of 120000, and because the slide duration is 10s, a next set of windows will be created with start time of 70000 and an end time of 130000. Since it's a sum operation the output will be 300(5 * 60) and 600(10 * 60).
Payload - 50 Key - odd Start - 10000 End - 70000
, we see 50 here for odd because the
first window has only 10 elements
Complex Reduce Pipeline¶
In the complex reduce example, we will
- chain of reduce functions
- use both fixed and sliding windows
- use keyed and non-keyed windowing
8-reduce-complex-pipeline.yaml has the complete pipeline definition
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/main/examples/8-reduce-complex-pipeline.yaml
Output:
2023/01/05 15:33:55 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 80000 End - 140000
2023/01/05 15:33:55 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 90000 End - 150000
2023/01/05 15:33:55 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 100000 End - 160000
2023/01/05 15:33:56 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 110000 End - 170000
2023/01/05 15:33:56 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 120000 End - 180000
2023/01/05 15:33:56 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 130000 End - 190000
In our example, first we have the reduce vertex with a fixed window of duration 5s. Since the input is 5 and 10, the output from the first reduce vertex will be 25 (5 * 5) and 50 (5 * 10). This will be passed to the next non-keyed reduce vertex with the fixed window duration of 10s. This being a non-keyed, it will combine the inputs and produce the output of 150(25 * 2 + 50 * 2), which will be passed to the reduce vertex with a sliding window of duration 60s and with the slide duration of 10s. Hence the final output will be 900(150 * 6).