Skip to content

Kafka Sink

Two methods are available for integrating Kafka topics into your Numaflow pipeline: using a user-defined Kafka Sink or opting for the built-in Kafka Sink provided by Numaflow.

Option 1: User-Defined Kafka Sink

Developed and maintained by the Numaflow contributor community, the Kafka Sink provides a reliable and feature-complete solution for publishing messages to Kafka topics.

Key Features:

  • Customization: Offers complete control over Kafka Sink configurations to tailor to specific requirements.
  • Kafka Java Client Utilization: Leverages the Kafka Java client for reliable message publishing to Kafka topics.
  • Schema Management: Integrates seamlessly with the Confluent Schema Registry to support schema validation and manage schema evolution effectively.

More details on how to use the Kafka Sink can be found here.

Option 2: Built-in Kafka Sink

A Kafka sink is used to forward the messages to a Kafka topic. Kafka sink supports configuration overrides.

Kafka Headers

We will insert keys into the Kafka header, but since keys is an array, we will add keys into the header in the following format.

  • __key_len will have the number of key in the header. if __key_len == 0, means no keys are present.
  • __key_%d will have the key, e.g., __key_0 will be the first key, and so forth.

Example

spec:
  vertices:
    - name: kafka-output
      sink:
        kafka:
          brokers:
            - my-broker1:19700
            - my-broker2:19700
          topic: my-topic
          setKey: true # Optional, use message keys as the Kafka record key. Default to false.
          tls: # Optional.
            insecureSkipVerify: false # Optional, whether to skip TLS verification. Default to false.
            caCertSecret: # Optional, a secret reference which contains the CA certificate.
              name: my-ca-cert
              key: my-ca-cert-key
            certSecret: # Optional, a secret reference which contains the client certificate.
              name: my-cert
              key: my-cert-key
            keySecret: # Required when certSecret is set; contains the client private key.
              name: my-pk
              key: my-pk-key
          sasl: # Optional. This example uses PLAIN; other mechanisms are listed below.
            mechanism: PLAIN
            plain:
              userSecret:
                name: kafka-plain-auth
                key: username
              passwordSecret:
                name: kafka-plain-auth
                key: password
              handshake: true # Required by the API; ignored by the Rust built-in sink.
          # Optional, flat librdkafka client configuration. Use one `key: value`
          # pair per line.
          config: |
            message.timeout.ms: 5000
            compression.type: gzip

The config field accepts any supported librdkafka client configuration option as a flat key: value pair. By default, the built-in sink sets message.timeout.ms: 5000 and client.id: numaflow-kafka-sink.

SASL Authentication

The built-in Kafka sink supports multiple SASL auth mechanisms. To enable SASL, set sasl.mechanism and include only the matching mechanism-specific block.

SASL credentials are referenced with Kubernetes Secret selectors. The name field is the Kubernetes Secret name, and the key field is the entry inside that Secret's data or stringData. Because SecretKeySelector does not include a namespace, create the Secret in the same namespace as the Pipeline or MonoVertex. The pipeline spec stores only the Secret name and key, not the credential value. Numaflow mounts the Secret into the vertex pod, and the built-in Kafka sink reads and trims the selected value at startup.

For example, create PLAIN credentials with:

kubectl create secret generic kafka-plain-auth \
  --from-literal=username='<username>' \
  --from-literal=password='<password>'

PLAIN

Use PLAIN when your Kafka cluster expects a username and password.

sasl:
  mechanism: PLAIN
  plain:
    userSecret:
      name: kafka-plain-auth
      key: username
    passwordSecret:
      name: kafka-plain-auth
      key: password
    handshake: true # Required by the API; ignored by the Rust built-in sink.

SCRAM-SHA-256

Use SCRAM-SHA-256 when your Kafka cluster uses SCRAM credentials with SHA-256.

sasl:
  mechanism: SCRAM-SHA-256
  scramsha256:
    userSecret:
      name: scram-sha-256-user
      key: scram-sha-256-user-key
    passwordSecret:
      name: scram-sha-256-password
      key: scram-sha-256-password-key
    handshake: true # Required by the API; ignored by the Rust built-in sink.

SCRAM-SHA-512

Use SCRAM-SHA-512 when your Kafka cluster uses SCRAM credentials with SHA-512.

sasl:
  mechanism: SCRAM-SHA-512
  scramsha512:
    userSecret:
      name: scram-sha-512-user
      key: scram-sha-512-user-key
    passwordSecret:
      name: scram-sha-512-password
      key: scram-sha-512-password-key
    handshake: true # Required by the API; ignored by the Rust built-in sink.

GSSAPI

Use GSSAPI for Kerberos. Set authType to KRB5_KEYTAB_AUTH when using keytabSecret, or to KRB5_USER_AUTH when using passwordSecret. kerberosConfigSecret is optional; when set, its selected Secret value is passed to librdkafka as the Kerberos kinit command.

sasl:
  mechanism: GSSAPI
  gssapi:
    serviceName: my-service
    realm: my-realm
    authType: KRB5_KEYTAB_AUTH
    usernameSecret:
      name: gssapi-username
      key: gssapi-username-key
    keytabSecret:
      name: gssapi-keytab
      key: gssapi-keytab-key
    kerberosConfigSecret: # Optional, contains the Kerberos kinit command.
      name: my-kerberos-config
      key: my-kerberos-config-key

OAUTHBEARER

Use OAUTHBEARER for OAuth/OIDC client credentials. clientID and clientSecret are Kubernetes Secret selectors, while tokenEndpoint is the token endpoint URL.

sasl:
  mechanism: OAUTHBEARER # OAUTH is also accepted.
  oauth:
    clientID:
      name: kafka-oauth-client
      key: clientid
    clientSecret:
      name: kafka-oauth-client
      key: clientsecret
    tokenEndpoint: https://oauth-token.com/v1/token