...
Kafka Consumer and Post-Processing as commitAsync
To handle the new record to process, we need to control when to consider the message as truly processed.
Code Block | ||
---|---|---|
| ||
try{
for(...){ //Processing batches of records... }
//Commit when we know we're done, after the batch is processed
myConsumer.commitAsync();//not recommended
//recommended
myConsumer.commitAsync(new OffsetCommitCallback(){
public void onComplete(...,...,...){ //do something...}
});
}catch(CommitFailedException){
log.error("...");
} |
commitAsync() is asynchronous = non-blocking but non-deterministic (we don’t know exactly when the commit succeeded or not). And, no retries (commitAsync does not automatically retry when a commit doesn’t happen).