Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import org.apache.druid.segment.data.Indexed;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

/**
* Helper class for NullHandling. This class is used to switch between SQL compatible Null Handling behavior
Expand Down Expand Up @@ -163,4 +165,45 @@ public static boolean isNullOrEquivalent(@Nullable String value)
{
return replaceWithDefault() ? Strings.isNullOrEmpty(value) : value == null;
}

public static boolean isNullOrEquivalent(@Nullable ByteBuffer buffer)
{
return buffer == null || (replaceWithDefault() && buffer.remaining() == 0);
}

/**
* Given a UTF-8 dictionary, returns whether the first two entries must be coalesced into a single null entry.
* This happens if we are in default-value mode and the first two entries are null and empty string.
*
* This and {@link #mustReplaceFirstValueWithNullInDictionary(Indexed)} are never both true.
*
* Provided to enable compatibility for segments written under {@link #sqlCompatible()} mode but
* read under {@link #replaceWithDefault()} mode.
*/
public static boolean mustCombineNullAndEmptyInDictionary(final Indexed<ByteBuffer> dictionaryUtf8)
{
return NullHandling.replaceWithDefault()
&& dictionaryUtf8.size() >= 2
&& isNullOrEquivalent(dictionaryUtf8.get(0))
&& isNullOrEquivalent(dictionaryUtf8.get(1));
}

