Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void setup() throws IOException
}
}

@Setup(Level.Iteration)
@Setup(Level.Invocation)
public void setup2() throws IOException
{
incIndex = makeIncIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ private List<String> readSegmentDim1(final SegmentDescriptor descriptor) throws
);
IndexIO indexIO = new TestUtils().getTestIndexIO();
QueryableIndex index = indexIO.loadIndex(outputLocation);
DictionaryEncodedColumn dim1 = index.getColumn("dim1").getDictionaryEncoding();
DictionaryEncodedColumn<String> dim1 = index.getColumn("dim1").getDictionaryEncoding();
List<String> values = Lists.newArrayList();
for (int i = 0; i < dim1.length(); i++) {
int id = dim1.getSingleValueRow(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import io.druid.segment.BaseProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
Expand Down Expand Up @@ -77,6 +79,7 @@
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -216,7 +219,8 @@ private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket,
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config,
Iterable<String> oldDimOrder
Iterable<String> oldDimOrder,
Map<String, ColumnCapabilitiesImpl> oldCapabilities
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
Expand All @@ -236,7 +240,7 @@ private static IncrementalIndex makeIncrementalIndex(
);

if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
newIndex.loadDimensionIterable(oldDimOrder);
newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities);
}

return newIndex;
Expand Down Expand Up @@ -340,7 +344,7 @@ protected void reduce(
LinkedHashSet<String> dimOrder = Sets.newLinkedHashSet();
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null);
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));

while (iter.hasNext()) {
Expand All @@ -351,7 +355,7 @@ protected void reduce(
dimOrder.addAll(index.getDimensionOrder());
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnCapabilities());
}

index.add(value);
Expand Down Expand Up @@ -557,6 +561,7 @@ protected void reduce(
bucket,
combiningAggs,
config,
null,
null
);
try {
Expand Down Expand Up @@ -654,7 +659,8 @@ public void doRun()
bucket,
combiningAggs,
config,
allDimensionNames
allDimensionNames,
persistIndex.getColumnCapabilities()
);
startTime = System.currentTimeMillis();
++indexCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public InputRow get()
IncrementalIndex.makeColumnSelectorFactory(
aggFactory,
supplier,
true
true,
null
)
);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Indexed<String> getDimensionValues(String dimension)
if (columnDesc == null || !columnDesc.getCapabilities().isDictionaryEncoded()) {
return null;
}
final DictionaryEncodedColumn column = columnDesc.getDictionaryEncoding();
final DictionaryEncodedColumn<String> column = columnDesc.getDictionaryEncoding();
return new Indexed<String>()
{
@Override
Expand Down
211 changes: 211 additions & 0 deletions processing/src/main/java/io/druid/segment/DimensionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.segment;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.metamx.common.io.smoosh.FileSmoosher;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;

import java.io.Closeable;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

/**
* Processing related interface
*
* A DimensionHandler is an object that encapsulates indexing, column merging/building, and querying operations
* for a given dimension type (e.g., dict-encoded String, Long).
*
* These operations are handled by sub-objects created through a DimensionHandler's methods:
* DimensionIndexer, DimensionMerger, and DimensionColumnReader, respectively.
*
* Each DimensionHandler object is associated with a single dimension.
*
* This interface allows type-specific behavior column logic, such as choice of indexing structures and disk formats.
* to be contained within a type-specific set of handler objects, simplifying processing classes
* such as IncrementalIndex and IndexMerger and allowing for abstracted development of additional dimension types.
*
* A dimension may have two representations, an encoded representation and a actual representation.
* For example, a value for a String dimension has an integer dictionary encoding, and an actual String representation.
*
* A DimensionHandler is a stateless object, and thus thread-safe; its methods should be pure functions.
*
* The EncodedType and ActualType are Comparable because columns used as dimensions must have sortable values.
*
* @param <EncodedType> class of the encoded values
* @param <ActualType> class of the actual values
*/
public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, EncodedTypeArray, ActualType extends Comparable<ActualType>>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its later in the docs, but I don't see anything here about why the types need to be Comparable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on that requirement a bit?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put the Comparable requirement there since a column used as a dimension needs to be sortable, I can elaborate on that in the docs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly I'm curious because you can either have items which are by their nature Comparable, or have some other item in the chain take a Comparator as an argument. And it is not clear that one is preferable over the other.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm in favor of having the values being Comparable inherently, since the basic types (strings and numbers) are Comparable, and for a potential complex type? an implementer would have to implement comparator logic anyway for the column type to be usable as a dimension, so might as well have it as part of the type itself.

Do you foresee a use case where requiring a Comparable type would be a hindrance?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comparison logic should all be contained within the type-specific Dimension* implementations though, so removing the Comparable requirement shouldn't be a problem either.

{
/**
* Get the name of the column associated with this handler.
*
* This string would be the output name of the column during ingestion, and the name of an input column when querying.
*
* @return Dimension name
*/
public String getDimensionName();


/**
* Creates a new DimensionIndexer, a per-dimension object responsible for processing ingested rows in-memory, used by the
* IncrementalIndex. See {@link DimensionIndexer} interface for more information.
*
* @return A new DimensionIndexer object.
*/
public DimensionIndexer<EncodedType, EncodedTypeArray, ActualType> makeIndexer();


/**
* Creates a new DimensionMergerV9, a per-dimension object responsible for merging indexes/row data across segments
* and building the on-disk representation of a dimension. For use with IndexMergerV9 only.
*
* See {@link DimensionMergerV9} interface for more information.
*
* @param indexSpec Specification object for the index merge
* @param outDir Location to store files generated by the merging process
* @param ioPeon ioPeon object passed in by IndexMerger, manages files created by the merging process
* @param capabilities The ColumnCapabilities of the dimension represented by this DimensionHandler
* @param progress ProgressIndicator used by the merging process

* @return A new DimensionMergerV9 object.
*/
public DimensionMergerV9<EncodedTypeArray> makeMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
);


/**
* Creates a new DimensionMergerLegacy, a per-dimension object responsible for merging indexes/row data across segments
* and building the on-disk representation of a dimension. For use with IndexMerger only.
*
* See {@link DimensionMergerLegacy} interface for more information.
*
* @param indexSpec Specification object for the index merge
* @param outDir Location to store files generated by the merging process
* @param ioPeon ioPeon object passed in by IndexMerger, manages files created by the merging process
* @param capabilities The ColumnCapabilities of the dimension represented by this DimensionHandler
* @param progress ProgressIndicator used by the merging process

* @return A new DimensionMergerLegacy object.
*/
public DimensionMergerLegacy<EncodedTypeArray> makeLegacyMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
);


