/
How to poll/consume for new Kafka Messages
How to poll/consume for new Kafka Messages
Poll/Consume for new messages / events. Blocks until a consume result is available or the timeout period has elapsed.
static void Consume(ConsumerBuilder<string, string> builder, string topic)
{
bool run = true;
var consumer = builder.Build();
consumer.Subscribe(topic);
Console.CancelKeyPress += (p, e) =>
{
run = false;
consumer.Unsubscribe();
consumer.Close();
consumer.Dispose();
};
while (run){
consumer.Consume(TimeSpan.FromSeconds(1));
}
}
Control the fetch behavior
var consumerConfig = new ConsumerConfig
{
FetchMinBytes = 1, //Default value
FetchWaitMaxMs = 500 ,//Default value
MaxPartitionFetchBytes = 1048576, //Default value, so 1 MB
FetchMaxBytes = 52428800, //Default value, so 50 MB
MaxPollIntervalMs = 300000 //Default value
};
FetchMinBytes : specify the minimum amount of data to receive from the broker when fetching records.
FetchWaitMaxMs : broker blocks before answering the fetch request of there is no sufficient data to satisfy the requirement given by FetchMinBytes.
MaxPartitionFetchBytes : maximum amount data per partition the server returns.
FetchMaxBytes : maximum data returned for each request.
MaxPollIntervalMs : setting does not impact the behavior, but consumer will cache records from each fetch request and returns records incrementally from each poll.
, multiple selections available,