Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Note

Code is coming from : https://www.c-sharpcorner.com/article/building-real-time-applications-using-net-core-and-kafka/

Producer

Code Block
languagec#
using System;  
using System.Threading.Tasks;  
using Confluent.Kafka;  
using Kafka.Interfaces;  

namespace Kafka.Producer  
{  
    /// <summary>  
    /// Base class for implementing Kafka Producer.  
    /// </summary>  
    /// <typeparam name="TKey">Indicates message's key in Kafka topic</typeparam>  
    /// <typeparam name="TValue">Indicates message's value in Kafka topic</typeparam>  
    public class KafkaProducer<TKey, TValue> : IDisposable, IKafkaProducer<TKey,TValue> where TValue : class  
    {  
        private readonly IProducer<TKey, TValue> _producer;  
  
        /// <summary>  
        /// Initializes the producer  
        /// </summary>  
        /// <param name="config"></param>  
        public KafkaProducer(ProducerConfig config)  
        {  
            _producer = new ProducerBuilder<TKey, TValue>(config).SetValueSerializer(new KafkaSerializer<TValue>()).Build();  
        }  

        /// <summary>  
        /// Triggered when the service creates Kafka topic.  
        /// </summary>  
        /// <param name="topic">Indicates topic name</param>  
        /// <param name="key">Indicates message's key in Kafka topic</param>  
        /// <param name="value">Indicates message's value in Kafka topic</param>  
        /// <returns></returns>  
        public async Task ProduceAsync(string topic,TKey key, TValue value)  
        {  
            await _producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });  
        }  
  
        /// <summary>  
        ///   
        /// </summary>  
        public void Dispose()  
        {  
            _producer.Flush();  
            _producer.Dispose();  
        }  
    }  
}  

Consumer

Code Block
languagec#
using System;  
using System.Threading;  
using System.Threading.Tasks;  
using Confluent.Kafka;  
using Kafka.Interfaces;  
using Microsoft.Extensions.DependencyInjection;  
  
namespace Kafka.Consumer  
{  
    /// <summary>  
    /// Base class for implementing Kafka Consumer.  
    /// </summary>  
    /// <typeparam name="TKey"></typeparam>  
    /// <typeparam name="TValue"></typeparam>  
    public class KafkaConsumer<TKey, TValue> : IKafkaConsumer<TKey, TValue> where TValue : class  
    {  
        private readonly ConsumerConfig _config;  
        private IKafkaHandler<TKey, TValue> _handler;  
        private IConsumer<TKey, TValue> _consumer;  
        private string _topic;  
  
        private readonly IServiceScopeFactory _serviceScopeFactory;  
  
        /// <summary>  
        /// Indicates constructor to initialize the serviceScopeFactory and ConsumerConfig  
        /// </summary>  
        /// <param name="config">Indicates the consumer configuration</param>  
        /// <param name="serviceScopeFactory">Indicates the instance for serviceScopeFactory</param>  
        public KafkaConsumer(ConsumerConfig config, IServiceScopeFactory serviceScopeFactory)  
        {  
            _serviceScopeFactory = serviceScopeFactory;  
            _config = config;  
        }  
  
        /// <summary>  
        /// Triggered when the service is ready to consume the Kafka topic.  
        /// </summary>  
        /// <param name="topic">Indicates Kafka Topic</param>  
        /// <param name="stoppingToken">Indicates stopping token</param>  
        /// <returns></returns>  
        public async Task Consume(string topic, CancellationToken stoppingToken)  
        {  
            using var scope = _serviceScopeFactory.CreateScope();  
  
            _handler = scope.ServiceProvider.GetRequiredService<IKafkaHandler<TKey, TValue>>();  
            _consumer = new ConsumerBuilder<TKey, TValue>(_config).SetValueDeserializer(new KafkaDeserializer<TValue>()).Build();  
            _topic = topic;  
  
            await Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken);  
        }  
  
        /// <summary>  
        /// This will close the consumer, commit offsets and leave the group cleanly.  
        /// </summary>  
        public void Close()  
        {  
            _consumer.Close();  
        }  
  
        /// <summary>  
        /// Releases all resources used by the current instance of the consumer  
        /// </summary>  
        public void Dispose()  
        {  
            _consumer.Dispose();  
        }  
  
        private async Task StartConsumerLoop(CancellationToken cancellationToken)  
        {  
            _consumer.Subscribe(_topic);  
  
            while (!cancellationToken.IsCancellationRequested)  
            {  
                try  
                {  
                    var result = _consumer.Consume(cancellationToken);  
  
                    if (result != null)  
                    {  
                        await _handler.HandleAsync(result.Message.Key, result.Message.Value);  
                    }  
                }  
                catch (OperationCanceledException)  
                {  
                    break;  
                }  
                catch (ConsumeException e)  
                {  
                    // Consumer errors should generally be ignored (or logged) unless fatal.  
                    Console.WriteLine($"Consume error: {e.Error.Reason}");  
  
                    if (e.Error.IsFatal)  
                    {  
                        break;  
                    }  
                }  
                catch (Exception e)  
                {  
                    Console.WriteLine($"Unexpected error: {e}");  
                    break;  
                }  
            }  
        }  
    }  
}