...
Code Block | ||
---|---|---|
| ||
// Set the topic subscription or partition assignments myConsumer.subscribe(topics); myConsumer.assign(partitions); try{ while(true){ ConsumerRecords<String, String> records = myConsumer.poll(100); //add business logic if needed in a loop for } }finally{ myConsumer.close(); } |
Kafka Consumer : Polling
The response from the brokers is used to instantiate its internal metadata object, which will keep up to date, while the poll method runs, getting periodic updates from the cluster when cluster details change. With metadata available, other elements become more involved : ConsumerCoordinator has 2 tasks.
First, ConsumerCoordinator must be aware of partition reassignment and notification of assignment changes to the subscription state object ; Second, ConsumerCoordinator for committing offsets to the cluster.
The tasks will cause the confirmation of which will cause the update of the subscription state, so it can always be aware of the status of the topics and partitions.