Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public AggregatorsModule()
{
super("AggregatorFactories");

registerComplexMetricsAndSerde();

setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);

addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE);
}

public static void registerComplexMetricsAndSerde()
{
ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new HyperUniquesSerde());
ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new PreComputedHyperUniquesSerde());
ComplexMetrics.registerSerde(
Expand All @@ -102,11 +112,6 @@ public AggregatorsModule()
SerializablePairLongLongComplexMetricSerde.TYPE_NAME,
new SerializablePairLongLongComplexMetricSerde()
);

setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);

addSerializer(DoubleMeanHolder.class, DoubleMeanHolder.Serializer.INSTANCE);
}

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,36 @@ public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType q
*/
public abstract TypeReference<ResultType> getResultTypeReference();

/**
* Like {@link #getCacheStrategy(Query, ObjectMapper)} but the caller doesn't supply the object mapper for deserializing
* and converting the cached data to desired type. It's upto the individual implementations to decide the appropriate action in that case.
* It can either throw an exception outright or decide if the query requires the object mapper for proper downstream processing and
* work with the generic java types if not.
* <p>
* @deprecated Use {@link #getCacheStrategy(Query, ObjectMapper)} instead
*/
@Deprecated
@Nullable
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
{
return null;
}

/**
* Returns a CacheStrategy to be used to load data into the cache and remove it from the cache.
* <p>
* This is optional. If it returns null, caching is effectively disabled for the query.
*
* @param query The query whose results might be cached
* @param mapper Object mapper to convert the deserialized generic java objects to desired types. It can be nullable
* to preserve backward compatibility.
* @param <T> The type of object that will be stored in the cache
* @return A CacheStrategy that can be used to populate and read from the Cache
*/
@Nullable
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query, @Nullable ObjectMapper mapper)
{
return null;
return getCacheStrategy(query);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [QueryToolChest.getCacheStrategy](1) should be avoided because it has been deprecated.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant, the deprecated method is used to maintain backward compatibility.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query.datasourcemetadata;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.inject.Inject;
Expand All @@ -38,6 +39,7 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.timeline.LogicalSegment;

import javax.annotation.Nullable;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -119,4 +121,10 @@ public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query)
{
return null;
}

@Override
public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query, @Nullable ObjectMapper mapper)
{
return null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if this is returning null is the override really needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added for clarity, will remove it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah maybe is safer to leave since default is to not cache I guess? I don't feel super strongly either way

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.nested.StructuredData;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -472,7 +474,7 @@ public void serialize(
// Deserializer that can deserialize either array- or map-based rows.
final JsonDeserializer<ResultRow> deserializer = new JsonDeserializer<ResultRow>()
{
final Class<?>[] dimensionClasses = createDimensionClasses();
final Class<?>[] dimensionClasses = createDimensionClasses(query);
boolean containsComplexDimensions = query.getDimensions()
.stream()
.anyMatch(
Expand Down Expand Up @@ -525,30 +527,6 @@ public ResultRow deserialize(final JsonParser jp, final DeserializationContext c
return ResultRow.of(objectArray);
}
}

private Class<?>[] createDimensionClasses()
{
final List<DimensionSpec> queryDimensions = query.getDimensions();
final Class<?>[] classes = new Class[queryDimensions.size()];
for (int i = 0; i < queryDimensions.size(); ++i) {
final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType();
if (dimensionOutputType.is(ValueType.COMPLEX)) {
NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy();
if (!nullableTypeStrategy.groupable()) {
throw DruidException.defensive(
"Ungroupable dimension [%s] with type [%s] found in the query.",
queryDimensions.get(i).getDimension(),
dimensionOutputType
);
}
classes[i] = nullableTypeStrategy.getClazz();
} else {
classes[i] = Object.class;
}
}
return classes;
}

};

class GroupByResultRowModule extends SimpleModule
Expand Down Expand Up @@ -598,9 +576,32 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
);
}

