Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions .snyk
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,42 @@ ignore:
SNYK-JAVA-LOG4J-572732:
- '*':
reason: no available replacement
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
SNYK-JAVA-IONETTY-473694:
- '*':
reason: no available replacement
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
SNYK-JAVA-IONETTY-1042268:
- '*':
reason: No replacement available
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
SNYK-JAVA-LOG4J-1300176:
- '*':
reason: None Given
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
SNYK-JAVA-LOG4J-2316893:
- '*':
reason: None Given
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
created: 2021-12-14T18:05:26.628Z
SNYK-JAVA-LOG4J-2342645:
- '*':
reason: None Given
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
created: 2022-03-16T03:39:38.957Z
SNYK-JAVA-LOG4J-2342646:
- '*':
reason: None Given
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
created: 2022-03-16T03:39:56.875Z
SNYK-JAVA-LOG4J-2342647:
- '*':
reason: None Given
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
created: 2022-03-16T03:40:04.704Z
SNYK-JAVA-ORGJETBRAINSKOTLIN-2393744:
- '*':
reason: None Given
expires: 2022-05-31T00:00:00.000Z
expires: 2022-08-31T00:00:00.000Z
created: 2022-03-16T03:40:31.778Z
patch: {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.hypertrace.core.query.service.pinot;

import static org.hypertrace.core.query.service.ConfigUtils.optionallyGet;
import static org.hypertrace.core.query.service.QueryRequestUtil.getLogicalColumnName;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -43,6 +42,7 @@
import org.hypertrace.core.query.service.api.ValueType;
import org.hypertrace.core.query.service.pinot.PinotClientFactory.PinotClient;
import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverter;
import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverterConfig;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,7 +56,6 @@ public class PinotBasedRequestHandler implements RequestHandler {
private static final String TENANT_COLUMN_NAME_CONFIG_KEY = "tenantColumnName";
private static final String START_TIME_ATTRIBUTE_NAME_CONFIG_KEY = "startTimeAttributeName";
private static final String SLOW_QUERY_THRESHOLD_MS_CONFIG = "slowQueryThresholdMs";
private static final String PERCENTILE_AGGREGATION_FUNCTION_CONFIG = "percentileAggFunction";

private static final int DEFAULT_SLOW_QUERY_THRESHOLD_MS = 3000;
private static final Set<Operator> GTE_OPERATORS = Set.of(Operator.GE, Operator.GT, Operator.EQ);
Expand Down Expand Up @@ -132,19 +131,9 @@ private void processConfig(Config config) {
? Optional.of(config.getString(START_TIME_ATTRIBUTE_NAME_CONFIG_KEY))
: Optional.empty();

Optional<String> customPercentileFunction =
optionallyGet(() -> config.getString(PERCENTILE_AGGREGATION_FUNCTION_CONFIG));

customPercentileFunction.ifPresent(
function ->
LOG.info(
"Using {} function for percentile aggregations of handler: {}", function, name));
PinotFunctionConverter functionConverter =
customPercentileFunction
.map(PinotFunctionConverter::new)
.orElseGet(PinotFunctionConverter::new);
this.request2PinotSqlConverter =
new QueryRequestToPinotSQLConverter(viewDefinition, functionConverter);
new QueryRequestToPinotSQLConverter(
viewDefinition, new PinotFunctionConverter(new PinotFunctionConverterConfig(config)));

if (config.hasPath(SLOW_QUERY_THRESHOLD_MS_CONFIG)) {
this.slowQueryThreshold = config.getInt(SLOW_QUERY_THRESHOLD_MS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_AVGRATE;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_CONCAT;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_COUNT;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_DISTINCTCOUNT;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_PERCENTILE;

import java.time.Duration;
Expand All @@ -18,28 +19,17 @@
import org.hypertrace.core.query.service.api.ValueType;

public class PinotFunctionConverter {
/**
* Computing PERCENTILE in Pinot is resource intensive. T-Digest calculation is much faster and
* reasonably accurate, hence use that as the default.
*
* <p>AVGRATE not supported directly in Pinot. So AVG_RATE is computed by summing over all values
* and then dividing by a constant.
*/
private static final String DEFAULT_PERCENTILE_AGGREGATION_FUNCTION = "PERCENTILETDIGEST";

private static final String PINOT_CONCAT_FUNCTION = "CONCATSKIPNULL";
private static final String DEFAULT_AVG_RATE_SIZE = "PT1S";

private final String percentileAggFunction;
private static final String DEFAULT_AVG_RATE_SIZE = "PT1S";
private final PinotFunctionConverterConfig config;

public PinotFunctionConverter(String configuredPercentileFunction) {
this.percentileAggFunction =
Optional.ofNullable(configuredPercentileFunction)
.orElse(DEFAULT_PERCENTILE_AGGREGATION_FUNCTION);
public PinotFunctionConverter(PinotFunctionConverterConfig config) {
this.config = config;
}

public PinotFunctionConverter() {
this.percentileAggFunction = DEFAULT_PERCENTILE_AGGREGATION_FUNCTION;
this(new PinotFunctionConverterConfig());
}

public String convert(
Expand All @@ -50,10 +40,16 @@ public String convert(
case QUERY_FUNCTION_COUNT:
return this.convertCount();
case QUERY_FUNCTION_PERCENTILE:
// Computing PERCENTILE in Pinot is resource intensive. T-Digest calculation is much faster
// and reasonably accurate, so support selecting the implementation to use
return this.functionToString(this.toPinotPercentile(function), argumentConverter);
case QUERY_FUNCTION_DISTINCTCOUNT:
return this.functionToString(this.toPinotDistinctCount(function), argumentConverter);
case QUERY_FUNCTION_CONCAT:
return this.functionToString(this.toPinotConcat(function), argumentConverter);
case QUERY_FUNCTION_AVGRATE:
// AVGRATE not supported directly in Pinot. So AVG_RATE is computed by summing over all
// values and then dividing by a constant.
return this.functionToStringForAvgRate(function, argumentConverter, executionContext);
default:
// TODO remove once pinot-specific logic removed from gateway - this normalization reverts
Expand All @@ -70,7 +66,7 @@ private String functionToString(
Function function, java.util.function.Function<Expression, String> argumentConverter) {
String argumentString =
function.getArgumentsList().stream()
.map(argumentConverter::apply)
.map(argumentConverter)
.collect(Collectors.joining(","));

return function.getFunctionName() + "(" + argumentString + ")";
Expand All @@ -88,15 +84,14 @@ private String functionToStringForAvgRate(
: DEFAULT_AVG_RATE_SIZE;
long rateIntervalInSeconds = isoDurationToSeconds(rateIntervalInIso);
long aggregateIntervalInSeconds =
(executionContext
.getTimeSeriesPeriod()
.or(executionContext::getTimeRangeDuration)
.orElseThrow())
executionContext
.getTimeSeriesPeriod()
.or(executionContext::getTimeRangeDuration)
.orElseThrow()
.getSeconds();

return String.format(
"SUM(DIV(%s, %s))",
columnName, (double) aggregateIntervalInSeconds / rateIntervalInSeconds);
"SUM(%s / %s)", columnName, (double) aggregateIntervalInSeconds / rateIntervalInSeconds);
}

private String convertCount() {
Expand All @@ -114,7 +109,7 @@ private Function toPinotPercentile(Function function) {
QUERY_FUNCTION_PERCENTILE, function.getArguments(0))));
return Function.newBuilder(function)
.removeArguments(0)
.setFunctionName(this.percentileAggFunction + percentileValue)
.setFunctionName(this.config.getPercentileAggregationFunction() + percentileValue)
.build();
}

Expand All @@ -124,6 +119,12 @@ private Function toPinotConcat(Function function) {
return Function.newBuilder(function).setFunctionName(PINOT_CONCAT_FUNCTION).build();
}

private Function toPinotDistinctCount(Function function) {
return Function.newBuilder(function)
.setFunctionName(this.config.getDistinctCountAggregationFunction())
.build();
}

private boolean isHardcodedPercentile(Function function) {
String functionName = function.getFunctionName().toUpperCase();
return functionName.startsWith(QUERY_FUNCTION_PERCENTILE)
Expand Down Expand Up @@ -178,8 +179,7 @@ Optional<Integer> intFromValue(Value value) {

private static long isoDurationToSeconds(String duration) {
try {
Duration d = java.time.Duration.parse(duration);
return d.get(ChronoUnit.SECONDS);
return Duration.parse(duration).get(ChronoUnit.SECONDS);
} catch (DateTimeParseException ex) {
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.hypertrace.core.query.service.pinot.converters;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.AllArgsConstructor;
import lombok.Value;

@Value
@AllArgsConstructor
public class PinotFunctionConverterConfig {

private static final String PERCENTILE_AGGREGATION_FUNCTION_CONFIG = "percentileAggFunction";
private static final String DISTINCT_COUNT_AGGREGATION_FUNCTION_CONFIG =
"distinctCountAggFunction";
private static final String DEFAULT_PERCENTILE_AGGREGATION_FUNCTION = "PERCENTILETDIGEST";
private static final String DEFAULT_DISTINCT_COUNT_AGGREGATION_FUNCTION = "DISTINCTCOUNT";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make DISTINCT_COUNT_HLL as the default.

Copy link
Copy Markdown
Contributor Author

@aaron-steinfeld aaron-steinfeld Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it this way for backwards compatibility. Without any custom config it'll retain identical behavior as before, but it's easily changed.


String percentileAggregationFunction;
String distinctCountAggregationFunction;

public PinotFunctionConverterConfig(Config config) {
if (config.hasPath(PERCENTILE_AGGREGATION_FUNCTION_CONFIG)) {
this.percentileAggregationFunction = config.getString(PERCENTILE_AGGREGATION_FUNCTION_CONFIG);
} else {
this.percentileAggregationFunction = DEFAULT_PERCENTILE_AGGREGATION_FUNCTION;
}
if (config.hasPath(DISTINCT_COUNT_AGGREGATION_FUNCTION_CONFIG)) {
this.distinctCountAggregationFunction =
config.getString(DISTINCT_COUNT_AGGREGATION_FUNCTION_CONFIG);
} else {
this.distinctCountAggregationFunction = DEFAULT_DISTINCT_COUNT_AGGREGATION_FUNCTION;
}
}

public PinotFunctionConverterConfig() {
this(ConfigFactory.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ public void testQueryWithAverageRateInOrderBy() {
+ "' "
+ "and ( start_time_millis >= 1637297304041 and start_time_millis < 1637300904041 and service_id != 'null' ) "
+ "group by service_id, service_name "
+ "order by sum(div(error_count, 3600.0)) "
+ "order by SUM(error_count / 3600.0) "
+ "limit 10000",
viewDefinition,
executionContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void acceptsCustomPercentileFunctions() {

assertEquals(
expected,
new PinotFunctionConverter("CUSTOMPERCENTILE")
new PinotFunctionConverter(new PinotFunctionConverterConfig("CUSTOMPERCENTILE", null))
.convert(mockingExecutionContext, percentileFunction, this.mockArgumentConverter));
}

Expand Down Expand Up @@ -211,7 +211,7 @@ void convertAvgRateFunction() {
.thenReturn(Optional.of(Duration.ofSeconds(10)));

assertEquals(
"SUM(DIV(foo, 2.0))",
"SUM(foo / 2.0)",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: Do we have any value level test that compares prior (SUM(DIV(foo, 2.0)) vs SUM(foo / 2.0, resulting in the same values?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be testing pinot's functionality - that DIV and / both actually... divide. Not sure it's worth writing a test, but we can verify by hand real quick.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select sum(num_calls), sum(num_calls / 60.0), sum(div(num_calls, 60.0)) ...

image

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, tried the same thing too.
org-query:
org-query

modified query:
modified-query

new PinotFunctionConverter()
.convert(
mockingExecutionContext,
Expand All @@ -223,14 +223,37 @@ void convertAvgRateFunction() {
.thenReturn(Optional.of(Duration.ofSeconds(20)));

assertEquals(
"SUM(DIV(foo, 4.0))",
"SUM(foo / 4.0)",
new PinotFunctionConverter()
.convert(
mockingExecutionContext,
buildFunction(QUERY_FUNCTION_AVGRATE, column1.toBuilder(), column2.toBuilder()),
this.mockArgumentConverter));
}

@Test
void convertsDistinctCountFunction() {
Expression column = createColumnExpression("foo").build();

when(this.mockArgumentConverter.apply(column)).thenReturn("foo");

assertEquals(
"DISTINCTCOUNT(foo)",
new PinotFunctionConverter()
.convert(
mockingExecutionContext,
buildFunction(QUERY_FUNCTION_DISTINCTCOUNT, column.toBuilder()),
this.mockArgumentConverter));

assertEquals(
"CUSTOM_DC(foo)",
new PinotFunctionConverter(new PinotFunctionConverterConfig(null, "CUSTOM_DC"))
.convert(
mockingExecutionContext,
buildFunction(QUERY_FUNCTION_DISTINCTCOUNT, column.toBuilder()),
this.mockArgumentConverter));
}

@Test
void testIllegalDurationFormat() {
Expression column1 = createColumnExpression("foo").build();
Expand Down
1 change: 1 addition & 0 deletions query-service-impl/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ service.config = {
tenantColumnName = tenant_id
slowQueryThresholdMs = 100
percentileAggFunction = "PERCENTILETDIGEST"
distinctCountAggFunction = "DISTINCTCOUNTHLL"
viewDefinition = {
viewName = spanEventView
mapFields = ["tags"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -262,21 +264,33 @@ private static boolean generateData() throws Exception {
"service-call-view-events", 27L,
"span-event-view", 50L,
"log-event-view", 0L);
int retry = 0;
while (!areMessagesConsumed(endOffSetMap) && retry++ < 5) {
Thread.sleep(2000);
int retry = 0, maxRetries = 50;
while (!areMessagesConsumed(endOffSetMap) && retry++ < maxRetries) {
Thread.sleep(6000); // max 5 min wait time
}
// stop this service
viewGen.stop();

return retry < 5;
return retry < maxRetries;
}

private static boolean areMessagesConsumed(Map<String, Long> endOffSetMap) throws Exception {
ListConsumerGroupOffsetsResult consumerGroupOffsetsResult =
adminClient.listConsumerGroupOffsets("");
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap =
consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
ListConsumerGroupsResult listConsumerGroups = adminClient.listConsumerGroups();
List<String> groupIds =
listConsumerGroups.all().get().stream()
.filter(consumerGroupListing -> consumerGroupListing.isSimpleConsumerGroup())
.map(consumerGroupListing -> consumerGroupListing.groupId())
.collect(Collectors.toUnmodifiableList());

Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
for (String groupId : groupIds) {
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult =
adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> metadataMap =
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
metadataMap.forEach((k, v) -> offsetAndMetadataMap.putIfAbsent(k, v));
}

if (offsetAndMetadataMap.size() < 6) {
return false;
}
Expand Down