Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ public static RowKeyComparisonRunLengths create(final List<KeyColumn> keyColumns
);
}

ColumnType columnType = rowSignature.getColumnType(keyColumn.columnName())
.orElseThrow(() -> DruidException.defensive("Need column types"));
ColumnType columnType =
rowSignature.getColumnType(keyColumn.columnName())
.orElseThrow(() -> DruidException.defensive("No type for column[%s]", keyColumn.columnName()));

// First key column to be processed
if (runLengthEntryBuilders.size() == 0) {
if (runLengthEntryBuilders.isEmpty()) {
final boolean isByteComparable = isByteComparable(columnType);
runLengthEntryBuilders.add(
new RunLengthEntryBuilder(isByteComparable, keyColumn.order())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@
*/
public class TestArrayCursorFactory extends QueryableIndexCursorFactory
{
private final RowSignature signature;

public TestArrayCursorFactory(QueryableIndex index)
{
super(index);
this.signature = computeRowSignature(index);
}

@Override
Expand Down Expand Up @@ -81,15 +84,31 @@ public void close()
};
}


@Override
public RowSignature getRowSignature()
{
return signature;
}

@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
final ColumnCapabilities ourType = getRowSignature().getColumnCapabilities(column);
if (ourType != null) {
return ColumnCapabilitiesImpl.copyOf(super.getColumnCapabilities(column)).setType(ourType.toColumnType());
} else {
return super.getColumnCapabilities(column);
}
}

private static RowSignature computeRowSignature(final QueryableIndex index)
{
final RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();

for (final String column : super.getRowSignature().getColumnNames()) {
ColumnCapabilities columnCapabilities = super.getColumnCapabilities(column);
for (final String column : new QueryableIndexCursorFactory(index).getRowSignature().getColumnNames()) {
ColumnCapabilities columnCapabilities = index.getColumnCapabilities(column);
ColumnType columnType = columnCapabilities == null ? null : columnCapabilities.toColumnType();
//change MV strings columns to Array<String>
if (columnType != null
Expand All @@ -103,18 +122,6 @@ public RowSignature getRowSignature()
return builder.build();
}

@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
final ColumnCapabilities ourType = getRowSignature().getColumnCapabilities(column);
if (ourType != null) {
return ColumnCapabilitiesImpl.copyOf(super.getColumnCapabilities(column)).setType(ourType.toColumnType());
} else {
return super.getColumnCapabilities(column);
}
}

private class DecoratedCursor implements Cursor
{
private final Cursor cursor;
Expand Down
155 changes: 127 additions & 28 deletions processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
Expand All @@ -49,17 +50,28 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.IntStream;

@RunWith(Parameterized.class)
public class FrameFileTest extends InitializedNullHandlingTest
{
/**
* Static cache of generated frame files, to speed up tests. Cleared in {@link #afterClass()}.
*/
private static final Map<FrameFileKey, byte[]> FRAME_FILES = new HashMap<>();

// Partition every 99 rows if "partitioned" is true.
private static final int PARTITION_SIZE = 99;

Expand Down Expand Up @@ -122,6 +134,7 @@ int getRowCount()
};

abstract CursorFactory getCursorFactory();

abstract int getRowCount();
}

Expand Down Expand Up @@ -195,38 +208,21 @@ public void setUp() throws IOException
{
cursorFactory = adapterType.getCursorFactory();
rowCount = adapterType.getRowCount();
file = temporaryFolder.newFile();

if (partitioned) {
// Partition every PARTITION_SIZE rows.
file = FrameTestUtil.writeFrameFileWithPartitions(
FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames().map(
new Function<Frame, IntObjectPair<Frame>>()
{
private int rows = 0;

@Override
public IntObjectPair<Frame> apply(final Frame frame)
{
final int partitionNum = rows / PARTITION_SIZE;
rows += frame.numRows();
return IntObjectPair.of(
partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum,
frame
);
}
}
),
temporaryFolder.newFile()
);

} else {
file = FrameTestUtil.writeFrameFile(
FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames(),
temporaryFolder.newFile()
);
try (final OutputStream out = Files.newOutputStream(file.toPath())) {
final FrameFileKey frameFileKey = new FrameFileKey(adapterType, frameType, maxRowsPerFrame, partitioned);
final byte[] frameFileBytes = FRAME_FILES.computeIfAbsent(frameFileKey, FrameFileTest::computeFrameFile);
out.write(frameFileBytes);
}
}

@AfterClass
public static void afterClass()
{
FRAME_FILES.clear();
}

@Test
public void test_numFrames() throws IOException
{
Expand Down Expand Up @@ -414,4 +410,107 @@ private static int countRows(final CursorFactory cursorFactory)
return FrameTestUtil.readRowsFromCursorFactory(cursorFactory, RowSignature.empty(), false)
.accumulate(0, (i, in) -> i + 1);
}

/**
* Returns bytes, in frame file format, corresponding to the given {@link FrameFileKey}.
*/
private static byte[] computeFrameFile(final FrameFileKey frameFileKey)
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();

try {
if (frameFileKey.partitioned) {
// Partition every PARTITION_SIZE rows.
FrameTestUtil.writeFrameFileWithPartitions(
FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory())
.frameType(frameFileKey.frameType)
.maxRowsPerFrame(frameFileKey.maxRowsPerFrame)
.frames()
.map(
new Function<Frame, IntObjectPair<Frame>>()
{
private int rows = 0;

@Override
public IntObjectPair<Frame> apply(final Frame frame)
{
final int partitionNum = rows / PARTITION_SIZE;
rows += frame.numRows();
return IntObjectPair.of(
partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum,
frame
);
}
}
),
baos
);
} else {
FrameTestUtil.writeFrameFile(
FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory())
.frameType(frameFileKey.frameType)
.maxRowsPerFrame(frameFileKey.maxRowsPerFrame)
.frames(),
baos
);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}

return baos.toByteArray();
}

/**
* Key for {@link #FRAME_FILES}, and input to {@link #computeFrameFile(FrameFileKey)}.
*/
private static class FrameFileKey
{
final AdapterType adapterType;
final FrameType frameType;
final int maxRowsPerFrame;
final boolean partitioned;

public FrameFileKey(AdapterType adapterType, FrameType frameType, int maxRowsPerFrame, boolean partitioned)
{
this.adapterType = adapterType;
this.frameType = frameType;
this.maxRowsPerFrame = maxRowsPerFrame;
this.partitioned = partitioned;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FrameFileKey that = (FrameFileKey) o;
return maxRowsPerFrame == that.maxRowsPerFrame
&& partitioned == that.partitioned
&& adapterType == that.adapterType
&& frameType == that.frameType;
}

@Override
public int hashCode()
{
return Objects.hash(adapterType, frameType, maxRowsPerFrame, partitioned);
}

@Override
public String toString()
{
return "FrameFileKey{" +
"adapterType=" + adapterType +
", frameType=" + frameType +
", maxRowsPerFrame=" + maxRowsPerFrame +
", partitioned=" + partitioned +
'}';
}
}
}
Loading