Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
Expand Down Expand Up @@ -83,6 +84,9 @@
public class BlockDataStreamOutput implements ByteBufferStreamOutput {
public static final Logger LOG =
LoggerFactory.getLogger(BlockDataStreamOutput.class);

public static final int PUT_BLOCK_REQUEST_LENGTH_MAX = 1 << 20; // 1MB

public static final String EXCEPTION_MSG =
"Unexpected Storage Container Exception: ";
private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {};
Expand Down Expand Up @@ -406,12 +410,26 @@ private void executePutBlock(boolean close,
byteBufferList = null;
}
waitFuturesComplete();
final BlockData blockData = containerBlockData.build();
if (close) {
dataStreamCloseReply = out.closeAsync();
final ContainerCommandRequestProto putBlockRequest
= ContainerProtocolCalls.getPutBlockRequest(
xceiverClient.getPipeline(), blockData, true, token);
dataStreamCloseReply = executePutBlockClose(putBlockRequest,
PUT_BLOCK_REQUEST_LENGTH_MAX, out);
dataStreamCloseReply.whenComplete((reply, e) -> {
if (e != null || reply == null || !reply.isSuccess()) {
LOG.warn("Failed executePutBlockClose, reply=" + reply, e);
try {
executePutBlock(true, false);
} catch (IOException ex) {
throw new CompletionException(ex);
}
}
});
}

try {
BlockData blockData = containerBlockData.build();
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close, token);
final CompletableFuture<ContainerCommandResponseProto> flushFuture
Expand Down Expand Up @@ -459,6 +477,30 @@ private void executePutBlock(boolean close,
}
}

public static CompletableFuture<DataStreamReply> executePutBlockClose(
ContainerCommandRequestProto putBlockRequest, int max,
DataStreamOutput out) {
final ByteBuffer putBlock = ContainerCommandRequestMessage.toMessage(
putBlockRequest, null).getContent().asReadOnlyByteBuffer();
final ByteBuffer protoLength = getProtoLength(putBlock, max);
RatisHelper.debug(putBlock, "putBlock", LOG);
out.writeAsync(putBlock);
RatisHelper.debug(protoLength, "protoLength", LOG);
return out.writeAsync(protoLength, StandardWriteOption.CLOSE);
}

public static ByteBuffer getProtoLength(ByteBuffer putBlock, int max) {
final int protoLength = putBlock.remaining();
Preconditions.checkState(protoLength <= max,
"protoLength== %s > max = %s", protoLength, max);
final ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(protoLength);
buffer.flip();
LOG.debug("protoLength = {}", protoLength);
Preconditions.checkState(buffer.remaining() == 4);
return buffer.asReadOnlyBuffer();
}

@Override
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
Expand Down Expand Up @@ -547,7 +589,7 @@ private void validateResponse(
}


private void setIoException(Exception e) {
private void setIoException(Throwable e) {
IOException ioe = getIoException();
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.ratis;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -436,4 +438,28 @@ private static <U> Class<? extends U> getClass(String name,
throw new RuntimeException(e);
}
}

public static void debug(ByteBuffer buffer, String name, Logger log) {
if (!log.isDebugEnabled()) {
return;
}
buffer = buffer.duplicate();
final StringBuilder builder = new StringBuilder();
for (int i = 1; buffer.remaining() > 0; i++) {
builder.append(buffer.get()).append(i % 20 == 0 ? "\n " : ", ");
}
log.debug("{}: {}\n {}", name, buffer, builder);
}

public static void debug(ByteBuf buf, String name, Logger log) {
if (!log.isDebugEnabled()) {
return;
}
buf = buf.duplicate();
final StringBuilder builder = new StringBuilder();
for (int i = 1; buf.readableBytes() > 0; i++) {
builder.append(buf.readByte()).append(i % 20 == 0 ? "\n " : ", ");
}
log.debug("{}: {}\n {}", name, buf, builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
Expand Down Expand Up @@ -186,11 +187,19 @@ public static XceiverClientReply putBlockAsync(
XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof,
Token<? extends TokenIdentifier> token)
throws IOException, InterruptedException, ExecutionException {
final ContainerCommandRequestProto request = getPutBlockRequest(
xceiverClient.getPipeline(), containerBlockData, eof, token);
return xceiverClient.sendCommandAsync(request);
}

public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, BlockData containerBlockData, boolean eof,
Token<? extends TokenIdentifier> token) throws IOException {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder()
.setBlockData(containerBlockData)
.setEof(eof);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
final String id = pipeline.getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
Expand All @@ -199,8 +208,7 @@ public static XceiverClientReply putBlockAsync(
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
ContainerCommandRequestProto request = builder.build();
return xceiverClient.sendCommandAsync(request);
return builder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,20 @@ private ContainerCommandResponseProto runCommand(
return dispatchCommand(requestProto, context);
}

private CompletableFuture<ContainerCommandResponseProto> runCommandAsync(
ContainerCommandRequestProto requestProto, LogEntryProto entry) {
return CompletableFuture.supplyAsync(() -> {
final DispatcherContext context = new DispatcherContext.Builder()
.setTerm(entry.getTerm())
.setLogIndex(entry.getIndex())
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();

return runCommand(requestProto, context);
}, executor);
}

private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
Expand Down Expand Up @@ -560,19 +574,16 @@ public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
"DataStream: " + stream + " is not closed properly"));
}

final CompletableFuture<ContainerCommandResponseProto> f;
final ContainerCommandRequestProto request;
if (dataChannel instanceof KeyValueStreamDataChannel) {
f = CompletableFuture.completedFuture(null);
request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest();
} else {
return JavaUtils.completeExceptionally(new IllegalStateException(
"Unexpected DataChannel " + dataChannel.getClass()));
}
return f.whenComplete((res, e) -> {
if (LOG.isDebugEnabled()) {
LOG.debug("PutBlock {} Term: {} Index: {}",
res.getResult(), entry.getTerm(), entry.getIndex());
}
});
return runCommandAsync(request, entry).whenComplete(
(res, e) -> LOG.debug("link {}, entry: {}, request: {}",
res.getResult(), entry, request));
}

private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
Expand Down
Loading