diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index d5b9dd9d81c2..d19f2aea1300 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; @@ -83,6 +84,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { public static final Logger LOG = LoggerFactory.getLogger(BlockDataStreamOutput.class); + + public static final int PUT_BLOCK_REQUEST_LENGTH_MAX = 1 << 20; // 1MB + public static final String EXCEPTION_MSG = "Unexpected Storage Container Exception: "; private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {}; @@ -406,12 +410,26 @@ private void executePutBlock(boolean close, byteBufferList = null; } waitFuturesComplete(); + final BlockData blockData = containerBlockData.build(); if (close) { - dataStreamCloseReply = out.closeAsync(); + final ContainerCommandRequestProto putBlockRequest + = ContainerProtocolCalls.getPutBlockRequest( + xceiverClient.getPipeline(), blockData, true, token); + dataStreamCloseReply = executePutBlockClose(putBlockRequest, + PUT_BLOCK_REQUEST_LENGTH_MAX, out); + dataStreamCloseReply.whenComplete((reply, e) -> { + if (e != null || reply == null || !reply.isSuccess()) { + LOG.warn("Failed executePutBlockClose, reply=" + reply, e); + try { + executePutBlock(true, false); + } catch (IOException ex) { + throw new CompletionException(ex); + } + } + }); } try { - BlockData blockData = containerBlockData.build(); XceiverClientReply asyncReply = putBlockAsync(xceiverClient, blockData, close, token); final CompletableFuture flushFuture @@ -459,6 +477,30 @@ private void executePutBlock(boolean close, } } + public static CompletableFuture executePutBlockClose( + ContainerCommandRequestProto putBlockRequest, int max, + DataStreamOutput out) { + final ByteBuffer putBlock = ContainerCommandRequestMessage.toMessage( + putBlockRequest, null).getContent().asReadOnlyByteBuffer(); + final ByteBuffer protoLength = getProtoLength(putBlock, max); + RatisHelper.debug(putBlock, "putBlock", LOG); + out.writeAsync(putBlock); + RatisHelper.debug(protoLength, "protoLength", LOG); + return out.writeAsync(protoLength, StandardWriteOption.CLOSE); + } + + public static ByteBuffer getProtoLength(ByteBuffer putBlock, int max) { + final int protoLength = putBlock.remaining(); + Preconditions.checkState(protoLength <= max, + "protoLength== %s > max = %s", protoLength, max); + final ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(protoLength); + buffer.flip(); + LOG.debug("protoLength = {}", protoLength); + Preconditions.checkState(buffer.remaining() == 4); + return buffer.asReadOnlyBuffer(); + } + @Override public void flush() throws IOException { if (xceiverClientFactory != null && xceiverClient != null @@ -547,7 +589,7 @@ private void validateResponse( } - private void setIoException(Exception e) { + private void setIoException(Throwable e) { IOException ioe = getIoException(); if (ioe == null) { IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index b381d7a3a11b..902b8e2c0f27 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.ratis; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; @@ -58,6 +59,7 @@ import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -436,4 +438,28 @@ private static Class getClass(String name, throw new RuntimeException(e); } } + + public static void debug(ByteBuffer buffer, String name, Logger log) { + if (!log.isDebugEnabled()) { + return; + } + buffer = buffer.duplicate(); + final StringBuilder builder = new StringBuilder(); + for (int i = 1; buffer.remaining() > 0; i++) { + builder.append(buffer.get()).append(i % 20 == 0 ? "\n " : ", "); + } + log.debug("{}: {}\n {}", name, buffer, builder); + } + + public static void debug(ByteBuf buf, String name, Logger log) { + if (!log.isDebugEnabled()) { + return; + } + buf = buf.duplicate(); + final StringBuilder builder = new StringBuilder(); + for (int i = 1; buf.readableBytes() > 0; i++) { + builder.append(buf.readByte()).append(i % 20 == 0 ? "\n " : ", "); + } + log.debug("{}: {}\n {}", name, buf, builder); + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 7f2d2a8bec9e..373093e78500 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; @@ -186,11 +187,19 @@ public static XceiverClientReply putBlockAsync( XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof, Token token) throws IOException, InterruptedException, ExecutionException { + final ContainerCommandRequestProto request = getPutBlockRequest( + xceiverClient.getPipeline(), containerBlockData, eof, token); + return xceiverClient.sendCommandAsync(request); + } + + public static ContainerCommandRequestProto getPutBlockRequest( + Pipeline pipeline, BlockData containerBlockData, boolean eof, + Token token) throws IOException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder() .setBlockData(containerBlockData) .setEof(eof); - String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + final String id = pipeline.getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) .setContainerID(containerBlockData.getBlockID().getContainerID()) @@ -199,8 +208,7 @@ public static XceiverClientReply putBlockAsync( if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } - ContainerCommandRequestProto request = builder.build(); - return xceiverClient.sendCommandAsync(request); + return builder.build(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 138cddd48671..3edb29b94a3c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -427,6 +427,20 @@ private ContainerCommandResponseProto runCommand( return dispatchCommand(requestProto, context); } + private CompletableFuture runCommandAsync( + ContainerCommandRequestProto requestProto, LogEntryProto entry) { + return CompletableFuture.supplyAsync(() -> { + final DispatcherContext context = new DispatcherContext.Builder() + .setTerm(entry.getTerm()) + .setLogIndex(entry.getIndex()) + .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA) + .setContainer2BCSIDMap(container2BCSIDMap) + .build(); + + return runCommand(requestProto, context); + }, executor); + } + private CompletableFuture handleWriteChunk( ContainerCommandRequestProto requestProto, long entryIndex, long term, long startTime) { @@ -560,19 +574,16 @@ public CompletableFuture link(DataStream stream, LogEntryProto entry) { "DataStream: " + stream + " is not closed properly")); } - final CompletableFuture f; + final ContainerCommandRequestProto request; if (dataChannel instanceof KeyValueStreamDataChannel) { - f = CompletableFuture.completedFuture(null); + request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest(); } else { return JavaUtils.completeExceptionally(new IllegalStateException( "Unexpected DataChannel " + dataChannel.getClass())); } - return f.whenComplete((res, e) -> { - if (LOG.isDebugEnabled()) { - LOG.debug("PutBlock {} Term: {} Index: {}", - res.getResult(), entry.getTerm(), entry.getIndex()); - } - }); + return runCommandAsync(request, entry).whenComplete( + (res, e) -> LOG.debug("link {}, entry: {}, request: {}", + res.getResult(), entry, request)); } private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index 66723031f069..99dc40f5d002 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -18,17 +18,131 @@ package org.apache.hadoop.ozone.container.keyvalue.impl; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; +import org.apache.ratis.util.ReferenceCountedObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * This class is used to get the DataChannel for streaming. */ public class KeyValueStreamDataChannel extends StreamDataChannelBase { + public static final Logger LOG = + LoggerFactory.getLogger(KeyValueStreamDataChannel.class); + + /** + * Keep the last {@link Buffers#max} bytes in the buffer + * in order to create putBlockRequest + * at {@link #closeBuffers(Buffers, WriteMethod)}}. + */ + static class Buffers { + private final Deque> deque + = new LinkedList<>(); + private final int max; + private int length; + + Buffers(int max) { + this.max = max; + } + + private boolean isExtra(int n) { + return length - n >= max; + } + + private boolean hasExtraBuffer() { + return Optional.ofNullable(deque.peek()) + .map(ReferenceCountedObject::get) + .filter(b -> isExtra(b.remaining())) + .isPresent(); + } + + /** + * @return extra buffers which are safe to be written. + */ + Iterable> offer( + ReferenceCountedObject ref) { + final ByteBuffer buffer = ref.retain(); + LOG.debug("offer {}", buffer); + final boolean offered = deque.offer(ref); + Preconditions.checkState(offered, "Failed to offer"); + length += buffer.remaining(); + + return () -> new Iterator>() { + @Override + public boolean hasNext() { + return hasExtraBuffer(); + } + + @Override + public ReferenceCountedObject next() { + final ReferenceCountedObject polled = poll(); + length -= polled.get().remaining(); + Preconditions.checkState(length >= max); + return polled; + } + }; + } + + ReferenceCountedObject poll() { + final ReferenceCountedObject polled + = Objects.requireNonNull(deque.poll()); + RatisHelper.debug(polled.get(), "polled", LOG); + return polled; + } + + ReferenceCountedObject pollAll() { + Preconditions.checkState(!deque.isEmpty(), "The deque is empty"); + final ByteBuffer[] array = new ByteBuffer[deque.size()]; + final List> refs + = new ArrayList<>(deque.size()); + for (int i = 0; i < array.length; i++) { + final ReferenceCountedObject ref = poll(); + refs.add(ref); + array[i] = ref.get(); + } + final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly(); + return ReferenceCountedObject.wrap(buf, () -> { + }, () -> { + buf.release(); + refs.forEach(ReferenceCountedObject::release); + }); + } + } + + interface WriteMethod { + int applyAsInt(ByteBuffer src) throws IOException; + } + + private final Buffers buffers = new Buffers( + BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX); + private final AtomicReference putBlockRequest + = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(); + KeyValueStreamDataChannel(File file, ContainerData containerData, ContainerMetrics metrics) throws StorageContainerException { @@ -39,4 +153,125 @@ public class KeyValueStreamDataChannel extends StreamDataChannelBase { ContainerProtos.Type getType() { return ContainerProtos.Type.StreamWrite; } + + @Override + public int write(ReferenceCountedObject referenceCounted) + throws IOException { + assertOpen(); + return writeBuffers(referenceCounted, buffers, super::writeFileChannel); + } + + static int writeBuffers(ReferenceCountedObject src, + Buffers buffers, WriteMethod writeMethod) + throws IOException { + for (ReferenceCountedObject b : buffers.offer(src)) { + try { + writeFully(b.get(), writeMethod); + } finally { + b.release(); + } + } + return src.get().remaining(); + } + + private static void writeFully(ByteBuffer b, WriteMethod writeMethod) + throws IOException { + for (; b.remaining() > 0;) { + final int written = writeMethod.applyAsInt(b); + if (written <= 0) { + throw new IOException("Unable to write"); + } + } + } + + public ContainerCommandRequestProto getPutBlockRequest() { + return Objects.requireNonNull(putBlockRequest.get(), + () -> "putBlockRequest == null, " + this); + } + + void assertOpen() throws IOException { + if (closed.get()) { + throw new IOException("Already closed: " + this); + } + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel)); + super.close(); + } + } + + static ContainerCommandRequestProto closeBuffers( + Buffers buffers, WriteMethod writeMethod) throws IOException { + final ReferenceCountedObject ref = buffers.pollAll(); + final ByteBuf buf = ref.retain(); + final ContainerCommandRequestProto putBlockRequest; + try { + putBlockRequest = readPutBlockRequest(buf); + // write the remaining data + writeFully(buf.nioBuffer(), writeMethod); + } finally { + ref.release(); + } + return putBlockRequest; + } + + private static int readProtoLength(ByteBuf b, int lengthIndex) { + final int readerIndex = b.readerIndex(); + LOG.debug("{}, lengthIndex = {}, readerIndex = {}", + b, lengthIndex, readerIndex); + if (lengthIndex > readerIndex) { + b.readerIndex(lengthIndex); + } else { + Preconditions.checkState(lengthIndex == readerIndex); + } + RatisHelper.debug(b, "readProtoLength", LOG); + return b.nioBuffer().getInt(); + } + + static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) + throws IOException { + // readerIndex protoIndex lengthIndex readerIndex+readableBytes + // V V V V + // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---| + final int readerIndex = b.readerIndex(); + final int lengthIndex = readerIndex + b.readableBytes() - 4; + final int protoLength = readProtoLength(b.duplicate(), lengthIndex); + final int protoIndex = lengthIndex - protoLength; + + final ContainerCommandRequestProto proto; + try { + proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer()); + } catch (Throwable t) { + RatisHelper.debug(b, "catch", LOG); + throw new IOException("Failed to readPutBlockRequest from " + b + + ": readerIndex=" + readerIndex + + ", protoIndex=" + protoIndex + + ", protoLength=" + protoLength + + ", lengthIndex=" + lengthIndex, t); + } + + // set index for reading data + b.writerIndex(protoIndex); + + return proto; + } + + private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b) + throws IOException { + RatisHelper.debug(b, "readPutBlockRequest", LOG); + final ByteString byteString = ByteString.copyFrom(b); + + final ContainerCommandRequestProto request = + ContainerCommandRequestMessage.toProto(byteString, null); + + if (!request.hasPutBlock()) { + throw new StorageContainerException( + "Malformed PutBlock request. trace ID: " + request.getTraceID(), + ContainerProtos.Result.MALFORMED_REQUEST); + } + return request; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java index b31e2ccbf413..982903324848 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java @@ -79,8 +79,7 @@ public void close() throws IOException { randomAccessFile.close(); } - @Override - public int write(ByteBuffer src) throws IOException { + final int writeFileChannel(ByteBuffer src) throws IOException { final int writeBytes = getChannel().write(src); metrics.incContainerBytesStats(getType(), writeBytes); containerData.updateWriteStats(writeBytes, false); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java new file mode 100644 index 000000000000..d252b1cb1bef --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.impl; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod; +import org.apache.ratis.client.api.DataStreamOutput; +import org.apache.ratis.io.FilePositionCount; +import org.apache.ratis.io.StandardWriteOption; +import org.apache.ratis.io.WriteOption; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled; +import org.apache.ratis.util.ReferenceCountedObject; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX; +import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.executePutBlockClose; +import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.getProtoLength; +import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.closeBuffers; +import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.readPutBlockRequest; +import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeBuffers; + +/** For testing {@link KeyValueStreamDataChannel}. */ +public class TestKeyValueStreamDataChannel { + public static final Logger LOG = + LoggerFactory.getLogger(TestKeyValueStreamDataChannel.class); + + static final ContainerCommandRequestProto PUT_BLOCK_PROTO + = ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.PutBlock) + .setPutBlock(PutBlockRequestProto.newBuilder().setBlockData( + BlockData.newBuilder().setBlockID(DatanodeBlockID.newBuilder() + .setContainerID(222).setLocalID(333).build()).build())) + .setDatanodeUuid("datanodeId") + .setContainerID(111L) + .build(); + static final int PUT_BLOCK_PROTO_SIZE = PUT_BLOCK_PROTO.toByteString().size(); + static { + LOG.info("PUT_BLOCK_PROTO_SIZE = {}", PUT_BLOCK_PROTO_SIZE); + } + + @Test + public void testSerialization() throws Exception { + final int max = PUT_BLOCK_REQUEST_LENGTH_MAX; + final ByteBuffer putBlockBuf = ContainerCommandRequestMessage.toMessage( + PUT_BLOCK_PROTO, null).getContent().asReadOnlyByteBuffer(); + final ByteBuffer protoLengthBuf = getProtoLength(putBlockBuf, max); + + // random data size + final int dataSize = ThreadLocalRandom.current().nextInt(1000) + 100; + final byte[] data = new byte[dataSize]; + + //serialize + final ByteBuf buf = Unpooled.buffer(max); + buf.writeBytes(data); + buf.writeBytes(putBlockBuf); + buf.writeBytes(protoLengthBuf); + + final ContainerCommandRequestProto proto = readPutBlockRequest(buf); + Assert.assertEquals(PUT_BLOCK_PROTO, proto); + } + + @Test + public void testBuffers() throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(32); + final List> futures = new ArrayList<>(); + + final int min = PUT_BLOCK_PROTO_SIZE + 4; + final int[] maxValues = {min, 2 * min, 10 * min}; + final int[] dataSizes = {0, 10, 100, 10_000}; + for (int max : maxValues) { + for (int dataSize : dataSizes) { + futures.add(CompletableFuture.supplyAsync( + () -> runTestBuffers(dataSize, max), executor)); + } + } + + for (CompletableFuture f : futures) { + f.get(); + } + } + + static String runTestBuffers(int dataSize, int max) { + final int seed = ThreadLocalRandom.current().nextInt(); + final String name = String.format("[dataSize=%d,max=%d,seed=%H]", + dataSize, max, seed); + LOG.info(name); + try { + runTestBuffers(dataSize, max, seed, name); + } catch (Throwable t) { + throw new CompletionException("Failed " + name, t); + } + return name; + } + + static void runTestBuffers(int dataSize, int max, int seed, String name) + throws Exception { + Assert.assertTrue(max >= PUT_BLOCK_PROTO_SIZE); + + // random data + final byte[] data = new byte[dataSize]; + final Random random = new Random(seed); + random.nextBytes(data); + + // write output + final Buffers buffers = new Buffers(max); + final Output out = new Output(buffers); + for (int offset = 0; offset < dataSize;) { + final int randomLength = random.nextInt(4 * max); + final int length = Math.min(randomLength, dataSize - offset); + LOG.info("{}: offset = {}, length = {}", name, offset, length); + final ByteBuffer b = ByteBuffer.wrap(data, offset, length); + final DataStreamReply writeReply = out.writeAsync(b).get(); + assertReply(writeReply, length, null); + offset += length; + } + + // close + final DataStreamReply closeReply = executePutBlockClose( + PUT_BLOCK_PROTO, max, out).get(); + assertReply(closeReply, 0, PUT_BLOCK_PROTO); + + // check output + final ByteBuf outBuf = out.getOutBuf(); + LOG.info("outBuf = {}", outBuf); + Assert.assertEquals(dataSize, outBuf.readableBytes()); + for (int i = 0; i < dataSize; i++) { + Assert.assertEquals(data[i], outBuf.readByte()); + } + outBuf.release(); + } + + static void assertReply(DataStreamReply reply, int byteWritten, + ContainerCommandRequestProto proto) { + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(byteWritten, reply.getBytesWritten()); + Assert.assertEquals(proto, ((Reply)reply).getPutBlockRequest()); + } + + static class Output implements DataStreamOutput { + private final Buffers buffers; + private final ByteBuf outBuf = Unpooled.buffer(); + private final WriteMethod writeMethod = src -> { + final int remaining = src.remaining(); + outBuf.writeBytes(src); + return remaining; + }; + + Output(Buffers buffers) { + this.buffers = buffers; + } + + ByteBuf getOutBuf() { + return outBuf; + } + + @Override + public CompletableFuture writeAsync( + ByteBuffer src, WriteOption... writeOptions) { + final int written; + try { + written = writeBuffers( + ReferenceCountedObject.wrap(src, () -> { }, () -> { }), + buffers, writeMethod); + } catch (IOException e) { + return completeExceptionally(e); + } + if (WriteOption.containsOption(writeOptions, StandardWriteOption.CLOSE)) { + return closeAsync(); + } + return CompletableFuture.completedFuture( + new Reply(true, written)); + } + + @Override + public CompletableFuture closeAsync() { + final ContainerCommandRequestProto putBlockRequest; + try { + putBlockRequest = closeBuffers(buffers, writeMethod); + } catch (IOException e) { + return completeExceptionally(e); + } + return CompletableFuture.completedFuture( + new Reply(true, 0, putBlockRequest)); + } + + @Override + public CompletableFuture writeAsync( + FilePositionCount filePositionCount, WriteOption... writeOptions) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture getRaftClientReplyFuture() { + throw new UnsupportedOperationException(); + } + + @Override + public WritableByteChannel getWritableByteChannel() { + throw new UnsupportedOperationException(); + } + } + + static class Reply implements DataStreamReply { + private final boolean success; + private final long bytesWritten; + private final ContainerCommandRequestProto putBlockRequest; + + Reply(boolean success, long bytesWritten) { + this(success, bytesWritten, null); + } + + Reply(boolean success, long bytesWritten, + ContainerCommandRequestProto putBlockRequest) { + this.success = success; + this.bytesWritten = bytesWritten; + this.putBlockRequest = putBlockRequest; + } + + ContainerCommandRequestProto getPutBlockRequest() { + return putBlockRequest; + } + + @Override + public boolean isSuccess() { + return success; + } + + @Override + public long getBytesWritten() { + return bytesWritten; + } + + @Override + public Collection getCommitInfos() { + throw new UnsupportedOperationException(); + } + + @Override + public ClientId getClientId() { + throw new UnsupportedOperationException(); + } + + @Override + public DataStreamPacketHeaderProto.Type getType() { + throw new UnsupportedOperationException(); + } + + @Override + public long getStreamId() { + throw new UnsupportedOperationException(); + } + + @Override + public long getStreamOffset() { + throw new UnsupportedOperationException(); + } + + @Override + public long getDataLength() { + throw new UnsupportedOperationException(); + } + } + + static CompletableFuture completeExceptionally(Throwable t) { + final CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(t); + return f; + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 65f734874065..c8a0115a808c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -120,7 +120,7 @@ public static void init() throws Exception { objectStore.getVolume(volumeName).createBucket(bucketName); } - private String getKeyName() { + static String getKeyName() { return UUID.randomUUID().toString(); } @@ -158,13 +158,11 @@ public void testMultiBlockWrite() throws Exception { testWriteWithFailure(blockSize + 50); } - private void testWrite(int dataLength) throws Exception { + static void testWrite(int dataLength) throws Exception { String keyName = getKeyName(); OzoneDataStreamOutput key = createKey( keyName, ReplicationType.RATIS, dataLength); - byte[] data = - ContainerTestHelper.getFixedLengthString(keyString, dataLength) - .getBytes(UTF_8); + final byte[] data = ContainerTestHelper.generateData(dataLength, false); key.write(ByteBuffer.wrap(data)); // now close the stream, It will update the key length. key.close(); @@ -221,14 +219,14 @@ public void testPutBlockAtBoundary() throws Exception { } - private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, + static OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey( keyName, type, size, objectStore, volumeName, bucketName); } - private void validateData(String keyName, byte[] data) throws Exception { - TestHelper - .validateData(keyName, data, objectStore, volumeName, bucketName); + static void validateData(String keyName, byte[] data) throws Exception { + TestHelper.validateData( + keyName, data, objectStore, volumeName, bucketName); } diff --git a/pom.xml b/pom.xml index 7522039caf86..a4f3a138184a 100644 --- a/pom.xml +++ b/pom.xml @@ -73,7 +73,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs ${ozone.version} - 2.3.0-da5d868-SNAPSHOT + 2.3.0-6742a4e-SNAPSHOT 0.7.0