Kafka Source¶
Two methods are available for integrating Kafka topics into your Numaflow pipeline: using a user-defined Kafka Source or opting for the built-in Kafka Source provided by Numaflow.
Option 1: User-Defined Kafka Source¶
Developed and maintained by the Numaflow contributor community, the Kafka Source offers a robust and feature-complete solution for integrating Kafka as a data source into your Numaflow pipeline.
Key Features:
- Flexibility: Allows full customization of Kafka Source configurations to suit specific needs.
- Kafka Java Client Utilization: Leverages the Kafka Java client for robust message consumption from Kafka topics.
- Schema Management: Integrates seamlessly with the Confluent Schema Registry to support schema validation and manage schema evolution effectively.
More details on how to use the Kafka Source can be found here.
Option 2: Built-in Kafka Source¶
Numaflow provides a built-in Kafka source to ingest messages from a Kafka topic. The source uses consumer-groups to manage offsets.
spec:
vertices:
- name: input
source:
kafka:
brokers:
- my-broker1:19700
- my-broker2:19700
topic: my-topic # Use comma-separated topic names to consume from multiple topics.
consumerGroup: my-consumer-group
# Optional, flat librdkafka client configuration. Use one `key: value`
# pair per line.
config: |
auto.offset.reset: earliest
session.timeout.ms: 45000
tls: # Optional.
insecureSkipVerify: false # Optional, whether to skip TLS verification. Default to false.
caCertSecret: # Optional, a secret reference which contains the CA certificate.
name: my-ca-cert
key: my-ca-cert-key
certSecret: # Optional, a secret reference which contains the client certificate.
name: my-cert
key: my-cert-key
keySecret: # Required when certSecret is set; contains the client private key.
name: my-pk
key: my-pk-key
sasl: # Optional. This example uses PLAIN; other mechanisms are listed below.
mechanism: PLAIN
plain:
userSecret:
name: kafka-plain-auth
key: username
passwordSecret:
name: kafka-plain-auth
key: password
handshake: true # Required by the API; ignored by the Rust built-in source.
The config field accepts any supported librdkafka client configuration option as a flat key: value pair. By default,
the built-in source sets auto.offset.reset: earliest, session.timeout.ms: 45000, heartbeat.interval.ms: 15000,
enable.partition.eof: false, and enable.auto.commit: false.
SASL Authentication¶
The built-in Kafka source supports multiple SASL auth mechanisms. To enable SASL, set sasl.mechanism and include only
the matching mechanism-specific block.
SASL credentials are referenced with Kubernetes Secret selectors. The name field is the Kubernetes Secret name, and
the key field is the entry inside that Secret's data or stringData. Because SecretKeySelector does not include a
namespace, create the Secret in the same namespace as the Pipeline or MonoVertex. The pipeline spec stores only the
Secret name and key, not the credential value. Numaflow mounts the Secret into the vertex pod, and the built-in Kafka
source reads and trims the selected value at startup.
For example, create PLAIN credentials with:
kubectl create secret generic kafka-plain-auth \
--from-literal=username='<username>' \
--from-literal=password='<password>'
PLAIN¶
Use PLAIN when your Kafka cluster expects a username and password.
sasl:
mechanism: PLAIN
plain:
userSecret:
name: kafka-plain-auth
key: username
passwordSecret:
name: kafka-plain-auth
key: password
handshake: true # Required by the API; ignored by the Rust built-in source.
SCRAM-SHA-256¶
Use SCRAM-SHA-256 when your Kafka cluster uses SCRAM credentials with SHA-256.
sasl:
mechanism: SCRAM-SHA-256
scramsha256:
userSecret:
name: scram-sha-256-user
key: scram-sha-256-user-key
passwordSecret:
name: scram-sha-256-password
key: scram-sha-256-password-key
handshake: true # Required by the API; ignored by the Rust built-in source.
SCRAM-SHA-512¶
Use SCRAM-SHA-512 when your Kafka cluster uses SCRAM credentials with SHA-512.
sasl:
mechanism: SCRAM-SHA-512
scramsha512:
userSecret:
name: scram-sha-512-user
key: scram-sha-512-user-key
passwordSecret:
name: scram-sha-512-password
key: scram-sha-512-password-key
handshake: true # Required by the API; ignored by the Rust built-in source.
GSSAPI¶
Use GSSAPI for Kerberos. Set authType to KRB5_KEYTAB_AUTH when using keytabSecret, or to KRB5_USER_AUTH when
using passwordSecret. kerberosConfigSecret is optional; when set, its selected Secret value is passed to librdkafka
as the Kerberos kinit command.
sasl:
mechanism: GSSAPI
gssapi:
serviceName: my-service
realm: my-realm
authType: KRB5_KEYTAB_AUTH
usernameSecret:
name: gssapi-username
key: gssapi-username-key
keytabSecret:
name: gssapi-keytab
key: gssapi-keytab-key
kerberosConfigSecret: # Optional, contains the Kerberos kinit command.
name: my-kerberos-config
key: my-kerberos-config-key
OAUTHBEARER¶
Use OAUTHBEARER for OAuth/OIDC client credentials. clientID and clientSecret are Kubernetes Secret selectors, while
tokenEndpoint is the token endpoint URL.
sasl:
mechanism: OAUTHBEARER # OAUTH is also accepted.
oauth:
clientID:
name: kafka-oauth-client
key: clientid
clientSecret:
name: kafka-oauth-client
key: clientsecret
tokenEndpoint: https://oauth-token.com/v1/token
FAQ¶
How to start the Kafka Source from a specific offset based on datetime?¶
In order to start the Kafka Source from a specific offset based on datetime, we need to reset the offset before we start the pipeline.
For example, we have a topic quickstart-events with 3 partitions and a consumer group console-consumer-94457. This example uses Kafka 3.6.1 and localhost.
➜ kafka_2.13-3.6.1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic quickstart-events
Topic: quickstart-events TopicId: WqIN6j7hTQqGZUQWdF7AdA PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 2 Leader: 0 Replicas: 0 Isr: 0
➜ kafka_2.13-3.6.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --all-groups
console-consumer-94457
quickstart-events, but we want to go back to some datetime and re-consume the data from that datetime.
➜ kafka_2.13-3.6.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-94457
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-94457 quickstart-events 0 56 56 0 - - -
console-consumer-94457 quickstart-events 1 38 38 0 - - -
console-consumer-94457 quickstart-events 2 4 4 0 - - -
console-consumer-94457 because offsets can only be reset if the group console-consumer-94457 is inactive. Then, reset the offsets using the desired date and time. The example command below uses UTC time.
➜ kafka_2.13-3.6.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group console-consumer-94457 --topic quickstart-events --to-datetime 2024-01-19T19:26:00.000
GROUP TOPIC PARTITION NEW-OFFSET
console-consumer-94457 quickstart-events 0 54
console-consumer-94457 quickstart-events 1 26
console-consumer-94457 quickstart-events 2 0
quickstart-events with consumer group console-consumer-94457 from the NEW-OFFSET.
You may need to create a property file which contains the connectivity details and use it to connect to the clusters. Below are two example config.properties files: SASL/PLAIN and TSL.
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=<BOOSTRAP_BROKER_LIST>
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<CLUSTER_API_KEY>" \
password="<CLUSTER_API_SECRET>";
security.protocol=SASL_SSL
request.timeout.ms=20000
bootstrap.servers=<BOOSTRAP_BROKER_LIST>
security.protocol=SSL
ssl.enabled.protocols=TLSv1.2
ssl.truststore.location=<JKS_FILE_PATH>
ssl.truststore.password=<PASSWORD>
--command-config option.
bin/kafka-consumer-groups.sh --bootstrap-server <BOOTSTRAP_BROKER_LIST> --command-config config.properties --execute --reset-offsets --group <GROUP_NAME> --topic <TOPIC_NAME> --to-datetime <DATETIME_STRING>
Reference: - How to Use Kafka Tools With Confluent Cloud - Apache Kafka Security