Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ public Long read(ByteBuffer buffer)
return buffer.getLong();
}

@Override
public Long read(ByteBuffer buffer, int offset)
{
return buffer.getLong(offset);
}

@Override
public boolean readRetainsBufferReference()
{
Expand Down Expand Up @@ -297,6 +303,12 @@ public Float read(ByteBuffer buffer)
return buffer.getFloat();
}

@Override
public Float read(ByteBuffer buffer, int offset)
{
return buffer.getFloat(offset);
}

@Override
public boolean readRetainsBufferReference()
{
Expand Down Expand Up @@ -344,6 +356,12 @@ public Double read(ByteBuffer buffer)
return buffer.getDouble();
}

@Override
public Double read(ByteBuffer buffer, int offset)
{
return buffer.getDouble(offset);
}

@Override
public boolean readRetainsBufferReference()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
import org.apache.druid.segment.nested.GlobalDimensionDictionary;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.nested.StructuredDataProcessor;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -224,6 +226,23 @@ public void fillBitmapsFromUnsortedEncodedKeyComponent(
throw new UnsupportedOperationException("Not supported");
}


public void mergeFields(SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields)
{
for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry : fieldIndexers.entrySet()) {
// skip adding the field if no types are in the set, meaning only null values have been processed
if (!entry.getValue().getTypes().isEmpty()) {
mergedFields.put(entry.getKey(), entry.getValue().getTypes());
}
}
}

public GlobalDictionarySortedCollector getSortedCollector()
{
return globalDictionary.getSortedCollector();
}


