Kafka Sink¶
Two methods are available for integrating Kafka topics into your Numaflow pipeline: using a user-defined Kafka Sink or opting for the built-in Kafka Sink provided by Numaflow.
Option 1: User-Defined Kafka Sink¶
Developed and maintained by the Numaflow contributor community, the Kafka Sink provides a reliable and feature-complete solution for publishing messages to Kafka topics.
Key Features:
- Customization: Offers complete control over Kafka Sink configurations to tailor to specific requirements.
- Kafka Java Client Utilization: Leverages the Kafka Java client for reliable message publishing to Kafka topics.
- Schema Management: Integrates seamlessly with the Confluent Schema Registry to support schema validation and manage schema evolution effectively.
More details on how to use the Kafka Sink can be found here.
Option 2: Built-in Kafka Sink¶
A Kafka sink is used to forward the messages to a Kafka topic. Kafka sink supports configuration overrides.
Kafka Headers¶
We will insert keys into the Kafka header, but since keys is an array, we will add keys into the header in the
following format.
__key_lenwill have the number ofkeyin the header. if__key_len==0, means nokeysare present.__key_%dwill have thekey, e.g.,__key_0will be the first key, and so forth.
Example¶
spec:
vertices:
- name: kafka-output
sink:
kafka:
brokers:
- my-broker1:19700
- my-broker2:19700
topic: my-topic
setKey: true # Optional, use message keys as the Kafka record key. Default to false.
tls: # Optional.
insecureSkipVerify: false # Optional, whether to skip TLS verification. Default to false.
caCertSecret: # Optional, a secret reference which contains the CA certificate.
name: my-ca-cert
key: my-ca-cert-key
certSecret: # Optional, a secret reference which contains the client certificate.
name: my-cert
key: my-cert-key
keySecret: # Required when certSecret is set; contains the client private key.
name: my-pk
key: my-pk-key
sasl: # Optional. This example uses PLAIN; other mechanisms are listed below.
mechanism: PLAIN
plain:
userSecret:
name: kafka-plain-auth
key: username
passwordSecret:
name: kafka-plain-auth
key: password
handshake: true # Required by the API; ignored by the Rust built-in sink.
# Optional, flat librdkafka client configuration. Use one `key: value`
# pair per line.
config: |
message.timeout.ms: 5000
compression.type: gzip
The config field accepts any supported librdkafka client configuration option as a flat key: value pair. By default,
the built-in sink sets message.timeout.ms: 5000 and client.id: numaflow-kafka-sink.
SASL Authentication¶
The built-in Kafka sink supports multiple SASL auth mechanisms. To enable SASL, set sasl.mechanism and include only the
matching mechanism-specific block.
SASL credentials are referenced with Kubernetes Secret selectors. The name field is the Kubernetes Secret name, and
the key field is the entry inside that Secret's data or stringData. Because SecretKeySelector does not include a
namespace, create the Secret in the same namespace as the Pipeline or MonoVertex. The pipeline spec stores only the
Secret name and key, not the credential value. Numaflow mounts the Secret into the vertex pod, and the built-in Kafka
sink reads and trims the selected value at startup.
For example, create PLAIN credentials with:
kubectl create secret generic kafka-plain-auth \
--from-literal=username='<username>' \
--from-literal=password='<password>'
PLAIN¶
Use PLAIN when your Kafka cluster expects a username and password.
sasl:
mechanism: PLAIN
plain:
userSecret:
name: kafka-plain-auth
key: username
passwordSecret:
name: kafka-plain-auth
key: password
handshake: true # Required by the API; ignored by the Rust built-in sink.
SCRAM-SHA-256¶
Use SCRAM-SHA-256 when your Kafka cluster uses SCRAM credentials with SHA-256.
sasl:
mechanism: SCRAM-SHA-256
scramsha256:
userSecret:
name: scram-sha-256-user
key: scram-sha-256-user-key
passwordSecret:
name: scram-sha-256-password
key: scram-sha-256-password-key
handshake: true # Required by the API; ignored by the Rust built-in sink.
SCRAM-SHA-512¶
Use SCRAM-SHA-512 when your Kafka cluster uses SCRAM credentials with SHA-512.
sasl:
mechanism: SCRAM-SHA-512
scramsha512:
userSecret:
name: scram-sha-512-user
key: scram-sha-512-user-key
passwordSecret:
name: scram-sha-512-password
key: scram-sha-512-password-key
handshake: true # Required by the API; ignored by the Rust built-in sink.
GSSAPI¶
Use GSSAPI for Kerberos. Set authType to KRB5_KEYTAB_AUTH when using keytabSecret, or to KRB5_USER_AUTH when
using passwordSecret. kerberosConfigSecret is optional; when set, its selected Secret value is passed to librdkafka
as the Kerberos kinit command.
sasl:
mechanism: GSSAPI
gssapi:
serviceName: my-service
realm: my-realm
authType: KRB5_KEYTAB_AUTH
usernameSecret:
name: gssapi-username
key: gssapi-username-key
keytabSecret:
name: gssapi-keytab
key: gssapi-keytab-key
kerberosConfigSecret: # Optional, contains the Kerberos kinit command.
name: my-kerberos-config
key: my-kerberos-config-key
OAUTHBEARER¶
Use OAUTHBEARER for OAuth/OIDC client credentials. clientID and clientSecret are Kubernetes Secret selectors, while
tokenEndpoint is the token endpoint URL.
sasl:
mechanism: OAUTHBEARER # OAUTH is also accepted.
oauth:
clientID:
name: kafka-oauth-client
key: clientid
clientSecret:
name: kafka-oauth-client
key: clientsecret
tokenEndpoint: https://oauth-token.com/v1/token