/**
* Given an array representing a single set of row value(s) for this dimension as an Object,
* return the length of the array after appropriate type-casting.
*
* For example, a dictionary encoded String dimension would receive an int[] as an Object.
*
* @param dimVals Array of row values
* @return Size of dimVals
*/
public int getLengthFromEncodedArray(EncodedTypeArray dimVals);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so that EncodedTypeArray does not have to guarantee a length() or similar method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller would otherwise have to check and cast the array to the appropriate type, or it would have to use the slow performance reflection method (Array.getLength())



/**
* Given two arrays representing sorted encoded row value(s), return the result of their comparison.
*
* If the two arrays have different lengths, the shorter array should be ordered first in the comparison.
*
* Otherwise, this function should iterate through the array values and return the comparison of the first difference.
*
* @param lhs array of row values
* @param rhs array of row values
*
* @return integer indicating comparison result of arrays
*/
public int compareSortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs);


/**
* Given two arrays representing sorted encoded row value(s), check that the two arrays have the same encoded values,
* or if the encoded values differ, that they translate into the same actual values, using the mappings
* provided by lhsEncodings and rhsEncodings (if applicable).
*
* If validation fails, this method should throw a SegmentValidationException.
*
* Used by IndexIO for validating segments.
*
* See StringDimensionHandler.validateSortedEncodedArrays() for a reference implementation.
*
* @param lhs array of row values
* @param rhs array of row values
* @param lhsEncodings encoding lookup from lhs's segment, null if not applicable for this dimension's type
* @param rhsEncodings encoding lookup from rhs's segment, null if not applicable for this dimension's type
*
* @return integer indicating comparison result of arrays
*/
public void validateSortedEncodedArrays(
EncodedTypeArray lhs,
EncodedTypeArray rhs,
Indexed<ActualType> lhsEncodings,
Indexed<ActualType> rhsEncodings
) throws SegmentValidationException;


/**
* Given a Column, return a type-specific object that can be used to retrieve row values.
*
* For example:
* - A String-typed implementation would return the result of column.getDictionaryEncoding()
* - A long-typed implemention would return the result of column.getGenericColumn().
*
* @param column Column for this dimension from a QueryableIndex
* @return The type-specific column subobject for this dimension.
*/
public Closeable getSubColumn(Column column);


/**
* Given a subcolumn from getSubColumn, and the index of the current row, retrieve a row as an array of values.
*
* For example:
* - A String-typed implementation would read the current row from a DictionaryEncodedColumn as an int[].
* - A long-typed implemention would read the current row from a GenericColumn return the current row as a long[].
*
* @param column Column for this dimension from a QueryableIndex
* @param currRow The index of the row to retrieve
* @return The row from "column" specified by "currRow", as an array of values
*/
public Object getRowValueArrayFromColumn(Closeable column, int currRow);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i find this a little bit confusing, does this function only work with subColumns from the previous method? If so, shoudl we have a more specific type than just closeable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the javadoc says it must be "given a subcolumn from getSubColumn"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless its closed in the method, which is not indicated, then it should accept Object

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OR have stricter typing.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.segment;

import com.google.common.base.Function;
import com.metamx.common.IAE;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;

public final class DimensionHandlerUtil
{
private DimensionHandlerUtil() {}

public static DimensionHandler getHandlerFromCapabilities(String dimensionName, ColumnCapabilities capabilities)
Copy link
Copy Markdown
Contributor

@drcrallen drcrallen Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a major pinch point for extensibility. Meaning there is no discovery capabilities here for non-default dimension handling.

would it be possible to either use a guice multi-map binding, or a Jackson factory class with something like

objectMapper.convertValue(ImmutableMap.of("type", capabilities.getType()), DimensionHandlerFactory.class).buildHandler(dimensionName, capabilities)

?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I can look into that

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, it's not clear to me right now what the best approach is here for supporting dimension type extensions.

My goal with this PR was just to allow the new interface to work with Strings. let's handle this in a follow-up PR that adds a Long implementation?

{
DimensionHandler handler = null;
if (capabilities.getType() == ValueType.STRING) {
if (!capabilities.isDictionaryEncoded() || !capabilities.hasBitmapIndexes()) {
throw new IAE("String column must have dictionary encoding and bitmap index.");
}
handler = new StringDimensionHandler(dimensionName);
}
if (handler == null) {
throw new IAE("Could not create handler from invalid column type: " + capabilities.getType());
}
return handler;
}
}
Loading