diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java index 4ce714ab3dc3..35a8d12a0dab 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java @@ -21,9 +21,13 @@ import com.google.protobuf.ServiceException; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.ipc.ProcessingDetails; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; @@ -36,10 +40,12 @@ public class OMExecutionFlow { private final OzoneManager ozoneManager; private final OMPerformanceMetrics perfMetrics; + private final OmLockOpr omLockOpr; public OMExecutionFlow(OzoneManager om) { this.ozoneManager = om; this.perfMetrics = ozoneManager.getPerfMetrics(); + this.omLockOpr = new OmLockOpr(); } /** @@ -72,11 +78,36 @@ private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceExcep return OzoneManagerRatisUtils.createErrorResponse(request, ex); } - // 2. submit request to ratis - OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit); - if (!response.getSuccess()) { - omClientRequest.handleRequestFailure(ozoneManager); + // 2. lock and submit request to ratis + OmLockOpr.OmLockInfo lockInfo = null; + try { + lockInfo = omClientRequest.lock(ozoneManager, omLockOpr); + OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit); + if (!response.getSuccess()) { + omClientRequest.handleRequestFailure(ozoneManager); + } + return response; + } catch (IOException e) { + throw new ServiceException(e.getMessage(), e); + } finally { + performUnlock(omClientRequest, omLockOpr, lockInfo); + } + } + + private static void performUnlock( + OMClientRequest omClientRequest, OmLockOpr omLockOpr, OmLockOpr.OmLockInfo lockInfo) { + if (null == lockInfo) { + return; + } + omClientRequest.unlock(omLockOpr, lockInfo); + Server.Call call = Server.getCurCall().get(); + if (null != call) { + call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKWAIT, + lockInfo.getWaitLockNanos(), TimeUnit.NANOSECONDS); + call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKSHARED, + lockInfo.getReadLockNanos(), TimeUnit.NANOSECONDS); + call.getProcessingDetails().add(ProcessingDetails.Timing.LOCKEXCLUSIVE, + lockInfo.getWriteLockNanos(), TimeUnit.NANOSECONDS); } - return response; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java new file mode 100644 index 000000000000..329148f25235 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.locks.Lock; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.util.Time; +import org.apache.ratis.util.function.CheckedBiFunction; +import org.apache.ratis.util.function.CheckedFunction; + +/** + * Manage locking of volume, bucket and keys. + */ +public class OmLockOpr { + private static final long LOCK_TIMEOUT_DEFAULT = 10 * 60 * 1000; + + private final WrappedStripedLock keyLocking; + private final WrappedStripedLock bucketLocking; + private final WrappedStripedLock volumeLocking; + + public OmLockOpr() { + keyLocking = new WrappedStripedLock(102400, LOCK_TIMEOUT_DEFAULT, false); + bucketLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); + volumeLocking = new WrappedStripedLock(1024, LOCK_TIMEOUT_DEFAULT, false); + } + + public OmLockInfo volumeReadLock(String volumeName) throws IOException { + return lockOneKey(volumeLocking::readLock, volumeName, OmLockInfo.LockOpType.WAIT); + } + + public OmLockInfo volumeWriteLock(String volumeName) throws IOException { + return lockOneKey(volumeLocking::writeLock, volumeName, OmLockInfo.LockOpType.WAIT); + } + + public OmLockInfo volBucketRWLock(String volumeName, String bucketName) throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + long startTime = Time.monotonicNowNanos(); + try { + locks.add(getLock(volumeLocking::readLock, volumeName)); + locks.add(getLock(bucketLocking::writeLock, bucketName)); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, OmLockInfo.LockOpType.WAIT); + omLockInfo.setLockTakenTime(endTime); + return omLockInfo; + } catch (IOException ex) { + writeUnlock(omLockInfo); + throw ex; + } + } + + public OmLockInfo bucketWriteLock(String bucketName) throws IOException { + return lockOneKey(bucketLocking::writeLock, bucketName, OmLockInfo.LockOpType.WAIT); + } + + public OmLockInfo bucketReadLock(String bucketName) throws IOException { + return lockOneKey(bucketLocking::readLock, bucketName, OmLockInfo.LockOpType.WAIT); + } + + private OmLockInfo lockOneKey( + CheckedFunction lockFunction, String name, OmLockInfo.LockOpType type) + throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + long startTime = Time.monotonicNowNanos(); + locks.add(getLock(lockFunction, name)); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, type); + omLockInfo.setLockTakenTime(endTime); + return omLockInfo; + } + + private static Lock getLock( + CheckedFunction lockFunction, String name) throws OMException { + try { + Lock lockObj = lockFunction.apply(name); + if (lockObj == null) { + throw new OMException("Unable to get lock for " + name + ", timeout occurred", OMException.ResultCodes.TIMEOUT); + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("waiting for lock is interrupted for " + name, OMException.ResultCodes.INTERNAL_ERROR); + } + } + + private static void getLock( + CheckedBiFunction, List, Boolean, InterruptedException> lockFunction, + List lockKeys, List lockList) throws OMException { + try { + if (!lockFunction.apply(lockKeys, lockList)) { + throw new OMException("Unable to get locks, timeout occurred", OMException.ResultCodes.TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("waiting for locks is interrupted", OMException.ResultCodes.INTERNAL_ERROR); + } + } + + public OmLockInfo obsLock(String bucketName, String keyName) throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + // bucket read lock + long startTime = Time.monotonicNowNanos(); + try { + locks.add(getLock(bucketLocking::readLock, bucketName)); + // key lock with bucket uniqueness as same key can be present across bucket + locks.add(getLock(keyLocking::writeLock, bucketName + "/" + keyName)); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, OmLockInfo.LockOpType.WAIT); + omLockInfo.setLockTakenTime(endTime); + return omLockInfo; + } catch (IOException ex) { + writeUnlock(omLockInfo); + throw ex; + } + } + + public OmLockInfo obsLock(String bucketName, List keyList) throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + // bucket read lock + long startTime = Time.monotonicNowNanos(); + try { + locks.add(getLock(bucketLocking::readLock, bucketName)); + // key lock with bucket uniqueness as same key can be present across bucket + List prefixedBucketKeyList = new ArrayList<>(); + keyList.forEach(e -> prefixedBucketKeyList.add(bucketName + "/" + e)); + getLock(keyLocking::writeLock, prefixedBucketKeyList, locks); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, OmLockInfo.LockOpType.WAIT); + omLockInfo.setLockTakenTime(endTime); + return omLockInfo; + } catch (IOException ex) { + writeUnlock(omLockInfo); + throw ex; + } + } + + public void writeUnlock(OmLockInfo lockInfo) { + unlock(lockInfo, OmLockInfo.LockOpType.WRITE); + } + + public void readUnlock(OmLockInfo lockInfo) { + unlock(lockInfo, OmLockInfo.LockOpType.READ); + } + + private void unlock(OmLockInfo lockInfo, OmLockInfo.LockOpType type) { + Collections.reverse(lockInfo.getLocks()); + lockInfo.getLocks().forEach(Lock::unlock); + if (lockInfo.getLockTakenTime() > 0) { + lockInfo.add(Time.monotonicNowNanos() - lockInfo.getLockTakenTime(), type); + } + lockInfo.getLocks().clear(); + } + + /** + * Lock information. + */ + public static class OmLockInfo { + private String key; + private long lockTakenTime; + private long waitLockNanos; + private long readLockNanos; + private long writeLockNanos; + private List locks = new ArrayList<>(); + + public void setKey(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + public long getWaitLockNanos() { + return waitLockNanos; + } + + public long getReadLockNanos() { + return readLockNanos; + } + + public long getWriteLockNanos() { + return writeLockNanos; + } + + public List getLocks() { + return locks; + } + + public long getLockTakenTime() { + return lockTakenTime; + } + + public void setLockTakenTime(long lockTakenTime) { + this.lockTakenTime = lockTakenTime; + } + + void add(long timeNanos, LockOpType lockOpType) { + switch (lockOpType) { + case WAIT: + waitLockNanos += timeNanos; + break; + case READ: + readLockNanos += timeNanos; + break; + case WRITE: + writeLockNanos += timeNanos; + break; + default: + } + } + + @Override + public String toString() { + return "OMLockDetails{" + + "key=" + key + + ", waitLockNanos=" + waitLockNanos + + ", readLockNanos=" + readLockNanos + + ", writeLockNanos=" + writeLockNanos + + '}'; + } + + enum LockOpType { + WAIT, + READ, + WRITE + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java new file mode 100644 index 000000000000..b60e60c92d2e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/WrappedStripedLock.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.common.util.concurrent.Striped; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.hadoop.hdds.utils.SimpleStriped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Locking class that wraps Striped locking with timeout and logging. It also provides ordering while multiple locks. + */ +public class WrappedStripedLock { + private static final Logger LOG = LoggerFactory.getLogger(WrappedStripedLock.class); + private final Striped fileStripedLock; + private final long lockTimeout; + + public WrappedStripedLock(int stripLockSize, long timeout, boolean fair) { + fileStripedLock = SimpleStriped.readWriteLock(stripLockSize, fair); + lockTimeout = timeout; + } + + /** + * lock the list of keys in order. + * Sample code for lock and unlock handling: + * + * try { + * if (!wrappedStripedLock.lock(keyList, locks)) { + * // timeout occurred, release lock if any in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * // perform business logic + * } finally { + * // to be released in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * + * + * @param keyList key list which needs to be locked + * @param locks successful lock object returned which will be used to release lock + * @return boolean true if success, else false + * @throws InterruptedException exception on interrupt + */ + public boolean writeLock(List keyList, List locks) throws InterruptedException { + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Write lock for keys are failed for the instance {} after wait of {}ms, read lock info: {}", this, + lockTimeout, rwLock.readLock()); + return false; + } + locks.add(lockObj); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Write lock for keys are interrupted for the instance {}", this); + throw e; + } + return true; + } + + /** + * lock single key. + * @param key object for which lock to be taken + * @return lock object to be used to release lock, null if unable to take lock due to timeout + * @throws InterruptedException exception on interrupt + */ + public Lock writeLock(String key) throws InterruptedException { + LOG.debug("Key {} is locked for instance {} {}", key, this, fileStripedLock.get(key)); + try { + Lock lockObj = fileStripedLock.get(key).writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Write lock for the key is failed for the instance {} after wait of {}ms, read lock info: {}", this, + lockTimeout, fileStripedLock.get(key).readLock()); + return null; + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Write lock for the key is interrupted for the instance {}", this); + throw e; + } + } + + /** + * lock the list of keys in order. + * Sample code for lock and unlock handling: + * + * try { + * if (!wrappedStripedLock.lock(keyList, locks)) { + * // timeout occurred, release lock if any in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * // perform business logic + * } finally { + * // to be released in reverse order + * Collections.reverse(locks); + * locks.forEach(Lock::unlock); + * } + * + * + * @param keyList key list which needs to be locked + * @param locks successful lock object returned which will be used to release lock + * @return boolean true if success, else false + * @throws InterruptedException exception on interrupt + */ + public boolean readLock(List keyList, List locks) throws InterruptedException { + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Read lock for keys are failed for the instance {} after wait of {}ms, write lock info: {}", this, + lockTimeout, rwLock.writeLock()); + return false; + } + locks.add(lockObj); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Read lock for keys are interrupted for the instance {}", this); + throw e; + } + return true; + } + + /** + * read lock single key. + * @param key object for which lock to be taken + * @return lock object to be used to release lock, null if unable to take lock due to timeout + * @throws InterruptedException exception on interrupt + */ + public Lock readLock(String key) throws InterruptedException { + try { + LOG.debug("Key {} is read locked for instance {} {}", key, this, fileStripedLock.get(key)); + Lock lockObj = fileStripedLock.get(key).readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Read lock for the key is failed for the instance {} after wait of {}ms, write lock info: {}", this, + lockTimeout, fileStripedLock.get(key).readLock()); + return null; + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Read lock for the key is interrupted for the instance {}", this); + throw e; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java index 9753fa3e0a0e..4297e8930f5d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java @@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.protocolPB.grpc.GrpcClientConstants; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.response.OMClientResponse; @@ -120,6 +121,13 @@ public OMRequest preExecute(OzoneManager ozoneManager) return omRequest; } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + } + /** * Performs any request specific failure handling during request * submission. An example of this would be an undo of any steps diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOpr.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOpr.java new file mode 100644 index 000000000000..8186d107e874 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOpr.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +/** + * Test for TestOmLockOpr. + */ +public class TestOmLockOpr { + + @Test + public void testObsLockOprWithParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOpr omLockOpr = new OmLockOpr(); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.obsLock("bucket", "testkey"); + assertEquals(2, omLockInfo.getLocks().size()); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOpr.OmLockInfo lockInfoAgain = omLockOpr.obsLock("bucket", "testkey"); + omLockOpr.writeUnlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.writeUnlock(omLockInfo); + rst.get(); + } + + @Test + public void testObsLockOprListKeyRepeated() throws IOException { + OmLockOpr omLockOpr = new OmLockOpr(); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.obsLock("bucket", Arrays.asList("testkey", "testkey2")); + assertEquals(3, omLockInfo.getLocks().size()); + + omLockOpr.writeUnlock(omLockInfo); + + omLockInfo = omLockOpr.obsLock("bucket", Arrays.asList("testkey", "testkey2")); + assertEquals(3, omLockInfo.getLocks().size()); + omLockOpr.writeUnlock(omLockInfo); + } + + @Test + public void testBucketReadLock() throws IOException { + OmLockOpr omLockOpr = new OmLockOpr(); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.bucketReadLock("bucket"); + assertEquals(1, omLockInfo.getLocks().size()); + + omLockOpr.readUnlock(omLockInfo); + } + + @Test + public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOpr omLockOpr = new OmLockOpr(); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.bucketReadLock("bucket"); + assertEquals(1, omLockInfo.getLocks().size()); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOpr.OmLockInfo lockInfoAgain = omLockOpr.bucketWriteLock("bucket"); + omLockOpr.writeUnlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.writeUnlock(omLockInfo); + rst.get(); + } + + @Test + public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOpr omLockOpr = new OmLockOpr(); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.volumeReadLock("vol1"); + assertEquals(1, omLockInfo.getLocks().size()); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOpr.OmLockInfo lockInfoAgain = omLockOpr.volumeWriteLock("vol1"); + omLockOpr.writeUnlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.writeUnlock(omLockInfo); + rst.get(); + } + + @Test + public void testVolWriteWithVolBucketRWParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOpr omLockOpr = new OmLockOpr(); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.volumeWriteLock("vol1"); + assertEquals(1, omLockInfo.getLocks().size()); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOpr.OmLockInfo lockInfoAgain = omLockOpr.volBucketRWLock("vol1", "buck1"); + omLockOpr.writeUnlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + + // parallel lock wait should fail as previous lock not released + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + // after unlock, the thread should be able to get lock + omLockOpr.writeUnlock(omLockInfo); + rst.get(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java new file mode 100644 index 000000000000..4492014b29d7 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestWrappedStripedLock.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import org.junit.jupiter.api.Test; + +/** + * Test for WrappedStripedLock. + */ +public class TestWrappedStripedLock { + @Test + public void testWriteLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.writeLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testWriteThenReadLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.writeLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testReadThenWriteLock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = wrappedStripedLock.readLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testLockListOrderSame() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); + locks.forEach(Lock::unlock); + List lockReverseOrder = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test1", "test2"), lockReverseOrder); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testReadLockListOrderSame() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.readLock(Arrays.asList("test", "test1"), locks); + locks.forEach(Lock::unlock); + List lockReverseOrder = new ArrayList<>(); + wrappedStripedLock.readLock(Arrays.asList("test1", "test2"), lockReverseOrder); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testLockListFailureOnRelock() throws InterruptedException { + WrappedStripedLock wrappedStripedLock = new WrappedStripedLock(1, 100, false); + List locks = new ArrayList<>(); + wrappedStripedLock.writeLock(Arrays.asList("test", "test1"), locks); + + // test write lock failure + CompletableFuture rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.writeLock("test")); + assertNull(out[0]); + }); + rst.join(); + + // test read lock failure + rst = CompletableFuture.runAsync(() -> { + Lock[] out = new Lock[1]; + assertDoesNotThrow(() -> out[0] = wrappedStripedLock.readLock("test1")); + assertNull(out[0]); + }); + rst.join(); + + locks.forEach(Lock::unlock); + + // verify if lock is success after unlock + Lock lock = wrappedStripedLock.readLock("test"); + lock.unlock(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/package-info.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/package-info.java new file mode 100644 index 000000000000..cf03c3558af0 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * test package for test lock. + */ +package org.apache.hadoop.ozone.om.lock;