From 1231756103542b7184efa01d17df9643d6eb331d Mon Sep 17 00:00:00 2001 From: tianchen Date: Tue, 3 Dec 2019 17:05:35 +0800 Subject: [PATCH 1/2] ARROW-7284: [Java] ensure java implementation meets clarified dictionary spec --- .../apache/arrow/vector/ipc/ArrowReader.java | 27 ++ .../ipc/message/ArrowDictionaryBatch.java | 14 + .../vector/ipc/message/MessageSerializer.java | 4 +- .../vector/ipc/TestArrowReaderWriter.java | 252 +++++++++++++++--- 4 files changed, 264 insertions(+), 33 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java index cd4b47f29ee..d394ca2874d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java @@ -35,6 +35,7 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.DictionaryUtility; +import org.apache.arrow.vector.util.TransferPair; /** * Abstract class to read Schema and ArrowRecordBatches. @@ -219,6 +220,18 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { throw new IllegalArgumentException("Dictionary ID " + id + " not defined in schema"); } FieldVector vector = dictionary.getVector(); + // if is deltaVector, concat it with non-delta vector with the same ID. + if (dictionaryBatch.isDelta()) { + FieldVector deltaVector = vector.getField().createVector(allocator); + load(dictionaryBatch, deltaVector); + concatDeltaDictionary(vector, deltaVector); + return; + } + + load(dictionaryBatch, vector); + } + + private void load(ArrowDictionaryBatch dictionaryBatch, FieldVector vector) { VectorSchemaRoot root = new VectorSchemaRoot( Collections.singletonList(vector.getField()), Collections.singletonList(vector), 0); @@ -229,4 +242,18 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { dictionaryBatch.close(); } } + + /** + * Concat dictionary vector and delta dictionary vector. + */ + private void concatDeltaDictionary(FieldVector vector, FieldVector deltaVector) { + final int valueCount = vector.getValueCount(); + final int deltaValueCount = deltaVector.getValueCount(); + final TransferPair transferPair = deltaVector.makeTransferPair(vector); + for (int i = 0; i < deltaValueCount; i++) { + transferPair.copyValueSafe(i, valueCount + i); + } + deltaVector.close(); + vector.setValueCount(valueCount + deltaValueCount); + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java index 93efb85d8f3..cbe91c989ec 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java @@ -30,10 +30,23 @@ public class ArrowDictionaryBatch implements ArrowMessage { private final long dictionaryId; private final ArrowRecordBatch dictionary; + private final boolean isDelta; public ArrowDictionaryBatch(long dictionaryId, ArrowRecordBatch dictionary) { + this (dictionaryId, dictionary, false); + } + + /** + * Constructs new instance. + */ + public ArrowDictionaryBatch(long dictionaryId, ArrowRecordBatch dictionary, boolean isDelta) { this.dictionaryId = dictionaryId; this.dictionary = dictionary; + this.isDelta = isDelta; + } + + public boolean isDelta() { + return isDelta; } public byte getMessageType() { @@ -54,6 +67,7 @@ public int writeTo(FlatBufferBuilder builder) { DictionaryBatch.startDictionaryBatch(builder); DictionaryBatch.addId(builder, dictionaryId); DictionaryBatch.addData(builder, dataOffset); + DictionaryBatch.addIsDelta(builder, isDelta); return DictionaryBatch.endDictionaryBatch(builder); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 9de5950596f..49eaba19b18 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -491,7 +491,7 @@ public static ArrowDictionaryBatch deserializeDictionaryBatch(Message message, A throws IOException { DictionaryBatch dictionaryBatchFB = (DictionaryBatch) message.header(new DictionaryBatch()); ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), bodyBuffer); - return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch); + return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch, dictionaryBatchFB.isDelta()); } /** @@ -570,7 +570,7 @@ public static ArrowDictionaryBatch deserializeDictionaryBatch( final ArrowBuf body = buffer.slice(block.getMetadataLength(), (int) totalLen - block.getMetadataLength()); ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), body); - return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch); + return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch, dictionaryBatchFB.isDelta()); } /** diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java index dda402deba4..9226f8a64c9 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java @@ -35,12 +35,14 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.stream.Collectors; import org.apache.arrow.flatbuf.FieldNode; import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.util.Collections2; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; @@ -50,6 +52,7 @@ import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.compare.VectorEqualsVisitor; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryEncoder; import org.apache.arrow.vector.dictionary.DictionaryProvider; @@ -76,8 +79,13 @@ public class TestArrowReaderWriter { private BufferAllocator allocator; - private Dictionary dictionary; + private VarCharVector dictionaryVector1; + private VarCharVector dictionaryVector2; + private VarCharVector dictionaryVector3; + + private Dictionary dictionary1; private Dictionary dictionary2; + private Dictionary dictionary3; private Schema schema; private Schema encodedSchema; @@ -86,29 +94,43 @@ public class TestArrowReaderWriter { public void init() { allocator = new RootAllocator(Long.MAX_VALUE); - VarCharVector dictionary1Vector = newVarCharVector("D1", allocator); - dictionary1Vector.allocateNewSafe(); - dictionary1Vector.set(0, "foo".getBytes(StandardCharsets.UTF_8)); - dictionary1Vector.set(1, "bar".getBytes(StandardCharsets.UTF_8)); - dictionary1Vector.set(2, "baz".getBytes(StandardCharsets.UTF_8)); - dictionary1Vector.setValueCount(3); - - dictionary = new Dictionary(dictionary1Vector, new DictionaryEncoding(1L, false, null)); - - VarCharVector dictionary2Vector = newVarCharVector("D2", allocator); - dictionary2Vector.allocateNewSafe(); - dictionary2Vector.set(0, "aa".getBytes(StandardCharsets.UTF_8)); - dictionary2Vector.set(1, "bb".getBytes(StandardCharsets.UTF_8)); - dictionary2Vector.set(2, "cc".getBytes(StandardCharsets.UTF_8)); - dictionary2Vector.setValueCount(3); - - dictionary2 = new Dictionary(dictionary2Vector, new DictionaryEncoding(2L, false, null)); + dictionaryVector1 = newVarCharVector("D1", allocator); + dictionaryVector1.allocateNewSafe(); + dictionaryVector1.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + dictionaryVector1.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + dictionaryVector1.set(2, "baz".getBytes(StandardCharsets.UTF_8)); + dictionaryVector1.setValueCount(3); + + dictionaryVector2 = newVarCharVector("D2", allocator); + dictionaryVector2.allocateNewSafe(); + dictionaryVector2.set(0, "aa".getBytes(StandardCharsets.UTF_8)); + dictionaryVector2.set(1, "bb".getBytes(StandardCharsets.UTF_8)); + dictionaryVector2.set(2, "cc".getBytes(StandardCharsets.UTF_8)); + dictionaryVector2.setValueCount(3); + + dictionaryVector3 = newVarCharVector("D3", allocator); + dictionaryVector3.allocateNewSafe(); + dictionaryVector3.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + dictionaryVector3.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + dictionaryVector3.set(2, "baz".getBytes(StandardCharsets.UTF_8)); + dictionaryVector3.set(3, "aa".getBytes(StandardCharsets.UTF_8)); + dictionaryVector3.set(4, "bb".getBytes(StandardCharsets.UTF_8)); + dictionaryVector3.set(5, "cc".getBytes(StandardCharsets.UTF_8)); + dictionaryVector3.setValueCount(6); + + dictionary1 = + new Dictionary(dictionaryVector1, new DictionaryEncoding(1L, false, null)); + dictionary2 = + new Dictionary(dictionaryVector2, new DictionaryEncoding(2L, false, null)); + dictionary3 = + new Dictionary(dictionaryVector3, new DictionaryEncoding(1L, false, null)); } @After public void terminate() throws Exception { - dictionary.getVector().close(); - dictionary2.getVector().close(); + dictionaryVector1.close(); + dictionaryVector2.close(); + dictionaryVector3.close(); allocator.close(); } @@ -232,7 +254,7 @@ public void testWriteReadNullVector() throws IOException { @Test public void testWriteReadWithDictionaries() throws IOException { DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); - provider.put(dictionary); + provider.put(dictionary1); VarCharVector vector1 = newVarCharVector("varchar1", allocator); vector1.allocateNewSafe(); @@ -242,7 +264,7 @@ public void testWriteReadWithDictionaries() throws IOException { vector1.set(4, "bar".getBytes(StandardCharsets.UTF_8)); vector1.set(5, "baz".getBytes(StandardCharsets.UTF_8)); vector1.setValueCount(6); - FieldVector encodedVector1 = (FieldVector) DictionaryEncoder.encode(vector1, dictionary); + FieldVector encodedVector1 = (FieldVector) DictionaryEncoder.encode(vector1, dictionary1); vector1.close(); VarCharVector vector2 = newVarCharVector("varchar2", allocator); @@ -254,7 +276,7 @@ public void testWriteReadWithDictionaries() throws IOException { vector2.set(4, "foo".getBytes(StandardCharsets.UTF_8)); vector2.set(5, "bar".getBytes(StandardCharsets.UTF_8)); vector2.setValueCount(6); - FieldVector encodedVector2 = (FieldVector) DictionaryEncoder.encode(vector2, dictionary); + FieldVector encodedVector2 = (FieldVector) DictionaryEncoder.encode(vector2, dictionary1); vector2.close(); List fields = Arrays.asList(encodedVector1.getField(), encodedVector2.getField()); @@ -285,7 +307,7 @@ public void testWriteReadWithDictionaries() throws IOException { public void testEmptyStreamInFileIPC() throws IOException { DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); - provider.put(dictionary); + provider.put(dictionary1); VarCharVector vector = newVarCharVector("varchar", allocator); vector.allocateNewSafe(); @@ -296,7 +318,7 @@ public void testEmptyStreamInFileIPC() throws IOException { vector.set(5, "baz".getBytes(StandardCharsets.UTF_8)); vector.setValueCount(6); - FieldVector encodedVector1A = (FieldVector) DictionaryEncoder.encode(vector, dictionary); + FieldVector encodedVector1A = (FieldVector) DictionaryEncoder.encode(vector, dictionary1); vector.close(); List fields = Arrays.asList(encodedVector1A.getField()); @@ -326,7 +348,7 @@ public void testEmptyStreamInFileIPC() throws IOException { public void testEmptyStreamInStreamingIPC() throws IOException { DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); - provider.put(dictionary); + provider.put(dictionary1); VarCharVector vector = newVarCharVector("varchar", allocator); vector.allocateNewSafe(); @@ -337,7 +359,7 @@ public void testEmptyStreamInStreamingIPC() throws IOException { vector.set(5, "baz".getBytes(StandardCharsets.UTF_8)); vector.setValueCount(6); - FieldVector encodedVector = (FieldVector) DictionaryEncoder.encode(vector, dictionary); + FieldVector encodedVector = (FieldVector) DictionaryEncoder.encode(vector, dictionary1); vector.close(); List fields = Arrays.asList(encodedVector.getField()); @@ -361,6 +383,174 @@ public void testEmptyStreamInStreamingIPC() throws IOException { } + @Test + public void testDictionaryReplacement() throws Exception { + VarCharVector vector1 = newVarCharVector("varchar1", allocator); + vector1.allocateNewSafe(); + vector1.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + vector1.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + vector1.set(2, "baz".getBytes(StandardCharsets.UTF_8)); + vector1.set(3, "bar".getBytes(StandardCharsets.UTF_8)); + vector1.setValueCount(4); + + FieldVector encodedVector1 = (FieldVector) DictionaryEncoder.encode(vector1, dictionary1); + + VarCharVector vector2 = newVarCharVector("varchar2", allocator); + vector2.allocateNewSafe(); + vector2.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + vector2.set(1, "foo".getBytes(StandardCharsets.UTF_8)); + vector2.set(2, "foo".getBytes(StandardCharsets.UTF_8)); + vector2.set(3, "foo".getBytes(StandardCharsets.UTF_8)); + vector2.setValueCount(4); + + FieldVector encodedVector2 = (FieldVector) DictionaryEncoder.encode(vector2, dictionary1); + + DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); + provider.put(dictionary1); + List schemaFields = new ArrayList<>(); + schemaFields.add(DictionaryUtility.toMessageFormat(encodedVector1.getField(), provider, new HashSet<>())); + schemaFields.add(DictionaryUtility.toMessageFormat(encodedVector2.getField(), provider, new HashSet<>())); + Schema schema = new Schema(schemaFields); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + WriteChannel out = new WriteChannel(newChannel(outStream)); + + // write schema + MessageSerializer.serialize(out, schema); + + List closeableList = new ArrayList<>(); + + // write non-delta dictionary with id=1 + serializeDictionaryBatch(out, dictionary3, false, closeableList); + + // write non-delta dictionary with id=1 + serializeDictionaryBatch(out, dictionary1, false, closeableList); + + // write recordBatch2 + serializeRecordBatch(out, Arrays.asList(encodedVector1, encodedVector2), closeableList); + + // write eos + out.writeIntLittleEndian(0); + + try (ArrowStreamReader reader = new ArrowStreamReader( + new ByteArrayReadableSeekableByteChannel(outStream.toByteArray()), allocator)) { + assertEquals(1, reader.getDictionaryVectors().size()); + assertTrue(reader.loadNextBatch()); + FieldVector dictionaryVector = reader.getDictionaryVectors().get(1L).getVector(); + // make sure the delta dictionary is concatenated. + assertTrue(VectorEqualsVisitor.vectorEquals(dictionaryVector, dictionaryVector1)); + assertFalse(reader.loadNextBatch()); + } + + vector1.close(); + vector2.close(); + AutoCloseables.close(closeableList); + } + + @Test + public void testDeltaDictionary() throws Exception { + VarCharVector vector1 = newVarCharVector("varchar1", allocator); + vector1.allocateNewSafe(); + vector1.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + vector1.set(1, "bar".getBytes(StandardCharsets.UTF_8)); + vector1.set(2, "baz".getBytes(StandardCharsets.UTF_8)); + vector1.set(3, "bar".getBytes(StandardCharsets.UTF_8)); + vector1.setValueCount(4); + + FieldVector encodedVector1 = (FieldVector) DictionaryEncoder.encode(vector1, dictionary1); + + VarCharVector vector2 = newVarCharVector("varchar2", allocator); + vector2.allocateNewSafe(); + vector2.set(0, "foo".getBytes(StandardCharsets.UTF_8)); + vector2.set(1, "aa".getBytes(StandardCharsets.UTF_8)); + vector2.set(2, "bb".getBytes(StandardCharsets.UTF_8)); + vector2.set(3, "cc".getBytes(StandardCharsets.UTF_8)); + vector2.setValueCount(4); + + FieldVector encodedVector2 = (FieldVector) DictionaryEncoder.encode(vector2, dictionary3); + + DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); + provider.put(dictionary1); + provider.put(dictionary3); + List schemaFields = new ArrayList<>(); + schemaFields.add(DictionaryUtility.toMessageFormat(encodedVector1.getField(), provider, new HashSet<>())); + schemaFields.add(DictionaryUtility.toMessageFormat(encodedVector2.getField(), provider, new HashSet<>())); + Schema schema = new Schema(schemaFields); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + WriteChannel out = new WriteChannel(newChannel(outStream)); + + // write schema + MessageSerializer.serialize(out, schema); + + List closeableList = new ArrayList<>(); + + // write non-delta dictionary with id=1 + serializeDictionaryBatch(out, dictionary1, false, closeableList); + + // write delta dictionary with id=1 + Dictionary deltaDictionary = + new Dictionary(dictionaryVector2, new DictionaryEncoding(1L, false, null)); + serializeDictionaryBatch(out, deltaDictionary, true, closeableList); + deltaDictionary.getVector().close(); + + // write recordBatch2 + serializeRecordBatch(out, Arrays.asList(encodedVector1, encodedVector2), closeableList); + + // write eos + out.writeIntLittleEndian(0); + + try (ArrowStreamReader reader = new ArrowStreamReader( + new ByteArrayReadableSeekableByteChannel(outStream.toByteArray()), allocator)) { + assertEquals(1, reader.getDictionaryVectors().size()); + assertTrue(reader.loadNextBatch()); + FieldVector dictionaryVector = reader.getDictionaryVectors().get(1L).getVector(); + // make sure the delta dictionary is concatenated. + assertTrue(VectorEqualsVisitor.vectorEquals(dictionaryVector, dictionaryVector3)); + assertFalse(reader.loadNextBatch()); + } + + vector1.close(); + vector2.close(); + AutoCloseables.close(closeableList); + + } + + private void serializeDictionaryBatch( + WriteChannel out, + Dictionary dictionary, + boolean isDelta, + List closeables) throws IOException { + + FieldVector dictVector = dictionary.getVector(); + VectorSchemaRoot root = new VectorSchemaRoot( + Collections.singletonList(dictVector.getField()), + Collections.singletonList(dictVector), + dictVector.getValueCount()); + ArrowDictionaryBatch batch = + new ArrowDictionaryBatch(dictionary.getEncoding().getId(), new VectorUnloader(root).getRecordBatch(), isDelta); + MessageSerializer.serialize(out, batch); + closeables.add(batch); + closeables.add(root); + } + + private void serializeRecordBatch( + WriteChannel out, + List vectors, + List closeables) throws IOException { + + List fields = vectors.stream().map(v -> v.getField()).collect(Collectors.toList()); + VectorSchemaRoot root = new VectorSchemaRoot( + fields, + vectors, + vectors.get(0).getValueCount()); + VectorUnloader unloader = new VectorUnloader(root); + ArrowRecordBatch batch = unloader.getRecordBatch(); + MessageSerializer.serialize(out, batch); + closeables.add(batch); + closeables.add(root); + } + @Test public void testReadInterleavedData() throws IOException { List batches = createRecordBatches(); @@ -372,7 +562,7 @@ public void testReadInterleavedData() throws IOException { MessageSerializer.serialize(out, schema); // write dictionary1 - FieldVector dictVector1 = dictionary.getVector(); + FieldVector dictVector1 = dictionary1.getVector(); VectorSchemaRoot dictRoot1 = new VectorSchemaRoot( Collections.singletonList(dictVector1.getField()), Collections.singletonList(dictVector1), @@ -421,7 +611,7 @@ private List createRecordBatches() { List batches = new ArrayList<>(); DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); - provider.put(dictionary); + provider.put(dictionary1); provider.put(dictionary2); VarCharVector vectorA1 = newVarCharVector("varcharA1", allocator); @@ -435,7 +625,7 @@ private List createRecordBatches() { VarCharVector vectorA2 = newVarCharVector("varcharA2", allocator); vectorA2.setValueCount(6); - FieldVector encodedVectorA1 = (FieldVector) DictionaryEncoder.encode(vectorA1, dictionary); + FieldVector encodedVectorA1 = (FieldVector) DictionaryEncoder.encode(vectorA1, dictionary1); vectorA1.close(); FieldVector encodedVectorA2 = (FieldVector) DictionaryEncoder.encode(vectorA1, dictionary2); vectorA2.close(); @@ -459,7 +649,7 @@ private List createRecordBatches() { vectorB2.set(4, "bb".getBytes(StandardCharsets.UTF_8)); vectorB2.set(5, "cc".getBytes(StandardCharsets.UTF_8)); vectorB2.setValueCount(6); - FieldVector encodedVectorB1 = (FieldVector) DictionaryEncoder.encode(vectorB1, dictionary); + FieldVector encodedVectorB1 = (FieldVector) DictionaryEncoder.encode(vectorB1, dictionary1); vectorB1.close(); FieldVector encodedVectorB2 = (FieldVector) DictionaryEncoder.encode(vectorB2, dictionary2); vectorB2.close(); From 16a609d8495766946661cc5de2c44d3e6930df5b Mon Sep 17 00:00:00 2001 From: tianchen Date: Fri, 10 Jan 2020 18:11:40 +0800 Subject: [PATCH 2/2] resolve some comments --- .../ipc/message/ArrowDictionaryBatch.java | 1 + .../vector/ipc/TestArrowReaderWriter.java | 92 +++++++++---------- 2 files changed, 44 insertions(+), 49 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java index cbe91c989ec..b7298a6820d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java @@ -32,6 +32,7 @@ public class ArrowDictionaryBatch implements ArrowMessage { private final ArrowRecordBatch dictionary; private final boolean isDelta; + @Deprecated public ArrowDictionaryBatch(long dictionaryId, ArrowRecordBatch dictionary) { this (dictionaryId, dictionary, false); } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java index 9226f8a64c9..5284170b99b 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java @@ -20,6 +20,7 @@ import static java.nio.channels.Channels.newChannel; import static java.util.Arrays.asList; import static org.apache.arrow.vector.TestUtils.newVarCharVector; +import static org.apache.arrow.vector.testing.ValueVectorDataPopulator.setVector; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -95,35 +96,32 @@ public void init() { allocator = new RootAllocator(Long.MAX_VALUE); dictionaryVector1 = newVarCharVector("D1", allocator); - dictionaryVector1.allocateNewSafe(); - dictionaryVector1.set(0, "foo".getBytes(StandardCharsets.UTF_8)); - dictionaryVector1.set(1, "bar".getBytes(StandardCharsets.UTF_8)); - dictionaryVector1.set(2, "baz".getBytes(StandardCharsets.UTF_8)); - dictionaryVector1.setValueCount(3); + setVector(dictionaryVector1, + "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8), + "baz".getBytes(StandardCharsets.UTF_8)); dictionaryVector2 = newVarCharVector("D2", allocator); - dictionaryVector2.allocateNewSafe(); - dictionaryVector2.set(0, "aa".getBytes(StandardCharsets.UTF_8)); - dictionaryVector2.set(1, "bb".getBytes(StandardCharsets.UTF_8)); - dictionaryVector2.set(2, "cc".getBytes(StandardCharsets.UTF_8)); - dictionaryVector2.setValueCount(3); + setVector(dictionaryVector2, + "aa".getBytes(StandardCharsets.UTF_8), + "bb".getBytes(StandardCharsets.UTF_8), + "cc".getBytes(StandardCharsets.UTF_8)); dictionaryVector3 = newVarCharVector("D3", allocator); - dictionaryVector3.allocateNewSafe(); - dictionaryVector3.set(0, "foo".getBytes(StandardCharsets.UTF_8)); - dictionaryVector3.set(1, "bar".getBytes(StandardCharsets.UTF_8)); - dictionaryVector3.set(2, "baz".getBytes(StandardCharsets.UTF_8)); - dictionaryVector3.set(3, "aa".getBytes(StandardCharsets.UTF_8)); - dictionaryVector3.set(4, "bb".getBytes(StandardCharsets.UTF_8)); - dictionaryVector3.set(5, "cc".getBytes(StandardCharsets.UTF_8)); - dictionaryVector3.setValueCount(6); - - dictionary1 = - new Dictionary(dictionaryVector1, new DictionaryEncoding(1L, false, null)); - dictionary2 = - new Dictionary(dictionaryVector2, new DictionaryEncoding(2L, false, null)); - dictionary3 = - new Dictionary(dictionaryVector3, new DictionaryEncoding(1L, false, null)); + setVector(dictionaryVector3, + "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8), + "baz".getBytes(StandardCharsets.UTF_8), + "aa".getBytes(StandardCharsets.UTF_8), + "bb".getBytes(StandardCharsets.UTF_8), + "cc".getBytes(StandardCharsets.UTF_8)); + + dictionary1 = new Dictionary(dictionaryVector1, + new DictionaryEncoding(/*id=*/1L, /*ordered=*/false, /*indexType=*/null)); + dictionary2 = new Dictionary(dictionaryVector2, + new DictionaryEncoding(/*id=*/2L, /*ordered=*/false, /*indexType=*/null)); + dictionary3 = new Dictionary(dictionaryVector3, + new DictionaryEncoding(/*id=*/1L, /*ordered=*/false, /*indexType=*/null)); } @After @@ -386,22 +384,20 @@ public void testEmptyStreamInStreamingIPC() throws IOException { @Test public void testDictionaryReplacement() throws Exception { VarCharVector vector1 = newVarCharVector("varchar1", allocator); - vector1.allocateNewSafe(); - vector1.set(0, "foo".getBytes(StandardCharsets.UTF_8)); - vector1.set(1, "bar".getBytes(StandardCharsets.UTF_8)); - vector1.set(2, "baz".getBytes(StandardCharsets.UTF_8)); - vector1.set(3, "bar".getBytes(StandardCharsets.UTF_8)); - vector1.setValueCount(4); + setVector(vector1, + "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8), + "baz".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8)); FieldVector encodedVector1 = (FieldVector) DictionaryEncoder.encode(vector1, dictionary1); VarCharVector vector2 = newVarCharVector("varchar2", allocator); - vector2.allocateNewSafe(); - vector2.set(0, "foo".getBytes(StandardCharsets.UTF_8)); - vector2.set(1, "foo".getBytes(StandardCharsets.UTF_8)); - vector2.set(2, "foo".getBytes(StandardCharsets.UTF_8)); - vector2.set(3, "foo".getBytes(StandardCharsets.UTF_8)); - vector2.setValueCount(4); + setVector(vector2, + "foo".getBytes(StandardCharsets.UTF_8), + "foo".getBytes(StandardCharsets.UTF_8), + "foo".getBytes(StandardCharsets.UTF_8), + "foo".getBytes(StandardCharsets.UTF_8)); FieldVector encodedVector2 = (FieldVector) DictionaryEncoder.encode(vector2, dictionary1); @@ -450,22 +446,20 @@ public void testDictionaryReplacement() throws Exception { @Test public void testDeltaDictionary() throws Exception { VarCharVector vector1 = newVarCharVector("varchar1", allocator); - vector1.allocateNewSafe(); - vector1.set(0, "foo".getBytes(StandardCharsets.UTF_8)); - vector1.set(1, "bar".getBytes(StandardCharsets.UTF_8)); - vector1.set(2, "baz".getBytes(StandardCharsets.UTF_8)); - vector1.set(3, "bar".getBytes(StandardCharsets.UTF_8)); - vector1.setValueCount(4); + setVector(vector1, + "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8), + "baz".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8)); FieldVector encodedVector1 = (FieldVector) DictionaryEncoder.encode(vector1, dictionary1); VarCharVector vector2 = newVarCharVector("varchar2", allocator); - vector2.allocateNewSafe(); - vector2.set(0, "foo".getBytes(StandardCharsets.UTF_8)); - vector2.set(1, "aa".getBytes(StandardCharsets.UTF_8)); - vector2.set(2, "bb".getBytes(StandardCharsets.UTF_8)); - vector2.set(3, "cc".getBytes(StandardCharsets.UTF_8)); - vector2.setValueCount(4); + setVector(vector2, + "foo".getBytes(StandardCharsets.UTF_8), + "aa".getBytes(StandardCharsets.UTF_8), + "bb".getBytes(StandardCharsets.UTF_8), + "cc".getBytes(StandardCharsets.UTF_8)); FieldVector encodedVector2 = (FieldVector) DictionaryEncoder.encode(vector2, dictionary3);