Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Kafka Consumer : Required Properties

Properties props = new Properties();
props.put("bootstrap.servers", "Broker-1:9092, Broker-2:9093"); //Cluster membership: partition leader, etc.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//used for message serialization and deserialization
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

Kafka Consumer : Subscribing /Unsubscribing to Topics

KafkaConsumer myConsumer = new KafkaConsumer(props);
myConsumer.subscribe(Array.asList("my_topic_A"));//take a collection of strings
myConsumer.unsubscribe();

Kafka Consumer : Assigning for partitions (vs subscribing)

We take on the responsibility of assigning ourselves specific partitions to a single consumer instance, regardless of topic.

//Similar pattern as subcribe();
// Manual Partition Assignment
TopicPartition partition0 = new TopicPartition("my_topic_A", 0);
ArrayList<TopicPartition> partitions = new ArrayLis<TopicPartition>();
partitions.add(partition0);

//Invoke the assign() method
myConsumer.assign(partitions); //This is not incremental

Kafka Consumer : Single Consumer Topic Subscriptions

Kafka Consumer : Single Consumer Partition Assignments

Kafka Consumer : Starting the Poll Loop

Primary function of the Kafka Consumer = poll()

// Set the topic subscription or partition assignments
myConsumer.subscribe(topics);
myConsumer.assign(partitions);

try{
  while(true){
    ConsumerRecords<String, String> records = myConsumer.poll(100);
    //add business logic if needed in a loop for
  }
}finally{
  myConsumer.close();
}

Kafka Consumer : Polling

The response from the brokers is used to instantiate its internal metadata object, which will keep up to date, while the poll method runs, getting periodic updates from the cluster when cluster details change. With metadata available, other elements become more involved : ConsumerCoordinator has 2 tasks.

First, ConsumerCoordinator must be aware of partition reassignment and notification of assignment changes to the subscription state object ; Second, ConsumerCoordinator for committing offsets to the cluster.

The tasks will cause the confirmation of which will cause the update of the subscription state, so it can always be aware of the status of the topics and partitions.

The poll() process is a single-threaded operation.

Kafka Consumer and Offset

  • No labels