@Nullable
@Override
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(final GroupByQuery query)
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(GroupByQuery query)
{
return getCacheStrategy(query, null);
}

@Override
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(
final GroupByQuery query,
@Nullable final ObjectMapper mapper
)
{

for (DimensionSpec dimension : query.getDimensions()) {
if (dimension.getOutputType().is(ValueType.COMPLEX) && !dimension.getOutputType().equals(ColumnType.NESTED_DATA)) {
if (mapper == null) {
throw DruidException.defensive(
"Cannot deserialize complex dimension of type[%s] from result cache if object mapper is not provided",
dimension.getOutputType().getComplexTypeName()
);
}
}
}
final Class<?>[] dimensionClasses = createDimensionClasses(query);

return new CacheStrategy<ResultRow, Object, GroupByQuery>()
{
private static final byte CACHE_STRATEGY_VERSION = 0x1;
Expand Down Expand Up @@ -727,13 +728,29 @@ public ResultRow apply(Object input)
int dimPos = 0;
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec dimensionSpec = dimsIter.next();

// Must convert generic Jackson-deserialized type into the proper type.
resultRow.set(
dimensionStart + dimPos,
DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType())
);

final Object dimensionObject = results.next();
final Object dimensionObjectCasted;

final ColumnType outputType = dimensionSpec.getOutputType();

// Must convert generic Jackson-deserialized type into the proper type. The downstream functions expect the
// dimensions to be of appropriate types for further processing like merging and comparing.
if (outputType.is(ValueType.COMPLEX)) {
// Json columns can interpret generic data objects appropriately, hence they are wrapped as is in StructuredData.
// They don't need to converted them from Object.class to StructuredData.class using object mapper as that is an
// expensive operation that will be wasteful.
if (outputType.equals(ColumnType.NESTED_DATA)) {
dimensionObjectCasted = StructuredData.wrap(dimensionObject);
} else {
dimensionObjectCasted = mapper.convertValue(dimensionObject, dimensionClasses[dimPos]);
}
} else {
dimensionObjectCasted = DimensionHandlerUtils.convertObjectToType(
dimensionObject,
dimensionSpec.getOutputType()
);
}
resultRow.set(dimensionStart + dimPos, dimensionObjectCasted);
dimPos++;
}

Expand Down Expand Up @@ -861,4 +878,27 @@ private static BitSet extractionsToRewrite(GroupByQuery query)

return retVal;
}

private static Class<?>[] createDimensionClasses(final GroupByQuery query)
{
final List<DimensionSpec> queryDimensions = query.getDimensions();
final Class<?>[] classes = new Class[queryDimensions.size()];
for (int i = 0; i < queryDimensions.size(); ++i) {
final ColumnType dimensionOutputType = queryDimensions.get(i).getOutputType();
if (dimensionOutputType.is(ValueType.COMPLEX)) {
NullableTypeStrategy nullableTypeStrategy = dimensionOutputType.getNullableStrategy();
if (!nullableTypeStrategy.groupable()) {
throw DruidException.defensive(
"Ungroupable dimension [%s] with type [%s] found in the query.",
queryDimensions.get(i).getDimension(),
dimensionOutputType
);
}
classes[i] = nullableTypeStrategy.getClazz();
} else {
classes[i] = Object.class;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to use the real class here?

}
}
return classes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.query.groupby.epinephelinae;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -232,6 +233,16 @@ interface KeySerde<T>
*/
BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] aggregatorFactories, int[] aggregatorOffsets);

/**
* Decorates the object mapper enabling it to read and write query results' grouping keys. It is used by the
* {@link SpillingGrouper} to preserve the types of the dimensions after serializing and deserializing them on the
* spilled files.
*/
default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
{
return spillMapper;
}

/**
* Reset the keySerde to its initial state. After this method is called, {@link #readFromByteBuffer}
* and {@link #bufferComparator()} may no longer work properly on previously-serialized keys.
Expand Down
Loading