Kafka Consumer : Required Properties
Properties props = new Properties(); props.put("bootstrap.servers", "Broker-1:9092, Broker-2:9093"); //Cluster membership: partition leader, etc. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//used for message serialization and deserialization props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
Kafka Consumer : Subscribing /Unsubscribing to Topics
KafkaConsumer myConsumer = new KafkaConsumer(props); myConsumer.subscribe(Array.asList("my_topic_A"));//take a collection of strings myConsumer.unsubscribe();
Kafka Consumer : Assigning for partitions (vs subscribing)
We take on the responsibility of assigning ourselves specific partitions to a single consumer instance, regardless of topic.
//Similar pattern as subcribe(); // Manual Partition Assignment TopicPartition partition0 = new TopicPartition("my_topic_A", 0); ArrayList<TopicPartition> partitions = new ArrayLis<TopicPartition>(); partitions.add(partition0); //Invoke the assign() method myConsumer.assign(partitions); //This is not incremental
Kafka Consumer : Single Consumer Topic Subscriptions
Kafka Consumer : Single Consumer Partition Assignments
Kafka Consumer : Starting the Poll Loop
Primary function of the Kafka Consumer = poll()
// Set the topic subscription or partition assignments myConsumer.subscribe(topics); myConsumer.assign(partitions); try{ while(true){ ConsumerRecords<String, String> records = myConsumer.poll(100); //add business logic if needed } }finally{ myConsumer.close(); }