Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -48,24 +49,24 @@ public class ExecutionContext {
private final LinkedHashSet<Expression> allSelections;
private final Optional<Duration> timeSeriesPeriod;
private final Filter queryRequestFilter;
private final Supplier<Optional<Duration>> timeRangeDurationSupplier;
private final Supplier<Optional<QueryTimeRange>> queryTimeRangeSupplier;

public ExecutionContext(String tenantId, QueryRequest request) {
this.tenantId = tenantId;
this.selectedColumns = new LinkedHashSet<>();
this.allSelections = new LinkedHashSet<>();
this.timeSeriesPeriod = calculateTimeSeriesPeriod(request);
this.queryRequestFilter = request.getFilter();
timeRangeDurationSupplier =
queryTimeRangeSupplier =
Suppliers.memoize(
() -> findTimeRangeDuration(this.queryRequestFilter, this.timeFilterColumn));
() -> buildQueryTimeRange(this.queryRequestFilter, this.timeFilterColumn));
analyze(request);
}

private Optional<Duration> calculateTimeSeriesPeriod(QueryRequest request) {
if (request.getGroupByCount() > 0) {
for (Expression expression : request.getGroupByList()) {
if (isDateTimeFunction(expression)) {
if (QueryRequestUtil.isDateTimeFunction(expression)) {
String timeSeriesPeriod =
expression
.getFunction()
Expand Down Expand Up @@ -192,7 +193,7 @@ private void extractColumns(List<String> columns, Expression expression) {
}
}

private Optional<Duration> findTimeRangeDuration(Filter filter, String timeFilterColumn) {
private Optional<QueryTimeRange> buildQueryTimeRange(Filter filter, String timeFilterColumn) {

// time filter will always be present with AND operator
if (filter.getOperator() != Operator.AND) {
Expand All @@ -218,11 +219,15 @@ private Optional<Duration> findTimeRangeDuration(Filter filter, String timeFilte
.findFirst();

if (timeRangeStart.isPresent() && timeRangeEnd.isPresent()) {
return Optional.of(Duration.ofMillis(timeRangeEnd.get() - timeRangeStart.get()));
return Optional.of(
new QueryTimeRange(
Instant.ofEpochMilli(timeRangeStart.get()),
Instant.ofEpochMilli(timeRangeEnd.get()),
Duration.ofMillis(timeRangeEnd.get() - timeRangeStart.get())));
}

return filter.getChildFilterList().stream()
.map(childFilter -> this.findTimeRangeDuration(childFilter, timeFilterColumn))
.map(childFilter -> this.buildQueryTimeRange(childFilter, timeFilterColumn))
.flatMap(Optional::stream)
.findFirst();
}
Expand All @@ -240,11 +245,6 @@ private Duration parseDuration(String timeSeriesPeriod) {
return Duration.of(amount, unit);
}

private boolean isDateTimeFunction(Expression expression) {
return expression.getValueCase() == ValueCase.FUNCTION
&& expression.getFunction().getFunctionName().equals("dateTimeConvert");
}

public void setTimeFilterColumn(String timeFilterColumn) {
this.timeFilterColumn = timeFilterColumn;
}
Expand Down Expand Up @@ -274,6 +274,14 @@ public Optional<Duration> getTimeSeriesPeriod() {
}

public Optional<Duration> getTimeRangeDuration() {
return timeRangeDurationSupplier.get();
return queryTimeRangeSupplier.get().map(QueryTimeRange::getDuration);
}

public Optional<QueryTimeRange> getQueryTimeRange() {
return queryTimeRangeSupplier.get();
}

public String getTimeFilterColumn() {
return timeFilterColumn;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.hypertrace.core.query.service;

import static org.hypertrace.core.query.service.api.Expression.ValueCase.ATTRIBUTE_EXPRESSION;

import org.hypertrace.core.query.service.api.ColumnIdentifier;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.Expression.ValueCase;
import org.hypertrace.core.query.service.api.LiteralConstant;
import org.hypertrace.core.query.service.api.Value;
import org.hypertrace.core.query.service.api.ValueType;
Expand Down Expand Up @@ -68,4 +71,30 @@ public static Expression createNullNumberLiteralExpression() {
.setValue(Value.newBuilder().setValueType(ValueType.NULL_NUMBER)))
.build();
}

public static boolean isDateTimeFunction(Expression expression) {
return expression.getValueCase() == ValueCase.FUNCTION
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would have been niced to abstract this pinot detail (like we did with AVGRATE) before adding promql support

Copy link
Copy Markdown
Author

@findingrish findingrish Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate more (for AVGRATE, wasn't the work around adding support in qs)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the concept of AVGRATE, similar to grouping by time is an abstract one that we have exposed in our own query language. In the original implementation (for both), the translation from that abstract concept to a pinot specific implementation happened in gateway which was a leak of implementation details that should be part of query service's abstraction. AVGRATE since was pushed down, so now gateway just asks for avgrate and we translate that to it's pinot equivalent (and eventually, I assume, it's promQL equivalent). Similarly, dateTimeConvert should do the same.

This may be a fair bit of work, so it's up to you guys on the approach, but what we're forcing ourselves to do right now is translating it first from our own language to a pinot expression, then from that pinot expression back to an abstraction.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #112

&& expression.getFunction().getFunctionName().equals("dateTimeConvert");
}

public static boolean isComplexAttribute(Expression expression) {
return expression.getValueCase().equals(ATTRIBUTE_EXPRESSION)
&& expression.getAttributeExpression().hasSubpath();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this also ?

  private boolean isMapField(String columnName) {
    return viewDefinition.getColumnType(columnName) == ValueType.STRING_MAP;
  }

Copy link
Copy Markdown
Author

@findingrish findingrish Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't get your point, this method is not part of the change?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggesting checking the complex attribute by checking from viewDefinition. Or this can be done while pre-processing ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd strongly recommend landing the attribute expression stuff before this - two concurrent major changes is asking for trouble.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change isn't gonna be deployed until we add the prometheus view definition in the main config, so any changes related to attributeExpression can be taken up as followup?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggesting checking the complex attribute by checking from viewDefinition. Or this can be done while pre-processing ?

The viewDefinition for prometheus is different than pinot, there is no concept of map columns here, so probably any query on map columns will not be eligible for prometheus handler (mapping column in viewDef will be missing)

}

public static boolean isSimpleColumnExpression(Expression expression) {
return expression.getValueCase() == ValueCase.COLUMNIDENTIFIER
|| (expression.getValueCase() == ATTRIBUTE_EXPRESSION && !isComplexAttribute(expression));
}

public static String getLogicalColumnNameForSimpleColumnExpression(Expression expression) {
if (!isSimpleColumnExpression(expression)) {
throw new RuntimeException("Expecting expression of type COLUMN or ATTRIBUTE");
}
if (expression.getValueCase() == ValueCase.COLUMNIDENTIFIER) {
return expression.getColumnIdentifier().getColumnName();
} else {
return expression.getAttributeExpression().getAttributeId();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.hypertrace.core.query.service.api.QueryServiceGrpc.QueryServiceImplBase;
import org.hypertrace.core.query.service.pinot.PinotModule;
import org.hypertrace.core.query.service.projection.ProjectionModule;
import org.hypertrace.core.query.service.prometheus.PrometheusModule;
import org.hypertrace.core.serviceframework.spi.PlatformServiceLifecycle;

class QueryServiceModule extends AbstractModule {
Expand All @@ -31,5 +32,6 @@ protected void configure() {
.in(Singleton.class);
install(new PinotModule());
install(new ProjectionModule());
install(new PrometheusModule());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.hypertrace.core.query.service;

import java.time.Duration;
import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Value;

@Value
@AllArgsConstructor
public class QueryTimeRange {
Comment thread
findingrish marked this conversation as resolved.
Instant startTime;
Instant endTime;
Duration duration;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.hypertrace.core.query.service.pinot;

import static org.hypertrace.core.query.service.QueryRequestUtil.isComplexAttribute;
import static org.hypertrace.core.query.service.api.Expression.ValueCase.ATTRIBUTE_EXPRESSION;
import static org.hypertrace.core.query.service.api.Expression.ValueCase.COLUMNIDENTIFIER;
import static org.hypertrace.core.query.service.api.Expression.ValueCase.LITERAL;
Expand Down Expand Up @@ -365,11 +366,6 @@ private LiteralConstant[] convertExpressionToMapLiterals(Expression expression)
return literals;
}

private boolean isComplexAttribute(Expression expression) {
return expression.getValueCase().equals(ATTRIBUTE_EXPRESSION)
&& expression.getAttributeExpression().hasSubpath();
}

/** TODO:Handle all types */
private String convertLiteralToString(LiteralConstant literal, Params.Builder paramsBuilder) {
Value value = literal.getValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package org.hypertrace.core.query.service.prometheus;

import static org.hypertrace.core.query.service.QueryRequestUtil.getLogicalColumnNameForSimpleColumnExpression;

import com.google.protobuf.ByteString;
import java.util.List;
import java.util.function.Function;
import org.apache.commons.codec.binary.Hex;
import org.hypertrace.core.query.service.QueryRequestUtil;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.Filter;
import org.hypertrace.core.query.service.api.LiteralConstant;
import org.hypertrace.core.query.service.api.Operator;
import org.hypertrace.core.query.service.api.Value;

class FilterToPromqlConverter {

/** only `AND` operator in filter is allowed rhs of leaf filter should be literal */
void convertFilterToString(
Filter filter,
String timeFilterColumn,
Function<Expression, String> expressionToColumnConverter,
List<String> filterList) {
if (filter.getChildFilterCount() > 0) {
for (Filter childFilter : filter.getChildFilterList()) {
convertFilterToString(
childFilter, timeFilterColumn, expressionToColumnConverter, filterList);
}
} else {
if (QueryRequestUtil.isSimpleColumnExpression(filter.getLhs())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSimpleColumnExpression do we need to check again? Isn't this already check in selection process?

&& timeFilterColumn.equals(
getLogicalColumnNameForSimpleColumnExpression(filter.getLhs()))) {
return;
}
StringBuilder builder = new StringBuilder();
builder.append(expressionToColumnConverter.apply(filter.getLhs()));
builder.append(convertOperatorToString(filter.getOperator()));
builder.append(convertLiteralToString(filter.getRhs().getLiteral()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will the null value convertLiteralToString mean here? I haven't try such query? Instead of null, would it make to have "" filter or no filter at all?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing exception from convertLiteralToString

filterList.add(builder.toString());
}
}

private String convertOperatorToString(Operator operator) {
switch (operator) {
case IN:
case EQ:
return "=";
case NEQ:
return "!=";
case LIKE:
return "=~";
default:
throw new RuntimeException(
String.format("Equivalent %s operator not supported in promql", operator));
}
}

private String convertLiteralToString(LiteralConstant literal) {
Value value = literal.getValue();
switch (value.getValueType()) {
case STRING_ARRAY:
StringBuilder builder = new StringBuilder("\"");
for (String item : value.getStringArrayList()) {
if (builder.length() > 1) {
builder.append("|");
}
builder.append(item);
}
builder.append("\"");
return builder.toString();
case BYTES_ARRAY:
builder = new StringBuilder("\"");
for (ByteString item : value.getBytesArrayList()) {
if (builder.length() > 1) {
builder.append("|");
}
builder.append(Hex.encodeHexString(item.toByteArray()));
}
builder.append("\"");
return builder.toString();
case STRING:
return "\"" + value.getString() + "\"";
case LONG:
return "\"" + value.getLong() + "\"";
case INT:
return "\"" + value.getInt() + "\"";
case FLOAT:
return "\"" + value.getFloat() + "\"";
case DOUBLE:
return "\"" + value.getDouble() + "\"";
case BYTES:
return "\"" + Hex.encodeHexString(value.getBytes().toByteArray()) + "\"";
case BOOL:
return "\"" + value.getBoolean() + "\"";
case TIMESTAMP:
return "\"" + value.getTimestamp() + "\"";
case NULL_NUMBER:
return "0";
case NULL_STRING:
return "null";
case LONG_ARRAY:
case INT_ARRAY:
case FLOAT_ARRAY:
case DOUBLE_ARRAY:
case BOOLEAN_ARRAY:
case UNRECOGNIZED:
default:
throw new RuntimeException(
String.format("Literal type %s not supported", value.getValueType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.hypertrace.core.query.service.prometheus;

import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import io.reactivex.rxjava3.core.Observable;
import java.util.Optional;
import org.hypertrace.core.query.service.ExecutionContext;
import org.hypertrace.core.query.service.QueryCost;
import org.hypertrace.core.query.service.RequestHandler;
import org.hypertrace.core.query.service.api.QueryRequest;
import org.hypertrace.core.query.service.api.Row;

public class PrometheusBasedRequestHandler implements RequestHandler {

private static final String VIEW_DEFINITION_CONFIG_KEY = "prometheusViewDefinition";
private static final String TENANT_ATTRIBUTE_NAME_CONFIG_KEY = "tenantAttributeName";
private static final String START_TIME_ATTRIBUTE_NAME_CONFIG_KEY = "startTimeAttributeName";

private final QueryRequestEligibilityValidator queryRequestEligibilityValidator;
private final String name;
private PrometheusViewDefinition prometheusViewDefinition;
private Optional<String> startTimeAttributeName;
private QueryRequestToPromqlConverter requestToPromqlConverter;

PrometheusBasedRequestHandler(String name, Config config) {
this.name = name;
this.processConfig(config);
this.queryRequestEligibilityValidator =
new QueryRequestEligibilityValidator(prometheusViewDefinition);
}

@Override
public String getName() {
return name;
}

@Override
public Optional<String> getTimeFilterColumn() {
return this.startTimeAttributeName;
}

private void processConfig(Config config) {

if (!config.hasPath(TENANT_ATTRIBUTE_NAME_CONFIG_KEY)) {
throw new RuntimeException(
TENANT_ATTRIBUTE_NAME_CONFIG_KEY
+ " is not defined in the "
+ name
+ " request handler.");
}

String tenantAttributeName = config.getString(TENANT_ATTRIBUTE_NAME_CONFIG_KEY);
this.prometheusViewDefinition =
PrometheusViewDefinition.parse(
config.getConfig(VIEW_DEFINITION_CONFIG_KEY), tenantAttributeName);

this.startTimeAttributeName =
config.hasPath(START_TIME_ATTRIBUTE_NAME_CONFIG_KEY)
? Optional.of(config.getString(START_TIME_ATTRIBUTE_NAME_CONFIG_KEY))
: Optional.empty();

this.requestToPromqlConverter = new QueryRequestToPromqlConverter(prometheusViewDefinition);
}

/**
* Returns a QueryCost that is an indication of whether the given query can be handled by this
* handler and if so, how costly is it to handle that query.
*/
@Override
public QueryCost canHandle(QueryRequest request, ExecutionContext executionContext) {
return queryRequestEligibilityValidator.calculateCost(request, executionContext);
}

@Override
public Observable<Row> handleRequest(
QueryRequest originalRequest, ExecutionContext executionContext) {

// Validate QueryContext and tenant id presence
Preconditions.checkNotNull(executionContext);
Preconditions.checkNotNull(executionContext.getTenantId());

// todo call convert and execute request using client here

return null;
}
}
Loading