/**
* Given a UTF-8 dictionary, returns whether the first entry must be replaced with null. This happens if we
* are in default-value mode and the first entry is an empty string. (Default-value mode expects it to be null.)
*
* This and {@link #mustCombineNullAndEmptyInDictionary(Indexed)} are never both true.
*
* Provided to enable compatibility for segments written under {@link #sqlCompatible()} mode but
* read under {@link #replaceWithDefault()} mode.
*/
public static boolean mustReplaceFirstValueWithNullInDictionary(final Indexed<ByteBuffer> dictionaryUtf8)
{
if (NullHandling.replaceWithDefault() && dictionaryUtf8.size() >= 1) {
final ByteBuffer firstValue = dictionaryUtf8.get(0);
return firstValue != null && firstValue.remaining() == 0;
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.segment.AbstractDimensionSelector;
import org.apache.druid.segment.DimensionSelectorUtils;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.data.CachingIndexed;
import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.ColumnarMultiInts;
import org.apache.druid.segment.data.Indexed;
Expand All @@ -45,6 +44,7 @@
import org.apache.druid.utils.CloseableUtils;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -60,19 +60,19 @@ public class StringDictionaryEncodedColumn implements DictionaryEncodedColumn<St
private final ColumnarInts column;
@Nullable
private final ColumnarMultiInts multiValueColumn;
private final CachingIndexed<String> cachedDictionary;
private final Indexed<String> dictionary;
private final Indexed<ByteBuffer> dictionaryUtf8;

public StringDictionaryEncodedColumn(
@Nullable ColumnarInts singleValueColumn,
@Nullable ColumnarMultiInts multiValueColumn,
CachingIndexed<String> dictionary,
Indexed<String> dictionary,
Indexed<ByteBuffer> dictionaryUtf8
)
{
this.column = singleValueColumn;
this.multiValueColumn = multiValueColumn;
this.cachedDictionary = dictionary;
this.dictionary = dictionary;
this.dictionaryUtf8 = dictionaryUtf8;
}

Expand Down Expand Up @@ -104,7 +104,7 @@ public IndexedInts getMultiValueRow(int rowNum)
@Nullable
public String lookupName(int id)
{
return cachedDictionary.get(id);
return dictionary.get(id);
}


Expand All @@ -130,13 +130,13 @@ public ByteBuffer lookupNameUtf8(int id)
@Override
public int lookupId(String name)
{
return cachedDictionary.indexOf(name);
return dictionary.indexOf(name);
}

@Override
public int getCardinality()
{
return cachedDictionary.size();
return dictionary.size();
}

@Override
Expand Down Expand Up @@ -495,7 +495,11 @@ public String lookupName(int id)
@Override
public void close() throws IOException
{
CloseableUtils.closeAll(cachedDictionary, column, multiValueColumn);
CloseableUtils.closeAll(
dictionary instanceof Closeable ? (Closeable) dictionary : null /* Dictionary may be CachingIndexed */,
column,
multiValueColumn
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.ColumnarMultiInts;
import org.apache.druid.segment.data.FrontCodedIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.ReadableOffset;
Expand All @@ -51,29 +50,27 @@
import java.util.BitSet;

/**
* {@link DictionaryEncodedColumn<String>} for a column which uses a {@link FrontCodedIndexed} to store its value
* dictionary, which 'delta encodes' strings (instead of {@link org.apache.druid.segment.data.GenericIndexed} like
* {@link StringDictionaryEncodedColumn}).
* {@link DictionaryEncodedColumn<String>} for a column which has only a UTF-8 dictionary, no String dictionary.
* <p>
* This class is otherwise nearly identical to {@link StringDictionaryEncodedColumn} other than the dictionary
* difference.
* This class is otherwise nearly identical to {@link StringDictionaryEncodedColumn} other than lacking a
* String dictionary.
* <p>
* Implements {@link NestedCommonFormatColumn} so it can be used as a reader for single value string specializations
* of {@link org.apache.druid.segment.AutoTypeColumnIndexer}.
*/
public class StringFrontCodedDictionaryEncodedColumn implements DictionaryEncodedColumn<String>,
public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColumn<String>,
NestedCommonFormatColumn
{
@Nullable
private final ColumnarInts column;
@Nullable
private final ColumnarMultiInts multiValueColumn;
private final FrontCodedIndexed utf8Dictionary;
private final Indexed<ByteBuffer> utf8Dictionary;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this and ScalarStringDictionaryEncodedColumn can now be combined but I can do that in a follow-up PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this in the latest patch.


public StringFrontCodedDictionaryEncodedColumn(
public StringUtf8DictionaryEncodedColumn(
@Nullable ColumnarInts singleValueColumn,
@Nullable ColumnarMultiInts multiValueColumn,
FrontCodedIndexed utf8Dictionary
Indexed<ByteBuffer> utf8Dictionary
)
{
this.column = singleValueColumn;
Expand Down Expand Up @@ -102,6 +99,9 @@ public int getSingleValueRow(int rowNum)
@Override
public IndexedInts getMultiValueRow(int rowNum)
{
if (!hasMultipleValues()) {
throw new UnsupportedOperationException("Column is not multi-valued");
}
return multiValueColumn.get(rowNum);
}

Expand Down Expand Up @@ -154,7 +154,7 @@ public int getValueCardinality()
@Override
public String lookupName(int id)
{
final String value = StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
final String value = StringUtf8DictionaryEncodedColumn.this.lookupName(id);
return extractionFn == null ? value : extractionFn.apply(value);
}

Expand Down Expand Up @@ -190,7 +190,7 @@ public int lookupId(String name)
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
}
return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
return StringUtf8DictionaryEncodedColumn.this.lookupId(name);
}
}

Expand Down Expand Up @@ -291,7 +291,7 @@ public boolean matches()
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", StringFrontCodedDictionaryEncodedColumn.this);
inspector.visit("column", StringUtf8DictionaryEncodedColumn.this);
}
};
} else {
Expand Down Expand Up @@ -332,7 +332,7 @@ public boolean matches()
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", StringFrontCodedDictionaryEncodedColumn.this);
inspector.visit("column", StringUtf8DictionaryEncodedColumn.this);
}
};
}
Expand Down Expand Up @@ -381,7 +381,7 @@ public int getValueCardinality()
@Override
public String lookupName(final int id)
{
return StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
return StringUtf8DictionaryEncodedColumn.this.lookupName(id);
}

@Nullable
Expand All @@ -394,7 +394,7 @@ public ByteBuffer lookupNameUtf8(int id)
@Override
public int lookupId(@Nullable String name)
{
return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
return StringUtf8DictionaryEncodedColumn.this.lookupId(name);
}
}

Expand All @@ -421,7 +421,7 @@ public int getValueCardinality()
@Override
public String lookupName(final int id)
{
return StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
return StringUtf8DictionaryEncodedColumn.this.lookupName(id);
}

