Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Kafka Consumer : Required Properties

...

The poll() process is a single-threaded operation.

Kafka Consumer and Offset

The offset is the critical value that enables consumer to operate independently by representing the last read position the consumer has read from a partition within a topic.

3 categories of offsets:

  • last committed offset : last record that consumer has confirmed to have processed.

  • current position : consumer tracks its current position.

  • log-end offset : this position advances as consumer advances in the log towards the last record in the partition which is know as the log end offset.

...

Mind the Offset Gap or un-committed offsets : between offset 1 et offset 3

...

The coordinator is the responsible object for communicating to the cluster and for ensuring the committed offsets are produced into the topic.

Info

If enable.auto.commit = false, then it is a manual management of the offset.

Kafka Consumer and Post-Processing as commitSync

To handle the new record to process, we need to make sure that the ones we’ve currently processed are committed.

Code Block
languagejava
try{
  for(...){ //Processing batches of records...  }
  //Commit when we know we're done, after the batch is processed
  myConsumer.commitSync();
}catch(CommitFailedException){
  log.error("...");
}

commitSync() is synchronous = blocks until receives response from cluster. And, retries until succeeds or unrecoverable error (retry.backoff.ms = 100 is by default).

Kafka Consumer and Post-Processing as commitAsync

To handle the new record to process, we need to control when to consider the message as truly processed.

Code Block
languagejava
try{
  for(...){ //Processing batches of records...  }
  //Commit when we know we're done, after the batch is processed
  myConsumer.commitAsync();//not recommended
  //recommended
  myConsumer.commitAsync(new OffsetCommitCallback(){
       public void onComplete(...,...,...){ //do something...}
  });
}catch(CommitFailedException){
  log.error("...");
}

commitAsync() is asynchronous = non-blocking but non-deterministic (we don’t know exactly when the commit succeeded or not). And, no retries (commitAsync does not automatically retry when a commit doesn’t happen). Finally, the callback option (is useful) will be triggered upon the commit response from the cluster.

Committing Offsets

...