diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java index bfb136cd7a8..219926a40e7 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -127,7 +127,7 @@ private void testEchoServer(int serverPort, } Assert.assertFalse(reader.loadNextBatch()); assertEquals(0, reader.getVectorSchemaRoot().getRowCount()); - assertEquals(reader.bytesRead() + 4, writer.bytesWritten()); + assertEquals(reader.bytesRead(), writer.bytesWritten()); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java index 936ab6de2a9..673cc6c898a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java @@ -30,6 +30,7 @@ import org.apache.arrow.vector.ipc.message.ArrowFooter; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.IpcOption; +import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,11 +75,10 @@ protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException @Override protected void endInternal(WriteChannel out) throws IOException { - if (option.write_legacy_ipc_format) { - out.writeIntLittleEndian(0); - } else { - out.writeLongLittleEndian(0); + if (!option.write_legacy_ipc_format) { + out.writeIntLittleEndian(MessageSerializer.IPC_CONTINUATION_TOKEN); } + out.writeIntLittleEndian(0); long footerStart = out.getCurrentPosition(); out.write(new ArrowFooter(schema, dictionaryBlocks, recordBlocks), false); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java index e74323b7569..ff0dfd396c6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java @@ -25,6 +25,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.message.IpcOption; +import org.apache.arrow.vector.ipc.message.MessageSerializer; /** * Writer for the Arrow stream format to send ArrowRecordBatches over a WriteChannel. @@ -68,18 +69,18 @@ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, Wri * Write an EOS identifier to the WriteChannel. * * @param out Open WriteChannel with an active Arrow stream. + * @param option IPC write option * @throws IOException on error */ - public void writeEndOfStream(WriteChannel out) throws IOException { - if (option.write_legacy_ipc_format) { - out.writeIntLittleEndian(0); - } else { - out.writeLongLittleEndian(0); + public static void writeEndOfStream(WriteChannel out, IpcOption option) throws IOException { + if (!option.write_legacy_ipc_format) { + out.writeIntLittleEndian(MessageSerializer.IPC_CONTINUATION_TOKEN); } + out.writeIntLittleEndian(0); } @Override protected void endInternal(WriteChannel out) throws IOException { - writeEndOfStream(out); + writeEndOfStream(out, option); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java index 2d36c93d581..eef36d3809d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java @@ -101,15 +101,6 @@ public long writeIntLittleEndian(int v) throws IOException { return write(outBuffer); } - /** - * Writes v in little-endian format to the underlying channel. - */ - public long writeLongLittleEndian(long v) throws IOException { - byte[] outBuffer = new byte[8]; - MessageSerializer.longToBytes(v, outBuffer); - return write(outBuffer); - } - /** * Writes the buffer to the underlying channel. */ diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java index 07f40176da7..422a63f57f7 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java @@ -156,6 +156,6 @@ public void pipeTest() throws IOException, InterruptedException { writer.join(); assertEquals(NUM_BATCHES, reader.getBatchesRead()); - assertEquals(writer.bytesWritten(), reader.bytesRead() + 4); + assertEquals(writer.bytesWritten(), reader.bytesRead()); } }