static class LiteralFieldIndexer
{
private final GlobalDimensionDictionary globalDimensionDictionary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.nio.IntBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

Expand Down Expand Up @@ -164,13 +163,8 @@ private GlobalDictionarySortedCollector getSortedIndexFromIncrementalAdapter(
return null;
}
final NestedDataColumnIndexer indexer = (NestedDataColumnIndexer) dim.getIndexer();
for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry : indexer.fieldIndexers.entrySet()) {
// skip adding the field if no types are in the set, meaning only null values have been processed
if (!entry.getValue().getTypes().isEmpty()) {
mergedFields.put(entry.getKey(), entry.getValue().getTypes());
}
}
return indexer.globalDictionary.getSortedCollector();
indexer.mergeFields(mergedFields);
return indexer.getSortedCollector();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
SmooshedFileMapper mapper
) throws IOException
{

byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
final int numElements = buffer.getInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.data;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.TypeStrategy;
Expand All @@ -43,12 +44,14 @@
* If {@link #hasNull} is set, id 0 is ALWAYS null, so the comparator should be 'nulls first' or else behavior will
* be unexpected. {@link #hasNull} can only be set if also {@link #isSorted} is set, since the null value is not
* actually stored in the values section.
*
* This class is thread-safe if and only if {@link TypeStrategy#read(ByteBuffer, int)} is thread-safe.
*/
public class FixedIndexed<T> implements Indexed<T>
{
public static final byte IS_SORTED_MASK = 0x02;

public static <T> FixedIndexed<T> read(ByteBuffer bb, TypeStrategy<T> strategy, ByteOrder byteOrder, int width)
public static <T> Supplier<FixedIndexed<T>> read(ByteBuffer bb, TypeStrategy<T> strategy, ByteOrder byteOrder, int width)
{
final ByteBuffer buffer = bb.asReadOnlyBuffer().order(byteOrder);
final byte version = buffer.get();
Expand All @@ -59,15 +62,17 @@ public static <T> FixedIndexed<T> read(ByteBuffer bb, TypeStrategy<T> strategy,
Preconditions.checkState(!(hasNull && !isSorted), "cannot have null values if not sorted");
final int size = buffer.getInt() + (hasNull ? 1 : 0);
final int valuesOffset = buffer.position();
final FixedIndexed<T> fixedIndexed = new FixedIndexed<>(
buffer,
final Supplier<FixedIndexed<T>> fixedIndexed = () -> new FixedIndexed<>(
bb,
byteOrder,
strategy,
hasNull,
isSorted,
width,
size,
valuesOffset
);

bb.position(buffer.position() + (width * size));
return fixedIndexed;
}
Expand All @@ -83,6 +88,7 @@ public static <T> FixedIndexed<T> read(ByteBuffer bb, TypeStrategy<T> strategy,

private FixedIndexed(
ByteBuffer buffer,
ByteOrder byteOrder,
TypeStrategy<T> typeStrategy,
boolean hasNull,
boolean isSorted,
Expand All @@ -91,7 +97,7 @@ private FixedIndexed(
int valuesOffset
)
{
this.buffer = buffer;
this.buffer = buffer.asReadOnlyBuffer().order(byteOrder);
this.typeStrategy = typeStrategy;
Preconditions.checkArgument(width > 0, "FixedIndexed requires a fixed width value type");
this.width = width;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* The value iterator reads an entire bucket at a time, reconstructing the values into an array to iterate within the
* bucket before moving onto the next bucket as the iterator is consumed.
*
* This class is not thread-safe since during operation modifies positions of a shared buffer.
*/
public final class FrontCodedIndexed implements Indexed<ByteBuffer>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ public String toString()
/**
* Single-threaded view.
*/
abstract class BufferIndexed implements Indexed<T>
public abstract class BufferIndexed implements Indexed<T>
{
int lastReadSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
private final GenericIndexed<String> fields;
private final NestedLiteralTypeInfo fieldInfo;

private final TStringDictionary stringDictionary;
private final FixedIndexed<Long> longDictionary;
private final FixedIndexed<Double> doubleDictionary;
private final Supplier<TStringDictionary> stringDictionarySupplier;
private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
private final SmooshedFileMapper fileMapper;

private final ConcurrentHashMap<String, ColumnHolder> columns = new ConcurrentHashMap<>();
Expand All @@ -103,19 +103,19 @@ public CompressedNestedDataComplexColumn(
ImmutableBitmap nullValues,
GenericIndexed<String> fields,
NestedLiteralTypeInfo fieldInfo,
TStringDictionary stringDictionary,
FixedIndexed<Long> longDictionary,
FixedIndexed<Double> doubleDictionary,
Supplier<TStringDictionary> stringDictionary,
Supplier<FixedIndexed<Long>> longDictionarySupplier,
Supplier<FixedIndexed<Double>> doubleDictionarySupplier,
SmooshedFileMapper fileMapper
)
{
this.metadata = metadata;
this.nullValues = nullValues;
this.fields = fields;
this.fieldInfo = fieldInfo;
this.stringDictionary = stringDictionary;
this.longDictionary = longDictionary;
this.doubleDictionary = doubleDictionary;
this.stringDictionarySupplier = stringDictionary;
this.longDictionarySupplier = longDictionarySupplier;
this.doubleDictionarySupplier = doubleDictionarySupplier;
this.fileMapper = fileMapper;
this.closer = Closer.create();
this.compressedRawColumnSupplier = compressedRawColumnSupplier;
Expand All @@ -133,17 +133,17 @@ public NestedLiteralTypeInfo getFieldInfo()

public TStringDictionary getStringDictionary()
{
return stringDictionary;
return stringDictionarySupplier.get();
}

public FixedIndexed<Long> getLongDictionary()
{
return longDictionary;
return longDictionarySupplier.get();
}

public FixedIndexed<Double> getDoubleDictionary()
{
return doubleDictionary;
return doubleDictionarySupplier.get();
}

@Nullable
Expand Down Expand Up @@ -406,7 +406,7 @@ private ColumnHolder readNestedFieldColumn(String field)
)
);

final FixedIndexed<Integer> localDictionary = FixedIndexed.read(
final Supplier<FixedIndexed<Integer>> localDictionarySupplier = FixedIndexed.read(
dataBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
metadata.getByteOrder(),
Expand Down Expand Up @@ -436,20 +436,22 @@ private ColumnHolder readNestedFieldColumn(String field)
metadata.getBitmapSerdeFactory().getObjectStrategy(),
columnBuilder.getFileMapper()
);
Supplier<DictionaryEncodedColumn<?>> columnSupplier = () ->
closer.register(new NestedFieldLiteralDictionaryEncodedColumn(
types,
longs.get(),
doubles.get(),
ints.get(),
stringDictionary,
longDictionary,
doubleDictionary,
localDictionary,
localDictionary.get(0) == 0
? rBitmaps.get(0)
: metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap()
));
Supplier<DictionaryEncodedColumn<?>> columnSupplier = () -> {
FixedIndexed<Integer> localDict = localDictionarySupplier.get();
return closer.register(new NestedFieldLiteralDictionaryEncodedColumn(
types,
longs.get(),
doubles.get(),
ints.get(),
stringDictionarySupplier.get(),
longDictionarySupplier.get(),
doubleDictionarySupplier.get(),
localDict,
localDict.get(0) == 0
? rBitmaps.get(0)
: metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap()
));
};
columnBuilder.setHasMultipleValues(false)
.setHasNulls(true)
.setDictionaryEncodedColumnSupplier(columnSupplier);
Expand All @@ -458,10 +460,10 @@ private ColumnHolder readNestedFieldColumn(String field)
types,
metadata.getBitmapSerdeFactory().getBitmapFactory(),
rBitmaps,
localDictionary,
stringDictionary,
longDictionary,
doubleDictionary
localDictionarySupplier,
stringDictionarySupplier,
longDictionarySupplier,
doubleDictionarySupplier
),
true,
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ public Integer read(ByteBuffer buffer)
return buffer.getInt();
}

@Override
public Integer read(ByteBuffer buffer, int offset)
{
return buffer.getInt(offset);
}

@Override
public boolean readRetainsBufferReference()
{
Expand Down
Loading