OnSuccess Sink¶
OnSuccess sink is used to write the messages to a sink only when the processing of the original message has completed successfully in the primary sink.
Use Case¶
OnSuccess sink is useful in following scenarios:
- When you want to write a confirmation message to a sink when the original message has been written to the primary sink successfully.
- When you want to write the original message to a secondary sink after it has been written to the primary sink successfully.
Caveats¶
- OnSuccess sink is more meaningful when the primary sink is a user-defined sink because builtins are not written to support on-success responses.
- For a given OnSuccess sink, we cannot configure another fallback or onSuccess sink. This means, if the OnSuccess sink is a user-defined sink, then we cannot wrap the message sent from this onSuccess UD sink as a onSuccess/fallback response.
How to use¶
To configure onSuccess sink, changes need to be made to the pipeline specification and the user-defined sink implementation.
Step 1 - update the specification¶
Add a onSuccess field to the sink configuration in the pipeline specification file.
The following example uses the builtin kafka as an onSuccess sink.
- name: out
sink:
udsink:
container:
image: my-sink:latest
onSuccess:
kafka:
brokers:
- my-broker1:19700
- my-broker2:19700
topic: my-topic
A onSuccess sink can also be a user-defined sink.
- name: out
sink:
udsink:
container:
image: my-sink:latest
onSuccess:
udsink:
container:
image: my-sink:latest
Step 2 - update the user-defined sink implementation¶
Code changes have to be made in the primary sink to generate an onSuccess response.
SDK methods to generate an onSuccess response in a primary user-defined sink:
package main
import (
sinksdk "github.com/numaproj/numaflow-go/pkg/sinker"
)
// onSuccessLogSink is a sinker implementation that logs the input to stdout
type onSuccessLogSink struct {
}
func (l *onSuccessLogSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
result := sinksdk.ResponsesBuilder()
for d := range datumStreamCh {
result = result.Append(sinksdk.ResponseOnSuccess(d.id, sinksdk.NewMessage([]byte("primary sink write succeeded"))))
}
return result
}
@Override
public ResponseList processMessages(DatumIterator datumIterator) {
ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
while (datumIterator.next() != null) {
try {
responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(),
Message.builder()
.value(String.format("Successfully wrote message with ID: %s",
datum.getId()).getBytes())
.build()));
} catch (Exception e) {
log.warn("Error while writing to any sink: ", e);
responseListBuilder.addResponse(Response.responseFailure(
datum.getId(),
e.getMessage()));
}
}
return responseListBuilder.build();
}
#[tonic::async_trait]
impl sink::Sinker for SinkHandler {
async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
let mut responses: Vec<Response> = Vec::new();
while let Some(datum) = input.recv().await {
responses.push(Response::on_success(
datum.id,
// To write the original message to the on success sink
None,
));
}
}
}