/
How to assign a consumer group to a partition / topic
How to assign a consumer group to a partition / topic
Assign a consumer to a list of partitions is not incremental.
// Set the consumer group
var config = new ConsumerConfig
{
GroupId = "test-app",
BootstrapServers = "localhost:9092",
Debug = "generic, broker, topic, metadata, consumer"
};
var builder = new ConsumerBuilder<string, string>(config);
Assign with Commit Method
static void Assign(ConsumerBuilder<string, string> builder, string bootstrap, string groupId, string topic)
{
var offsets = new List<TopicPartitionOffset>
{
new TopicPartitionOffset(new TopicPartition(topic, 0), 2),
new TopicPartitionOffset(new TopicPartition(topic, 1), 1),
};
// Check if none consumer is register for this consumer group
var adminClientConfig = new AdminClientConfig
{
BootstrapServers = bootstrap
};
var adminClientBuilder = new AdminClientBuilder(adminClientConfig);
using (var adminClient = adminClientBuilder.Build()){
var groupInfo = adminClient.ListGroup(groupId, TimeSpan.FromSeconds(10));
if (groupInfo.Members.Count > 0){
Console.WriteLine($"Error consumers already exist in this consumer group {groupId}");
foreach (var member in groupInfo.Members)
Console.WriteLine(
$"Member {member.MemberId} (client.id:{member.ClientId}#client.host:{member.ClientHost}) =" +
$" Assigment {DecodeMemberAssignment(member.MemberAssignment)}");
return;
}
}
using (var consumer = builder.Build()){
consumer.Commit(offsets);
}
}
, multiple selections available,