From ca21a2f0a31109760b0ffd9fcbc507093dbf78ef Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 15 Sep 2024 17:03:18 -0700 Subject: [PATCH] Speed up FrameFileTest, SuperSorterTest. (#17068) * Speed up FrameFileTest, SuperSorterTest. These are two heavily parameterized tests that, together, account for about 60% of runtime in the test suite. FrameFileTest changes: 1) Cache frame files in a static, rather than building the frame file for each parameterization of the test. 2) Adjust TestArrayCursorFactory to cache the signature, rather than re-creating it on each call to getColumnCapabilities. SuperSorterTest changes: 1) Dramatically reduce the number of tests that run with "maxRowsPerFrame" = 1. These are particularly slow due to writing so many small files. Some still run, since it's useful to test edge cases, but much fewer than before. 2) Reduce the "maxActiveProcessors" axis of the test from [1, 2, 4] to [1, 3]. The aim is to reduce the number of cases while still getting good coverage of the feature. 3) Reduce the "maxChannelsPerProcessor" axis of the test from [2, 3, 8] to [2, 7]. The aim is to reduce the number of cases while still getting good coverage of the feature. 4) Use in-memory input channels rather than file channels. 5) Defer formatting of assertion failure messages until they are needed. 6) Cache the cursor factory and its signature in a static. 7) Cache sorted test rows (used for verification) in a static. * It helps to include the file. * Style. --- .../frame/key/RowKeyComparisonRunLengths.java | 7 +- .../druid/frame/TestArrayCursorFactory.java | 37 +-- .../druid/frame/file/FrameFileTest.java | 155 ++++++++++--- .../frame/processor/SuperSorterTest.java | 219 +++++++++++------- .../frame/testutil/FrameSequenceBuilder.java | 16 +- .../druid/frame/testutil/FrameTestUtil.java | 27 ++- 6 files changed, 316 insertions(+), 145 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java b/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java index cc05dea993e5..ab6797a7c003 100644 --- a/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java +++ b/processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java @@ -88,11 +88,12 @@ public static RowKeyComparisonRunLengths create(final List keyColumns ); } - ColumnType columnType = rowSignature.getColumnType(keyColumn.columnName()) - .orElseThrow(() -> DruidException.defensive("Need column types")); + ColumnType columnType = + rowSignature.getColumnType(keyColumn.columnName()) + .orElseThrow(() -> DruidException.defensive("No type for column[%s]", keyColumn.columnName())); // First key column to be processed - if (runLengthEntryBuilders.size() == 0) { + if (runLengthEntryBuilders.isEmpty()) { final boolean isByteComparable = isByteComparable(columnType); runLengthEntryBuilders.add( new RunLengthEntryBuilder(isByteComparable, keyColumn.order()) diff --git a/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java b/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java index 2a6116bfddff..4bde3d31fe0c 100644 --- a/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java +++ b/processing/src/test/java/org/apache/druid/frame/TestArrayCursorFactory.java @@ -48,9 +48,12 @@ */ public class TestArrayCursorFactory extends QueryableIndexCursorFactory { + private final RowSignature signature; + public TestArrayCursorFactory(QueryableIndex index) { super(index); + this.signature = computeRowSignature(index); } @Override @@ -81,15 +84,31 @@ public void close() }; } - @Override public RowSignature getRowSignature() + { + return signature; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + final ColumnCapabilities ourType = getRowSignature().getColumnCapabilities(column); + if (ourType != null) { + return ColumnCapabilitiesImpl.copyOf(super.getColumnCapabilities(column)).setType(ourType.toColumnType()); + } else { + return super.getColumnCapabilities(column); + } + } + + private static RowSignature computeRowSignature(final QueryableIndex index) { final RowSignature.Builder builder = RowSignature.builder(); builder.addTimeColumn(); - for (final String column : super.getRowSignature().getColumnNames()) { - ColumnCapabilities columnCapabilities = super.getColumnCapabilities(column); + for (final String column : new QueryableIndexCursorFactory(index).getRowSignature().getColumnNames()) { + ColumnCapabilities columnCapabilities = index.getColumnCapabilities(column); ColumnType columnType = columnCapabilities == null ? null : columnCapabilities.toColumnType(); //change MV strings columns to Array if (columnType != null @@ -103,18 +122,6 @@ public RowSignature getRowSignature() return builder.build(); } - @Nullable - @Override - public ColumnCapabilities getColumnCapabilities(String column) - { - final ColumnCapabilities ourType = getRowSignature().getColumnCapabilities(column); - if (ourType != null) { - return ColumnCapabilitiesImpl.copyOf(super.getColumnCapabilities(column)).setType(ourType.toColumnType()); - } else { - return super.getColumnCapabilities(column); - } - } - private class DecoratedCursor implements Cursor { private final Cursor cursor; diff --git a/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java b/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java index 9c92eb14e3e5..c916a458564c 100644 --- a/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java +++ b/processing/src/test/java/org/apache/druid/frame/file/FrameFileTest.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; import org.apache.druid.testing.InitializedNullHandlingTest; import org.hamcrest.Matchers; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -49,17 +50,28 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.math.RoundingMode; +import java.nio.file.Files; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.stream.IntStream; @RunWith(Parameterized.class) public class FrameFileTest extends InitializedNullHandlingTest { + /** + * Static cache of generated frame files, to speed up tests. Cleared in {@link #afterClass()}. + */ + private static final Map FRAME_FILES = new HashMap<>(); + // Partition every 99 rows if "partitioned" is true. private static final int PARTITION_SIZE = 99; @@ -122,6 +134,7 @@ int getRowCount() }; abstract CursorFactory getCursorFactory(); + abstract int getRowCount(); } @@ -195,38 +208,21 @@ public void setUp() throws IOException { cursorFactory = adapterType.getCursorFactory(); rowCount = adapterType.getRowCount(); + file = temporaryFolder.newFile(); - if (partitioned) { - // Partition every PARTITION_SIZE rows. - file = FrameTestUtil.writeFrameFileWithPartitions( - FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames().map( - new Function>() - { - private int rows = 0; - - @Override - public IntObjectPair apply(final Frame frame) - { - final int partitionNum = rows / PARTITION_SIZE; - rows += frame.numRows(); - return IntObjectPair.of( - partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum, - frame - ); - } - } - ), - temporaryFolder.newFile() - ); - - } else { - file = FrameTestUtil.writeFrameFile( - FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames(), - temporaryFolder.newFile() - ); + try (final OutputStream out = Files.newOutputStream(file.toPath())) { + final FrameFileKey frameFileKey = new FrameFileKey(adapterType, frameType, maxRowsPerFrame, partitioned); + final byte[] frameFileBytes = FRAME_FILES.computeIfAbsent(frameFileKey, FrameFileTest::computeFrameFile); + out.write(frameFileBytes); } } + @AfterClass + public static void afterClass() + { + FRAME_FILES.clear(); + } + @Test public void test_numFrames() throws IOException { @@ -414,4 +410,107 @@ private static int countRows(final CursorFactory cursorFactory) return FrameTestUtil.readRowsFromCursorFactory(cursorFactory, RowSignature.empty(), false) .accumulate(0, (i, in) -> i + 1); } + + /** + * Returns bytes, in frame file format, corresponding to the given {@link FrameFileKey}. + */ + private static byte[] computeFrameFile(final FrameFileKey frameFileKey) + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try { + if (frameFileKey.partitioned) { + // Partition every PARTITION_SIZE rows. + FrameTestUtil.writeFrameFileWithPartitions( + FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory()) + .frameType(frameFileKey.frameType) + .maxRowsPerFrame(frameFileKey.maxRowsPerFrame) + .frames() + .map( + new Function>() + { + private int rows = 0; + + @Override + public IntObjectPair apply(final Frame frame) + { + final int partitionNum = rows / PARTITION_SIZE; + rows += frame.numRows(); + return IntObjectPair.of( + partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum, + frame + ); + } + } + ), + baos + ); + } else { + FrameTestUtil.writeFrameFile( + FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory()) + .frameType(frameFileKey.frameType) + .maxRowsPerFrame(frameFileKey.maxRowsPerFrame) + .frames(), + baos + ); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + + return baos.toByteArray(); + } + + /** + * Key for {@link #FRAME_FILES}, and input to {@link #computeFrameFile(FrameFileKey)}. + */ + private static class FrameFileKey + { + final AdapterType adapterType; + final FrameType frameType; + final int maxRowsPerFrame; + final boolean partitioned; + + public FrameFileKey(AdapterType adapterType, FrameType frameType, int maxRowsPerFrame, boolean partitioned) + { + this.adapterType = adapterType; + this.frameType = frameType; + this.maxRowsPerFrame = maxRowsPerFrame; + this.partitioned = partitioned; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FrameFileKey that = (FrameFileKey) o; + return maxRowsPerFrame == that.maxRowsPerFrame + && partitioned == that.partitioned + && adapterType == that.adapterType + && frameType == that.frameType; + } + + @Override + public int hashCode() + { + return Objects.hash(adapterType, frameType, maxRowsPerFrame, partitioned); + } + + @Override + public String toString() + { + return "FrameFileKey{" + + "adapterType=" + adapterType + + ", frameType=" + frameType + + ", maxRowsPerFrame=" + maxRowsPerFrame + + ", partitioned=" + partitioned + + '}'; + } + } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java index 2149d6cbf1c7..7a885af49c54 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java @@ -29,13 +29,7 @@ import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.channel.BlockingQueueFrameChannel; -import org.apache.druid.frame.channel.ByteTracker; -import org.apache.druid.frame.channel.ReadableFileFrameChannel; import org.apache.druid.frame.channel.ReadableFrameChannel; -import org.apache.druid.frame.channel.WritableFrameChannel; -import org.apache.druid.frame.channel.WritableFrameFileChannel; -import org.apache.druid.frame.file.FrameFile; -import org.apache.druid.frame.file.FrameFileWriter; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.ClusterByPartition; import org.apache.druid.frame.key.ClusterByPartitions; @@ -47,6 +41,7 @@ import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.testutil.FrameSequenceBuilder; import org.apache.druid.frame.testutil.FrameTestUtil; +import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -62,8 +57,10 @@ import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -73,12 +70,12 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -228,6 +225,15 @@ public void testLimitHint() throws Exception @RunWith(Parameterized.class) public static class ParameterizedCasesTest extends InitializedNullHandlingTest { + private static CursorFactory CURSOR_FACTORY; + private static RowSignature CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER; + + /** + * Static cache of sorted versions of the {@link #CURSOR_FACTORY} dataset, to speed up tests. + * Cleared in {@link #tearDownClass()}. + */ + private static final Map>> SORTED_TEST_ROWS = new HashMap<>(); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -241,7 +247,6 @@ public static class ParameterizedCasesTest extends InitializedNullHandlingTest private final boolean partitionsDeferred; private final long limitHint; - private CursorFactory cursorFactory; private RowSignature signature; private FrameProcessorExecutor exec; private List inputChannels; @@ -285,11 +290,12 @@ public static Iterable constructorFeeder() { final List constructors = new ArrayList<>(); - for (int maxRowsPerFrame : new int[]{Integer.MAX_VALUE, 50, 1}) { + // Add some constructors for testing maxRowsPerFrame > 1. Later on, we'll add some for maxRowsPerFrame = 1. + for (int maxRowsPerFrame : new int[]{Integer.MAX_VALUE, 50}) { for (int maxBytesPerFrame : new int[]{20_000, 2_000_000}) { for (int numChannels : new int[]{1, 3}) { - for (int maxActiveProcessors : new int[]{1, 2, 4}) { - for (int maxChannelsPerProcessor : new int[]{2, 3, 8}) { + for (int maxActiveProcessors : new int[]{1, 3}) { + for (int maxChannelsPerProcessor : new int[]{2, 7}) { for (int numThreads : new int[]{1, 3}) { for (boolean isComposedStorage : new boolean[]{true, false}) { for (boolean partitionsDeferred : new boolean[]{true, false}) { @@ -317,16 +323,51 @@ public static Iterable constructorFeeder() } } + // Add some constructors for testing maxRowsPerFrame = 1. This isn't part of the full matrix since it's quite + // slow, but we still want to exercise it a bit. + for (boolean isComposedStorage : new boolean[]{true, false}) { + for (long limitHint : new long[]{SuperSorter.UNLIMITED, 3, 1_000}) { + constructors.add( + new Object[]{ + 1 /* maxRowsPerFrame */, + 20_000 /* maxBytesPerFrame */, + 3 /* numChannels */, + 2 /* maxActiveProcessors */, + 3 /* maxChannelsPerProcessor */, + 1 /* numThreads */, + isComposedStorage, + false /* partitionsDeferred */, + limitHint + } + ); + } + } + return constructors; } + @BeforeClass + public static void setUpClass() + { + CURSOR_FACTORY = new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex()); + CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER = + FrameSequenceBuilder.signatureWithRowNumber(CURSOR_FACTORY.getRowSignature()); + } + + @AfterClass + public static void tearDownClass() + { + CURSOR_FACTORY = null; + CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER = null; + SORTED_TEST_ROWS.clear(); + } + @Before public void setUp() { exec = new FrameProcessorExecutor( MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads, getClass().getSimpleName() + "[%d]")) ); - cursorFactory = new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex()); } @After @@ -352,15 +393,15 @@ private void setUpInputChannels(final ClusterBy clusterBy) throws Exception } final FrameSequenceBuilder frameSequenceBuilder = - FrameSequenceBuilder.fromCursorFactory(cursorFactory) + FrameSequenceBuilder.fromCursorFactory(CURSOR_FACTORY) .maxRowsPerFrame(maxRowsPerFrame) .sortBy(clusterBy.getColumns()) .allocator(ArenaMemoryAllocator.create(ByteBuffer.allocate(maxBytesPerFrame))) .frameType(FrameType.ROW_BASED) .populateRowNumber(); - inputChannels = makeFileChannels(frameSequenceBuilder.frames(), temporaryFolder.newFolder(), numChannels); - signature = frameSequenceBuilder.signature(); + inputChannels = makeRoundRobinChannels(frameSequenceBuilder.frames(), numChannels); + signature = FrameWriters.sortableSignature(CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER, clusterBy.getColumns()); frameReader = FrameReader.create(signature); } @@ -411,7 +452,7 @@ private void verifySuperSorter( Assert.assertEquals(clusterByPartitions.size(), outputChannels.getAllChannels().size()); Assert.assertEquals(Double.valueOf(1.0), superSorterProgressTracker.snapshot().getProgressDigest()); - final int[] clusterByPartColumns = clusterBy.getColumns().stream().mapToInt( + final int[] clusterByColumns = clusterBy.getColumns().stream().mapToInt( part -> signature.indexOf(part.columnName()) ).toArray(); @@ -427,33 +468,36 @@ private void verifySuperSorter( frameReader ).forEach( row -> { - final Object[] array = new Object[clusterByPartColumns.length]; + final Object[] array = new Object[clusterByColumns.length]; for (int i = 0; i < array.length; i++) { - array[i] = row.get(clusterByPartColumns[i]); + array[i] = row.get(clusterByColumns[i]); } final RowKey key = createKey(clusterBy, array); - Assert.assertTrue( - StringUtils.format( - "Key %s >= partition %,d start %s", - keyReader.read(key), - partitionNumber, - partition.getStart() == null ? null : keyReader.read(partition.getStart()) - ), - partition.getStart() == null || keyComparator.compare(key, partition.getStart()) >= 0 - ); - - Assert.assertTrue( - StringUtils.format( - "Key %s < partition %,d end %s", - keyReader.read(key), - partitionNumber, - partition.getEnd() == null ? null : keyReader.read(partition.getEnd()) - ), - partition.getEnd() == null || keyComparator.compare(key, partition.getEnd()) < 0 - ); + if (!(partition.getStart() == null || keyComparator.compare(key, partition.getStart()) >= 0)) { + // Defer formatting of error message until it's actually needed + Assert.fail( + StringUtils.format( + "Key %s >= partition %,d start %s", + keyReader.read(key), + partitionNumber, + partition.getStart() == null ? null : keyReader.read(partition.getStart()) + ) + ); + } + + if (!(partition.getEnd() == null || keyComparator.compare(key, partition.getEnd()) < 0)) { + Assert.fail( + StringUtils.format( + "Key %s < partition %,d end %s", + keyReader.read(key), + partitionNumber, + partition.getEnd() == null ? null : keyReader.read(partition.getEnd()) + ) + ); + } readRows.add(row); } @@ -464,21 +508,9 @@ private void verifySuperSorter( MatcherAssert.assertThat(readRows.size(), Matchers.greaterThanOrEqualTo(Ints.checkedCast(limitHint))); } - final Sequence> expectedRows = Sequences.sort( - FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, true), - Comparator.comparing( - row -> { - final Object[] array = new Object[clusterByPartColumns.length]; - - for (int i = 0; i < array.length; i++) { - array[i] = row.get(clusterByPartColumns[i]); - } - - return createKey(clusterBy, array); - }, - keyComparator - ) - ).limit(limitHint == SuperSorter.UNLIMITED ? Long.MAX_VALUE : readRows.size()); + final Sequence> expectedRows = + Sequences.simple(getOrComputeSortedTestRows(clusterBy)) + .limit(limitHint == SuperSorter.UNLIMITED ? Long.MAX_VALUE : readRows.size()); FrameTestUtil.assertRowsEqual(expectedRows, Sequences.simple(readRows)); } @@ -724,29 +756,63 @@ private RowKey createKey(final ClusterBy clusterBy, final Object... objects) final RowSignature keySignature = KeyTestUtils.createKeySignature(clusterBy.getColumns(), signature); return KeyTestUtils.createKey(keySignature, objects); } + + /** + * Retrieve sorted test rows from {@link #SORTED_TEST_ROWS}, or else compute using + * {@link #computeSortedTestRows(ClusterBy)}. + */ + private static List> getOrComputeSortedTestRows(final ClusterBy clusterBy) + { + return SORTED_TEST_ROWS.computeIfAbsent(clusterBy, SuperSorterTest.ParameterizedCasesTest::computeSortedTestRows); + } + + /** + * Sort test rows from {@link TestIndex#getNoRollupMMappedTestIndex()} by the given {@link ClusterBy}. + */ + private static List> computeSortedTestRows(final ClusterBy clusterBy) + { + final QueryableIndexCursorFactory cursorFactory = + new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex()); + final RowSignature signature = + FrameWriters.sortableSignature( + FrameSequenceBuilder.signatureWithRowNumber(cursorFactory.getRowSignature()), + clusterBy.getColumns() + ); + final RowSignature keySignature = KeyTestUtils.createKeySignature(clusterBy.getColumns(), signature); + final int[] clusterByColumns = + clusterBy.getColumns().stream().mapToInt(part -> signature.indexOf(part.columnName())).toArray(); + final Comparator keyComparator = clusterBy.keyComparator(keySignature); + + return Sequences.sort( + FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, true), + Comparator.comparing( + row -> { + final Object[] array = new Object[clusterByColumns.length]; + + for (int i = 0; i < array.length; i++) { + array[i] = row.get(clusterByColumns[i]); + } + + return KeyTestUtils.createKey(keySignature, array); + }, + keyComparator + ) + ).toList(); + } } - private static List makeFileChannels( + /** + * Distribute frames round-robin to some number of channels. + */ + private static List makeRoundRobinChannels( final Sequence frames, - final File tmpDir, final int numChannels ) throws IOException { - final List files = new ArrayList<>(); - final List writableChannels = new ArrayList<>(); + final List channels = new ArrayList<>(numChannels); for (int i = 0; i < numChannels; i++) { - final File file = new File(tmpDir, StringUtils.format("channel-%d", i)); - files.add(file); - writableChannels.add( - new WritableFrameFileChannel( - FrameFileWriter.open( - Channels.newChannel(Files.newOutputStream(file.toPath())), - null, - ByteTracker.unboundedTracker() - ) - ) - ); + channels.add(new BlockingQueueFrameChannel(2000) /* enough even for 1 row per frame; dataset has < 2000 rows */); } frames.forEach( @@ -758,7 +824,7 @@ private static List makeFileChannels( public void accept(final Frame frame) { try { - writableChannels.get(i % writableChannels.size()).write(frame); + channels.get(i % channels.size()).writable().write(frame); } catch (IOException e) { throw new RuntimeException(e); @@ -771,20 +837,11 @@ public void accept(final Frame frame) final List retVal = new ArrayList<>(); - for (int i = 0; i < writableChannels.size(); i++) { - WritableFrameChannel writableChannel = writableChannels.get(i); - writableChannel.close(); - retVal.add(new ReadableFileFrameChannel(FrameFile.open(files.get(i), null))); + for (final BlockingQueueFrameChannel channel : channels) { + channel.writable().close(); + retVal.add(channel.readable()); } return retVal; } - - private static long countSequence(final Sequence sequence) - { - return sequence.accumulate( - 0L, - (accumulated, in) -> accumulated + 1 - ); - } } diff --git a/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java b/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java index d3fbffd7b4af..1cb6298b8b10 100644 --- a/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java +++ b/processing/src/test/java/org/apache/druid/frame/testutil/FrameSequenceBuilder.java @@ -67,6 +67,17 @@ public static FrameSequenceBuilder fromCursorFactory(final CursorFactory cursorF return new FrameSequenceBuilder(cursorFactory); } + /** + * Returns what {@link #signature()} would return if {@link #populateRowNumber()} is set. + */ + public static RowSignature signatureWithRowNumber(final RowSignature signature) + { + return RowSignature.builder() + .addAll(signature) + .add(FrameTestUtil.ROW_NUMBER_COLUMN, ColumnType.LONG) + .build(); + } + public FrameSequenceBuilder frameType(final FrameType frameType) { this.frameType = frameType; @@ -108,10 +119,7 @@ public RowSignature signature() final RowSignature baseSignature; if (populateRowNumber) { - baseSignature = RowSignature.builder() - .addAll(cursorFactory.getRowSignature()) - .add(FrameTestUtil.ROW_NUMBER_COLUMN, ColumnType.LONG) - .build(); + baseSignature = signatureWithRowNumber(cursorFactory.getRowSignature()); } else { baseSignature = cursorFactory.getRowSignature(); } diff --git a/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java b/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java index c75a57a86990..2bb8789740c4 100644 --- a/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java +++ b/processing/src/test/java/org/apache/druid/frame/testutil/FrameTestUtil.java @@ -56,9 +56,10 @@ import javax.annotation.Nullable; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.channels.Channels; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -79,12 +80,14 @@ private FrameTestUtil() public static File writeFrameFile(final Sequence frames, final File file) throws IOException { - try ( - final FileOutputStream fos = new FileOutputStream(file); - final FrameFileWriter writer = FrameFileWriter.open( - Channels.newChannel(fos), null, ByteTracker.unboundedTracker() - ) - ) { + writeFrameFile(frames, Files.newOutputStream(file.toPath())); + return file; + } + + public static void writeFrameFile(final Sequence frames, final OutputStream out) throws IOException + { + try (final FrameFileWriter writer = + FrameFileWriter.open(Channels.newChannel(out), null, ByteTracker.unboundedTracker())) { frames.forEach( frame -> { try { @@ -96,17 +99,15 @@ public static File writeFrameFile(final Sequence frames, final File file) } ); } - - return file; } - public static File writeFrameFileWithPartitions( + public static void writeFrameFileWithPartitions( final Sequence> framesWithPartitions, - final File file + final OutputStream out ) throws IOException { try (final FrameFileWriter writer = FrameFileWriter.open( - Channels.newChannel(new FileOutputStream(file)), + Channels.newChannel(out), null, ByteTracker.unboundedTracker() )) { @@ -121,8 +122,6 @@ public static File writeFrameFileWithPartitions( } ); } - - return file; } public static void assertRowsEqual(final Sequence> expected, final Sequence> actual)