/
How to control the flow of Kafka messages

How to control the flow of Kafka messages

internal static IEnumerable<ConsumeResult<K, V>> ConsumeRecords<K, V>(this IConsumer<K, V> consumer, TimeSpan timeout, long maxRecords) { List<ConsumeResult<K, V>> records = new List<ConsumeResult<K, V>>(); DateTime dt = DateTime.Now; TimeSpan ts = TimeSpan.Zero; do{ var r = consumer.Consume(ts); if (r != null) { records.Add(r); } else{ // TODO : make log(N) to increse progressivly timeout consumption ts = (dt.Add(timeout) - DateTime.Now); } if (records.Count >= maxRecords) break; } while (dt.Add(timeout) > DateTime.Now); return records; }

ย