/
How to control the flow of Kafka messages
How to control the flow of Kafka messages
internal static IEnumerable<ConsumeResult<K, V>> ConsumeRecords<K, V>(this IConsumer<K, V> consumer, TimeSpan timeout, long maxRecords)
{
List<ConsumeResult<K, V>> records = new List<ConsumeResult<K, V>>();
DateTime dt = DateTime.Now;
TimeSpan ts = TimeSpan.Zero;
do{
var r = consumer.Consume(ts);
if (r != null)
{ records.Add(r); }
else{ // TODO : make log(N) to increse progressivly timeout consumption
ts = (dt.Add(timeout) - DateTime.Now);
}
if (records.Count >= maxRecords)
break;
} while (dt.Add(timeout) > DateTime.Now);
return records;
}
ย
, multiple selections available,