Kafka Consumer : Required Properties
...
Mind the Offset Gap or un-committed offsets : between offset 1 et offset 3
...
The coordinator is the responsible object for communicating to the cluster and for ensuring the committed offsets are produced into the topic.
Info |
---|
If enable.auto.commit = false, then it is a manual management of the offset. |
Kafka Consumer and Post-Processing as commitSync
To handle the new record to process, we need to make sure that the ones we’ve currently processed are committed.
Code Block | ||
---|---|---|
| ||
try{
for(...){ //Processing batches of records... }
//Commit when we know we're done, after the batch is processed
myConsumer.commitSync();
}catch(CommitFailedException){
log.error("...");
} |
commitSync() is synchronous = blocks until receives response from cluster. And, retries until succeeds or unrecoverable error (retry.backoff.ms = 100 is by default).
Kafka Consumer and Post-Processing as commitAsync
To handle the new record to process, we need to control when to consider the message as truly processed.
Code Block | ||
---|---|---|
| ||
try{ for(...){ //Processing batches of records... } //Commit when we know we're done, after the batch is processed myConsumer.commitAsync();//not recommended //recommended myConsumer.commitAsync(new OffsetCommitCallback(){ public void onComplete(...,...,...){ //do something...} }); }catch(CommitFailedException){ log.error("..."); } |
commitAsync() is asynchronous = non-blocking but non-deterministic (we don’t know exactly when the commit succeeded or not). And, no retries (commitAsync does not automatically retry when a commit doesn’t happen). Finally, the callback option (is useful) will be triggered upon the commit response from the cluster.
Committing Offsets
...