From ed929c857efea30861eff2b64b9b904c2519ad58 Mon Sep 17 00:00:00 2001 From: micah zhao Date: Wed, 23 Jun 2021 23:20:27 +0800 Subject: [PATCH 01/64] HDDS-5366. [Ozone-Streaming] Implement stream method to ContainerStateMachine. (#2358). Contributed by mingchao zhao --- .../server/ratis/ContainerStateMachine.java | 25 ++++++++ .../transport/server/ratis/LocalStream.java | 50 ++++++++++++++++ .../server/ratis/StreamDataChannel.java | 57 +++++++++++++++++++ 3 files changed, 132 insertions(+) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 1184f42de036..1cfb771dc957 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -23,11 +23,13 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -494,6 +496,29 @@ private CompletableFuture handleWriteChunk( return raftFuture; } + @Override + public CompletableFuture stream(RaftClientRequest request) { + return CompletableFuture.supplyAsync(() -> { + try { + ContainerCommandRequestProto requestProto = + getContainerCommandRequestProto(gid, + request.getMessage().getContent()); + DispatcherContext context = + new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) + .setContainer2BCSIDMap(container2BCSIDMap) + .build(); + + ContainerCommandResponseProto response = runCommand( + requestProto, context); + String path = response.getMessage(); + return new LocalStream(new StreamDataChannel(Paths.get(path))); + } catch (IOException e) { + throw new CompletionException("Failed to create data stream", e); + } + }, executor); + } + private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { int hash = Objects.hashCode(req.getBlockID()); if (hash == Integer.MIN_VALUE) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java new file mode 100644 index 000000000000..baae0139667d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import org.apache.ratis.statemachine.StateMachine; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +class LocalStream implements StateMachine.DataStream { + private final StateMachine.DataChannel dataChannel; + + LocalStream(StateMachine.DataChannel dataChannel) { + this.dataChannel = dataChannel; + } + + @Override + public StateMachine.DataChannel getDataChannel() { + return dataChannel; + } + + @Override + public CompletableFuture cleanUp() { + return CompletableFuture.supplyAsync(() -> { + try { + dataChannel.close(); + return true; + } catch (IOException e) { + throw new CompletionException("Failed to close data channel", e); + } + }); + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java new file mode 100644 index 000000000000..3df66e26dcc9 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import org.apache.ratis.statemachine.StateMachine; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.Path; + +class StreamDataChannel implements StateMachine.DataChannel { + private final Path path; + private final RandomAccessFile randomAccessFile; + + StreamDataChannel(Path path) throws FileNotFoundException { + this.path = path; + this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw"); + } + + @Override + public void force(boolean metadata) throws IOException { + randomAccessFile.getChannel().force(metadata); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return randomAccessFile.getChannel().write(src); + } + + @Override + public boolean isOpen() { + return randomAccessFile.getChannel().isOpen(); + } + + @Override + public void close() throws IOException { + randomAccessFile.close(); + } +} From cfd44449ccda1d6cf03e4bd243e5883148aadc0a Mon Sep 17 00:00:00 2001 From: captainzmc Date: Tue, 8 Jun 2021 21:36:29 +0800 Subject: [PATCH 02/64] update ozone ratis version to 2.1.0-d65ca26-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2773c69951bd..b4fa0ca9ee0b 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs ${ozone.version} - 2.1.0-ff8aa66-SNAPSHOT + 2.1.0-d65ca26-SNAPSHOT 0.7.0-a398b19-SNAPSHOT From c30661e85e9cd60cabde9ff1df65bf6708fd86e1 Mon Sep 17 00:00:00 2001 From: captainzmc Date: Tue, 29 Jun 2021 20:59:55 +0800 Subject: [PATCH 03/64] use ratis streaming --- .../hadoop/hdds/scm/XceiverClientRatis.java | 2 +- .../hdds/scm/storage/BlockOutputStream.java | 222 ++++++++++++------ .../hadoop/hdds/protocol/DatanodeDetails.java | 3 +- .../apache/hadoop/hdds/ratis/RatisHelper.java | 64 ++++- .../apache/hadoop/ozone/OzoneConfigKeys.java | 16 ++ .../apache/hadoop/ozone/audit/DNAction.java | 3 +- .../ContainerCommandRequestPBHelper.java | 1 + .../src/main/resources/ozone-default.xml | 18 ++ .../container/common/impl/HddsDispatcher.java | 3 +- .../server/ratis/ContainerStateMachine.java | 19 ++ .../server/ratis/XceiverServerRatis.java | 27 ++- .../container/keyvalue/KeyValueHandler.java | 41 ++++ .../keyvalue/impl/ChunkManagerDispatcher.java | 6 + .../keyvalue/impl/FilePerBlockStrategy.java | 8 + .../keyvalue/interfaces/ChunkManager.java | 5 + .../main/proto/DatanodeClientProtocol.proto | 6 +- .../dev-support/intellij/ozone-site.xml | 110 ++++++++- .../intellij/runConfigurations/Datanode2.xml | 2 +- .../intellij/runConfigurations/Datanode3.xml | 2 +- pom.xml | 2 +- 20 files changed, 464 insertions(+), 96 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 6982d41fbce5..fb264bdaea1e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -195,7 +195,7 @@ private void closeRaftClient(RaftClient raftClient) { } } - private RaftClient getClient() { + public RaftClient getClient() { return Objects.requireNonNull(client.get(), "client is null"); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index e80c1a3da89b..37f416ae9654 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -53,6 +54,11 @@ import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; + +import org.apache.ratis.client.api.DataStreamOutput; +import org.apache.ratis.io.StandardWriteOption; +import org.apache.ratis.io.WriteOption; +import org.apache.ratis.protocol.DataStreamReply; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,6 +100,7 @@ public class BlockOutputStream extends OutputStream { // request will fail upfront. private final AtomicReference ioException; private final ExecutorService responseExecutor; + private final ExecutorService putBlockExecutor; // the effective length of data flushed so far private long totalDataFlushedLength; @@ -124,6 +131,9 @@ public class BlockOutputStream extends OutputStream { //current buffer allocated to write private ChunkBuffer currentBuffer; private final Token token; + private final DataStreamOutput out; + List> futures = new ArrayList<>(); + int writeSize = 0; /** * Creates a new BlockOutputStream. @@ -149,7 +159,25 @@ public BlockOutputStream( this.containerBlockData = BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) .addMetadata(keyValue); - this.xceiverClient = xceiverClientManager.acquireClient(pipeline); + try { + this.xceiverClient = xceiverClientManager.acquireClient(pipeline); + } catch (Exception e) { + LOG.warn("exception:", e); + } + ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest = + ContainerProtos.WriteChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()); + + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + ContainerProtos.ContainerCommandRequestProto.Builder builder = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.StreamInit) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(id).setWriteChunk(writeChunkRequest); + + this.out = ((XceiverClientRatis)this.xceiverClient).getClient() + .getDataStreamApi() + .stream(builder.build().toByteString().asReadOnlyByteBuffer()); this.bufferPool = bufferPool; this.token = token; @@ -165,6 +193,7 @@ public BlockOutputStream( // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); + putBlockExecutor = Executors.newSingleThreadExecutor(); commitWatcher = new CommitWatcher(bufferPool, xceiverClient); bufferList = null; totalDataFlushedLength = 0; @@ -417,59 +446,67 @@ ContainerCommandResponseProto> executePutBlock(boolean close, byteBufferList = null; } - CompletableFuture flushFuture = null; - try { - BlockData blockData = containerBlockData.build(); - XceiverClientReply asyncReply = - putBlockAsync(xceiverClient, blockData, close, token); - CompletableFuture future = - asyncReply.getResponse(); - flushFuture = future.thenApplyAsync(e -> { + BlockData blockData = containerBlockData.build(); + + CompletableFuture[] EMPTY_COMPLETABLE_FUTURE_ARRAY = {}; + return CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY)).thenApplyAsync(v -> { + try { + CompletableFuture flushFuture = null; try { - validateResponse(e); - } catch (IOException sce) { - throw new CompletionException(sce); - } - // if the ioException is not set, putBlock is successful - if (getIoException() == null && !force) { - BlockID responseBlockID = BlockID.getFromProtobuf( - e.getPutBlock().getCommittedBlockLength().getBlockID()); - Preconditions.checkState(blockID.get().getContainerBlockID() - .equals(responseBlockID.getContainerBlockID())); - // updates the bcsId of the block - blockID.set(responseBlockID); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Adding index " + asyncReply.getLogIndex() + " commitMap size " - + commitWatcher.getCommitInfoMapSize() + " flushLength " - + flushPos + " numBuffers " + byteBufferList.size() - + " blockID " + blockID + " bufferPool size" + bufferPool - .getSize() + " currentBufferIndex " + bufferPool - .getCurrentBufferIndex()); - } - // for standalone protocol, logIndex will always be 0. - commitWatcher - .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); - } - return e; - }, responseExecutor).exceptionally(e -> { - if (LOG.isDebugEnabled()) { - LOG.debug("putBlock failed for blockID {} with exception {}", + XceiverClientReply asyncReply = + putBlockAsync(xceiverClient, blockData, close, token); + CompletableFuture future = + asyncReply.getResponse(); + flushFuture = future.thenApplyAsync(e -> { + try { + validateResponse(e); + } catch (IOException sce) { + throw new CompletionException(sce); + } + // if the ioException is not set, putBlock is successful + if (getIoException() == null && !force) { + BlockID responseBlockID = BlockID.getFromProtobuf( + e.getPutBlock().getCommittedBlockLength().getBlockID()); + Preconditions.checkState(blockID.get().getContainerBlockID() + .equals(responseBlockID.getContainerBlockID())); + // updates the bcsId of the block + blockID.set(responseBlockID); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Adding index " + asyncReply.getLogIndex() + " commitMap size " + + commitWatcher.getCommitInfoMapSize() + " flushLength " + + flushPos + " numBuffers " + byteBufferList.size() + + " blockID " + blockID + " bufferPool size" + bufferPool + .getSize() + " currentBufferIndex " + bufferPool + .getCurrentBufferIndex()); + } + // for standalone protocol, logIndex will always be 0. + commitWatcher + .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); + } + return e; + }, responseExecutor).exceptionally(e -> { + if (LOG.isDebugEnabled()) { + LOG.debug("putBlock failed for blockID {} with exception {}", blockID, e.getLocalizedMessage()); + } + CompletionException ce = new CompletionException(e); + setIoException(ce); + throw ce; + }); + } catch (IOException | ExecutionException e) { + throw new IOException(EXCEPTION_MSG + e.toString(), e); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + handleInterruptedException(ex, false); } - CompletionException ce = new CompletionException(e); - setIoException(ce); - throw ce; - }); - } catch (IOException | ExecutionException e) { - throw new IOException(EXCEPTION_MSG + e.toString(), e); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - handleInterruptedException(ex, false); - } - commitWatcher.getFutureMap().put(flushPos, flushFuture); - return flushFuture; + commitWatcher.getFutureMap().put(flushPos, flushFuture); + return flushFuture.get(); + } catch (Exception e) { + throw new CompletionException(e); + } + }, putBlockExecutor); } @Override @@ -545,11 +582,16 @@ public void close() throws IOException { && bufferPool != null && bufferPool.getSize() > 0) { try { handleFlush(true); + out.closeAsync().thenApplyAsync(r -> { + return null; + }, responseExecutor); } catch (ExecutionException e) { handleExecutionException(e); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); handleInterruptedException(ex, true); + } catch (Exception e) { + LOG.error(e.toString()); } finally { cleanup(false); } @@ -660,32 +702,60 @@ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException { chunkInfo.getChunkName(), effectiveChunkSize, offset); } - try { - XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, - blockID.get(), data, token); - CompletableFuture future = - asyncReply.getResponse(); - future.thenApplyAsync(e -> { - try { - validateResponse(e); - } catch (IOException sce) { - future.completeExceptionally(sce); - } - return e; - }, responseExecutor).exceptionally(e -> { - String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " + - "into block " + blockID; - LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); - CompletionException ce = new CompletionException(msg, e); - setIoException(ce); - throw ce; - }); - } catch (IOException | ExecutionException e) { - throw new IOException(EXCEPTION_MSG + e.toString(), e); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - handleInterruptedException(ex, false); + writeSize += data.asReadOnlyByteBuffer().remaining(); +// try { +// XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, +// blockID.get(), data, token); +// CompletableFuture future = +// asyncReply.getResponse(); +// future.thenApplyAsync(e -> { +// try { +// validateResponse(e); +// } catch (IOException sce) { +// future.completeExceptionally(sce); +// } +// return e; +// }, responseExecutor).exceptionally(e -> { +// String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " + +// "into block " + blockID; +// LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage()); +// CompletionException ce = new CompletionException(msg, e); +// setIoException(ce); +// throw ce; +// }); +// } catch (IOException | ExecutionException e) { +// throw new IOException(EXCEPTION_MSG + e.toString(), e); +// } catch (InterruptedException ex) { +// Thread.currentThread().interrupt(); +// handleInterruptedException(ex, false); +// } + WriteOption[] options = new WriteOption[0]; + if (writeSize >= 16 * 1000 * 1000) { + writeSize = 0; + options = new WriteOption[1]; + options[0] = StandardWriteOption.SYNC; } + + CompletableFuture future = out.writeAsync(data.asReadOnlyByteBuffer(), options) + .whenCompleteAsync((r,e) -> { + if (e != null) { + LOG.error("writing chunk failed " + chunkInfo.getChunkName() + + " blockID " + blockID + " with exception " + + e.getLocalizedMessage()); + CompletionException ce = new CompletionException(e); + setIoException(ce); + throw ce; + } + if (!r.isSuccess()) { + LOG.error("writing chunk failed " + chunkInfo.getChunkName() + + " blockID " + blockID + " with exception "); + CompletionException ce = new CompletionException(new IOException("write chunk failed")); + setIoException(ce); + throw ce; + } + }, responseExecutor); + + futures.add(future); containerBlockData.addChunks(chunkInfo); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 7faa7415134b..cdd88239ac2d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -774,7 +774,8 @@ public static final class Port { * Ports that are supported in DataNode. */ public enum Name { - STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER; + STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER, + DATASTREAM; public static final Set ALL_PORTS = ImmutableSet.copyOf( Name.values()); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index e310cc9f7ddd..234739c210a3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -43,7 +43,9 @@ import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.datastream.SupportedDataStreamType; import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcFactory; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftGroup; @@ -79,6 +81,11 @@ public final class RatisHelper { private RatisHelper() { } + private static String toRaftPeerDataStreamAddressString(DatanodeDetails id) { + return id.getIpAddress() + ":" + + id.getPort(DatanodeDetails.Port.Name.DATASTREAM).getValue(); + } + private static String toRaftPeerIdString(DatanodeDetails id) { return id.getUuidString(); } @@ -110,6 +117,7 @@ public static RaftPeer toRaftPeer(DatanodeDetails id) { public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) { return raftPeerBuilderFor(id) .setPriority(priority) + .setDataStreamAddress(toRaftPeerDataStreamAddressString(id)) .build(); } @@ -118,7 +126,8 @@ private static RaftPeer.Builder raftPeerBuilderFor(DatanodeDetails dn) { .setId(toRaftPeerId(dn)) .setAddress(toRaftPeerAddress(dn, Port.Name.RATIS_SERVER)) .setAdminAddress(toRaftPeerAddress(dn, Port.Name.RATIS_ADMIN)) - .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS)); + .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS)) + .setDataStreamAddress(toRaftPeerDataStreamAddressString(dn)); } private static List toRaftPeers(Pipeline pipeline) { @@ -171,7 +180,7 @@ public static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, ConfigurationSource ozoneConfiguration) throws IOException { return newRaftClient(rpcType, - toRaftPeerId(pipeline.getLeaderNode()), + pipeline.getLeaderNode(), newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()), pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration); } @@ -191,7 +200,7 @@ public static RaftClient newRaftClient(RaftPeer leader, public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, ConfigurationSource configuration) { - return newRaftClient(rpcType, leader.getId(), + return newRaftClient(rpcType, leader, newRaftGroup(Collections.singletonList(leader)), retryPolicy, tlsConfig, configuration); } @@ -199,13 +208,51 @@ public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, RetryPolicy retryPolicy, ConfigurationSource ozoneConfiguration) { - return newRaftClient(rpcType, leader.getId(), + return newRaftClient(rpcType, leader, newRaftGroup(Collections.singletonList(leader)), retryPolicy, null, ozoneConfiguration); } @SuppressWarnings("checkstyle:ParameterNumber") - private static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, + private static RaftClient newRaftClient(RpcType rpcType, DatanodeDetails leaderDn, + RaftGroup group, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, + ConfigurationSource ozoneConfiguration) { + if (LOG.isTraceEnabled()) { + LOG.trace("newRaftClient: {}, leader={}, group={}", + rpcType, leaderDn, group); + } + final RaftProperties properties = new RaftProperties(); + + RaftConfigKeys.Rpc.setType(properties, rpcType); + + RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY); + + // Set the ratis client headers which are matching with regex. + createRaftClientProperties(ozoneConfiguration, properties); + + RaftPeerId leaderId = RatisHelper.toRaftPeerId(leaderDn); + RaftPeer leader = RaftPeer.newBuilder() + .setId(leaderId.toString()) + .setAddress(leaderDn.getIpAddress() + ":" + leaderDn.getPort(DatanodeDetails.Port.Name.RATIS).getValue()) + .setDataStreamAddress(leaderDn.getIpAddress() + ":" + leaderDn.getPort(DatanodeDetails.Port.Name.DATASTREAM).getValue()) + .build(); + RaftClient.Builder builder = RaftClient.newBuilder() + .setRaftGroup(group) + .setLeaderId(leaderId) + .setProperties(properties) + .setRetryPolicy(retryPolicy) + .setPrimaryDataStreamServer(leader); + + // TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later. + if (tlsConfig != null && rpcType == SupportedRpcType.GRPC) { + builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig)); + } + return builder.build(); + } + + + @SuppressWarnings("checkstyle:ParameterNumber") + private static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, RaftGroup group, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, ConfigurationSource ozoneConfiguration) { if (LOG.isTraceEnabled()) { @@ -214,6 +261,8 @@ private static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, } final RaftProperties properties = new RaftProperties(); + RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY); + RaftConfigKeys.Rpc.setType(properties, rpcType); // Set the ratis client headers which are matching with regex. @@ -221,9 +270,10 @@ private static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, RaftClient.Builder builder = RaftClient.newBuilder() .setRaftGroup(group) - .setLeaderId(leader) + .setLeaderId(leader.getId()) .setProperties(properties) - .setRetryPolicy(retryPolicy); + .setRetryPolicy(retryPolicy) + .setPrimaryDataStreamServer(leader); // TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later. if (tlsConfig != null && rpcType == SupportedRpcType.GRPC) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 7492a3ad13ee..e77b04bb4acc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -66,6 +66,22 @@ public final class OzoneConfigKeys { public static final String DFS_CONTAINER_RATIS_IPC_PORT = "dfs.container.ratis.ipc"; public static final int DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT = 9858; + + /** + * Ratis Port where containers listen to. + */ + public static final String DFS_CONTAINER_RATIS_DATASTREAM_IPC_PORT = + "dfs.container.ratis.datastream.ipc"; + public static final int DFS_CONTAINER_RATIS_DATASTREAM_IPC_PORT_DEFAULT = 9857; + + public static final String DFS_CONTAINER_RATIS_DATASTREAM_REQUEST_THREADS = + "dfs.container.ratis.datastream.request.threads"; + public static final int DFS_CONTAINER_RATIS_DATASTREAM_REQUEST_THREADS_DEFAULT = 200; + + public static final String DFS_CONTAINER_RATIS_DATASTREAM_WRITE_THREADS = + "dfs.container.ratis.datastream.write.threads"; + public static final int DFS_CONTAINER_RATIS_DATASTREAM_WRITE_THREADS_DEFAULT = 200; + /** * Ratis Port where containers listen to admin requests. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index 1c87f2bdebad..73aff9ac830c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -38,7 +38,8 @@ public enum DNAction implements AuditAction { PUT_SMALL_FILE, GET_SMALL_FILE, CLOSE_CONTAINER, - GET_COMMITTED_BLOCK_LENGTH; + GET_COMMITTED_BLOCK_LENGTH, + STREAM_INIT; @Override public String getAction() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java index 7773828e2db3..b2f46748128b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java @@ -187,6 +187,7 @@ public static DNAction getAuditAction(Type cmdType) { case GetSmallFile : return DNAction.GET_SMALL_FILE; case CloseContainer : return DNAction.CLOSE_CONTAINER; case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH; + case StreamInit : return DNAction.STREAM_INIT; default : LOG.debug("Invalid command type - {}", cmdType); return null; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a66ee9368775..fd7c9313c666 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -53,6 +53,24 @@ OZONE, CONTAINER, MANAGEMENT The ipc port number of container. + + dfs.container.ratis.datastream.ipc + 9890 + OZONE, CONTAINER, RATIS, DATASTREAM + The datastream ipc port number of container. + + + dfs.container.ratis.datastream.request.threads + 200 + OZONE, CONTAINER, RATIS, DATASTREAM + Maximum number of threads in the thread pool for datastream request. + + + dfs.container.ratis.datastream.write.threads + 200 + OZONE, CONTAINER, RATIS, DATASTREAM + Maximum number of threads in the thread pool for datastream write. + dfs.container.ipc.random.port false diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 864fcb6d3a74..91102c0e2e35 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -199,7 +199,8 @@ private ContainerCommandResponseProto dispatchRequest( boolean isWriteStage = (cmdType == Type.WriteChunk && dispatcherContext != null && dispatcherContext.getStage() - == DispatcherContext.WriteChunkStage.WRITE_DATA); + == DispatcherContext.WriteChunkStage.WRITE_DATA) + || (cmdType == Type.StreamInit); boolean isWriteCommitStage = (cmdType == Type.WriteChunk && dispatcherContext != null && dispatcherContext.getStage() diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 1cfb771dc957..c76641d04d7e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -79,6 +79,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -89,6 +90,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat; import org.apache.ratis.util.TaskQueue; import org.apache.ratis.util.function.CheckedSupplier; +import org.apache.ratis.util.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,6 +145,7 @@ public class ContainerStateMachine extends BaseStateMachine { // keeps track of the containers created per pipeline private final Map container2BCSIDMap; + private final ConcurrentMap containerTaskQueues; private final ExecutorService executor; private final List chunkExecutors; @@ -519,6 +522,22 @@ public CompletableFuture stream(RaftClientRequest request) { }, executor); } + public CompletableFuture link(DataStream stream, LogEntryProto entry) { + return CompletableFuture.supplyAsync(() -> { + if (stream == null) { + return JavaUtils.completeExceptionally( + new IllegalStateException("DataStream: " + stream + "is Null")); + } + if (stream.getDataChannel().isOpen()) { + return JavaUtils.completeExceptionally( + new IllegalStateException( + "DataStream: " + stream + " is not closed properly")); + } else { + return CompletableFuture.completedFuture(null); + } + }, executor); + } + private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { int hash = Objects.hashCode(req.getBlockID()); if (hash == Integer.MIN_VALUE) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 867127ed3b75..5adf471e523b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -77,6 +77,7 @@ import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.datastream.SupportedDataStreamType; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.netty.NettyConfigKeys; @@ -126,6 +127,7 @@ private static long nextCallId() { private int serverPort; private int adminPort; private int clientPort; + private int dataStreamPort; private final RaftServer server; private final List chunkExecutors; private final ContainerDispatcher dispatcher; @@ -146,11 +148,12 @@ private static long nextCallId() { private long requestTimeout; private boolean shouldDeleteRatisLogDirectory; - private XceiverServerRatis(DatanodeDetails dd, + private XceiverServerRatis(DatanodeDetails dd, int dataStreamPort, ContainerDispatcher dispatcher, ContainerController containerController, StateContext context, ConfigurationSource conf, Parameters parameters) throws IOException { this.conf = conf; + this.dataStreamPort = dataStreamPort; Objects.requireNonNull(dd, "id == null"); datanodeDetails = dd; assignPorts(); @@ -227,6 +230,19 @@ private RaftProperties newRaftProperties() { // set the configs enable and set the stateMachineData sync timeout RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true); + + // set the datastream config + NettyConfigKeys.DataStream.setPort(properties, dataStreamPort); + RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY); + int dataStreamAsyncRequestThreadPoolSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_REQUEST_THREADS, + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_REQUEST_THREADS_DEFAULT); + RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties, dataStreamAsyncRequestThreadPoolSize); + int dataStreamWriteRequestThreadPoolSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_WRITE_THREADS, + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_WRITE_THREADS_DEFAULT); + RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties, dataStreamWriteRequestThreadPoolSize); + timeUnit = OzoneConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT.getUnit(); duration = conf.getTimeDuration( @@ -439,8 +455,10 @@ public static XceiverServerRatis newXceiverServerRatis( CertificateClient caClient, StateContext context) throws IOException { Parameters parameters = createTlsParameters( new SecurityConfig(ozoneConf), caClient); - - return new XceiverServerRatis(datanodeDetails, dispatcher, + int dataStreamPort = ozoneConf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_IPC_PORT_DEFAULT); + return new XceiverServerRatis(datanodeDetails, dataStreamPort, dispatcher, containerController, context, ozoneConf, parameters); } @@ -496,6 +514,9 @@ public void start() throws IOException { private int getRealPort(InetSocketAddress address, Port.Name name) { int realPort = address.getPort(); datanodeDetails.setPort(DatanodeDetails.newPort(name, realPort)); + datanodeDetails.setPort(DatanodeDetails + .newPort(DatanodeDetails.Port.Name.DATASTREAM, + dataStreamPort)); LOG.info("{} {} is started using port {} for {}", getClass().getSimpleName(), server.getId(), realPort, name); return realPort; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 163556fb9fb0..6e73d7d7a4cf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -93,6 +93,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; @@ -204,6 +205,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, return handler.handleDeleteChunk(request, kvContainer); case WriteChunk: return handler.handleWriteChunk(request, kvContainer, dispatcherContext); + case StreamInit: + return handler.handleStreamInit(request, kvContainer, dispatcherContext); case ListChunk: return handler.handleUnsupportedOp(request); case CompactChunk: @@ -230,6 +233,44 @@ public BlockManager getBlockManager() { return this.blockManager; } + ContainerCommandResponseProto handleStreamInit( + ContainerCommandRequestProto request, KeyValueContainer kvContainer, + DispatcherContext dispatcherContext) { + if (!request.hasWriteChunk()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Malformed Write Chunk request. trace ID: {}", + request.getTraceID()); + } + return malformedRequest(request); + } + + String path = null; + try { + checkContainerOpen(kvContainer); + + WriteChunkRequestProto writeChunk = request.getWriteChunk(); + BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID()); + + if (dispatcherContext == null) { + dispatcherContext = new DispatcherContext.Builder().build(); + } + + path = chunkManager + .streamInit(kvContainer, blockID, dispatcherContext); + + } catch (StorageContainerException ex) { + return ContainerUtils.logAndReturnError(LOG, ex, request); + } catch (IOException ex) { + return ContainerUtils.logAndReturnError(LOG, + new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), + request); + } + + return getSuccessResponseBuilder(request) + .setMessage(path) + .build(); + } + /** * Handles Create Container Request. If successful, adds the container to * ContainerSet and sends an ICR to the SCM. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java index 27fe0d9cc0d2..ebf20ecd6285 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java @@ -74,6 +74,12 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info, .writeChunk(container, blockID, info, data, dispatcherContext); } + public String streamInit(Container container, BlockID blockID, DispatcherContext dispatcherContext) + throws StorageContainerException { + return selectHandler(container) + .streamInit(container, blockID, dispatcherContext); + } + @Override public void finishWriteChunks(KeyValueContainer kvContainer, BlockData blockData) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index 3afcdf32496c..13411986d9ad 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -89,6 +89,14 @@ private static void checkLayoutVersion(Container container) { container.getContainerData().getLayOutVersion() == FILE_PER_BLOCK); } + @Override + public String streamInit(Container container, BlockID blockID, DispatcherContext dispatcherContext) + throws StorageContainerException { + checkLayoutVersion(container); + File chunkFile = getChunkFile(container, blockID, null); + return chunkFile.getAbsolutePath(); + } + @Override public void writeChunk(Container container, BlockID blockID, ChunkInfo info, ChunkBuffer data, DispatcherContext dispatcherContext) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index 15ff9d6b9d61..dbee93a026f3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -60,6 +60,11 @@ default void writeChunk(Container container, BlockID blockID, ChunkInfo info, writeChunk(container, blockID, info, wrapper, dispatcherContext); } + default String streamInit(Container container, BlockID blockID, DispatcherContext dispatcherContext) + throws StorageContainerException { + return null; + } + /** * reads the data defined by a chunk. * diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 31947db18adc..8a4baaa773d2 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -100,6 +100,8 @@ enum Type { GetSmallFile = 16; CloseContainer = 17; GetCommittedBlockLength = 18; + + StreamInit = 19; } @@ -391,7 +393,7 @@ enum ChecksumType { message WriteChunkRequestProto { required DatanodeBlockID blockID = 1; - required ChunkInfo chunkData = 2; + optional ChunkInfo chunkData = 2; optional bytes data = 3; } @@ -405,7 +407,7 @@ enum ReadChunkVersion { message ReadChunkRequestProto { required DatanodeBlockID blockID = 1; - required ChunkInfo chunkData = 2; + optional ChunkInfo chunkData = 2; optional ReadChunkVersion readChunkVersion = 3; } diff --git a/hadoop-ozone/dev-support/intellij/ozone-site.xml b/hadoop-ozone/dev-support/intellij/ozone-site.xml index e691d911ebc6..4d1ac26fcd81 100644 --- a/hadoop-ozone/dev-support/intellij/ozone-site.xml +++ b/hadoop-ozone/dev-support/intellij/ozone-site.xml @@ -59,6 +59,114 @@ hdds.prometheus.endpoint.enabled true + + + ozone.replication.type + RATIS + + + ozone.scm.pipeline.creation.auto.factor.one + false + + + + ozone.enabled + true + + + ozone.metastore.rocksdb.statistics + OFF + + + ozone.scm.pipeline.owner.container.count + 1 + + + dfs.container.ratis.num.write.chunk.threads + 20 + + + ozone.client.checksum.type + NONE + + + raft.server.log.corruption.policy + WARN_AND_RETURN + + + ozone.datanode.pipeline.limit + 3 + + + ozone.scm.pipeline.leader-choose.policy + org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.MinLeaderCountChoosePolicy + + + hdds.tracing.enabled + false + + + rpc.metrics.quantile.enable + false + + + rpc.metrics.percentiles.intervals + 60,300 + + + ozone.scm.handler.count.key + 200 + + + dfs.datanode.use.datanode.hostname + true + + + hdds.profiler.endpoint.enabled + false + + + ozone.client.checksum.type + NONE + + + raft.server.log.corruption.policy + WARN_AND_RETURN + + + hdds.ratis.client.retry.policy + org.apache.hadoop.hdds.ratis.retrypolicy.RetryLimitedPolicyCreator + + + hdds.ratis.client.retrylimited.max.retries + 5 + + + hdds.ratis.raft.client.rpc.request.timeout + 10s + + + hdds.ratis.raft.client.rpc.watch.request.timeout + 30s + + + ozone.scm.heartbeat.rpc-timeout + 20s + + + ozone.scm.stale.node.interval + 15m + + + ozone.scm.dead.node.interval + 30m + + + dfs.container.ratis.num.write.chunk.threads.per.volume + 20 + + + diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml index 3d3302030d18..e125880899c7 100644 --- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml +++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml @@ -18,7 +18,7 @@