Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagec#
// Set the consumer group
var config = new ConsumerConfig
{
  GroupId = "test-app",
  BootstrapServers = "localhost:9092",
  Debug = "generic, broker, topic, metadata, consumer"
};
var builder = new ConsumerBuilder<string, string>(config);

Assign with Commit Method

Code Block
languagec#
static void Assign(ConsumerBuilder<string, string> builder, string bootstrap, string groupId, string topic)
{
     var offsets = new List<TopicPartitionOffset>
     {
        new TopicPartitionOffset(new TopicPartition(topic, 0), 2),
        new TopicPartitionOffset(new TopicPartition(topic, 1), 1),
     };
     // Check if none consumer is register for this consumer group
     var adminClientConfig = new AdminClientConfig
     {
          BootstrapServers = bootstrap
     };
     var adminClientBuilder = new AdminClientBuilder(adminClientConfig);
     using (var adminClient = adminClientBuilder.Build()){
          var groupInfo = adminClient.ListGroup(groupId, TimeSpan.FromSeconds(10));
          if (groupInfo.Members.Count > 0){
              Console.WriteLine($"Error consumers already exist in this consumer group {groupId}");
              foreach (var member in groupInfo.Members)
                  Console.WriteLine(
                     $"Member {member.MemberId} (client.id:{member.ClientId}#client.host:{member.ClientHost}) =" +
                      $" Assigment {DecodeMemberAssignment(member.MemberAssignment)}");
                  return;
            }
      }
      using (var consumer = builder.Build()){
           consumer.Commit(offsets);
      }
}