Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current View Version History

« Previous Version 4 Next »

using System;  
using System.Threading.Tasks;  
using Confluent.Kafka;  
using Kafka.Interfaces;  

namespace Kafka.Producer  
{  
    /// <summary>  
    /// Base class for implementing Kafka Producer.  
    /// </summary>  
    /// <typeparam name="TKey">Indicates message's key in Kafka topic</typeparam>  
    /// <typeparam name="TValue">Indicates message's value in Kafka topic</typeparam>  
    public class KafkaProducer<TKey, TValue> : IDisposable, IKafkaProducer<TKey,TValue> where TValue : class  
    {  
        private readonly IProducer<TKey, TValue> _producer;  
  
        /// <summary>  
        /// Initializes the producer  
        /// </summary>  
        /// <param name="config"></param>  
        public KafkaProducer(ProducerConfig config)  
        {  
            _producer = new ProducerBuilder<TKey, TValue>(config).SetValueSerializer(new KafkaSerializer<TValue>()).Build();  
        }  

        /// <summary>  
        /// Triggered when the service creates Kafka topic.  
        /// </summary>  
        /// <param name="topic">Indicates topic name</param>  
        /// <param name="key">Indicates message's key in Kafka topic</param>  
        /// <param name="value">Indicates message's value in Kafka topic</param>  
        /// <returns></returns>  
        public async Task ProduceAsync(string topic,TKey key, TValue value)  
        {  
            await _producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });  
        }  
  
        /// <summary>  
        ///   
        /// </summary>  
        public void Dispose()  
        {  
            _producer.Flush();  
            _producer.Dispose();  
        }  
    }  
}  

  • No labels