From 70355c95227f81fa6d5ac9150bec57e921e82b01 Mon Sep 17 00:00:00 2001 From: guohao1 Date: Wed, 14 Jun 2023 19:27:26 +0800 Subject: [PATCH 1/2] cleanup --- .../server/ratis/ContainerStateMachine.java | 33 +++++++++++++++--- .../transport/server/ratis/LocalStream.java | 16 +++------ .../impl/KeyValueStreamDataChannel.java | 20 ++++++++++- .../keyvalue/impl/StreamDataChannelBase.java | 34 ++++++++++++++++++- 4 files changed, 84 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 0af9657b69f4..cf5632f69d7a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -589,6 +589,9 @@ public CompletableFuture link(DataStream stream, LogEntryProto entry) { if (stream == null) { return JavaUtils.completeExceptionally(new IllegalStateException( "DataStream is null")); + } else if (!(stream instanceof LocalStream)) { + return JavaUtils.completeExceptionally(new IllegalStateException( + "Unexpected DataStream " + stream.getClass())); } final DataChannel dataChannel = stream.getDataChannel(); if (dataChannel.isOpen()) { @@ -596,16 +599,36 @@ public CompletableFuture link(DataStream stream, LogEntryProto entry) { "DataStream: " + stream + " is not closed properly")); } - final ContainerCommandRequestProto request; if (dataChannel instanceof KeyValueStreamDataChannel) { - request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest(); + final KeyValueStreamDataChannel kvStreamDataChannel = + (KeyValueStreamDataChannel) dataChannel; + + ContainerCommandRequestProto request = + kvStreamDataChannel.getPutBlockRequest(); + + return runCommandAsync(request, entry).whenComplete((response, e) -> { + if (e != null) { + LOG.warn("Failed to link logEntry {} for request {}", + TermIndex.valueOf(entry), request, e); + } + if (response != null) { + final ContainerProtos.Result result = response.getResult(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} to link logEntry {} for request {}, response: {}", + result, TermIndex.valueOf(entry), request, response); + } + if (result == ContainerProtos.Result.SUCCESS) { + kvStreamDataChannel.setLinked(); + return; + } + } + // failed to link, cleanup + kvStreamDataChannel.cleanUp(); + }); } else { return JavaUtils.completeExceptionally(new IllegalStateException( "Unexpected DataChannel " + dataChannel.getClass())); } - return runCommandAsync(request, entry).whenComplete( - (res, e) -> LOG.debug("link {}, entry: {}, request: {}", - res.getResult(), entry, request)); } private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java index 8daa7185b680..495a1913ee90 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java @@ -18,19 +18,18 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase; import org.apache.ratis.statemachine.StateMachine; -import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; class LocalStream implements StateMachine.DataStream { - private final StateMachine.DataChannel dataChannel; + private final StreamDataChannelBase dataChannel; private final Executor executor; LocalStream(StateMachine.DataChannel dataChannel, Executor executor) { - this.dataChannel = dataChannel; + this.dataChannel = (StreamDataChannelBase) dataChannel; this.executor = executor; } @@ -41,14 +40,7 @@ public StateMachine.DataChannel getDataChannel() { @Override public CompletableFuture cleanUp() { - return CompletableFuture.supplyAsync(() -> { - try { - dataChannel.close(); - return true; - } catch (IOException e) { - throw new CompletionException("Failed to close data channel", e); - } - }); + return CompletableFuture.supplyAsync(dataChannel::cleanUp, executor); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index 99dc40f5d002..d4bd4530bdc3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -131,6 +131,13 @@ ReferenceCountedObject pollAll() { refs.forEach(ReferenceCountedObject::release); }); } + + void cleanUpAll() { + final int size = deque.size(); + for (int i = 0; i < size; i++) { + Optional.ofNullable(poll()).ifPresent(ReferenceCountedObject::release); + } + } } interface WriteMethod { @@ -198,7 +205,18 @@ void assertOpen() throws IOException { @Override public void close() throws IOException { if (closed.compareAndSet(false, true)) { - putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel)); + try { + putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel)); + } finally { + super.close(); + } + } + } + + @Override + protected void cleanupInternal() throws IOException { + buffers.cleanUpAll(); + if (!closed.get()) { super.close(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java index 4b1a255e93ef..403c56bb805f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java @@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; @@ -30,16 +32,23 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure; /** * For write state machine data. */ -abstract class StreamDataChannelBase implements StateMachine.DataChannel { +public abstract class StreamDataChannelBase + implements StateMachine.DataChannel { + static final Logger LOG = LoggerFactory.getLogger( + StreamDataChannelBase.class); + private final RandomAccessFile randomAccessFile; private final File file; + private final AtomicBoolean linked = new AtomicBoolean(); + private final AtomicBoolean cleaned = new AtomicBoolean(); private final ContainerData containerData; private final ContainerMetrics metrics; @@ -85,6 +94,29 @@ public final boolean isOpen() { return getChannel().isOpen(); } + public void setLinked() { + linked.set(true); + } + + /** @return true iff {@link StateMachine.DataChannel} is already linked. */ + public boolean cleanUp() { + if (linked.get()) { + // already linked, nothing to do. + return true; + } + if (cleaned.compareAndSet(false, true)) { + // close and then delete the file. + try { + cleanupInternal(); + } catch (IOException e) { + LOG.warn("Failed to close " + this, e); + } + } + return false; + } + + protected abstract void cleanupInternal() throws IOException; + @Override public void close() throws IOException { try { From b60cd109cf6f96f7075644e9f95438b0be0d8e89 Mon Sep 17 00:00:00 2001 From: guohao1 Date: Thu, 15 Jun 2023 12:24:29 +0800 Subject: [PATCH 2/2] cleanup code review --- .../server/ratis/ContainerStateMachine.java | 52 +++++++++---------- .../transport/server/ratis/LocalStream.java | 15 ++++-- .../impl/KeyValueStreamDataChannel.java | 7 ++- .../keyvalue/impl/StreamDataChannelBase.java | 2 +- 4 files changed, 41 insertions(+), 35 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index cf5632f69d7a..e8ab1b0af827 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -599,36 +599,36 @@ public CompletableFuture link(DataStream stream, LogEntryProto entry) { "DataStream: " + stream + " is not closed properly")); } - if (dataChannel instanceof KeyValueStreamDataChannel) { - final KeyValueStreamDataChannel kvStreamDataChannel = - (KeyValueStreamDataChannel) dataChannel; + if (!(dataChannel instanceof KeyValueStreamDataChannel)) { + return JavaUtils.completeExceptionally(new IllegalStateException( + "Unexpected DataChannel " + dataChannel.getClass())); + } + + final KeyValueStreamDataChannel kvStreamDataChannel = + (KeyValueStreamDataChannel) dataChannel; - ContainerCommandRequestProto request = - kvStreamDataChannel.getPutBlockRequest(); + final ContainerCommandRequestProto request = + kvStreamDataChannel.getPutBlockRequest(); - return runCommandAsync(request, entry).whenComplete((response, e) -> { - if (e != null) { - LOG.warn("Failed to link logEntry {} for request {}", - TermIndex.valueOf(entry), request, e); + return runCommandAsync(request, entry).whenComplete((response, e) -> { + if (e != null) { + LOG.warn("Failed to link logEntry {} for request {}", + TermIndex.valueOf(entry), request, e); + } + if (response != null) { + final ContainerProtos.Result result = response.getResult(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} to link logEntry {} for request {}, response: {}", + result, TermIndex.valueOf(entry), request, response); } - if (response != null) { - final ContainerProtos.Result result = response.getResult(); - if (LOG.isDebugEnabled()) { - LOG.debug("{} to link logEntry {} for request {}, response: {}", - result, TermIndex.valueOf(entry), request, response); - } - if (result == ContainerProtos.Result.SUCCESS) { - kvStreamDataChannel.setLinked(); - return; - } + if (result == ContainerProtos.Result.SUCCESS) { + kvStreamDataChannel.setLinked(); + return; } - // failed to link, cleanup - kvStreamDataChannel.cleanUp(); - }); - } else { - return JavaUtils.completeExceptionally(new IllegalStateException( - "Unexpected DataChannel " + dataChannel.getClass())); - } + } + // failed to link, cleanup + kvStreamDataChannel.cleanUp(); + }); } private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java index 495a1913ee90..2473fdeb0ef6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java @@ -18,18 +18,19 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; -import org.apache.hadoop.ozone.container.keyvalue.impl.StreamDataChannelBase; +import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.JavaUtils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; class LocalStream implements StateMachine.DataStream { - private final StreamDataChannelBase dataChannel; + private final StateMachine.DataChannel dataChannel; private final Executor executor; LocalStream(StateMachine.DataChannel dataChannel, Executor executor) { - this.dataChannel = (StreamDataChannelBase) dataChannel; + this.dataChannel = dataChannel; this.executor = executor; } @@ -40,7 +41,13 @@ public StateMachine.DataChannel getDataChannel() { @Override public CompletableFuture cleanUp() { - return CompletableFuture.supplyAsync(dataChannel::cleanUp, executor); + if (!(dataChannel instanceof KeyValueStreamDataChannel)) { + return JavaUtils.completeExceptionally(new IllegalStateException( + "Unexpected DataChannel " + dataChannel.getClass())); + } + return CompletableFuture + .supplyAsync(((KeyValueStreamDataChannel) dataChannel)::cleanUp, + executor); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index d4bd4530bdc3..e34a1e273c2d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -133,9 +133,8 @@ ReferenceCountedObject pollAll() { } void cleanUpAll() { - final int size = deque.size(); - for (int i = 0; i < size; i++) { - Optional.ofNullable(poll()).ifPresent(ReferenceCountedObject::release); + while (!deque.isEmpty()) { + poll().release(); } } } @@ -216,7 +215,7 @@ public void close() throws IOException { @Override protected void cleanupInternal() throws IOException { buffers.cleanUpAll(); - if (!closed.get()) { + if (closed.compareAndSet(false, true)) { super.close(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java index 403c56bb805f..58fc2c348bef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java @@ -39,7 +39,7 @@ /** * For write state machine data. */ -public abstract class StreamDataChannelBase +abstract class StreamDataChannelBase implements StateMachine.DataChannel { static final Logger LOG = LoggerFactory.getLogger( StreamDataChannelBase.class);