From 0029d78c8ad4579e28b1ce09fe80f5bec76cd0ca Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 13 Nov 2019 16:22:29 -0800 Subject: [PATCH 1/6] HDDS-2477. TableCache cleanup issue for OM non-HA.: --- .../apache/hadoop/hdds/utils/db/Table.java | 13 ++++- .../hadoop/hdds/utils/db/TypedTable.java | 6 +++ .../hdds/utils/db/cache/TableCache.java | 12 +++++ .../hdds/utils/db/cache/TableCacheImpl.java | 40 ++++++++++++++- .../utils/db/cache/TestTableCacheImpl.java | 50 +++++++++++++++++++ .../om/ratis/OzoneManagerDoubleBuffer.java | 46 +++++++++++++++-- 6 files changed, 158 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 0502541e9c5c..2439b2518140 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.commons.lang3.NotImplementedException; @@ -140,14 +141,22 @@ default CacheValue getCacheValue(CacheKey cacheKey) { /** * Removes all the entries from the table cache which are having epoch value - * less - * than or equal to specified epoch value. + * less than or equal to specified epoch value. * @param epoch */ default void cleanupCache(long epoch) { throw new NotImplementedException("cleanupCache is not implemented"); } + /** + * Removes all the entries from the table cache which are matching with + * epoch provided in the epoch list. + * @param epochs + */ + default void cleanupCache(List epochs) { + throw new NotImplementedException("cleanupCache is not implemented"); + } + /** * Return cache iterator maintained for this table. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index b3e43d17d955..1262e853296e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; import com.google.common.annotations.VisibleForTesting; @@ -238,6 +239,11 @@ public void cleanupCache(long epoch) { cache.cleanup(epoch); } + @Override + public void cleanupCache(List epochs) { + cache.cleanup(epochs); + } + @VisibleForTesting TableCache, CacheValue> getCache() { return cache; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java index de5a07978f51..f9b988369f82 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -70,6 +71,17 @@ public interface TableCache epochs); + /** * Return the size of the cache. * @return size diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java index 3e6999a49cfa..88a86067ceac 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.utils.db.cache; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; @@ -93,7 +94,11 @@ public void put(CACHEKEY cacheKey, CACHEVALUE value) { @Override public void cleanup(long epoch) { - executorService.submit(() -> evictCache(epoch, cleanupPolicy)); + executorService.submit(() -> evictCache(epoch)); + } + + public void cleanup(List epochs) { + executorService.submit(() -> evictCache(epochs)); } @Override @@ -106,7 +111,38 @@ public Iterator> iterator() { return cache.entrySet().iterator(); } - private void evictCache(long epoch, CacheCleanupPolicy cacheCleanupPolicy) { + private void evictCache(List epochs) { + EpochEntry currentEntry; + long lastEpoch = epochs.get(epochs.size() - 1); + for (Iterator> iterator = epochEntries.iterator(); + iterator.hasNext();) { + currentEntry = iterator.next(); + CACHEKEY cachekey = currentEntry.getCachekey(); + CacheValue cacheValue = cache.computeIfPresent(cachekey, ((k, v) -> { + if (cleanupPolicy == CacheCleanupPolicy.MANUAL) { + if (epochs.contains(v.getEpoch())) { + iterator.remove(); + return null; + } + } else if (cleanupPolicy == CacheCleanupPolicy.NEVER) { + // Remove only entries which are marked for delete. + if (epochs.contains(v.getEpoch()) && v.getCacheValue() == null) { + iterator.remove(); + return null; + } + } + return v; + })); + + // If currentEntry epoch is greater than last epoch, we have deleted all + // entries less than specified epoch. So, we can break. + if (cacheValue != null && cacheValue.getEpoch() >= lastEpoch) { + break; + } + } + } + + private void evictCache(long epoch) { EpochEntry currentEntry = null; for (Iterator> iterator = epochEntries.iterator(); iterator.hasNext();) { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java index 42391297a0a6..9ae88f4047f0 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.utils.db.cache; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -88,6 +89,55 @@ public void testPartialTableCache() { } + @Test + public void testPartialTableCacheWithNotContinousEntries() throws Exception { + int totalCount = 0; + int insertedCount = 3000; + for (long i=0; i(Long.toString(i)), + new CacheValue<>(Optional.of(Long.toString(i)), i)); + totalCount++; + } + + Assert.assertEquals(totalCount, tableCache.size()); + + ArrayList epochs = new ArrayList(); + epochs.add(0L); + epochs.add(10L); + epochs.add(300L); + epochs.add(500L); + epochs.add(1000L); + + tableCache.cleanup(epochs); + + final int count = totalCount; + + // If cleanup policy is manual entries should have been removed. + if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.MANUAL) { + GenericTestUtils.waitFor(() -> { + return count - epochs.size() == tableCache.size(); + }, 100, 10000); + + // Check remaining entries exist or not and deleted entries does not + // exist. + for (long i = 0; i < insertedCount; i += 2) { + if (!epochs.contains(i)) { + Assert.assertEquals(Long.toString(i), + tableCache.get(new CacheKey<>(Long.toString(i))).getCacheValue()); + } else { + Assert.assertEquals(null, + tableCache.get(new CacheKey<>(Long.toString(i)))); + } + } + } else { + for (long i = 0; i < insertedCount; i += 2) { + Assert.assertEquals(Long.toString(i), + tableCache.get(new CacheKey<>(Long.toString(i))).getCacheValue()); + } + } + + } + @Test public void testPartialTableCacheParallel() throws Exception { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 751624408c23..1ec231fc326c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -19,11 +19,13 @@ package org.apache.hadoop.ozone.om.ratis; import java.io.IOException; +import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -167,14 +169,22 @@ private void flushTransactions() { flushedTransactionsSize); } + readyBuffer.clear(); + long lastRatisTransactionIndex = readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex) - .max(Long::compareTo).get(); - - readyBuffer.clear(); + .max(Long::compareTo).get(); - // cleanup cache. - cleanupCache(lastRatisTransactionIndex); + if (!isRatisEnabled) { + List flushedEpochs = + readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex) + .sorted().collect(Collectors.toList()); + + cleanupCache(flushedEpochs); + } else { + // cleanup cache. + cleanupCache(lastRatisTransactionIndex); + } // TODO: Need to revisit this logic, once we have multiple // executors for volume/bucket request handling. As for now @@ -236,6 +246,32 @@ private void cleanupCache(long lastRatisTransactionIndex) { } + private void cleanupCache(List lastRatisTransactionIndex) { + // As now only volume and bucket transactions are handled only called + // cleanupCache on bucketTable. + // TODO: After supporting all write operations we need to call + // cleanupCache on the tables only when buffer has entries for that table. + omMetadataManager.getBucketTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getVolumeTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex); + + //TODO: Optimization we can do here is for key transactions we can only + // cleanup cache when it is key commit transaction. In this way all + // intermediate transactions for a key will be read from in-memory cache. + omMetadataManager.getOpenKeyTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getKeyTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getDeletedTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getS3Table().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getMultipartInfoTable().cleanupCache( + lastRatisTransactionIndex); + omMetadataManager.getS3SecretTable().cleanupCache( + lastRatisTransactionIndex); + omMetadataManager.getDelegationTokenTable().cleanupCache( + lastRatisTransactionIndex); + omMetadataManager.getPrefixTable().cleanupCache(lastRatisTransactionIndex); + + } + /** * Update OzoneManagerDoubleBuffer metrics values. * @param flushedTransactionsSize From f99afac18b045622eff255065526e4f40ad16708 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 13 Nov 2019 16:57:06 -0800 Subject: [PATCH 2/6] make use for loop to build epoch list for cleanup. --- .../hdds/utils/db/cache/TestTableCacheImpl.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java index 9ae88f4047f0..482a14c28611 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java @@ -93,7 +93,14 @@ public void testPartialTableCache() { public void testPartialTableCacheWithNotContinousEntries() throws Exception { int totalCount = 0; int insertedCount = 3000; + + int cleanupCount = 1000; + + ArrayList epochs = new ArrayList(); for (long i=0; i(Long.toString(i)), new CacheValue<>(Optional.of(Long.toString(i)), i)); totalCount++; @@ -101,13 +108,6 @@ public void testPartialTableCacheWithNotContinousEntries() throws Exception { Assert.assertEquals(totalCount, tableCache.size()); - ArrayList epochs = new ArrayList(); - epochs.add(0L); - epochs.add(10L); - epochs.add(300L); - epochs.add(500L); - epochs.add(1000L); - tableCache.cleanup(epochs); final int count = totalCount; From d50f7840bbcc6abfcd037db95854b779bf89f04c Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 13 Nov 2019 17:13:05 -0800 Subject: [PATCH 3/6] fix test bug. --- .../apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java index 482a14c28611..b6c9343eeb70 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java @@ -94,7 +94,7 @@ public void testPartialTableCacheWithNotContinousEntries() throws Exception { int totalCount = 0; int insertedCount = 3000; - int cleanupCount = 1000; + int cleanupCount = 0; ArrayList epochs = new ArrayList(); for (long i=0; i Date: Wed, 13 Nov 2019 11:11:01 -0800 Subject: [PATCH 4/6] HDDS-2470. Add partName, partNumber for CommitMultipartUpload. --- .../org/apache/hadoop/ozone/OzoneConsts.java | 2 ++ .../S3MultipartUploadCommitPartRequest.java | 23 +++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 867da0865472..59533a17246f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -272,6 +272,8 @@ private OzoneConsts() { public static final String MAX_PARTS = "maxParts"; public static final String S3_BUCKET = "s3Bucket"; public static final String S3_GETSECRET_USER = "S3GetSecretUser"; + public static final String MULTIPART_UPLOAD_PART_NUMBER = "partNumber"; + public static final String MULTIPART_UPLOAD_PART_NAME = "partName"; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java index cf7db655a020..70618775f12b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; import com.google.common.base.Optional; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; @@ -32,6 +33,8 @@ import org.apache.hadoop.ozone.om.response.s3.multipart .S3MultipartUploadCommitPartResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartCommitUploadPartRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -47,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; @@ -84,8 +88,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, MultipartCommitUploadPartRequest multipartCommitUploadPartRequest = getOmRequest().getCommitMultiPartUploadRequest(); - OzoneManagerProtocolProtos.KeyArgs keyArgs = - multipartCommitUploadPartRequest.getKeyArgs(); + KeyArgs keyArgs = multipartCommitUploadPartRequest.getKeyArgs(); String volumeName = keyArgs.getVolumeName(); String bucketName = keyArgs.getBucketName(); @@ -210,8 +213,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // audit log auditLog(ozoneManager.getAuditLogger(), buildAuditMessage( - OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs), - exception, getOmRequest().getUserInfo())); + OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, + buildAuditMap(keyArgs, partName), exception, + getOmRequest().getUserInfo())); if (exception == null) { LOG.debug("MultipartUpload Commit is successfully for Key:{} in " + @@ -224,5 +228,16 @@ OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs), } return omClientResponse; } + + private Map buildAuditMap(KeyArgs keyArgs, String partName) { + Map auditMap = buildKeyArgsAuditMap(keyArgs); + + // Add MPU related information. + auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NUMBER, + String.valueOf(keyArgs.getMultipartNumber())); + auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NAME, partName); + + return auditMap; + } } From e2428fbecab7ff4ba7e502bd8b8163bfe3361398 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 13 Nov 2019 12:53:07 -0800 Subject: [PATCH 5/6] HDDS-2471. Improve exception message for CompleteMultipartUpload. --- .../s3/multipart/S3MultipartUploadCompleteRequest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index ed48d8585856..ce923f893802 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -184,8 +184,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, if (partKeyInfo == null || !partName.equals(partKeyInfo.getPartName())) { + String omPartName = partKeyInfo == null ? null : + partKeyInfo.getPartName(); throw new OMException("Complete Multipart Upload Failed: volume: " + - volumeName + "bucket: " + bucketName + "key: " + keyName, + volumeName + "bucket: " + bucketName + "key: " + keyName + + ". Provided Part info is { " + partName + ", " + partNumber + + "}, where as OM has partName " + omPartName, OMException.ResultCodes.INVALID_PART); } From 76c65cdd52a40a072587cadcab5ccb9c6a03d4b7 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 14 Nov 2019 13:52:40 -0800 Subject: [PATCH 6/6] fix bug in doublebuffer. --- .../src/main/java/org/apache/hadoop/ozone/OzoneConsts.java | 4 ++-- .../hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 59533a17246f..34d6118b6ba4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -272,8 +272,8 @@ private OzoneConsts() { public static final String MAX_PARTS = "maxParts"; public static final String S3_BUCKET = "s3Bucket"; public static final String S3_GETSECRET_USER = "S3GetSecretUser"; - public static final String MULTIPART_UPLOAD_PART_NUMBER = "partNumber"; - public static final String MULTIPART_UPLOAD_PART_NAME = "partName"; + public static final String MULTIPART_UPLOAD_PART_NUMBER = "mpuPartNumber"; + public static final String MULTIPART_UPLOAD_PART_NAME = "mpuPartName"; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 1ec231fc326c..2e4f83a7de4d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -169,8 +169,6 @@ private void flushTransactions() { flushedTransactionsSize); } - readyBuffer.clear(); - long lastRatisTransactionIndex = readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex) .max(Long::compareTo).get(); @@ -186,6 +184,8 @@ private void flushTransactions() { cleanupCache(lastRatisTransactionIndex); } + readyBuffer.clear(); + // TODO: Need to revisit this logic, once we have multiple // executors for volume/bucket request handling. As for now // transactions are serialized this should be fine.