diff --git a/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java b/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java index da0d3148..7ef25e67 100644 --- a/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java +++ b/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java @@ -18,16 +18,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.file.DataFileReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; @@ -262,21 +267,30 @@ private static boolean generateData() throws Exception { "service-call-view-events", 27L, "span-event-view", 50L, "log-event-view", 0L); - int retry = 0; - while (!areMessagesConsumed(endOffSetMap) && retry++ < 5) { - Thread.sleep(2000); + int retry = 0, maxRetries = 50; + while (!areMessagesConsumed(endOffSetMap) && retry++ < maxRetries) { + Thread.sleep(6000); // max 5 min wait time } // stop this service viewGen.stop(); - return retry < 5; + return retry < maxRetries; } private static boolean areMessagesConsumed(Map endOffSetMap) throws Exception { - ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = - adminClient.listConsumerGroupOffsets(""); - Map offsetAndMetadataMap = - consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); + ListConsumerGroupsResult listConsumerGroups = adminClient.listConsumerGroups(); + List groupIds = listConsumerGroups.all().get().stream() + .filter(consumerGroupListing -> consumerGroupListing.isSimpleConsumerGroup()) + .map(consumerGroupListing -> consumerGroupListing.groupId()) + .collect(Collectors.toUnmodifiableList()); + + Map offsetAndMetadataMap = new HashMap<>(); + for(String groupId : groupIds) { + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId); + Map metadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); + metadataMap.forEach((k, v) -> offsetAndMetadataMap.putIfAbsent(k, v)); + } + if (offsetAndMetadataMap.size() < 6) { return false; }