From fdab3dd7b9849d85ee65ca260b5039a0a3074868 Mon Sep 17 00:00:00 2001 From: Praveen Date: Wed, 29 May 2019 11:38:12 +0530 Subject: [PATCH 1/2] ARROW-3191: [Memory][Java] Introduce wrappers to keep backward compatibility. - Introduce some wrapper methods to reduce amount of client changes. - Changes were introduced as part of patch to support arrow buffers on random memory. --- .../main/java/io/netty/buffer/ArrowBuf.java | 41 +++++++++++++++++++ .../java/io/netty/buffer/NettyArrowBuf.java | 4 ++ 2 files changed, 45 insertions(+) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 2a1746fa60f..2fda890e732 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -1204,4 +1204,45 @@ public ArrowBuf reallocIfNeeded(final int size) { "Realloc is only available in the context of operator's UDFs"); } } + + /** + * Following are wrapper methods to keep this backward compatible. + */ + public void release() { + referenceManager.release(); + } + + public void release(int decrement) { + referenceManager.release(decrement); + } + + public void retain() { + referenceManager.retain(); + } + + public void retain(int increment) { + referenceManager.retain(increment); + } + + public ArrowBuf clear() { + this.readerIndex = this.writerIndex = 0; + return this; + } + + /** + * Initialize the reader and writer index. + * @param readerIndex index to read from + * @param writerIndex index to write to + * @return this + */ + public ArrowBuf setIndex(int readerIndex, int writerIndex) { + if (readerIndex >= 0 && readerIndex <= writerIndex && writerIndex <= this.capacity()) { + this.readerIndex = readerIndex; + this.writerIndex = writerIndex; + return this; + } else { + throw new IndexOutOfBoundsException(String.format("readerIndex: %d, writerIndex: %d " + + "(expected: 0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, this.capacity())); + } + } } diff --git a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java index 766b2b13e19..8d98ae7c4c0 100644 --- a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java @@ -76,6 +76,10 @@ public ByteBuf retain() { return this; } + public ArrowBuf arrowBuf() { + return arrowBuf; + } + @Override public ByteBuf retain(final int increment) { arrowBuf.getReferenceManager().retain(increment); From 389c1075db20f0010f4e0a339ee1480be6adb6e8 Mon Sep 17 00:00:00 2001 From: Praveen Date: Wed, 29 May 2019 14:16:31 +0530 Subject: [PATCH 2/2] Fix review comments and ci. --- .../memory/src/main/java/io/netty/buffer/ArrowBuf.java | 10 ++++++++-- .../src/main/java/io/netty/buffer/NettyArrowBuf.java | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 2fda890e732..c3b34b6fc5c 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -77,7 +77,7 @@ public final class ArrowBuf implements AutoCloseable { private int readerIndex; private int writerIndex; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? - new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null; + new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null; private volatile int length; /** @@ -1208,22 +1208,27 @@ public ArrowBuf reallocIfNeeded(final int size) { /** * Following are wrapper methods to keep this backward compatible. */ + @Deprecated public void release() { referenceManager.release(); } + @Deprecated public void release(int decrement) { referenceManager.release(decrement); } + @Deprecated public void retain() { referenceManager.retain(); } + @Deprecated public void retain(int increment) { referenceManager.retain(increment); } + @Deprecated public ArrowBuf clear() { this.readerIndex = this.writerIndex = 0; return this; @@ -1235,6 +1240,7 @@ public ArrowBuf clear() { * @param writerIndex index to write to * @return this */ + @Deprecated public ArrowBuf setIndex(int readerIndex, int writerIndex) { if (readerIndex >= 0 && readerIndex <= writerIndex && writerIndex <= this.capacity()) { this.readerIndex = readerIndex; @@ -1242,7 +1248,7 @@ public ArrowBuf setIndex(int readerIndex, int writerIndex) { return this; } else { throw new IndexOutOfBoundsException(String.format("readerIndex: %d, writerIndex: %d " + - "(expected: 0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, this.capacity())); + "(expected:0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, this.capacity())); } } } diff --git a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java index 8d98ae7c4c0..3c6bd5e4d08 100644 --- a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java @@ -355,12 +355,12 @@ public int getBytes(int index, FileChannel out, long position, int length) throw @Override public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { - throw new UnsupportedOperationException("Operation not supported"); + return (int) in.read(nioBuffers(index, length)); } @Override public int setBytes(int index, FileChannel in, long position, int length) throws IOException { - throw new UnsupportedOperationException("Operation not supported"); + return (int) in.read(nioBuffers(index, length)); } @Override