SQS Sink¶
A SQS sink is used to forward messages to an AWS SQS queue.
Configuration¶
spec:
vertices:
- name: sqs-output
sink:
sqs:
queueName: "your-queue-name" # Required: Name of your SQS queue
awsRegion: "us-west-2" # Required: AWS region
queueOwnerAWSAccountID: "123456789012" # Required: AWS account ID
assumeRole: # Optional: For cross-account access
roleArn: "arn:aws:iam::123456789012:role/CrossAccount-Role"
Authentication¶
See SQS Source - Configuring Credentials for authentication options including AWS credentials secrets and IAM roles.
Message Headers¶
The SQS sink uses message headers to control SQS-specific behavior. Headers can originate from:
1. The source (e.g., SQS system attributes from an SQS source)
2. User metadata under the sqs namespace (merged into headers at the sink)
Supported Headers¶
| Header | Description |
|---|---|
MessageGroupId |
Required for FIFO queues. Messages with the same group ID are processed in order. |
MessageDeduplicationId |
Used for FIFO queues to prevent duplicate message delivery. |
DelaySeconds |
Delays message delivery by the specified number of seconds (0-900). |
FIFO Queue Support¶
For FIFO queues, the sink automatically extracts MessageGroupId and MessageDeduplicationId from message headers:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: sqs-to-sqs-fifo
spec:
vertices:
- name: in
source:
sqs:
queueName: "source-queue.fifo"
awsRegion: "us-west-2"
queueOwnerAWSAccountID: "123456789012"
attributeNames:
- MessageGroupId
- MessageDeduplicationId
- name: out
sink:
sqs:
queueName: "destination-queue.fifo"
awsRegion: "us-west-2"
queueOwnerAWSAccountID: "123456789012"
edges:
- from: in
to: out
In this example, MessageGroupId and MessageDeduplicationId from the source queue are automatically propagated to the destination FIFO queue.
Setting Headers via Metadata¶
User-defined functions can set SQS headers by writing to the sqs metadata namespace. These values are merged into the message headers at the sink, allowing UDFs to control FIFO ordering or set delay seconds.
from pynumaflow.mapper import Messages, Message, Datum, Mapper
def my_handler(keys: list[str], datum: Datum) -> Messages:
# Process the message
payload = datum.value
# Set SQS-specific metadata
messages = Messages()
messages.append(
Message(
value=payload,
keys=keys,
tags=[],
metadata={
"sqs": {
"MessageGroupId": "order-processing",
"MessageDeduplicationId": f"dedup-{datum.id}",
"DelaySeconds": "10"
}
}
)
)
return messages
if __name__ == "__main__":
grpc_server = Mapper(handler=my_handler)
grpc_server.start()
package main
import (
"context"
"log"
"github.com/numaproj/numaflow-go/pkg/mapper"
)
func handle(_ context.Context, keys []string, d mapper.Datum) mapper.Messages {
// Process the message
payload := d.Value()
// Set SQS-specific metadata
return mapper.MessagesBuilder().
Append(
mapper.NewMessage(payload).
WithKeys(keys).
WithMetadata(map[string]map[string]string{
"sqs": {
"MessageGroupId": "order-processing",
"MessageDeduplicationId": "dedup-" + string(d.ID()),
"DelaySeconds": "10",
},
}),
).
Build()
}
func main() {
if err := mapper.NewServer(mapper.MapperFunc(handle)).Start(context.Background()); err != nil {
log.Fatal(err)
}
}
package io.numaproj.examples;
import io.numaproj.numaflow.mapper.*;
import java.util.HashMap;
import java.util.Map;
public class SqsMetadataMapper extends Mapper {
@Override
public MessageList processMessage(String[] keys, Datum datum) {
// Process the message
byte[] payload = datum.getValue();
// Set SQS-specific metadata
Map<String, Map<String, String>> metadata = new HashMap<>();
Map<String, String> sqsMetadata = new HashMap<>();
sqsMetadata.put("MessageGroupId", "order-processing");
sqsMetadata.put("MessageDeduplicationId", "dedup-" + datum.getId());
sqsMetadata.put("DelaySeconds", "10");
metadata.put("sqs", sqsMetadata);
return MessageList.newBuilder()
.addMessage(
Message.newBuilder()
.value(payload)
.keys(keys)
.metadata(metadata)
.build())
.build();
}
public static void main(String[] args) throws Exception {
Server server = new Server(new SqsMetadataMapper());
server.start();
server.awaitTermination();
}
}
Example Pipeline¶
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: http-to-sqs
spec:
vertices:
- name: in
source:
http: {}
- name: process
udf:
container:
image: my-processor:latest
- name: out
sink:
sqs:
queueName: "output-queue"
awsRegion: "us-west-2"
queueOwnerAWSAccountID: "123456789012"
edges:
- from: in
to: process
- from: process
to: out