diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 0502541e9c5c..2439b2518140 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.commons.lang3.NotImplementedException; @@ -140,14 +141,22 @@ default CacheValue getCacheValue(CacheKey cacheKey) { /** * Removes all the entries from the table cache which are having epoch value - * less - * than or equal to specified epoch value. + * less than or equal to specified epoch value. * @param epoch */ default void cleanupCache(long epoch) { throw new NotImplementedException("cleanupCache is not implemented"); } + /** + * Removes all the entries from the table cache which are matching with + * epoch provided in the epoch list. + * @param epochs + */ + default void cleanupCache(List epochs) { + throw new NotImplementedException("cleanupCache is not implemented"); + } + /** * Return cache iterator maintained for this table. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index b3e43d17d955..1262e853296e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; import com.google.common.annotations.VisibleForTesting; @@ -238,6 +239,11 @@ public void cleanupCache(long epoch) { cache.cleanup(epoch); } + @Override + public void cleanupCache(List epochs) { + cache.cleanup(epochs); + } + @VisibleForTesting TableCache, CacheValue> getCache() { return cache; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java index de5a07978f51..f9b988369f82 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCache.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -70,6 +71,17 @@ public interface TableCache epochs); + /** * Return the size of the cache. * @return size diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java index 3e6999a49cfa..88a86067ceac 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/cache/TableCacheImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.utils.db.cache; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; @@ -93,7 +94,11 @@ public void put(CACHEKEY cacheKey, CACHEVALUE value) { @Override public void cleanup(long epoch) { - executorService.submit(() -> evictCache(epoch, cleanupPolicy)); + executorService.submit(() -> evictCache(epoch)); + } + + public void cleanup(List epochs) { + executorService.submit(() -> evictCache(epochs)); } @Override @@ -106,7 +111,38 @@ public Iterator> iterator() { return cache.entrySet().iterator(); } - private void evictCache(long epoch, CacheCleanupPolicy cacheCleanupPolicy) { + private void evictCache(List epochs) { + EpochEntry currentEntry; + long lastEpoch = epochs.get(epochs.size() - 1); + for (Iterator> iterator = epochEntries.iterator(); + iterator.hasNext();) { + currentEntry = iterator.next(); + CACHEKEY cachekey = currentEntry.getCachekey(); + CacheValue cacheValue = cache.computeIfPresent(cachekey, ((k, v) -> { + if (cleanupPolicy == CacheCleanupPolicy.MANUAL) { + if (epochs.contains(v.getEpoch())) { + iterator.remove(); + return null; + } + } else if (cleanupPolicy == CacheCleanupPolicy.NEVER) { + // Remove only entries which are marked for delete. + if (epochs.contains(v.getEpoch()) && v.getCacheValue() == null) { + iterator.remove(); + return null; + } + } + return v; + })); + + // If currentEntry epoch is greater than last epoch, we have deleted all + // entries less than specified epoch. So, we can break. + if (cacheValue != null && cacheValue.getEpoch() >= lastEpoch) { + break; + } + } + } + + private void evictCache(long epoch) { EpochEntry currentEntry = null; for (Iterator> iterator = epochEntries.iterator(); iterator.hasNext();) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 867da0865472..34d6118b6ba4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -272,6 +272,8 @@ private OzoneConsts() { public static final String MAX_PARTS = "maxParts"; public static final String S3_BUCKET = "s3Bucket"; public static final String S3_GETSECRET_USER = "S3GetSecretUser"; + public static final String MULTIPART_UPLOAD_PART_NUMBER = "mpuPartNumber"; + public static final String MULTIPART_UPLOAD_PART_NAME = "mpuPartName"; diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java index 42391297a0a6..b6c9343eeb70 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/db/cache/TestTableCacheImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.utils.db.cache; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -88,6 +89,55 @@ public void testPartialTableCache() { } + @Test + public void testPartialTableCacheWithNotContinousEntries() throws Exception { + int totalCount = 0; + int insertedCount = 3000; + + int cleanupCount = 0; + + ArrayList epochs = new ArrayList(); + for (long i=0; i(Long.toString(i)), + new CacheValue<>(Optional.of(Long.toString(i)), i)); + totalCount++; + } + + Assert.assertEquals(totalCount, tableCache.size()); + + tableCache.cleanup(epochs); + + final int count = totalCount; + + // If cleanup policy is manual entries should have been removed. + if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.MANUAL) { + GenericTestUtils.waitFor(() -> { + return count - epochs.size() == tableCache.size(); + }, 100, 10000); + + // Check remaining entries exist or not and deleted entries does not + // exist. + for (long i = 0; i < insertedCount; i += 2) { + if (!epochs.contains(i)) { + Assert.assertEquals(Long.toString(i), + tableCache.get(new CacheKey<>(Long.toString(i))).getCacheValue()); + } else { + Assert.assertEquals(null, + tableCache.get(new CacheKey<>(Long.toString(i)))); + } + } + } else { + for (long i = 0; i < insertedCount; i += 2) { + Assert.assertEquals(Long.toString(i), + tableCache.get(new CacheKey<>(Long.toString(i))).getCacheValue()); + } + } + + } + @Test public void testPartialTableCacheParallel() throws Exception { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 751624408c23..2e4f83a7de4d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -19,11 +19,13 @@ package org.apache.hadoop.ozone.om.ratis; import java.io.IOException; +import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -169,12 +171,20 @@ private void flushTransactions() { long lastRatisTransactionIndex = readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex) - .max(Long::compareTo).get(); + .max(Long::compareTo).get(); - readyBuffer.clear(); + if (!isRatisEnabled) { + List flushedEpochs = + readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex) + .sorted().collect(Collectors.toList()); + + cleanupCache(flushedEpochs); + } else { + // cleanup cache. + cleanupCache(lastRatisTransactionIndex); + } - // cleanup cache. - cleanupCache(lastRatisTransactionIndex); + readyBuffer.clear(); // TODO: Need to revisit this logic, once we have multiple // executors for volume/bucket request handling. As for now @@ -236,6 +246,32 @@ private void cleanupCache(long lastRatisTransactionIndex) { } + private void cleanupCache(List lastRatisTransactionIndex) { + // As now only volume and bucket transactions are handled only called + // cleanupCache on bucketTable. + // TODO: After supporting all write operations we need to call + // cleanupCache on the tables only when buffer has entries for that table. + omMetadataManager.getBucketTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getVolumeTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex); + + //TODO: Optimization we can do here is for key transactions we can only + // cleanup cache when it is key commit transaction. In this way all + // intermediate transactions for a key will be read from in-memory cache. + omMetadataManager.getOpenKeyTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getKeyTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getDeletedTable().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getS3Table().cleanupCache(lastRatisTransactionIndex); + omMetadataManager.getMultipartInfoTable().cleanupCache( + lastRatisTransactionIndex); + omMetadataManager.getS3SecretTable().cleanupCache( + lastRatisTransactionIndex); + omMetadataManager.getDelegationTokenTable().cleanupCache( + lastRatisTransactionIndex); + omMetadataManager.getPrefixTable().cleanupCache(lastRatisTransactionIndex); + + } + /** * Update OzoneManagerDoubleBuffer metrics values. * @param flushedTransactionsSize diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java index cf7db655a020..70618775f12b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; import com.google.common.base.Optional; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; @@ -32,6 +33,8 @@ import org.apache.hadoop.ozone.om.response.s3.multipart .S3MultipartUploadCommitPartResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartCommitUploadPartRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -47,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; @@ -84,8 +88,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, MultipartCommitUploadPartRequest multipartCommitUploadPartRequest = getOmRequest().getCommitMultiPartUploadRequest(); - OzoneManagerProtocolProtos.KeyArgs keyArgs = - multipartCommitUploadPartRequest.getKeyArgs(); + KeyArgs keyArgs = multipartCommitUploadPartRequest.getKeyArgs(); String volumeName = keyArgs.getVolumeName(); String bucketName = keyArgs.getBucketName(); @@ -210,8 +213,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // audit log auditLog(ozoneManager.getAuditLogger(), buildAuditMessage( - OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs), - exception, getOmRequest().getUserInfo())); + OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, + buildAuditMap(keyArgs, partName), exception, + getOmRequest().getUserInfo())); if (exception == null) { LOG.debug("MultipartUpload Commit is successfully for Key:{} in " + @@ -224,5 +228,16 @@ OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs), } return omClientResponse; } + + private Map buildAuditMap(KeyArgs keyArgs, String partName) { + Map auditMap = buildKeyArgsAuditMap(keyArgs); + + // Add MPU related information. + auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NUMBER, + String.valueOf(keyArgs.getMultipartNumber())); + auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NAME, partName); + + return auditMap; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index ed48d8585856..ce923f893802 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -184,8 +184,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, if (partKeyInfo == null || !partName.equals(partKeyInfo.getPartName())) { + String omPartName = partKeyInfo == null ? null : + partKeyInfo.getPartName(); throw new OMException("Complete Multipart Upload Failed: volume: " + - volumeName + "bucket: " + bucketName + "key: " + keyName, + volumeName + "bucket: " + bucketName + "key: " + keyName + + ". Provided Part info is { " + partName + ", " + partNumber + + "}, where as OM has partName " + omPartName, OMException.ResultCodes.INVALID_PART); }