Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ private static DataSchema makeDataSchemaForIngestion(
columnMappings,
isRollupQuery,
querySpec.getQuery(),
destination.getDimensionToSchemaMap()
destination.getDimensionSchemas()
);

return new DataSchema(
Expand Down Expand Up @@ -2127,11 +2127,11 @@ private static DimensionSchema getDimensionSchema(
final String outputColumnName,
@Nullable final ColumnType queryType,
QueryContext context,
@Nullable Map<String, DimensionSchema> dimensionToSchemaMap
@Nullable Map<String, DimensionSchema> dimensionSchemas
)
{
if (dimensionToSchemaMap != null && dimensionToSchemaMap.containsKey(outputColumnName)) {
return dimensionToSchemaMap.get(outputColumnName);
if (dimensionSchemas != null && dimensionSchemas.containsKey(outputColumnName)) {
return dimensionSchemas.get(outputColumnName);
}
// In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollup,
// we won't have an entry in the map. For those cases, use the default config.
Expand All @@ -2150,7 +2150,7 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
final ColumnMappings columnMappings,
final boolean isRollupQuery,
final Query<?> query,
@Nullable final Map<String, DimensionSchema> dimensionToSchemaMap
@Nullable final Map<String, DimensionSchema> dimensionSchemas
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to
Expand Down Expand Up @@ -2237,13 +2237,13 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
outputColumnName,
type,
query.context(),
dimensionToSchemaMap
dimensionSchemas
);
} else {
// complex columns only
if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
dimensions.add(
getDimensionSchema(outputColumnName, type, query.context(), dimensionToSchemaMap)
getDimensionSchema(outputColumnName, type, query.context(), dimensionSchemas)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
Expand All @@ -2255,7 +2255,7 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
outputColumnName,
type,
query.context(),
dimensionToSchemaMap
dimensionSchemas
);
}
}
Expand Down Expand Up @@ -2283,14 +2283,14 @@ private static void populateDimensionsAndAggregators(
String outputColumn,
ColumnType type,
QueryContext context,
Map<String, DimensionSchema> dimensionToSchemaMap
Map<String, DimensionSchema> dimensionSchemas
)
{
if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(
getDimensionSchema(outputColumn, type, context, dimensionToSchemaMap)
getDimensionSchema(outputColumn, type, context, dimensionSchemas)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ private Map<String, Object> createMSQTaskContext(CompactionTask compactionTask,
context.putIfAbsent(QueryContexts.FINALIZE_KEY, false);
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false);
context.putIfAbsent(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array");
// Always override CTX_ARRAY_INGEST_MODE since it can otherwise lead to mixed ARRAY and MVD types for a column.
context.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array");
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ public class DataSourceMSQDestination implements MSQDestination
private final List<Interval> replaceTimeChunks;

@Nullable
private final Map<String, DimensionSchema> dimensionToSchemaMap;
private final Map<String, DimensionSchema> dimensionSchemas;

@JsonCreator
public DataSourceMSQDestination(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("segmentSortOrder") @Nullable List<String> segmentSortOrder,
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks,
@JsonProperty("dimensionToSchemaMap") @Nullable Map<String, DimensionSchema> dimensionToSchemaMap
@JsonProperty("dimensionSchemas") @Nullable Map<String, DimensionSchema> dimensionSchemas
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity");
this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList();
this.replaceTimeChunks = replaceTimeChunks;
this.dimensionToSchemaMap = dimensionToSchemaMap;
this.dimensionSchemas = dimensionSchemas;

if (replaceTimeChunks != null) {
// Verify that if replaceTimeChunks is provided, it is nonempty.
Expand Down Expand Up @@ -138,9 +138,9 @@ public List<Interval> getReplaceTimeChunks()
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, DimensionSchema> getDimensionToSchemaMap()
public Map<String, DimensionSchema> getDimensionSchemas()
{
return dimensionToSchemaMap;
return dimensionSchemas;
}

/**
Expand Down Expand Up @@ -177,13 +177,13 @@ public boolean equals(Object o)
&& Objects.equals(segmentGranularity, that.segmentGranularity)
&& Objects.equals(segmentSortOrder, that.segmentSortOrder)
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
&& Objects.equals(dimensionToSchemaMap, that.dimensionToSchemaMap);
&& Objects.equals(dimensionSchemas, that.dimensionSchemas);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionToSchemaMap);
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionSchemas);
}

@Override
Expand All @@ -194,7 +194,7 @@ public String toString()
", segmentGranularity=" + segmentGranularity +
", segmentSortOrder=" + segmentSortOrder +
", replaceTimeChunks=" + replaceTimeChunks +
", dimensionToSchemaMap=" + dimensionToSchemaMap +
", dimensionSchemas=" + dimensionSchemas +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class DataSourceMSQDestinationTest
public void testEquals()
{
EqualsVerifier.forClass(DataSourceMSQDestination.class)
.withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder", "dimensionToSchemaMap")
.withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder", "dimensionSchemas")
.withPrefabValues(
Map.class,
ImmutableMap.of(
Expand Down