How to poll/consume for new Kafka Messages

Poll/Consume for new messages / events. Blocks until a consume result is available or the timeout period has elapsed.

static void Consume(ConsumerBuilder<string, string> builder, string topic) { bool run = true; var consumer = builder.Build(); consumer.Subscribe(topic); Console.CancelKeyPress += (p, e) => { run = false; consumer.Unsubscribe(); consumer.Close(); consumer.Dispose(); }; while (run){ consumer.Consume(TimeSpan.FromSeconds(1)); } }

Control the fetch behavior

var consumerConfig = new ConsumerConfig { FetchMinBytes = 1, //Default value FetchWaitMaxMs = 500 ,//Default value MaxPartitionFetchBytes = 1048576, //Default value, so 1 MB FetchMaxBytes = 52428800, //Default value, so 50 MB MaxPollIntervalMs = 300000 //Default value };
  • FetchMinBytes : specify the minimum amount of data to receive from the broker when fetching records.

  • FetchWaitMaxMs : broker blocks before answering the fetch request of there is no sufficient data to satisfy the requirement given by FetchMinBytes.

  • MaxPartitionFetchBytes : maximum amount data per partition the server returns.

  • FetchMaxBytes : maximum data returned for each request.

  • MaxPollIntervalMs : setting does not impact the behavior, but consumer will cache records from each fetch request and returns records incrementally from each poll.