From 60e3d6422148f4999e646814e90eebd1b2c3ab46 Mon Sep 17 00:00:00 2001 From: Aaron Steinfeld Date: Thu, 23 Jun 2022 16:34:54 -0400 Subject: [PATCH 1/4] feat: support configuring distinct count agg function --- .../pinot/PinotBasedRequestHandler.java | 16 ++---- .../converters/PinotFunctionConverter.java | 52 +++++++++---------- .../PinotFunctionConverterConfig.java | 38 ++++++++++++++ .../QueryRequestToPinotSQLConverterTest.java | 2 +- .../PinotFunctionConverterTest.java | 29 +++++++++-- 5 files changed, 94 insertions(+), 43 deletions(-) create mode 100644 query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterConfig.java diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java index 249020b5..7c936860 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java @@ -1,6 +1,5 @@ package org.hypertrace.core.query.service.pinot; -import static org.hypertrace.core.query.service.ConfigUtils.optionallyGet; import static org.hypertrace.core.query.service.QueryRequestUtil.getLogicalColumnName; import com.google.common.base.Preconditions; @@ -43,6 +42,7 @@ import org.hypertrace.core.query.service.api.ValueType; import org.hypertrace.core.query.service.pinot.PinotClientFactory.PinotClient; import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverter; +import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverterConfig; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,19 +132,9 @@ private void processConfig(Config config) { ? Optional.of(config.getString(START_TIME_ATTRIBUTE_NAME_CONFIG_KEY)) : Optional.empty(); - Optional customPercentileFunction = - optionallyGet(() -> config.getString(PERCENTILE_AGGREGATION_FUNCTION_CONFIG)); - - customPercentileFunction.ifPresent( - function -> - LOG.info( - "Using {} function for percentile aggregations of handler: {}", function, name)); - PinotFunctionConverter functionConverter = - customPercentileFunction - .map(PinotFunctionConverter::new) - .orElseGet(PinotFunctionConverter::new); this.request2PinotSqlConverter = - new QueryRequestToPinotSQLConverter(viewDefinition, functionConverter); + new QueryRequestToPinotSQLConverter( + viewDefinition, new PinotFunctionConverter(new PinotFunctionConverterConfig(config))); if (config.hasPath(SLOW_QUERY_THRESHOLD_MS_CONFIG)) { this.slowQueryThreshold = config.getInt(SLOW_QUERY_THRESHOLD_MS_CONFIG); diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java index 1f15dea3..cf4dec6c 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java @@ -3,6 +3,7 @@ import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_AVGRATE; import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_CONCAT; import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_COUNT; +import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_DISTINCTCOUNT; import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_PERCENTILE; import java.time.Duration; @@ -18,28 +19,17 @@ import org.hypertrace.core.query.service.api.ValueType; public class PinotFunctionConverter { - /** - * Computing PERCENTILE in Pinot is resource intensive. T-Digest calculation is much faster and - * reasonably accurate, hence use that as the default. - * - *

AVGRATE not supported directly in Pinot. So AVG_RATE is computed by summing over all values - * and then dividing by a constant. - */ - private static final String DEFAULT_PERCENTILE_AGGREGATION_FUNCTION = "PERCENTILETDIGEST"; - private static final String PINOT_CONCAT_FUNCTION = "CONCATSKIPNULL"; - private static final String DEFAULT_AVG_RATE_SIZE = "PT1S"; - private final String percentileAggFunction; + private static final String DEFAULT_AVG_RATE_SIZE = "PT1S"; + private final PinotFunctionConverterConfig config; - public PinotFunctionConverter(String configuredPercentileFunction) { - this.percentileAggFunction = - Optional.ofNullable(configuredPercentileFunction) - .orElse(DEFAULT_PERCENTILE_AGGREGATION_FUNCTION); + public PinotFunctionConverter(PinotFunctionConverterConfig config) { + this.config = config; } public PinotFunctionConverter() { - this.percentileAggFunction = DEFAULT_PERCENTILE_AGGREGATION_FUNCTION; + this(new PinotFunctionConverterConfig()); } public String convert( @@ -50,10 +40,16 @@ public String convert( case QUERY_FUNCTION_COUNT: return this.convertCount(); case QUERY_FUNCTION_PERCENTILE: + // Computing PERCENTILE in Pinot is resource intensive. T-Digest calculation is much faster + // and reasonably accurate, so support selecting the implementation to use return this.functionToString(this.toPinotPercentile(function), argumentConverter); + case QUERY_FUNCTION_DISTINCTCOUNT: + return this.functionToString(this.toPinotDistinctCount(function), argumentConverter); case QUERY_FUNCTION_CONCAT: return this.functionToString(this.toPinotConcat(function), argumentConverter); case QUERY_FUNCTION_AVGRATE: + // AVGRATE not supported directly in Pinot. So AVG_RATE is computed by summing over all + // values and then dividing by a constant. return this.functionToStringForAvgRate(function, argumentConverter, executionContext); default: // TODO remove once pinot-specific logic removed from gateway - this normalization reverts @@ -70,7 +66,7 @@ private String functionToString( Function function, java.util.function.Function argumentConverter) { String argumentString = function.getArgumentsList().stream() - .map(argumentConverter::apply) + .map(argumentConverter) .collect(Collectors.joining(",")); return function.getFunctionName() + "(" + argumentString + ")"; @@ -88,15 +84,14 @@ private String functionToStringForAvgRate( : DEFAULT_AVG_RATE_SIZE; long rateIntervalInSeconds = isoDurationToSeconds(rateIntervalInIso); long aggregateIntervalInSeconds = - (executionContext - .getTimeSeriesPeriod() - .or(executionContext::getTimeRangeDuration) - .orElseThrow()) + executionContext + .getTimeSeriesPeriod() + .or(executionContext::getTimeRangeDuration) + .orElseThrow() .getSeconds(); return String.format( - "SUM(DIV(%s, %s))", - columnName, (double) aggregateIntervalInSeconds / rateIntervalInSeconds); + "(%s / %s)", columnName, (double) aggregateIntervalInSeconds / rateIntervalInSeconds); } private String convertCount() { @@ -114,7 +109,7 @@ private Function toPinotPercentile(Function function) { QUERY_FUNCTION_PERCENTILE, function.getArguments(0)))); return Function.newBuilder(function) .removeArguments(0) - .setFunctionName(this.percentileAggFunction + percentileValue) + .setFunctionName(this.config.getPercentileAggregationFunction() + percentileValue) .build(); } @@ -124,6 +119,12 @@ private Function toPinotConcat(Function function) { return Function.newBuilder(function).setFunctionName(PINOT_CONCAT_FUNCTION).build(); } + private Function toPinotDistinctCount(Function function) { + return Function.newBuilder(function) + .setFunctionName(this.config.getDistinctCountAggregationFunction()) + .build(); + } + private boolean isHardcodedPercentile(Function function) { String functionName = function.getFunctionName().toUpperCase(); return functionName.startsWith(QUERY_FUNCTION_PERCENTILE) @@ -178,8 +179,7 @@ Optional intFromValue(Value value) { private static long isoDurationToSeconds(String duration) { try { - Duration d = java.time.Duration.parse(duration); - return d.get(ChronoUnit.SECONDS); + return Duration.parse(duration).get(ChronoUnit.SECONDS); } catch (DateTimeParseException ex) { throw new IllegalArgumentException( String.format( diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterConfig.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterConfig.java new file mode 100644 index 00000000..54f55467 --- /dev/null +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterConfig.java @@ -0,0 +1,38 @@ +package org.hypertrace.core.query.service.pinot.converters; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import lombok.AllArgsConstructor; +import lombok.Value; + +@Value +@AllArgsConstructor +public class PinotFunctionConverterConfig { + + private static final String PERCENTILE_AGGREGATION_FUNCTION_CONFIG = "percentileAggFunction"; + private static final String DISTINCT_COUNT_AGGREGATION_FUNCTION_CONFIG = + "distinctCountAggFunction"; + private static final String DEFAULT_PERCENTILE_AGGREGATION_FUNCTION = "PERCENTILETDIGEST"; + private static final String DEFAULT_DISTINCT_COUNT_AGGREGATION_FUNCTION = "DISTINCTCOUNT"; + + String percentileAggregationFunction; + String distinctCountAggregationFunction; + + public PinotFunctionConverterConfig(Config config) { + if (config.hasPath(PERCENTILE_AGGREGATION_FUNCTION_CONFIG)) { + this.percentileAggregationFunction = config.getString(PERCENTILE_AGGREGATION_FUNCTION_CONFIG); + } else { + this.percentileAggregationFunction = DEFAULT_PERCENTILE_AGGREGATION_FUNCTION; + } + if (config.hasPath(DISTINCT_COUNT_AGGREGATION_FUNCTION_CONFIG)) { + this.distinctCountAggregationFunction = + config.getString(DISTINCT_COUNT_AGGREGATION_FUNCTION_CONFIG); + } else { + this.distinctCountAggregationFunction = DEFAULT_DISTINCT_COUNT_AGGREGATION_FUNCTION; + } + } + + public PinotFunctionConverterConfig() { + this(ConfigFactory.empty()); + } +} diff --git a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java index d971c2a3..64fcfe9c 100644 --- a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java +++ b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java @@ -914,7 +914,7 @@ public void testQueryWithAverageRateInOrderBy() { + "' " + "and ( start_time_millis >= 1637297304041 and start_time_millis < 1637300904041 and service_id != 'null' ) " + "group by service_id, service_name " - + "order by sum(div(error_count, 3600.0)) " + + "order by (error_count / 3600.0) " + "limit 10000", viewDefinition, executionContext); diff --git a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java index c3303c1f..571687d6 100644 --- a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java +++ b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java @@ -104,7 +104,7 @@ void acceptsCustomPercentileFunctions() { assertEquals( expected, - new PinotFunctionConverter("CUSTOMPERCENTILE") + new PinotFunctionConverter(new PinotFunctionConverterConfig("CUSTOMPERCENTILE", null)) .convert(mockingExecutionContext, percentileFunction, this.mockArgumentConverter)); } @@ -211,7 +211,7 @@ void convertAvgRateFunction() { .thenReturn(Optional.of(Duration.ofSeconds(10))); assertEquals( - "SUM(DIV(foo, 2.0))", + "(foo / 2.0)", new PinotFunctionConverter() .convert( mockingExecutionContext, @@ -223,7 +223,7 @@ void convertAvgRateFunction() { .thenReturn(Optional.of(Duration.ofSeconds(20))); assertEquals( - "SUM(DIV(foo, 4.0))", + "(foo / 4.0)", new PinotFunctionConverter() .convert( mockingExecutionContext, @@ -231,6 +231,29 @@ void convertAvgRateFunction() { this.mockArgumentConverter)); } + @Test + void convertsDistinctCountFunction() { + Expression column = createColumnExpression("foo").build(); + + when(this.mockArgumentConverter.apply(column)).thenReturn("foo"); + + assertEquals( + "DISTINCTCOUNT(foo)", + new PinotFunctionConverter() + .convert( + mockingExecutionContext, + buildFunction(QUERY_FUNCTION_DISTINCTCOUNT, column.toBuilder()), + this.mockArgumentConverter)); + + assertEquals( + "CUSTOM_DC(foo)", + new PinotFunctionConverter(new PinotFunctionConverterConfig(null, "CUSTOM_DC")) + .convert( + mockingExecutionContext, + buildFunction(QUERY_FUNCTION_DISTINCTCOUNT, column.toBuilder()), + this.mockArgumentConverter)); + } + @Test void testIllegalDurationFormat() { Expression column1 = createColumnExpression("foo").build(); From 271fe58f852de3482a348e0ffc57c99078536068 Mon Sep 17 00:00:00 2001 From: Aaron Steinfeld Date: Thu, 23 Jun 2022 17:04:26 -0400 Subject: [PATCH 2/4] fix: restore missing sum --- .snyk | 18 +++++++++--------- .../converters/PinotFunctionConverter.java | 2 +- .../QueryRequestToPinotSQLConverterTest.java | 2 +- .../converters/PinotFunctionConverterTest.java | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/.snyk b/.snyk index 7392ed5f..5bc28ccd 100644 --- a/.snyk +++ b/.snyk @@ -5,42 +5,42 @@ ignore: SNYK-JAVA-LOG4J-572732: - '*': reason: no available replacement - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z SNYK-JAVA-IONETTY-473694: - '*': reason: no available replacement - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z SNYK-JAVA-IONETTY-1042268: - '*': reason: No replacement available - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z SNYK-JAVA-LOG4J-1300176: - '*': reason: None Given - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z SNYK-JAVA-LOG4J-2316893: - '*': reason: None Given - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z created: 2021-12-14T18:05:26.628Z SNYK-JAVA-LOG4J-2342645: - '*': reason: None Given - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z created: 2022-03-16T03:39:38.957Z SNYK-JAVA-LOG4J-2342646: - '*': reason: None Given - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z created: 2022-03-16T03:39:56.875Z SNYK-JAVA-LOG4J-2342647: - '*': reason: None Given - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z created: 2022-03-16T03:40:04.704Z SNYK-JAVA-ORGJETBRAINSKOTLIN-2393744: - '*': reason: None Given - expires: 2022-05-31T00:00:00.000Z + expires: 2022-08-31T00:00:00.000Z created: 2022-03-16T03:40:31.778Z patch: {} diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java index cf4dec6c..36f0b59c 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverter.java @@ -91,7 +91,7 @@ private String functionToStringForAvgRate( .getSeconds(); return String.format( - "(%s / %s)", columnName, (double) aggregateIntervalInSeconds / rateIntervalInSeconds); + "SUM(%s / %s)", columnName, (double) aggregateIntervalInSeconds / rateIntervalInSeconds); } private String convertCount() { diff --git a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java index 64fcfe9c..b5836bb7 100644 --- a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java +++ b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/QueryRequestToPinotSQLConverterTest.java @@ -914,7 +914,7 @@ public void testQueryWithAverageRateInOrderBy() { + "' " + "and ( start_time_millis >= 1637297304041 and start_time_millis < 1637300904041 and service_id != 'null' ) " + "group by service_id, service_name " - + "order by (error_count / 3600.0) " + + "order by SUM(error_count / 3600.0) " + "limit 10000", viewDefinition, executionContext); diff --git a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java index 571687d6..63896ad6 100644 --- a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java +++ b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/converters/PinotFunctionConverterTest.java @@ -211,7 +211,7 @@ void convertAvgRateFunction() { .thenReturn(Optional.of(Duration.ofSeconds(10))); assertEquals( - "(foo / 2.0)", + "SUM(foo / 2.0)", new PinotFunctionConverter() .convert( mockingExecutionContext, @@ -223,7 +223,7 @@ void convertAvgRateFunction() { .thenReturn(Optional.of(Duration.ofSeconds(20))); assertEquals( - "(foo / 4.0)", + "SUM(foo / 4.0)", new PinotFunctionConverter() .convert( mockingExecutionContext, From 9aaa0782b6680f2034395df47232d7174c1b71de Mon Sep 17 00:00:00 2001 From: kotharironak <53209990+kotharironak@users.noreply.github.com> Date: Tue, 28 Jun 2022 17:38:00 +0530 Subject: [PATCH 3/4] fix: the integration test with latest pinot image for group.id change (#146) * fix: the integration test with latest pinot image for group.id change * making it 5 mins for worst case * updated comments * reverted the test image --- .../service/htqueries/HTPinotQueriesTest.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java b/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java index da0d3148..7ef25e67 100644 --- a/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java +++ b/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java @@ -18,16 +18,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.file.DataFileReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; @@ -262,21 +267,30 @@ private static boolean generateData() throws Exception { "service-call-view-events", 27L, "span-event-view", 50L, "log-event-view", 0L); - int retry = 0; - while (!areMessagesConsumed(endOffSetMap) && retry++ < 5) { - Thread.sleep(2000); + int retry = 0, maxRetries = 50; + while (!areMessagesConsumed(endOffSetMap) && retry++ < maxRetries) { + Thread.sleep(6000); // max 5 min wait time } // stop this service viewGen.stop(); - return retry < 5; + return retry < maxRetries; } private static boolean areMessagesConsumed(Map endOffSetMap) throws Exception { - ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = - adminClient.listConsumerGroupOffsets(""); - Map offsetAndMetadataMap = - consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); + ListConsumerGroupsResult listConsumerGroups = adminClient.listConsumerGroups(); + List groupIds = listConsumerGroups.all().get().stream() + .filter(consumerGroupListing -> consumerGroupListing.isSimpleConsumerGroup()) + .map(consumerGroupListing -> consumerGroupListing.groupId()) + .collect(Collectors.toUnmodifiableList()); + + Map offsetAndMetadataMap = new HashMap<>(); + for(String groupId : groupIds) { + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId); + Map metadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); + metadataMap.forEach((k, v) -> offsetAndMetadataMap.putIfAbsent(k, v)); + } + if (offsetAndMetadataMap.size() < 6) { return false; } From 729ec2f6bda17d167f6ddcde6e43dc99a7777f6d Mon Sep 17 00:00:00 2001 From: Aaron Steinfeld Date: Tue, 28 Jun 2022 08:17:27 -0400 Subject: [PATCH 4/4] refactor: update a test config, remove unused const --- .../pinot/PinotBasedRequestHandler.java | 1 - .../src/test/resources/application.conf | 1 + .../service/htqueries/HTPinotQueriesTest.java | 20 +++++++++---------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java index 7c936860..e51b3f3c 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java @@ -56,7 +56,6 @@ public class PinotBasedRequestHandler implements RequestHandler { private static final String TENANT_COLUMN_NAME_CONFIG_KEY = "tenantColumnName"; private static final String START_TIME_ATTRIBUTE_NAME_CONFIG_KEY = "startTimeAttributeName"; private static final String SLOW_QUERY_THRESHOLD_MS_CONFIG = "slowQueryThresholdMs"; - private static final String PERCENTILE_AGGREGATION_FUNCTION_CONFIG = "percentileAggFunction"; private static final int DEFAULT_SLOW_QUERY_THRESHOLD_MS = 3000; private static final Set GTE_OPERATORS = Set.of(Operator.GE, Operator.GT, Operator.EQ); diff --git a/query-service-impl/src/test/resources/application.conf b/query-service-impl/src/test/resources/application.conf index cf2ee919..62f7fc97 100644 --- a/query-service-impl/src/test/resources/application.conf +++ b/query-service-impl/src/test/resources/application.conf @@ -62,6 +62,7 @@ service.config = { tenantColumnName = tenant_id slowQueryThresholdMs = 100 percentileAggFunction = "PERCENTILETDIGEST" + distinctCountAggFunction = "DISTINCTCOUNTHLL" viewDefinition = { viewName = spanEventView mapFields = ["tags"] diff --git a/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java b/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java index 7ef25e67..45d51f99 100644 --- a/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java +++ b/query-service/src/integrationTest/java/org/hypertrace/core/query/service/htqueries/HTPinotQueriesTest.java @@ -23,14 +23,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.file.DataFileReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListConsumerGroupsResult; import org.apache.kafka.clients.admin.NewTopic; @@ -279,15 +276,18 @@ private static boolean generateData() throws Exception { private static boolean areMessagesConsumed(Map endOffSetMap) throws Exception { ListConsumerGroupsResult listConsumerGroups = adminClient.listConsumerGroups(); - List groupIds = listConsumerGroups.all().get().stream() - .filter(consumerGroupListing -> consumerGroupListing.isSimpleConsumerGroup()) - .map(consumerGroupListing -> consumerGroupListing.groupId()) - .collect(Collectors.toUnmodifiableList()); + List groupIds = + listConsumerGroups.all().get().stream() + .filter(consumerGroupListing -> consumerGroupListing.isSimpleConsumerGroup()) + .map(consumerGroupListing -> consumerGroupListing.groupId()) + .collect(Collectors.toUnmodifiableList()); Map offsetAndMetadataMap = new HashMap<>(); - for(String groupId : groupIds) { - ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId); - Map metadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); + for (String groupId : groupIds) { + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = + adminClient.listConsumerGroupOffsets(groupId); + Map metadataMap = + listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); metadataMap.forEach((k, v) -> offsetAndMetadataMap.putIfAbsent(k, v)); }