How to produce/consume Kafka messages

Code is coming from : https://www.c-sharpcorner.com/article/building-real-time-applications-using-net-core-and-kafka/

Producer

using System; using System.Threading.Tasks; using Confluent.Kafka; using Kafka.Interfaces; namespace Kafka.Producer { /// <summary> /// Base class for implementing Kafka Producer. /// </summary> /// <typeparam name="TKey">Indicates message's key in Kafka topic</typeparam> /// <typeparam name="TValue">Indicates message's value in Kafka topic</typeparam> public class KafkaProducer<TKey, TValue> : IDisposable, IKafkaProducer<TKey,TValue> where TValue : class { private readonly IProducer<TKey, TValue> _producer; /// <summary> /// Initializes the producer /// </summary> /// <param name="config"></param> public KafkaProducer(ProducerConfig config) { _producer = new ProducerBuilder<TKey, TValue>(config).SetValueSerializer(new KafkaSerializer<TValue>()).Build(); } /// <summary> /// Triggered when the service creates Kafka topic. /// </summary> /// <param name="topic">Indicates topic name</param> /// <param name="key">Indicates message's key in Kafka topic</param> /// <param name="value">Indicates message's value in Kafka topic</param> /// <returns></returns> public async Task ProduceAsync(string topic,TKey key, TValue value) { await _producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value }); } /// <summary> /// /// </summary> public void Dispose() { _producer.Flush(); _producer.Dispose(); } } }

Consumer

using System; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using Kafka.Interfaces; using Microsoft.Extensions.DependencyInjection; namespace Kafka.Consumer { /// <summary> /// Base class for implementing Kafka Consumer. /// </summary> /// <typeparam name="TKey"></typeparam> /// <typeparam name="TValue"></typeparam> public class KafkaConsumer<TKey, TValue> : IKafkaConsumer<TKey, TValue> where TValue : class { private readonly ConsumerConfig _config; private IKafkaHandler<TKey, TValue> _handler; private IConsumer<TKey, TValue> _consumer; private string _topic; private readonly IServiceScopeFactory _serviceScopeFactory; /// <summary> /// Indicates constructor to initialize the serviceScopeFactory and ConsumerConfig /// </summary> /// <param name="config">Indicates the consumer configuration</param> /// <param name="serviceScopeFactory">Indicates the instance for serviceScopeFactory</param> public KafkaConsumer(ConsumerConfig config, IServiceScopeFactory serviceScopeFactory) { _serviceScopeFactory = serviceScopeFactory; _config = config; } /// <summary> /// Triggered when the service is ready to consume the Kafka topic. /// </summary> /// <param name="topic">Indicates Kafka Topic</param> /// <param name="stoppingToken">Indicates stopping token</param> /// <returns></returns> public async Task Consume(string topic, CancellationToken stoppingToken) { using var scope = _serviceScopeFactory.CreateScope(); _handler = scope.ServiceProvider.GetRequiredService<IKafkaHandler<TKey, TValue>>(); _consumer = new ConsumerBuilder<TKey, TValue>(_config).SetValueDeserializer(new KafkaDeserializer<TValue>()).Build(); _topic = topic; await Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken); } /// <summary> /// This will close the consumer, commit offsets and leave the group cleanly. /// </summary> public void Close() { _consumer.Close(); } /// <summary> /// Releases all resources used by the current instance of the consumer /// </summary> public void Dispose() { _consumer.Dispose(); } private async Task StartConsumerLoop(CancellationToken cancellationToken) { _consumer.Subscribe(_topic); while (!cancellationToken.IsCancellationRequested) { try { var result = _consumer.Consume(cancellationToken); if (result != null) { await _handler.HandleAsync(result.Message.Key, result.Message.Value); } } catch (OperationCanceledException) { break; } catch (ConsumeException e) { // Consumer errors should generally be ignored (or logged) unless fatal. Console.WriteLine($"Consume error: {e.Error.Reason}"); if (e.Error.IsFatal) { break; } } catch (Exception e) { Console.WriteLine($"Unexpected error: {e}"); break; } } } } }