diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 0af9657b69f4..e8ab1b0af827 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -589,6 +589,9 @@ public CompletableFuture link(DataStream stream, LogEntryProto entry) { if (stream == null) { return JavaUtils.completeExceptionally(new IllegalStateException( "DataStream is null")); + } else if (!(stream instanceof LocalStream)) { + return JavaUtils.completeExceptionally(new IllegalStateException( + "Unexpected DataStream " + stream.getClass())); } final DataChannel dataChannel = stream.getDataChannel(); if (dataChannel.isOpen()) { @@ -596,16 +599,36 @@ public CompletableFuture link(DataStream stream, LogEntryProto entry) { "DataStream: " + stream + " is not closed properly")); } - final ContainerCommandRequestProto request; - if (dataChannel instanceof KeyValueStreamDataChannel) { - request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest(); - } else { + if (!(dataChannel instanceof KeyValueStreamDataChannel)) { return JavaUtils.completeExceptionally(new IllegalStateException( "Unexpected DataChannel " + dataChannel.getClass())); } - return runCommandAsync(request, entry).whenComplete( - (res, e) -> LOG.debug("link {}, entry: {}, request: {}", - res.getResult(), entry, request)); + + final KeyValueStreamDataChannel kvStreamDataChannel = + (KeyValueStreamDataChannel) dataChannel; + + final ContainerCommandRequestProto request = + kvStreamDataChannel.getPutBlockRequest(); + + return runCommandAsync(request, entry).whenComplete((response, e) -> { + if (e != null) { + LOG.warn("Failed to link logEntry {} for request {}", + TermIndex.valueOf(entry), request, e); + } + if (response != null) { + final ContainerProtos.Result result = response.getResult(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} to link logEntry {} for request {}, response: {}", + result, TermIndex.valueOf(entry), request, response); + } + if (result == ContainerProtos.Result.SUCCESS) { + kvStreamDataChannel.setLinked(); + return; + } + } + // failed to link, cleanup + kvStreamDataChannel.cleanUp(); + }); } private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java index 8daa7185b680..2473fdeb0ef6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java @@ -18,11 +18,11 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.JavaUtils; -import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; class LocalStream implements StateMachine.DataStream { @@ -41,14 +41,13 @@ public StateMachine.DataChannel getDataChannel() { @Override public CompletableFuture cleanUp() { - return CompletableFuture.supplyAsync(() -> { - try { - dataChannel.close(); - return true; - } catch (IOException e) { - throw new CompletionException("Failed to close data channel", e); - } - }); + if (!(dataChannel instanceof KeyValueStreamDataChannel)) { + return JavaUtils.completeExceptionally(new IllegalStateException( + "Unexpected DataChannel " + dataChannel.getClass())); + } + return CompletableFuture + .supplyAsync(((KeyValueStreamDataChannel) dataChannel)::cleanUp, + executor); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index 99dc40f5d002..e34a1e273c2d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -131,6 +131,12 @@ ReferenceCountedObject pollAll() { refs.forEach(ReferenceCountedObject::release); }); } + + void cleanUpAll() { + while (!deque.isEmpty()) { + poll().release(); + } + } } interface WriteMethod { @@ -198,7 +204,18 @@ void assertOpen() throws IOException { @Override public void close() throws IOException { if (closed.compareAndSet(false, true)) { - putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel)); + try { + putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel)); + } finally { + super.close(); + } + } + } + + @Override + protected void cleanupInternal() throws IOException { + buffers.cleanUpAll(); + if (closed.compareAndSet(false, true)) { super.close(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java index 4b1a255e93ef..58fc2c348bef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java @@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; @@ -30,16 +32,23 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure; /** * For write state machine data. */ -abstract class StreamDataChannelBase implements StateMachine.DataChannel { +abstract class StreamDataChannelBase + implements StateMachine.DataChannel { + static final Logger LOG = LoggerFactory.getLogger( + StreamDataChannelBase.class); + private final RandomAccessFile randomAccessFile; private final File file; + private final AtomicBoolean linked = new AtomicBoolean(); + private final AtomicBoolean cleaned = new AtomicBoolean(); private final ContainerData containerData; private final ContainerMetrics metrics; @@ -85,6 +94,29 @@ public final boolean isOpen() { return getChannel().isOpen(); } + public void setLinked() { + linked.set(true); + } + + /** @return true iff {@link StateMachine.DataChannel} is already linked. */ + public boolean cleanUp() { + if (linked.get()) { + // already linked, nothing to do. + return true; + } + if (cleaned.compareAndSet(false, true)) { + // close and then delete the file. + try { + cleanupInternal(); + } catch (IOException e) { + LOG.warn("Failed to close " + this, e); + } + } + return false; + } + + protected abstract void cleanupInternal() throws IOException; + @Override public void close() throws IOException { try {