From c225d244d94fc4bc8d4cd5f20916f48e61b3f1e9 Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Mon, 28 Sep 2020 11:45:20 +0200 Subject: [PATCH 1/7] Utility to add context to exception messages Utility to make it easier to add context to exception messages. Avoids having to do custom formatting. You just add keyvalues to a builder. Use like: ``` exMsg("something failed").kv("filename", fn).kv("errno", errno).toString() ``` (cherry picked from commit 1dd031d98497e5832da96385806262f99b7b4fb0) --- .../common/util/ExceptionMessageHelper.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java new file mode 100644 index 00000000000..9fa925d8038 --- /dev/null +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.common.util; + +/** + * Utility to make it easier to add context to exception messages. + */ +public class ExceptionMessageHelper { + public StringBuilder sb = new StringBuilder(); + private boolean firstKV = true; + + public static ExceptionMessageHelper exMsg(String msg) { + return new ExceptionMessageHelper(msg); + } + + ExceptionMessageHelper(String msg) { + sb.append(msg).append("("); + } + + public ExceptionMessageHelper kv(String key, Object value) { + if (firstKV) { + firstKV = false; + } else { + sb.append(","); + } + sb.append(key).append("=").append(value.toString()); + return this; + } + + public String toString() { + return sb.append(")").toString(); + } +} From 159b1328e23ecf340400fd1a5df9587b57590d82 Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Mon, 28 Sep 2020 20:51:08 +0200 Subject: [PATCH 2/7] Aligned native buffer wrapper A utility buffer class to be used with JNA calls. Buffers are page aligned (4k pages). The wrapper mostly handles writes between ByteBuffers and ByteBufs. It also provides a method for padding the buffer to the next alignment, so writes can have an aligned size also (as required by direct I/O). The padding is done with 0xF0, so that if it is read as an integer, or long, the value will be negative (assuming the read is a java read, and thus an signed int). (cherry picked from commit 9c0e02647ef10df1f81b8a563d5f82925894307d) --- .../storage/directentrylogger/Buffer.java | 270 ++++++++++++++++++ .../storage/directentrylogger/BufferPool.java | 69 +++++ .../directentrylogger/package-info.java | 22 ++ .../storage/directentrylogger/TestBuffer.java | 210 ++++++++++++++ 4 files changed, 571 insertions(+) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java new file mode 100644 index 00000000000..03a9342f085 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java @@ -0,0 +1,270 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ReferenceCountUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import org.apache.bookkeeper.common.util.nativeio.NativeIO; + +/** + * A utility buffer class to be used with native calls. + *

+ * Buffers are page aligned (4k pages). + *

+ * The wrapper mostly handles writes between ByteBuffers and + * ByteBufs. It also provides a method for padding the buffer to the next + * alignment, so writes can have an aligned size also (as required by + * direct I/O). The padding is done with 0xF0, so that if it is read as + * an integer, or long, the value will be negative (assuming the read is + * a java read, and thus an signed int). + */ +class Buffer { + /* Padding byte must have MSB set, so if read at the start + * of an integer or long, the returned value is negative. */ + public static final byte PADDING_BYTE = (byte) 0xF0; + + /* Some machines can live with 512 alignment, but others + * appear to require 4096, so go with 4096, which is page + * alignment */ + public static final int ALIGNMENT = 4096; + private static final int MAX_ALIGNMENT = Integer.MAX_VALUE & ~(ALIGNMENT - 1); + static final byte[] PADDING = generatePadding(); + + final NativeIO nativeIO; + final int bufferSize; + ByteBuf buffer; + ByteBuffer byteBuffer; + long pointer = 0; + + Buffer(NativeIO nativeIO, int bufferSize) throws IOException { + checkArgument(isAligned(bufferSize), + "Buffer size not aligned %d", bufferSize); + + this.buffer = allocateAligned(ALIGNMENT, bufferSize); + + this.nativeIO = nativeIO; + this.bufferSize = bufferSize; + byteBuffer = buffer.nioBuffer(0, bufferSize); + byteBuffer.order(ByteOrder.BIG_ENDIAN); + } + + private ByteBuf allocateAligned(int alignment, int bufferSize) { + ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize + alignment); + long addr = buf.memoryAddress(); + if ((addr & (alignment - 1)) == 0) { + // The address is already aligned + pointer = addr; + return buf.slice(0, bufferSize); + } else { + int alignOffset = (int) (alignment - (addr & (alignment - 1))); + pointer = addr + alignOffset; + return buf.slice(alignOffset, bufferSize); + } + } + + /** + * @return whether there is space in the buffer for size bytes. + */ + boolean hasSpace(int size) throws IOException { + if (size > bufferSize) { + throw new IOException(exMsg("Write too large").kv("writeSize", size) + .kv("maxSize", bufferSize).toString()); + } + return byteBuffer.remaining() >= size; + } + + /** + * @return whether the buffer can honour a read of size at offset. + */ + boolean hasData(int offset, int size) { + return offset + size <= bufferSize; + } + + /** + * Write an integer to buffer. Progresses the position of the buffer by 4 bytes. + */ + void writeInt(int value) throws IOException { + byteBuffer.putInt(value); + } + + /** + * Write a bytebuf to this buffer. Progresses the position of the buffer by the + * number of readable bytes of the bytebuf. Progresses the readerIndex of the passed + * bytebuf by the number of bytes read (i.e. to the end). + */ + void writeByteBuf(ByteBuf bytebuf) throws IOException { + int bytesWritten = bytebuf.readableBytes(); + ByteBuffer bytesToPut = bytebuf.nioBuffer(); + byteBuffer.put(bytesToPut); + bytebuf.skipBytes(bytesWritten); + } + + /** + * Read an integer from the buffer at the given offset. The offset is in bytes. + */ + int readInt(int offset) throws IOException { + if (!hasData(offset, Integer.BYTES)) { + throw new IOException(exMsg("Buffer cannot satify int read") + .kv("offset", offset) + .kv("bufferSize", bufferSize).toString()); + } + try { + return byteBuffer.getInt(offset); + } catch (Exception e) { + throw new IOException(exMsg("Error reading int") + .kv("byteBuffer", byteBuffer.toString()) + .kv("offset", offset) + .kv("bufferSize", bufferSize).toString(), e); + } + } + + /** + * Read a long from the buffer at the given offset. The offset is in bytes. + */ + long readLong(int offset) throws IOException { + if (!hasData(offset, Long.BYTES)) { + throw new IOException(exMsg("Buffer cannot satify long read") + .kv("offset", offset) + .kv("bufferSize", bufferSize).toString()); + } + try { + return byteBuffer.getLong(offset); + } catch (Exception e) { + throw new IOException(exMsg("Error reading long") + .kv("byteBuffer", byteBuffer.toString()) + .kv("offset", offset) + .kv("bufferSize", bufferSize).toString(), e); + } + } + + /** + * Read a bytebuf of size from the buffer at the given offset. + * If there are not enough bytes in the buffer to satify the read, some of the bytes are read + * into the byte buffer and the number of bytes read is returned. + */ + int readByteBuf(ByteBuf buffer, int offset, int size) throws IOException { + int originalLimit = byteBuffer.limit(); + byteBuffer.position(offset); + int bytesToRead = Math.min(size, byteBuffer.capacity() - offset); + byteBuffer.limit(offset + bytesToRead); + try { + buffer.writeBytes(byteBuffer); + } catch (Exception e) { + throw new IOException(exMsg("Error reading buffer") + .kv("byteBuffer", byteBuffer.toString()) + .kv("offset", offset).kv("size", size) + .kv("bufferSize", bufferSize).toString(), e); + } finally { + byteBuffer.limit(originalLimit); + } + return bytesToRead; + } + + /** + * The data pointer object for the native buffer. This can be used + * by JNI method which take a char* or void*. + */ + long pointer() { + return pointer; + } + + long pointer(long offset, long expectedWrite) { + if (offset == 0) { + return pointer; + } else { + if (offset + expectedWrite > byteBuffer.capacity()) { + throw new IllegalArgumentException( + exMsg("Buffer overflow").kv("offset", offset).kv("expectedWrite", expectedWrite) + .kv("capacity", byteBuffer.capacity()).toString()); + } + + return pointer + offset; + } + } + /** + * @return the number of bytes which have been written to this buffer. + */ + int position() { + return byteBuffer.position(); + } + + /** + * @return the size of the buffer (i.e. the max number of bytes writable, or the max offset readable) + */ + int size() { + return bufferSize; + } + + /** + * Pad the buffer to the next alignment position. + * @return the position of the next alignment. This should be used as the size argument to make aligned writes. + */ + int padToAlignment() { + int bufferPos = byteBuffer.position(); + int nextAlignment = nextAlignment(bufferPos); + byteBuffer.put(PADDING, 0, nextAlignment - bufferPos); + return nextAlignment; + } + + /** + * Clear the bytes written. This doesn't actually destroy the data, but moves the position back to the start of + * the buffer. + */ + void reset() { + byteBuffer.clear(); + } + + /** + * Free the memory that backs this buffer. + */ + void free() { + ReferenceCountUtil.safeRelease(buffer); + buffer = null; + byteBuffer = null; + } + private static byte[] generatePadding() { + byte[] padding = new byte[ALIGNMENT]; + Arrays.fill(padding, (byte) PADDING_BYTE); + return padding; + } + + static boolean isAligned(long size) { + return size >= 0 && ((ALIGNMENT - 1) & size) == 0; + } + + static int nextAlignment(int pos) { + checkArgument(pos <= MAX_ALIGNMENT, + "position (0x%x) must be lower or equal to max alignment (0x%x)", + pos, MAX_ALIGNMENT); + checkArgument(pos >= 0, "position (0x%x) must be positive", pos); + return (pos + (ALIGNMENT - 1)) & ~(ALIGNMENT - 1); + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java new file mode 100644 index 00000000000..901f9ee1e72 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.bookkeeper.common.util.nativeio.NativeIO; + +/** + * BufferPool. + */ +public class BufferPool implements AutoCloseable { + private final int maxPoolSize; + private final ArrayBlockingQueue pool; + BufferPool(NativeIO nativeIO, int bufferSize, int maxPoolSize) throws IOException { + this.maxPoolSize = maxPoolSize; + pool = new ArrayBlockingQueue<>(maxPoolSize); + for (int i = 0; i < maxPoolSize; i++) { + pool.add(new Buffer(nativeIO, bufferSize)); + } + } + + Buffer acquire() throws IOException { + try { + return pool.take(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException(ie); + } + } + + void release(Buffer buffer) { + buffer.reset(); + if (!pool.add(buffer)) { + buffer.free(); + } + } + + @Override + public void close() { + while (true) { + Buffer b = pool.poll(); + if (b == null) { + break; + } + + b.free(); + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java new file mode 100644 index 00000000000..a714867782b --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Support for bookie entry logs using Direct IO. + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java new file mode 100644 index 00000000000..e35de544d45 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java @@ -0,0 +1,210 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +// CHECKSTYLE.OFF: IllegalImport +import io.netty.util.internal.PlatformDependent; +// CHECKSTYLE.ON: IllegalImport + +import java.io.IOException; + +import org.apache.bookkeeper.common.util.nativeio.NativeIOImpl; +import org.junit.Test; + +/** + * TestBuffer. + */ +public class TestBuffer { + + @Test + public void testIsAligned() throws Exception { + assertThat(Buffer.isAligned(1234), equalTo(false)); + assertThat(Buffer.isAligned(4096), equalTo(true)); + assertThat(Buffer.isAligned(40960), equalTo(true)); + assertThat(Buffer.isAligned(1 << 20), equalTo(true)); + assertThat(Buffer.isAligned(-1), equalTo(false)); + assertThat(Buffer.isAligned(Integer.MIN_VALUE), equalTo(false)); + assertThat(Buffer.isAligned(Integer.MAX_VALUE), equalTo(false)); + } + + @Test + public void testNextAlignment() throws Exception { + assertThat(Buffer.nextAlignment(0), equalTo(0)); + assertThat(Buffer.nextAlignment(1), equalTo(4096)); + assertThat(Buffer.nextAlignment(4096), equalTo(4096)); + assertThat(Buffer.nextAlignment(4097), equalTo(8192)); + assertThat(Buffer.nextAlignment(0x7FFFF000), equalTo(0x7FFFF000)); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativePosition() throws Exception { + Buffer.nextAlignment(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testMaxAlignment() throws Exception { + Buffer.nextAlignment(Integer.MAX_VALUE); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateUnaligned() throws Exception { + new Buffer(new NativeIOImpl(), 1234); + } + + @Test + public void testWriteInt() throws Exception { + int bufferSize = 1 << 20; + Buffer b = new Buffer(new NativeIOImpl(), bufferSize); + assertThat(b.hasSpace(bufferSize), equalTo(true)); + assertThat(b.position(), equalTo(0)); + b.writeInt(0xdeadbeef); + + + assertThat(PlatformDependent.getByte(b.pointer() + 0), equalTo((byte) 0xde)); + assertThat(PlatformDependent.getByte(b.pointer() + 1), equalTo((byte) 0xad)); + assertThat(PlatformDependent.getByte(b.pointer() + 2), equalTo((byte) 0xbe)); + assertThat(PlatformDependent.getByte(b.pointer() + 3), equalTo((byte) 0xef)); + + assertThat(b.hasSpace(bufferSize), equalTo(false)); + assertThat(b.position(), equalTo(Integer.BYTES)); + + for (int i = 0; i < 10000; i++) { + b.writeInt(i); + } + assertThat(b.position(), equalTo(Integer.BYTES * 10001)); + assertThat(b.hasSpace(bufferSize - (Integer.BYTES * 10001)), equalTo(true)); + assertThat(b.hasSpace(bufferSize - (Integer.BYTES * 10000)), equalTo(false)); + + assertThat(b.readInt(0), equalTo(0xdeadbeef)); + for (int i = 0; i < 10000; i++) { + assertThat(b.readInt((i + 1) * Integer.BYTES), equalTo(i)); + } + b.reset(); + assertThat(b.hasSpace(bufferSize), equalTo(true)); + assertThat(b.position(), equalTo(0)); + } + + @Test + public void testWriteBuffer() throws Exception { + ByteBuf bb = Unpooled.buffer(1021); + fillByteBuf(bb, 0xdeadbeef); + int bufferSize = 1 << 20; + Buffer b = new Buffer(new NativeIOImpl(), bufferSize); + assertThat(b.position(), equalTo(0)); + b.writeByteBuf(bb); + assertThat(b.position(), equalTo(1021)); + assertThat(bb.readableBytes(), equalTo(0)); + bb.clear(); + fillByteBuf(bb, 0xcafecafe); + b.writeByteBuf(bb); + assertThat(bb.readableBytes(), equalTo(0)); + assertThat(b.position(), equalTo(2042)); + + bb = Unpooled.buffer(2042); + int ret = b.readByteBuf(bb, 0, 2042); + assertThat(ret, equalTo(2042)); + for (int i = 0; i < 1020 / Integer.BYTES; i++) { + assertThat(bb.readInt(), equalTo(0xdeadbeef)); + } + assertThat(bb.readByte(), equalTo((byte) 0xde)); + for (int i = 0; i < 1020 / Integer.BYTES; i++) { + assertThat(bb.readInt(), equalTo(0xcafecafe)); + } + } + + @Test + public void testPartialRead() throws Exception { + ByteBuf bb = Unpooled.buffer(5000); + + Buffer b = new Buffer(new NativeIOImpl(), 4096); + for (int i = 0; i < 4096 / Integer.BYTES; i++) { + b.writeInt(0xdeadbeef); + } + + int ret = b.readByteBuf(bb, 0, 5000); + assertThat(ret, equalTo(4096)); + } + + @Test(expected = IOException.class) + public void testReadIntAtBoundary() throws Exception { + Buffer b = new Buffer(new NativeIOImpl(), 4096); + + for (int i = 0; i < 4096 / Integer.BYTES; i++) { + b.writeInt(0xdeadbeef); + } + assertThat(b.hasData(4092, Integer.BYTES), equalTo(true)); + assertThat(b.hasData(4093, Integer.BYTES), equalTo(false)); + assertThat(b.hasData(4096, Integer.BYTES), equalTo(false)); + b.readInt(4096 - 2); + } + + @Test(expected = IOException.class) + public void testReadLongAtBoundary() throws Exception { + Buffer b = new Buffer(new NativeIOImpl(), 4096); + + for (int i = 0; i < 4096 / Integer.BYTES; i++) { + b.writeInt(0xdeadbeef); + } + assertThat(b.hasData(4088, Long.BYTES), equalTo(true)); + assertThat(b.hasData(4089, Long.BYTES), equalTo(false)); + assertThat(b.hasData(4096, Long.BYTES), equalTo(false)); + b.readInt(4096 - 2); + } + + @Test + public void testPadToAlignment() throws Exception { + Buffer b = new Buffer(new NativeIOImpl(), 1 << 23); + + for (int i = 0; i < 1025; i++) { + b.writeInt(0xdededede); + } + int writtenLength = b.padToAlignment(); + + assertThat(writtenLength, equalTo(8192)); + assertThat(b.readInt(1024 * Integer.BYTES), equalTo(0xdededede)); + for (int i = 1025 * Integer.BYTES; i < writtenLength; i += Integer.BYTES) { + assertThat(b.readInt(i), equalTo(0xf0f0f0f0)); + } + assertThat(b.readInt(writtenLength), equalTo(0)); + } + + @Test + public void testFree() throws Exception { + Buffer b = new Buffer(new NativeIOImpl(), 1 << 23); + b.free(); // success if process doesn't explode + b.free(); + } + + static void fillByteBuf(ByteBuf bb, int value) { + while (bb.writableBytes() >= Integer.BYTES) { + bb.writeInt(value); + } + for (int i = 0; i < Integer.BYTES && bb.writableBytes() > 0; i++) { + byte b = (byte) (value >> (Integer.BYTES - i - 1) * 8); + bb.writeByte(b); + } + } +} From 52c938a93e8f9d1edd14e877913b09745e5b0164 Mon Sep 17 00:00:00 2001 From: chenhang Date: Sat, 30 Apr 2022 17:44:07 +0800 Subject: [PATCH 3/7] format code --- bookkeeper-server/pom.xml | 5 +++++ .../bookkeeper/bookie/storage/directentrylogger/Buffer.java | 5 ++--- .../bookie/storage/directentrylogger/BufferPool.java | 5 ++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml index bc7886d7bfa..b74b9bb71b0 100644 --- a/bookkeeper-server/pom.xml +++ b/bookkeeper-server/pom.xml @@ -45,6 +45,11 @@ bookkeeper-tools-framework ${project.parent.version} + + org.apache.bookkeeper + native-io + ${project.parent.version} + org.rocksdb rocksdbjni diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java index 03a9342f085..0555ce007e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java @@ -44,7 +44,7 @@ * alignment, so writes can have an aligned size also (as required by * direct I/O). The padding is done with 0xF0, so that if it is read as * an integer, or long, the value will be negative (assuming the read is - * a java read, and thus an signed int). + * a java read, and thus a signed int). */ class Buffer { /* Padding byte must have MSB set, so if read at the start @@ -69,7 +69,6 @@ class Buffer { "Buffer size not aligned %d", bufferSize); this.buffer = allocateAligned(ALIGNMENT, bufferSize); - this.nativeIO = nativeIO; this.bufferSize = bufferSize; byteBuffer = buffer.nioBuffer(0, bufferSize); @@ -116,7 +115,7 @@ void writeInt(int value) throws IOException { } /** - * Write a bytebuf to this buffer. Progresses the position of the buffer by the + * Write a btebuf to this buffer. Progresses the position of the buffer by the * number of readable bytes of the bytebuf. Progresses the readerIndex of the passed * bytebuf by the number of bytes read (i.e. to the end). */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java index 901f9ee1e72..95031b700c2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java @@ -26,13 +26,12 @@ import org.apache.bookkeeper.common.util.nativeio.NativeIO; /** - * BufferPool. + * BufferPool used to manage Buffers. */ public class BufferPool implements AutoCloseable { - private final int maxPoolSize; private final ArrayBlockingQueue pool; + BufferPool(NativeIO nativeIO, int bufferSize, int maxPoolSize) throws IOException { - this.maxPoolSize = maxPoolSize; pool = new ArrayBlockingQueue<>(maxPoolSize); for (int i = 0; i < maxPoolSize; i++) { pool.add(new Buffer(nativeIO, bufferSize)); From 98ec43344bf792d556bcd6c671ac50c754dd7f10 Mon Sep 17 00:00:00 2001 From: chenhang Date: Sat, 30 Apr 2022 18:19:22 +0800 Subject: [PATCH 4/7] fix test --- .../{TestBuffer.java => BufferTest.java} | 103 +++++++++--------- 1 file changed, 53 insertions(+), 50 deletions(-) rename bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/{TestBuffer.java => BufferTest.java} (60%) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java similarity index 60% rename from bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java rename to bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java index e35de544d45..2a48157523f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java @@ -20,8 +20,9 @@ */ package org.apache.bookkeeper.bookie.storage.directentrylogger; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -37,26 +38,26 @@ /** * TestBuffer. */ -public class TestBuffer { +public class BufferTest { @Test public void testIsAligned() throws Exception { - assertThat(Buffer.isAligned(1234), equalTo(false)); - assertThat(Buffer.isAligned(4096), equalTo(true)); - assertThat(Buffer.isAligned(40960), equalTo(true)); - assertThat(Buffer.isAligned(1 << 20), equalTo(true)); - assertThat(Buffer.isAligned(-1), equalTo(false)); - assertThat(Buffer.isAligned(Integer.MIN_VALUE), equalTo(false)); - assertThat(Buffer.isAligned(Integer.MAX_VALUE), equalTo(false)); + assertFalse(Buffer.isAligned(1234)); + assertTrue(Buffer.isAligned(4096)); + assertTrue(Buffer.isAligned(40960)); + assertTrue(Buffer.isAligned(1 << 20)); + assertFalse(Buffer.isAligned(-1)); + assertFalse(Buffer.isAligned(Integer.MAX_VALUE)); + assertFalse(Buffer.isAligned(Integer.MIN_VALUE)); } @Test public void testNextAlignment() throws Exception { - assertThat(Buffer.nextAlignment(0), equalTo(0)); - assertThat(Buffer.nextAlignment(1), equalTo(4096)); - assertThat(Buffer.nextAlignment(4096), equalTo(4096)); - assertThat(Buffer.nextAlignment(4097), equalTo(8192)); - assertThat(Buffer.nextAlignment(0x7FFFF000), equalTo(0x7FFFF000)); + assertEquals(0, Buffer.nextAlignment(0)); + assertEquals(4096, Buffer.nextAlignment(1)); + assertEquals(4096, Buffer.nextAlignment(4096)); + assertEquals(8192, Buffer.nextAlignment(4097)); + assertEquals(0x7FFFF000, Buffer.nextAlignment(0x7FFFF000)); } @Test(expected = IllegalArgumentException.class) @@ -78,33 +79,33 @@ public void testCreateUnaligned() throws Exception { public void testWriteInt() throws Exception { int bufferSize = 1 << 20; Buffer b = new Buffer(new NativeIOImpl(), bufferSize); - assertThat(b.hasSpace(bufferSize), equalTo(true)); - assertThat(b.position(), equalTo(0)); + assertTrue(b.hasSpace(bufferSize)); + assertEquals(0, b.position()); b.writeInt(0xdeadbeef); - assertThat(PlatformDependent.getByte(b.pointer() + 0), equalTo((byte) 0xde)); - assertThat(PlatformDependent.getByte(b.pointer() + 1), equalTo((byte) 0xad)); - assertThat(PlatformDependent.getByte(b.pointer() + 2), equalTo((byte) 0xbe)); - assertThat(PlatformDependent.getByte(b.pointer() + 3), equalTo((byte) 0xef)); + assertEquals((byte) 0xde, PlatformDependent.getByte(b.pointer() + 0)); + assertEquals((byte) 0xad, PlatformDependent.getByte(b.pointer() + 1)); + assertEquals((byte) 0xbe, PlatformDependent.getByte(b.pointer() + 2)); + assertEquals((byte) 0xef, PlatformDependent.getByte(b.pointer() + 3)); - assertThat(b.hasSpace(bufferSize), equalTo(false)); - assertThat(b.position(), equalTo(Integer.BYTES)); + assertFalse(b.hasSpace(bufferSize)); + assertEquals(Integer.BYTES, b.position()); for (int i = 0; i < 10000; i++) { b.writeInt(i); } - assertThat(b.position(), equalTo(Integer.BYTES * 10001)); - assertThat(b.hasSpace(bufferSize - (Integer.BYTES * 10001)), equalTo(true)); - assertThat(b.hasSpace(bufferSize - (Integer.BYTES * 10000)), equalTo(false)); + assertEquals(Integer.BYTES * 10001, b.position()); + assertTrue(b.hasSpace(bufferSize - (Integer.BYTES * 10001))); + assertFalse(b.hasSpace(bufferSize - (Integer.BYTES * 10000))); - assertThat(b.readInt(0), equalTo(0xdeadbeef)); + assertEquals(0xdeadbeef, b.readInt(0)); for (int i = 0; i < 10000; i++) { - assertThat(b.readInt((i + 1) * Integer.BYTES), equalTo(i)); + assertEquals(i, b.readInt((i + 1) * Integer.BYTES)); } b.reset(); - assertThat(b.hasSpace(bufferSize), equalTo(true)); - assertThat(b.position(), equalTo(0)); + assertTrue(b.hasSpace(bufferSize)); + assertEquals(0, b.position()); } @Test @@ -113,25 +114,25 @@ public void testWriteBuffer() throws Exception { fillByteBuf(bb, 0xdeadbeef); int bufferSize = 1 << 20; Buffer b = new Buffer(new NativeIOImpl(), bufferSize); - assertThat(b.position(), equalTo(0)); + assertEquals(0, b.position()); b.writeByteBuf(bb); - assertThat(b.position(), equalTo(1021)); - assertThat(bb.readableBytes(), equalTo(0)); + assertEquals(1021, b.position()); + assertEquals(0, bb.readableBytes()); bb.clear(); fillByteBuf(bb, 0xcafecafe); b.writeByteBuf(bb); - assertThat(bb.readableBytes(), equalTo(0)); - assertThat(b.position(), equalTo(2042)); + assertEquals(0, bb.readableBytes()); + assertEquals(2042, b.position()); bb = Unpooled.buffer(2042); int ret = b.readByteBuf(bb, 0, 2042); - assertThat(ret, equalTo(2042)); + assertEquals(2042, ret); for (int i = 0; i < 1020 / Integer.BYTES; i++) { - assertThat(bb.readInt(), equalTo(0xdeadbeef)); + assertEquals(0xdeadbeef, bb.readInt()); } - assertThat(bb.readByte(), equalTo((byte) 0xde)); + assertEquals((byte) 0xde, bb.readByte()); for (int i = 0; i < 1020 / Integer.BYTES; i++) { - assertThat(bb.readInt(), equalTo(0xcafecafe)); + assertEquals(0xcafecafe, bb.readInt()); } } @@ -145,7 +146,7 @@ public void testPartialRead() throws Exception { } int ret = b.readByteBuf(bb, 0, 5000); - assertThat(ret, equalTo(4096)); + assertEquals(4096, ret); } @Test(expected = IOException.class) @@ -155,9 +156,10 @@ public void testReadIntAtBoundary() throws Exception { for (int i = 0; i < 4096 / Integer.BYTES; i++) { b.writeInt(0xdeadbeef); } - assertThat(b.hasData(4092, Integer.BYTES), equalTo(true)); - assertThat(b.hasData(4093, Integer.BYTES), equalTo(false)); - assertThat(b.hasData(4096, Integer.BYTES), equalTo(false)); + assertTrue(b.hasData(4092, Integer.BYTES)); + assertFalse(b.hasData(4093, Integer.BYTES)); + assertFalse(b.hasData(4096, Integer.BYTES)); + b.readInt(4096 - 2); } @@ -168,9 +170,10 @@ public void testReadLongAtBoundary() throws Exception { for (int i = 0; i < 4096 / Integer.BYTES; i++) { b.writeInt(0xdeadbeef); } - assertThat(b.hasData(4088, Long.BYTES), equalTo(true)); - assertThat(b.hasData(4089, Long.BYTES), equalTo(false)); - assertThat(b.hasData(4096, Long.BYTES), equalTo(false)); + assertTrue(b.hasData(4088, Long.BYTES)); + assertFalse(b.hasData(4089, Long.BYTES)); + assertFalse(b.hasData(4096, Long.BYTES)); + b.readInt(4096 - 2); } @@ -183,12 +186,12 @@ public void testPadToAlignment() throws Exception { } int writtenLength = b.padToAlignment(); - assertThat(writtenLength, equalTo(8192)); - assertThat(b.readInt(1024 * Integer.BYTES), equalTo(0xdededede)); + assertEquals(8192, writtenLength); + assertEquals(0xdededede, b.readInt(1024 * Integer.BYTES)); for (int i = 1025 * Integer.BYTES; i < writtenLength; i += Integer.BYTES) { - assertThat(b.readInt(i), equalTo(0xf0f0f0f0)); + assertEquals(0xf0f0f0f0, b.readInt(i)); } - assertThat(b.readInt(writtenLength), equalTo(0)); + assertEquals(0, b.readInt(writtenLength)); } @Test From 00b71668a6d9dc98524ccd1dc412acc04b8e33ce Mon Sep 17 00:00:00 2001 From: chenhang Date: Sat, 30 Apr 2022 19:03:27 +0800 Subject: [PATCH 5/7] fix gradlew generateApiJavadoc failed --- bookkeeper-server/build.gradle | 1 + build.gradle | 1 + settings.gradle | 2 ++ 3 files changed, 4 insertions(+) diff --git a/bookkeeper-server/build.gradle b/bookkeeper-server/build.gradle index 18a43d2d5fb..e759a495ca3 100644 --- a/bookkeeper-server/build.gradle +++ b/bookkeeper-server/build.gradle @@ -28,6 +28,7 @@ dependencies { implementation project(':bookkeeper-http:http-server') implementation project(':bookkeeper-proto') implementation project(':bookkeeper-tools-framework') + implementation project(':native-io') implementation project(':circe-checksum') implementation project(':cpu-affinity') implementation project(':stats:bookkeeper-stats-api') diff --git a/build.gradle b/build.gradle index 4fb440c4473..f6ff2a47195 100644 --- a/build.gradle +++ b/build.gradle @@ -85,6 +85,7 @@ allprojects { apply from: "$rootDir/dependencies.gradle" if (it.path != ':circe-checksum:src:main:circe' && it.path != ':cpu-affinity:src:main:affinity' + && it.path != ':native-io:src:main:native-io-jni' && it.name != 'src' && it.name != 'main') { apply plugin: 'java' diff --git a/settings.gradle b/settings.gradle index a9114e1280d..7c40cc85ee7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -60,6 +60,8 @@ include(':bookkeeper-benchmark', 'dev:release', 'microbenchmarks', 'metadata-drivers:etcd', + 'native-io', + 'native-io:src:main:native-io-jni', 'shaded:distributedlog-core-shaded', 'stats:bookkeeper-stats-api', 'stats:bookkeeper-stats-providers:prometheus-metrics-provider', From 0741acdf407314569d6710b0d21659e14675e95d Mon Sep 17 00:00:00 2001 From: chenhang Date: Sun, 1 May 2022 20:38:28 +0800 Subject: [PATCH 6/7] fix test failed --- native-io/build.gradle | 1 - native-io/pom.xml | 4 ---- .../bookkeeper/common/util/nativeio/NativeUtils.java | 10 +++++++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/native-io/build.gradle b/native-io/build.gradle index b680940c807..3d16ec8b8d4 100644 --- a/native-io/build.gradle +++ b/native-io/build.gradle @@ -24,7 +24,6 @@ dependencies { compileOnly depLibs.lombok compileOnly depLibs.spotbugsAnnotations implementation depLibs.commonsLang3 - implementation depLibs.guava implementation depLibs.slf4j testImplementation depLibs.junit diff --git a/native-io/pom.xml b/native-io/pom.xml index 8ba378dd0b3..693bfd9af87 100644 --- a/native-io/pom.xml +++ b/native-io/pom.xml @@ -33,10 +33,6 @@ - - com.google.guava - guava - org.apache.commons commons-lang3 diff --git a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java index d3c83f118f2..af3378f4918 100644 --- a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java +++ b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java @@ -18,8 +18,6 @@ */ package org.apache.bookkeeper.common.util.nativeio; -import static com.google.common.base.Preconditions.checkArgument; - import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; @@ -29,6 +27,7 @@ import java.io.OutputStream; import java.nio.file.Files; +import javax.annotation.CheckForNull; import lombok.experimental.UtilityClass; /** @@ -80,4 +79,9 @@ static void loadLibraryFromJar(String path) throws Exception { System.load(temp.getAbsolutePath()); } -} + + public static void checkArgument(boolean expression, @CheckForNull Object errorMessage) { + if (!expression) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + }} From 972c3b5b172c2bd89ead61b836f9780c7649f406 Mon Sep 17 00:00:00 2001 From: chenhang Date: Sun, 1 May 2022 21:10:10 +0800 Subject: [PATCH 7/7] fix test failed --- .../apache/bookkeeper/common/util/nativeio/NativeUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java index af3378f4918..339dfeda1e9 100644 --- a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java +++ b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java @@ -27,7 +27,7 @@ import java.io.OutputStream; import java.nio.file.Files; -import javax.annotation.CheckForNull; +import lombok.NonNull; import lombok.experimental.UtilityClass; /** @@ -80,7 +80,7 @@ static void loadLibraryFromJar(String path) throws Exception { System.load(temp.getAbsolutePath()); } - public static void checkArgument(boolean expression, @CheckForNull Object errorMessage) { + public static void checkArgument(boolean expression, @NonNull Object errorMessage) { if (!expression) { throw new IllegalArgumentException(String.valueOf(errorMessage)); }