Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public final class CompressedBlockReader implements Closeable

public static Supplier<CompressedBlockReader> fromByteBuffer(
ByteBuffer buffer,
ByteOrder byteOrder,
ByteOrder compressionOrder,
ByteOrder valueOrder,
boolean copyValuesOnRead
)
{
Expand All @@ -75,26 +76,27 @@ public static Supplier<CompressedBlockReader> fromByteBuffer(
final int numBlocks = buffer.getInt();
final int offsetsSize = numBlocks * Integer.BYTES;
// buffer is at start of ending offsets
final ByteBuffer offsets = buffer.asReadOnlyBuffer().order(byteOrder);
final ByteBuffer offsets = buffer.asReadOnlyBuffer().order(compressionOrder);
offsets.limit(offsets.position() + offsetsSize);
final IntBuffer offsetView = offsets.slice().order(byteOrder).asIntBuffer();
final IntBuffer offsetView = offsets.slice().order(compressionOrder).asIntBuffer();
final int compressedSize = offsetView.get(numBlocks - 1);

// move to start of compressed data
buffer.position(buffer.position() + offsetsSize);
final ByteBuffer compressedData = buffer.asReadOnlyBuffer().order(byteOrder);
final ByteBuffer compressedData = buffer.asReadOnlyBuffer().order(compressionOrder);
compressedData.limit(compressedData.position() + compressedSize);
buffer.position(buffer.position() + compressedSize);

final ByteBuffer compressedDataView = compressedData.slice().order(byteOrder);
final ByteBuffer compressedDataView = compressedData.slice().order(compressionOrder);
return () -> new CompressedBlockReader(
compression,
numBlocks,
blockSize,
copyValuesOnRead,
offsetView.asReadOnlyBuffer(),
compressedDataView.asReadOnlyBuffer().order(byteOrder),
byteOrder
compressedDataView.asReadOnlyBuffer().order(compressionOrder),
compressionOrder,
valueOrder
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
Expand Down Expand Up @@ -123,7 +125,8 @@ public CompressedBlockReader(
boolean copyValuesOnRead,
IntBuffer endOffsetsBuffer,
ByteBuffer compressedDataBuffer,
ByteOrder byteOrder
ByteOrder compressionByteOrder,
ByteOrder valueByteOrder
)
{
this.decompressor = compressionStrategy.getDecompressor();
Expand All @@ -134,11 +137,11 @@ public CompressedBlockReader(
this.endOffsetsBuffer = endOffsetsBuffer;
this.compressedDataBuffer = compressedDataBuffer;
this.closer = Closer.create();
this.decompressedDataBufferHolder = CompressedPools.getByteBuf(byteOrder);
this.decompressedDataBufferHolder = CompressedPools.getByteBuf(compressionByteOrder);
closer.register(decompressedDataBufferHolder);
this.decompressedDataBuffer = decompressedDataBufferHolder.get();
this.decompressedDataBuffer.clear();
this.byteOrder = byteOrder;
this.byteOrder = valueByteOrder;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public final class CompressedLongsReader implements ColumnarLongs
{
public static Supplier<CompressedLongsReader> fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
final Supplier<CompressedBlockReader> baseReader = CompressedBlockReader.fromByteBuffer(buffer, order, false);
final Supplier<CompressedBlockReader> baseReader = CompressedBlockReader.fromByteBuffer(
buffer,
order,
order, // long serializer uses native order, same as compression
false
);
return () -> new CompressedLongsReader(baseReader.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
String filenameBase,
ByteBuffer buffer,
ByteOrder order,
ByteOrder compressionOrder,
ByteOrder valueOrder,
SmooshedFileMapper mapper
) throws IOException
{
return fromByteBuffer(filenameBase, buffer, order, false, mapper);
return fromByteBuffer(filenameBase, buffer, compressionOrder, valueOrder, false, mapper);
}

public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
String filenameBase,
ByteBuffer buffer,
ByteOrder order,
ByteOrder compressionOrder,
ByteOrder valueOrder,
boolean copyValuesOnRead,
SmooshedFileMapper mapper
) throws IOException
Expand All @@ -59,7 +61,14 @@ public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
final ByteBuffer dataBuffer = mapper.mapFile(
CompressedVariableSizedBlobColumnSerializer.getCompressedBlobsFileName(filenameBase)
);
return new CompressedVariableSizedBlobColumnSupplier(offsetsBuffer, dataBuffer, order, numElements, copyValuesOnRead);
return new CompressedVariableSizedBlobColumnSupplier(
offsetsBuffer,
dataBuffer,
compressionOrder,
valueOrder,
numElements,
copyValuesOnRead
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
Expand All @@ -72,14 +81,15 @@ public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
private CompressedVariableSizedBlobColumnSupplier(
ByteBuffer offsetsBuffer,
ByteBuffer dataBuffer,
ByteOrder order,
ByteOrder compressionOrder,
ByteOrder valueOrder,
int numElements,
boolean copyValuesOnRead
)
{
this.numElements = numElements;
this.offsetReaderSupplier = CompressedLongsReader.fromByteBuffer(offsetsBuffer, order);
this.blockDataReaderSupplier = CompressedBlockReader.fromByteBuffer(dataBuffer, order, copyValuesOnRead);
this.offsetReaderSupplier = CompressedLongsReader.fromByteBuffer(offsetsBuffer, compressionOrder);
this.blockDataReaderSupplier = CompressedBlockReader.fromByteBuffer(dataBuffer, compressionOrder, valueOrder, copyValuesOnRead);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public static NestedDataColumnSupplier read(
),
rawBuffer,
byteOrder,
byteOrder, // byte order doesn't matter since serde is byte blobs
mapper
);
if (hasNulls) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public static NestedDataColumnSupplierV4 read(
),
rawBuffer,
metadata.getByteOrder(),
metadata.getByteOrder(), // byte order doesn't matter since serde is byte blobs
mapper
);
if (metadata.hasNulls()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@
import javax.annotation.Nullable;
import java.nio.ByteBuffer;

public final class CompressedComplexColumn implements ComplexColumn
public final class CompressedComplexColumn<T> implements ComplexColumn
{
private final String typeName;
private final CompressedVariableSizedBlobColumn compressedColumn;
private final ImmutableBitmap nullValues;
private final ObjectStrategy<?> objectStrategy;
private final ObjectStrategy<T> objectStrategy;

public CompressedComplexColumn(
String typeName,
CompressedVariableSizedBlobColumn compressedColumn,
ImmutableBitmap nullValues,
ObjectStrategy<?> objectStrategy
ObjectStrategy<T> objectStrategy
)
{
this.typeName = typeName;
Expand All @@ -62,7 +62,7 @@ public String getTypeName()

@Override
@Nullable
public Object getRowValue(int rowNum)
public T getRowValue(int rowNum)
{
if (nullValues.get(rowNum)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

public class CompressedComplexColumnSupplier implements Supplier<ComplexColumn>
public class CompressedComplexColumnSupplier<T> implements Supplier<ComplexColumn>
{
public static CompressedComplexColumnSupplier read(
public static <T> CompressedComplexColumnSupplier<T> read(
ByteBuffer bb,
ColumnBuilder columnBuilder,
String typeName,
ObjectStrategy objectStrategy
ObjectStrategy<T> objectStrategy
)
{
final byte version = bb.get();
Expand Down Expand Up @@ -67,6 +68,9 @@ public static CompressedComplexColumnSupplier read(
),
fileBuffer,
metadata.getByteOrder(),
// object strategies today assume that all buffers are big endian, so we hard-code the value buffer
// presented to the object strategy to always be big endian
ByteOrder.BIG_ENDIAN,
objectStrategy.readRetainsBufferReference(),
mapper
);
Expand All @@ -83,7 +87,7 @@ public static CompressedComplexColumnSupplier read(
nullValues = metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap();
}

return new CompressedComplexColumnSupplier(typeName, objectStrategy, compressedColumnSupplier, nullValues);
return new CompressedComplexColumnSupplier<>(typeName, objectStrategy, compressedColumnSupplier, nullValues);
}
catch (IOException ex) {
throw new RE(ex, "Failed to deserialize V%s column.", version);
Expand All @@ -93,13 +97,13 @@ public static CompressedComplexColumnSupplier read(
}

private final String typeName;
private final ObjectStrategy objectStrategy;
private final ObjectStrategy<T> objectStrategy;
private final CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier;
private final ImmutableBitmap nullValues;

private CompressedComplexColumnSupplier(
String typeName,
ObjectStrategy objectStrategy,
ObjectStrategy<T> objectStrategy,
CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier,
ImmutableBitmap nullValues
)
Expand All @@ -113,7 +117,7 @@ private CompressedComplexColumnSupplier(
@Override
public ComplexColumn get()
{
return new CompressedComplexColumn(typeName, compressedColumnSupplier.get(), nullValues, objectStrategy);
return new CompressedComplexColumn<>(typeName, compressedColumnSupplier.get(), nullValues, objectStrategy);
}

public ImmutableBitmap getNullValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void testSomeValues() throws IOException
fileNameBase,
base,
ByteOrder.nativeOrder(),
ByteOrder.nativeOrder(),
fileMapper
).get();
for (int row = 0; row < numWritten; row++) {
Expand Down Expand Up @@ -151,6 +152,7 @@ public void testSomeValuesByteBuffers() throws IOException
fileNameBase,
base,
ByteOrder.nativeOrder(),
ByteOrder.nativeOrder(),
fileMapper
).get();
for (int row = 0; row < numWritten; row++) {
Expand All @@ -170,6 +172,68 @@ public void testSomeValuesByteBuffers() throws IOException
fileMapper.close();
}

@Test
public void testSomeValuesByteBuffersBigEndian() throws IOException
{
final File tmpFile = tempFolder.newFolder();
final FileSmoosher smoosher = new FileSmoosher(tmpFile);

final File tmpFile2 = tempFolder.newFolder();
final SegmentWriteOutMedium writeOutMedium =
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(tmpFile2);

final String fileNameBase = "test";

final CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
CompressedVariableSizedBlobColumnSerializer serializer = new CompressedVariableSizedBlobColumnSerializer(
fileNameBase,
writeOutMedium,
compressionStrategy
);
serializer.open();

int numWritten = 0;
final Random r = ThreadLocalRandom.current();
final List<Long> values = new ArrayList<>();
final ByteBuffer longValueConverter = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
for (int i = 0, offset = 0; offset < CompressedPools.BUFFER_SIZE * 4; i++, offset = 1 << i) {
final long l = r.nextLong();
values.add(l);
longValueConverter.clear();
longValueConverter.putLong(l);
longValueConverter.rewind();
serializer.addValue(longValueConverter.array());
numWritten++;
}

SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize());
serializer.writeTo(writer, smoosher);
writer.close();
smoosher.close();
SmooshedFileMapper fileMapper = SmooshedFileMapper.load(tmpFile);

ByteBuffer base = fileMapper.mapFile(fileNameBase);

CompressedVariableSizedBlobColumn column = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer(
fileNameBase,
base,
ByteOrder.nativeOrder(),
ByteOrder.BIG_ENDIAN,
fileMapper
).get();
for (int row = 0; row < numWritten; row++) {
ByteBuffer value = column.get(row);
Assert.assertEquals("Row " + row, values.get(row).longValue(), value.getLong());
}
for (int rando = 0; rando < numWritten; rando++) {
int row = ThreadLocalRandom.current().nextInt(0, numWritten - 1);
ByteBuffer value = column.get(row);
Assert.assertEquals("Row " + row, values.get(row).longValue(), value.getLong());
}
column.close();
fileMapper.close();
}

@Test
public void testLongs() throws IOException
{
Expand Down