Kafka Source¶
A Kafka
source is used to ingest the messages from a Kafka topic. Numaflow uses consumer-groups to manage offsets.
spec:
vertices:
- name: input
source:
kafka:
brokers:
- my-broker1:19700
- my-broker2:19700
topic: my-topic
consumerGroup: my-consumer-group
config: | # Optional.
consumer:
offsets:
initial: -2 # -2 for sarama.OffsetOldest, -1 for sarama.OffsetNewest. Default to sarama.OffsetNewest.
tls: # Optional.
insecureSkipVerify: # Optional, whether to skip TLS verification. Default to false.
caCertSecret: # Optional, a secret reference, which contains the CA Cert.
name: my-ca-cert
key: my-ca-cert-key
certSecret: # Optional, pointing to a secret reference which contains the Cert.
name: my-cert
key: my-cert-key
keySecret: # Optional, pointing to a secret reference which contains the Private Key.
name: my-pk
key: my-pk-key
sasl: # Optional
mechanism: GSSAPI # PLAIN, GSSAPI, SCRAM-SHA-256 or SCRAM-SHA-512, other mechanisms not supported
gssapi: # Optional, for GSSAPI mechanism
serviceName: my-service
realm: my-realm
# KRB5_USER_AUTH for auth using password
# KRB5_KEYTAB_AUTH for auth using keytab
authType: KRB5_KEYTAB_AUTH
usernameSecret: # Pointing to a secret reference which contains the username
name: gssapi-username
key: gssapi-username-key
# Pointing to a secret reference which contains the keytab (authType: KRB5_KEYTAB_AUTH)
keytabSecret:
name: gssapi-keytab
key: gssapi-keytab-key
# Pointing to a secret reference which contains the keytab (authType: KRB5_USER_AUTH)
passwordSecret:
name: gssapi-password
key: gssapi-password-key
kerberosConfigSecret: # Pointing to a secret reference which contains the kerberos config
name: my-kerberos-config
key: my-kerberos-config-key
plain: # Optional, for PLAIN mechanism
userSecret: # Pointing to a secret reference which contains the user
name: plain-user
key: plain-user-key
passwordSecret: # Pointing to a secret reference which contains the password
name: plain-password
key: plain-password-key
# Send the Kafka SASL handshake first if enabled (defaults to true)
# Set this to false if using a non-Kafka SASL proxy
handshake: true
scramsha256: # Optional, for SCRAM-SHA-256 mechanism
userSecret: # Pointing to a secret reference which contains the user
name: scram-sha-256-user
key: scram-sha-256-user-key
passwordSecret: # Pointing to a secret reference which contains the password
name: scram-sha-256-password
key: scram-sha-256-password-key
# Send the Kafka SASL handshake first if enabled (defaults to true)
# Set this to false if using a non-Kafka SASL proxy
handshake: true
scramsha512: # Optional, for SCRAM-SHA-512 mechanism
userSecret: # Pointing to a secret reference which contains the user
name: scram-sha-512-user
key: scram-sha-512-user-key
passwordSecret: # Pointing to a secret reference which contains the password
name: scram-sha-512-password
key: scram-sha-512-password-key
# Send the Kafka SASL handshake first if enabled (defaults to true)
# Set this to false if using a non-Kafka SASL proxy
handshake: true
oauth: #Optional, for OAUTHBEARER mechanism
clientID: # Pointing to a secret reference which contains the client id
name: kafka-oauth-client
key: clientid
clientSecret: # Pointing to a secret reference which contains the client secret
name: kafka-oauth-client
key: clientsecret
tokenEndpoint: https://oauth-token.com/v1/token
FAQ¶
How to start the Kafka Source from a specific offset based on datetime?¶
In order to start the Kafka Source from a specific offset based on datetime, we need to reset the offset before we start the pipeline.
For example, we have a topic quickstart-events
with 3 partitions and a consumer group console-consumer-94457
. This example uses Kafka 3.6.1 and localhost.
➜ kafka_2.13-3.6.1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic quickstart-events
Topic: quickstart-events TopicId: WqIN6j7hTQqGZUQWdF7AdA PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 2 Leader: 0 Replicas: 0 Isr: 0
➜ kafka_2.13-3.6.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --all-groups
console-consumer-94457
quickstart-events
, but we want to go back to some datetime and re-consume the data from that datetime.
➜ kafka_2.13-3.6.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-94457
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-94457 quickstart-events 0 56 56 0 - - -
console-consumer-94457 quickstart-events 1 38 38 0 - - -
console-consumer-94457 quickstart-events 2 4 4 0 - - -
console-consumer-94457
because offsets can only be reset if the group console-consumer-94457
is inactive. Then, reset the offsets using the desired date and time. The example command below uses UTC time.
➜ kafka_2.13-3.6.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group console-consumer-94457 --topic quickstart-events --to-datetime 2024-01-19T19:26:00.000
GROUP TOPIC PARTITION NEW-OFFSET
console-consumer-94457 quickstart-events 0 54
console-consumer-94457 quickstart-events 1 26
console-consumer-94457 quickstart-events 2 0
quickstart-events
with consumer group console-consumer-94457
from the NEW-OFFSET
.
You may need to create a property file which contains the connectivity details and use it to connect to the clusters. Below are two example config.properties
files: SASL/PLAIN
and TSL
.
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=<BOOSTRAP_BROKER_LIST>
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<CLUSTER_API_KEY>" \
password="<CLUSTER_API_SECRET>";
security.protocol=SASL_SSL
request.timeout.ms=20000
bootstrap.servers=<BOOSTRAP_BROKER_LIST>
security.protocol=SSL
ssl.enabled.protocols=TLSv1.2
ssl.truststore.location=<JKS_FILE_PATH>
ssl.truststore.password=<PASSWORD>
--command-config
option.
bin/kafka-consumer-groups.sh --bootstrap-server <BOOTSTRAP_BROKER_LIST> --command-config config.properties --execute --reset-offsets --group <GROUP_NAME> --topic <TOPIC_NAME> --to-datetime <DATETIME_STRING>
Reference: - How to Use Kafka Tools With Confluent Cloud - Apache Kafka Security