Skip to content

Commit 14cbc10

Browse files
committed
draft
1 parent 4a761ce commit 14cbc10

File tree

1 file changed

+29
-23
lines changed

1 file changed

+29
-23
lines changed

tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.clients.admin.ListGroupsOptions;
2929
import org.apache.kafka.clients.admin.ListGroupsResult;
3030
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
31+
import org.apache.kafka.clients.admin.OffsetSpec;
3132
import org.apache.kafka.clients.admin.ShareGroupDescription;
3233
import org.apache.kafka.clients.admin.ShareMemberAssignment;
3334
import org.apache.kafka.clients.admin.ShareMemberDescription;
@@ -182,6 +183,10 @@ List<GroupListing> listShareGroupsInStates(Set<GroupState> states) throws Execut
182183
return new ArrayList<>(result.all().get());
183184
}
184185

186+
Map<TopicPartition, ConsumerGroupCommand.LogOffsetResult> getLogOffsets(Collection<TopicPartition> topicPartitions, OffsetSpec offsetSpec) {
187+
188+
}
189+
185190
private void printGroupInfo(List<GroupListing> groups) {
186191
// find proper columns width
187192
int maxGroupLen = 15;
@@ -370,8 +375,7 @@ Entry<Throwable, Map<String, Throwable>> sendDeleteShareGroupOffsetsRequest(Stri
370375

371376
void resetOffsets() throws ExecutionException, InterruptedException {
372377
String groupId = opts.options.valueOf(opts.groupOpt);
373-
Collection<SharePartitionOffsetInformation> partitionsToReset = getPartitionsToReset(groupId);
374-
Map<TopicPartition, OffsetAndMetadata> offsetsToReset = getOffsetsToReset(partitionsToReset);
378+
Map<TopicPartition, OffsetAndMetadata> offsetsToReset = prepareOffsetsToReset(groupId);
375379
boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt);
376380
if (!dryRun) {
377381
adminClient.alterConsumerGroupOffsets(groupId, offsetsToReset, new AlterConsumerGroupOffsetsOptions()
@@ -381,39 +385,41 @@ void resetOffsets() throws ExecutionException, InterruptedException {
381385
printOffsetFormat(groupId, partitionsToReset, opts.options.has(opts.verboseOpt));
382386
}
383387

384-
private Map<TopicPartition, OffsetAndMetadata> getOffsetsToReset(Collection<SharePartitionOffsetInformation> partitionsToReset) {
388+
private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId) throws ExecutionException, InterruptedException {
385389
Map<TopicPartition, OffsetAndMetadata> offsetsToReset = new HashMap<>();
390+
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
391+
Map<TopicPartition, OffsetAndMetadata> startOffsets = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
392+
393+
if (opts.options.has(opts.topicOpt)) {
394+
Set<String> topics = new HashSet<>(opts.options.valuesOf(opts.topicOpt));
395+
startOffsets = startOffsets.entrySet().stream()
396+
.filter(entry -> {
397+
TopicPartition topicPartition = entry.getKey();
398+
return topics.contains(topicPartition.topic());
399+
})
400+
.collect(Collectors.toMap(
401+
Map.Entry::getKey,
402+
Map.Entry::getValue
403+
));
404+
}
405+
if (startOffsets.isEmpty()) {
406+
return Map.of();
407+
}
408+
386409
for (SharePartitionOffsetInformation partition : partitionsToReset) {
387410
TopicPartition tp = new TopicPartition(partition.topic, partition.partition);
388411
if (opts.options.has(opts.resetToEarliestOpt)) {
389-
offsetsToReset.put(tp, new OffsetAndMetadata(0L));
412+
//TODO implement resetToEarliestOpt
390413
} else if (opts.options.has(opts.resetToLatestOpt)) {
391-
offsetsToReset.put(tp, new OffsetAndMetadata(Long.MAX_VALUE));
414+
//TODO implement resetToLatestOpt
392415
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
416+
//TODO implement resetToLatestOpt
393417
offsetsToReset.put(tp, new OffsetAndMetadata(partition.offset.get()));
394418
}
395419
}
396420
return offsetsToReset;
397421
}
398422

399-
private Collection<SharePartitionOffsetInformation> getPartitionsToReset(String groupId) throws ExecutionException, InterruptedException {
400-
TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> offsets
401-
= collectGroupsOffsets(Set.of(groupId));
402-
if (offsets.isEmpty()) {
403-
return Collections.emptySet();
404-
}
405-
Collection<SharePartitionOffsetInformation> partitionOffsetInformation = offsets.get(groupId).getValue();
406-
if (opts.options.has(opts.allTopicsOpt)) {
407-
return partitionOffsetInformation;
408-
} else if (opts.options.has(opts.topicOpt)) {
409-
Set<String> topics= new HashSet<>(opts.options.valuesOf(opts.topicOpt));
410-
return partitionOffsetInformation.stream()
411-
.filter(partition -> topics.contains(partition.topic))
412-
.collect(Collectors.toSet());
413-
}
414-
return Collections.emptySet();
415-
}
416-
417423
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
418424
int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
419425
return options.timeoutMs(t);

0 commit comments

Comments
 (0)