Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Kafka Consumer : Required Properties

Properties props = new Properties();
props.put("bootstrap.servers", "Broker-1:9092, Broker-2:9093"); //Cluster membership: partition leader, etc.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//used for message serialization and deserialization
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

Kafka Consumer : Subscribing /Unsubscribing to Topics

KafkaConsumer myConsumer = new KafkaConsumer(props);
myConsumer.subscribe(Array.asList("my_topic_A"));//take a collection of strings
myConsumer.unsubscribe();

Kafka Consumer : Assigning for partitions (vs subscribing)

We take on the responsibility of assigning ourselves specific partitions to a single consumer instance, regardless of topic.

//Similar pattern as subcribe();
// Manual Partition Assignment
TopicPartition partition0 = new TopicPartition("my_topic_A", 0);
ArrayList<TopicPartition> partitions = new ArrayLis<TopicPartition>();
partitions.add(partition0);

//Invoke the assign() method
myConsumer.assign(partitions); //This is not incremental

Kafka Consumer : Single Consumer Topic Subscriptions

Kafka Consumer : Single Consumer Partition Assignments

Kafka Consumer : Starting the Poll Loop

Primary function of the Kafka Consumer = poll()

// Set the topic subscription or partition assignments
myConsumer.subscribe(topics);
myConsumer.assign(partitions);

try{
  while(true){
    ConsumerRecords<String, String> records = myConsumer.poll(100);
    //add business logic if needed
  }
}finally{
  myConsumer.close();
}
  • No labels