@Nullable
Expand All @@ -435,7 +435,7 @@ public ByteBuffer lookupNameUtf8(int id)
@Override
public int lookupId(@Nullable String name)
{
return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
return StringUtf8DictionaryEncodedColumn.this.lookupId(name);
}
}

Expand All @@ -457,7 +457,7 @@ public StringVectorSelector()
@Override
public String lookupName(int id)
{
return StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
return StringUtf8DictionaryEncodedColumn.this.lookupName(id);
}
}
return new StringVectorSelector();
Expand All @@ -473,7 +473,7 @@ public MultiStringVectorSelector()
@Override
public String lookupName(int id)
{
return StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
return StringUtf8DictionaryEncodedColumn.this.lookupName(id);
}
}
return new MultiStringVectorSelector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.ToIntFunction;

public class CachingIndexed<T> implements CloseableIndexed<T>
{
private static final int INITIAL_CACHE_CAPACITY = 16384;

private static final Logger log = new Logger(CachingIndexed.class);

private final GenericIndexed<T>.BufferIndexed delegate;
private final Indexed<T> delegate;
private final ToIntFunction<T> sizeFn;
@Nullable
private final SizedLRUMap<Integer, T> cachedValues;

Expand All @@ -44,12 +46,14 @@ public class CachingIndexed<T> implements CloseableIndexed<T>
* CachingIndexed objects are not thread safe and should only be used by a single thread at a time.
* CachingIndexed objects must be closed to release any underlying cache resources.
*
* @param delegate the GenericIndexed to wrap with a lookup cache.
* @param delegate the Indexed to wrap with a lookup cache.
* @param sizeFn function that determines the size in bytes of an object
* @param lookupCacheSize maximum size in bytes of the lookup cache if greater than zero
*/
public CachingIndexed(GenericIndexed<T> delegate, final int lookupCacheSize)
public CachingIndexed(Indexed<T> delegate, final ToIntFunction<T> sizeFn, final int lookupCacheSize)
{
this.delegate = delegate.singleThreaded();
this.delegate = delegate;
this.sizeFn = sizeFn;

if (lookupCacheSize > 0) {
log.debug("Allocating column cache of max size[%d]", lookupCacheSize);
Expand All @@ -75,7 +79,7 @@ public T get(int index)
}

final T value = delegate.get(index);
cachedValues.put(index, value, delegate.getLastValueSize());
cachedValues.put(index, value, sizeFn.applyAsInt(value));
return value;
} else {
return delegate.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,6 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
*/
public abstract class BufferIndexed implements Indexed<T>
{
int lastReadSize;

@Override
public int size()
{
Expand Down Expand Up @@ -492,7 +490,6 @@ ByteBuffer bufferedIndexedGetByteBuffer(ByteBuffer copyValueBuffer, int startOff
|| copyValueBuffer.get(startOffset - Integer.BYTES) == NULL_VALUE_SIZE_MARKER)) {
return null;
}
lastReadSize = size;

// ObjectStrategy.fromByteBuffer() is allowed to reset the limit of the buffer. So if the limit is changed,
// position() call could throw an exception, if the position is set beyond the new limit. Calling limit()
Expand All @@ -511,16 +508,6 @@ ByteBuffer bufferedIndexedGetByteBuffer(ByteBuffer copyValueBuffer, int startOff
@Nullable
protected abstract ByteBuffer getByteBuffer(int index);

/**
* This method makes no guarantees with respect to thread safety
*
* @return the size in bytes of the last value read
*/
int getLastValueSize()
{
return lastReadSize;
}

@Override
public int indexOf(@Nullable T value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;

Expand All @@ -45,8 +46,7 @@
*
* @see ScalarDoubleColumn
* @see ScalarLongColumn
* @see ScalarStringDictionaryEncodedColumn
* @see org.apache.druid.segment.column.StringFrontCodedDictionaryEncodedColumn
* @see StringUtf8DictionaryEncodedColumn
* @see VariantColumn
* @see CompressedNestedDataComplexColumn
*/
Expand Down
Loading