Kafka Consumer : Required Properties
Code Block | ||
---|---|---|
| ||
Properties props = new Properties();
props.put("bootstrap.servers", "Broker-1:9092, Broker-2:9093"); //Cluster membership: partition leader, etc.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//used for message serialization and deserialization
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") |
Kafka Consumer : Subscribing /Unsubscribing to Topics
Code Block | ||
---|---|---|
| ||
KafkaConsumer myConsumer = new KafkaConsumer(props);
myConsumer.subscribe(Array.asList("my_topic_A"));//take a collection of strings
myConsumer.unsubscribe(); |
Kafka Consumer : Assigning for partitions (vs subscribing)
We take on the responsibility of assigning ourselves specific partitions to a single consumer instance, regardless of topic.
Code Block | ||
---|---|---|
| ||
//Similar pattern as subcribe();
// Manual Partition Assignment
TopicPartition partition0 = new TopicPartition("my_topic_A", 0);
ArrayList<TopicPartition> partitions = new ArrayLis<TopicPartition>();
partitions.add(partition0);
//Invoke the assign() method
myConsumer.assign(partitions); //This is not incremental |
Kafka Consumer : Single Consumer Topic Subscriptions
...
Kafka Consumer : Single Consumer Partition Assignments
...
Kafka Consumer : Starting the Poll Loop
Info |
---|
Primary function of the Kafka Consumer = poll() |
Code Block | ||
---|---|---|
| ||
// Set the topic subscription or partition assignments
myConsumer.subscribe(topics);
myConsumer.assign(partitions);
try{
while(true){
ConsumerRecords<String, String> records = myConsumer.poll(100);
//add business logic if needed in a loop for
}
}finally{
myConsumer.close();
} |
Kafka Consumer : Polling
The response from the brokers is used to instantiate its internal metadata object, which will keep up to date, while the poll method runs, getting periodic updates from the cluster when cluster details change. With metadata available, other elements become more involved : ConsumerCoordinator has 2 tasks.
First, ConsumerCoordinator must be aware of partition reassignment and notification of assignment changes to the subscription state object ; Second, ConsumerCoordinator for committing offsets to the cluster.
The tasks will cause the confirmation of which will cause the update of the subscription state, so it can always be aware of the status of the topics and partitions.
...
The poll() process is a single-threaded operation.
Kafka Consumer and Offset
The offset is the critical value that enables consumer to operate independently by representing the last read position the consumer has read from a partition within a topic.
3 categories of offsets:
last committed offset : last record that consumer has confirmed to have processed.
current position : consumer tracks its current position.
log-end offset : this position advances as consumer advances in the log towards the last record in the partition which is know as the log end offset.
...
Mind the Offset Gap or un-committed offsets : between offset 1 et offset 3
...
The coordinator is the responsible object for communicating to the cluster and for ensuring the committed offsets are produced into the topic.
Info |
---|
If enable.auto.commit = false, then it is a manual management of the offset. |
Kafka Consumer and Post-Processing as commitSync
To handle the new record to process, we need to make sure that the ones we’ve currently processed are committed.
Code Block | ||
---|---|---|
| ||
try{
for(...){ //Processing batches of records... }
//Commit when we know we're done, after the batch is processed
myConsumer.commitSync();
}catch(CommitFailedException){
log.error("...");
} |
commitSync() is synchronous = blocks until receives response from cluster. And, retries until succeeds or unrecoverable error (retry.backoff.ms = 100 is by default).
Kafka Consumer and Post-Processing as commitAsync
To handle the new record to process, we need to control when to consider the message as truly processed.
Code Block | ||
---|---|---|
| ||
try{
for(...){ //Processing batches of records... }
//Commit when we know we're done, after the batch is processed
myConsumer.commitAsync();//not recommended
//recommended
myConsumer.commitAsync(new OffsetCommitCallback(){
public void onComplete(...,...,...){ //do something...}
});
}catch(CommitFailedException){
log.error("...");
} |
commitAsync() is asynchronous = non-blocking but non-deterministic (we don’t know exactly when the commit succeeded or not). And, no retries (commitAsync does not automatically retry when a commit doesn’t happen). Finally, the callback option (is useful) will be triggered upon the commit response from the cluster.
Committing Offsets
...