Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.druid.segment.data.ReadableOffset;

import javax.annotation.Nullable;

import java.io.IOException;

/**
Expand All @@ -40,6 +39,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn
{
public static final Logger LOGGER = new Logger(CompressedBigDecimalColumn.class);

private final int length;
private final ColumnarInts scale;
private final ColumnarMultiInts magnitude;

Expand All @@ -49,8 +49,9 @@ public class CompressedBigDecimalColumn implements ComplexColumn
* @param scale scale of the rows
* @param magnitude LongColumn representing magnitudes
*/
public CompressedBigDecimalColumn(ColumnarInts scale, ColumnarMultiInts magnitude)
public CompressedBigDecimalColumn(int length, ColumnarInts scale, ColumnarMultiInts magnitude)
{
this.length = length;
this.scale = scale;
this.magnitude = magnitude;
}
Expand Down Expand Up @@ -87,7 +88,7 @@ public CompressedBigDecimal getRowValue(int rowNum)
@Override
public int getLength()
{
return scale.size();
return length;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,16 @@
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSupplier;

import java.nio.ByteBuffer;

/**
* Complex column supplier that understands {@link CompressedBigDecimal} values.
*/
public class CompressedBigDecimalColumnPartSupplier implements Supplier<ComplexColumn>
{

public static final int VERSION = 0x1;

private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;

/**
* Constructor.
*
* @param scaleSupplier scale supplier
* @param magnitudeSupplier supplied of results
*/
public CompressedBigDecimalColumnPartSupplier(
CompressedVSizeColumnarIntsSupplier scaleSupplier,
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
)
{
this.scaleSupplier = scaleSupplier;
this.magnitudeSupplier = magnitudeSupplier;
}

/**
* Compressed.
*
Expand All @@ -67,23 +49,50 @@ public static CompressedBigDecimalColumnPartSupplier fromByteBuffer(
byte versionFromBuffer = buffer.get();

if (versionFromBuffer == VERSION) {
int positionStart = buffer.position();

CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
buffer,
IndexIO.BYTE_ORDER);
IndexIO.BYTE_ORDER
);

V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier =
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER);

return new CompressedBigDecimalColumnPartSupplier(scaleSupplier, magnitudeSupplier);
return new CompressedBigDecimalColumnPartSupplier(
buffer.position() - positionStart,
scaleSupplier,
magnitudeSupplier
);
} else {
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
}

private final int byteSize;
private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;

/**
* Constructor.
*
* @param scaleSupplier scale supplier
* @param magnitudeSupplier supplied of results
*/
public CompressedBigDecimalColumnPartSupplier(
int byteSize,
CompressedVSizeColumnarIntsSupplier scaleSupplier,
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
)
{
this.byteSize = byteSize;
this.scaleSupplier = scaleSupplier;
this.magnitudeSupplier = magnitudeSupplier;
}

@Override
public ComplexColumn get()
{
return new CompressedBigDecimalColumn(scaleSupplier.get(), magnitudeSupplier.get());
return new CompressedBigDecimalColumn(byteSize, scaleSupplier.get(), magnitudeSupplier.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ public void testCompressedBigDecimalColumn()
ColumnarInts columnarInts = EasyMock.createMock(ColumnarInts.class);
ReadableOffset readableOffset = EasyMock.createMock(ReadableOffset.class);
CompressedBigDecimalColumn compressedBigDecimalColumn = new CompressedBigDecimalColumn(
12345,
columnarInts,
columnarMultiInts
);
Assert.assertEquals(
CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL,
compressedBigDecimalColumn.getTypeName()
);
Assert.assertEquals(0, compressedBigDecimalColumn.getLength());
Assert.assertEquals(12345, compressedBigDecimalColumn.getLength());
Assert.assertEquals(CompressedBigDecimalColumn.class, compressedBigDecimalColumn.getClazz());
Assert.assertNotNull(compressedBigDecimalColumn.makeColumnValueSelector(readableOffset));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Object getRowValue(int rowNum)
@Override
public int getLength()
{
return frame.numRows();
return (int) frame.numBytes();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

Expand All @@ -49,6 +51,20 @@
*/
public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricSerde
{
/**
* This is a configuration parameter to allow for turning on compression. It is a hack, it would be significantly
* better if this could be delivered via properties. The number one reason this is a hack is because it reads
* the System.getProperty which doesn't actually have runtime.properties files put into it, so this setting
* could be set in runtime.properties and this code wouldn't see it, because that's not how it is wired up.
*
* The intent of this parameter is so that Druid 25 can be released using the legacy serialization format. This
* will allow us to get code released that can *read* both the legacy and the new format. Then, in Druid 26,
* we can completely eliminate this boolean and start to only *write* the new format, in which case this
* hack of a configuration property disappears.
*/
private static final boolean COMPRESSION_ENABLED =
Boolean.parseBoolean(System.getProperty("druid.columns.pairLongString.compressed", "false"));

public static final int EXPECTED_VERSION = 3;
public static final String TYPE_NAME = "serializablePairLongString";
// Null SerializablePairLongString values are put first
Expand All @@ -60,6 +76,17 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS

private static final SerializablePairLongStringSimpleStagedSerde SERDE =
new SerializablePairLongStringSimpleStagedSerde();
private final boolean compressionEnabled;

public SerializablePairLongStringComplexMetricSerde()
{
this(COMPRESSION_ENABLED);
}

public SerializablePairLongStringComplexMetricSerde(boolean compressionEnabled)
{
this.compressionEnabled = compressionEnabled;
}

@Override
public String getTypeName()
Expand Down Expand Up @@ -92,7 +119,7 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
byte version = buffer.get(buffer.position());

if (version == 0 || version == 1 || version == 2) {
GenericIndexed<?> column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
GenericIndexed<?> column = GenericIndexed.read(buffer, LEGACY_STRATEGY, columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
} else {
SerializablePairLongStringComplexColumn.Builder builder =
Expand Down Expand Up @@ -141,9 +168,73 @@ public byte[] toBytes(SerializablePairLongString val)
@Override
public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return new SerializablePairLongStringColumnSerializer(
segmentWriteOutMedium,
NativeClearedByteBufferProvider.INSTANCE
);
if (compressionEnabled) {
return new SerializablePairLongStringColumnSerializer(
segmentWriteOutMedium,
NativeClearedByteBufferProvider.INSTANCE
);
} else {
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
LEGACY_STRATEGY
);
}
}

private static final ObjectStrategy<SerializablePairLongString> LEGACY_STRATEGY =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe worth leaving a comment about why this is different from getObjectStrategy() here too (sad all big endian all the time, etc)

new ObjectStrategy<SerializablePairLongString>()
{
@Override
public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2)
{
return COMPARATOR.compare(o1, o2);
}

@Override
public Class<? extends SerializablePairLongString> getClazz()
{
return SerializablePairLongString.class;
}

@Override
public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();

long lhs = readOnlyBuffer.getLong();
int stringSize = readOnlyBuffer.getInt();

String lastString = null;
if (stringSize > 0) {
byte[] stringBytes = new byte[stringSize];
readOnlyBuffer.get(stringBytes, 0, stringSize);
lastString = StringUtils.fromUtf8(stringBytes);
}

return new SerializablePairLongString(lhs, lastString);
}

@Override
public byte[] toBytes(SerializablePairLongString val)
{
String rhsString = val.rhs;
ByteBuffer bbuf;

if (rhsString != null) {
byte[] rhsBytes = StringUtils.toUtf8(rhsString);
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length);
bbuf.putLong(val.lhs);
bbuf.putInt(Long.BYTES, rhsBytes.length);
bbuf.position(Long.BYTES + Integer.BYTES);
bbuf.put(rhsBytes);
} else {
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
bbuf.putLong(val.lhs);
bbuf.putInt(Long.BYTES, 0);
}

return bbuf.array();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Map<String, ColumnAnalysis> analyze(Segment segment)
final StorageAdapter storageAdapter = segment.asStorageAdapter();

// get length and column names from storageAdapter
final int length = storageAdapter.getNumRows();
final int numRows = storageAdapter.getNumRows();

// Use LinkedHashMap to preserve column order.
final Map<String, ColumnAnalysis> columns = new LinkedHashMap<>();
Expand All @@ -119,13 +119,13 @@ public Map<String, ColumnAnalysis> analyze(Segment segment)
final int bytesPerRow =
ColumnHolder.TIME_COLUMN_NAME.equals(columnName) ? NUM_BYTES_IN_TIMESTAMP : Long.BYTES;

analysis = analyzeNumericColumn(capabilities, length, bytesPerRow);
analysis = analyzeNumericColumn(capabilities, numRows, bytesPerRow);
break;
case FLOAT:
analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT);
analysis = analyzeNumericColumn(capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT);
break;
case DOUBLE:
analysis = analyzeNumericColumn(capabilities, length, Double.BYTES);
analysis = analyzeNumericColumn(capabilities, numRows, Double.BYTES);
break;
case STRING:
if (index != null) {
Expand All @@ -136,7 +136,7 @@ public Map<String, ColumnAnalysis> analyze(Segment segment)
break;
case COMPLEX:
final ColumnHolder columnHolder = index != null ? index.getColumnHolder(columnName) : null;
analysis = analyzeComplexColumn(capabilities, columnHolder);
analysis = analyzeComplexColumn(capabilities, numRows, columnHolder);
break;
default:
log.warn("Unknown column type[%s].", capabilities.asTypeString());
Expand Down Expand Up @@ -330,6 +330,7 @@ public Long accumulate(Long accumulated, Cursor cursor)

private ColumnAnalysis analyzeComplexColumn(
@Nullable final ColumnCapabilities capabilities,
final int numCells,
@Nullable final ColumnHolder columnHolder
)
{
Expand Down Expand Up @@ -362,8 +363,7 @@ private ColumnAnalysis analyzeComplexColumn(
);
}

final int length = complexColumn.getLength();
for (int i = 0; i < length; ++i) {
for (int i = 0; i < numCells; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public interface ComplexColumn extends BaseColumn
Object getRowValue(int rowNum);

/**
* @return serialized size (in bytes) of this column.
* @return serialized size (in bytes) of this column. -1 for unknown
*/
int getLength();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Object getRowValue(int rowNum)
@Override
public int getLength()
{
return index.size();
return -1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Object getRowValue(int rowNum)
@Override
public int getLength()
{
return 0;
return -1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public int getMaxVectorSize()
@Override
public int getLength()
{
return 0;
return -1;
}

@Override
Expand Down
Loading