From 2270cec79b3c085654468a8ad9282dcb0437c6b7 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 4 Jan 2021 22:20:50 -0800 Subject: [PATCH 1/4] AWS: Glue catalog lock interface --- .../apache/iceberg/aws/glue/GlueTestBase.java | 6 +- .../apache/iceberg/aws/s3/S3FileIOTest.java | 4 +- .../apache/iceberg/aws/glue/GlueCatalog.java | 7 +- .../iceberg/aws/glue/GlueTableOperations.java | 38 ++- .../apache/iceberg/aws/glue/LockManager.java | 50 ++++ .../apache/iceberg/aws/glue/LockManagers.java | 241 ++++++++++++++++++ .../iceberg/aws/glue/GlueCatalogTest.java | 9 +- .../aws/glue/InMemoryLockManagerTest.java | 152 +++++++++++ .../iceberg/aws/glue/LockManagersTest.java | 64 +++++ .../org/apache/iceberg/CatalogProperties.java | 22 ++ 10 files changed, 576 insertions(+), 17 deletions(-) create mode 100644 aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java create mode 100644 aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java create mode 100644 aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java create mode 100644 aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java index 48a4f860e8bb..f86890abbff1 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java @@ -68,11 +68,13 @@ public static void beforeClass() { String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix; S3FileIO fileIO = new S3FileIO(clientFactory::s3); glueCatalog = new GlueCatalog(); - glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), glue, fileIO); + glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), glue, + LockManagers.defaultLockManager(), fileIO); AwsProperties properties = new AwsProperties(); properties.setGlueCatalogSkipArchive(true); glueCatalogWithSkip = new GlueCatalog(); - glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, glue, fileIO); + glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, glue, + LockManagers.defaultLockManager(), fileIO); } @AfterClass diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java index 06e18b8426c9..04ddabfa6939 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java @@ -36,7 +36,6 @@ import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -202,8 +201,7 @@ public void testACL() throws Exception { @Test public void testClientFactorySerialization() throws Exception { - S3FileIO fileIO = new S3FileIO(); - fileIO.initialize(Maps.newHashMap()); + S3FileIO fileIO = new S3FileIO(clientFactory::s3); write(fileIO); byte [] data = SerializationUtils.serialize(fileIO); S3FileIO fileIO2 = SerializationUtils.deserialize(data); diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index 2dddda9c6878..a4d407d0b8f0 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -78,6 +78,7 @@ public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, Supp private String warehousePath; private AwsProperties awsProperties; private FileIO fileIO; + private LockManager lockManager; /** * No-arg constructor to load the catalog dynamically. @@ -94,6 +95,7 @@ public void initialize(String name, Map properties) { properties.get(CatalogProperties.WAREHOUSE_LOCATION), new AwsProperties(properties), AwsClientFactories.from(properties).glue(), + LockManagers.from(properties), initializeFileIO(properties)); } @@ -109,11 +111,12 @@ private FileIO initializeFileIO(Map properties) { } @VisibleForTesting - void initialize(String name, String path, AwsProperties properties, GlueClient client, FileIO io) { + void initialize(String name, String path, AwsProperties properties, GlueClient client, LockManager lock, FileIO io) { this.catalogName = name; this.awsProperties = properties; this.warehousePath = cleanWarehousePath(path); this.glue = client; + this.lockManager = lock; this.fileIO = io; } @@ -130,7 +133,7 @@ private String cleanWarehousePath(String path) { @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { - return new GlueTableOperations(glue, catalogName, awsProperties, fileIO, tableIdentifier); + return new GlueTableOperations(glue, lockManager, catalogName, awsProperties, fileIO, tableIdentifier); } /** diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java index cff437b18606..832c45d4c9c7 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java @@ -57,16 +57,20 @@ class GlueTableOperations extends BaseMetastoreTableOperations { private final String databaseName; private final String tableName; private final String fullTableName; + private final String commitLockEntityId; private final FileIO fileIO; + private final LockManager lockManager; - GlueTableOperations(GlueClient glue, String catalogName, AwsProperties awsProperties, + GlueTableOperations(GlueClient glue, LockManager lockManager, String catalogName, AwsProperties awsProperties, FileIO fileIO, TableIdentifier tableIdentifier) { this.glue = glue; this.awsProperties = awsProperties; this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier); this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier); this.fullTableName = String.format("%s.%s.%s", catalogName, databaseName, tableName); + this.commitLockEntityId = String.format("%s.%s", databaseName, tableName); this.fileIO = fileIO; + this.lockManager = lockManager; } @Override @@ -100,10 +104,11 @@ protected void doRefresh() { protected void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); boolean exceptionThrown = true; - Table glueTable = getGlueTable(); - checkMetadataLocation(glueTable, base); - Map properties = prepareProperties(glueTable, newMetadataLocation); try { + lock(newMetadataLocation); + Table glueTable = getGlueTable(); + checkMetadataLocation(glueTable, base); + Map properties = prepareProperties(glueTable, newMetadataLocation); persistGlueTable(glueTable, properties); exceptionThrown = false; } catch (ConcurrentModificationException e) { @@ -114,9 +119,14 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } catch (SdkException e) { throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName()); } finally { - if (exceptionThrown) { - io().deleteFile(newMetadataLocation); - } + cleanupMetadataAndUnlock(exceptionThrown, newMetadataLocation); + } + } + + private void lock(String newMetadataLocation) { + if (!lockManager.acquire(commitLockEntityId, newMetadataLocation)) { + throw new IllegalStateException(String.format("Fail to acquire lock %s to commit new metadata at %s", + commitLockEntityId, newMetadataLocation)); } } @@ -180,4 +190,18 @@ private void persistGlueTable(Table glueTable, Map parameters) { .build()); } } + + private void cleanupMetadataAndUnlock(boolean exceptionThrown, String metadataLocation) { + try { + if (exceptionThrown) { + // if anything went wrong, clean up the uncommitted metadata file + io().deleteFile(metadataLocation); + } + } catch (RuntimeException e) { + LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e); + throw e; + } finally { + lockManager.release(commitLockEntityId, metadataLocation); + } + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java new file mode 100644 index 000000000000..0eb3fdb0c687 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Map; + +/** + * An interface for locking, used to ensure Glue catalog commit isolation. + */ +interface LockManager extends AutoCloseable { + + /** + * Try to acquire a lock + * @param entityId ID of the entity to lock + * @param ownerId ID of the owner if the lock + * @return if the lock for the entity is acquired by the owner + */ + boolean acquire(String entityId, String ownerId); + + /** + * Release a lock + * @param entityId ID of the entity to lock + * @param ownerId ID of the owner if the lock + * @throws IllegalArgumentException if lock entity not found or trying to unlock with a wrong owner ID + */ + void release(String entityId, String ownerId); + + /** + * Initialize lock manager from catalog properties. + * @param properties catalog properties + */ + void initialize(Map properties); +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java new file mode 100644 index 000000000000..5ee3ec4f3ac1 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; + +class LockManagers { + + private static final LockManager LOCK_MANAGER_DEFAULT = new InMemoryLockManager(Maps.newHashMap()); + + private LockManagers() { + } + + public static LockManager defaultLockManager() { + return LOCK_MANAGER_DEFAULT; + } + + public static LockManager from(Map properties) { + if (properties.containsKey(CatalogProperties.LOCK_IMPL)) { + return loadLockManager(properties.get(CatalogProperties.LOCK_IMPL), properties); + } else { + return defaultLockManager(); + } + } + + private static LockManager loadLockManager(String impl, Map properties) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(LockManager.class).hiddenImpl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize LockManager, missing no-arg constructor: %s", impl), e); + } + + LockManager lockManager; + try { + lockManager = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize LockManager, %s does not implement LockManager.", impl), e); + } + + lockManager.initialize(properties); + return lockManager; + } + + abstract static class LockManagerBase implements LockManager { + + private static volatile ScheduledExecutorService scheduler; + + private long acquireTimeoutMs; + private long acquireIntervalMs; + private long heartbeatIntervalMs; + private long heartbeatTimeoutMs; + private int heartbeatThreads; + + public long heartbeatTimeoutMs() { + return heartbeatTimeoutMs; + } + + public long heartbeatIntervalMs() { + return heartbeatIntervalMs; + } + + public long acquireIntervalMs() { + return acquireIntervalMs; + } + + public long acquireTimeoutMs() { + return acquireTimeoutMs; + } + + public int heartbeatThreads() { + return heartbeatThreads; + } + + @SuppressWarnings("StaticGuardedByInstance") + public ScheduledExecutorService scheduler() { + if (scheduler == null) { + synchronized (this) { + if (scheduler == null) { + scheduler = Executors.newScheduledThreadPool(heartbeatThreads); + } + } + } + return scheduler; + } + + @Override + public void initialize(Map properties) { + this.acquireTimeoutMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS_DEFAULT); + this.acquireIntervalMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT); + this.heartbeatIntervalMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT); + this.heartbeatTimeoutMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS, CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT); + this.heartbeatThreads = PropertyUtil.propertyAsInt(properties, + CatalogProperties.LOCK_HEARTBEAT_THREADS, CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT); + } + } + + /** + * Implementation of {@link LockManager} that uses an in-memory concurrent map for locking. + * This implementation should only be used for testing, + * or if the caller only needs locking within the same JVM during table commits. + */ + static class InMemoryLockManager extends LockManagerBase { + + private static final Map LOCKS = Maps.newConcurrentMap(); + private static final Map> HEARTBEATS = Maps.newHashMap(); + + InMemoryLockManager(Map properties) { + initialize(properties); + } + + @VisibleForTesting + void acquireOnce(String entityId, String ownerId) { + DefaultLockContent content = LOCKS.get(entityId); + if (content != null && content.expireMs() > System.currentTimeMillis()) { + throw new IllegalStateException(String.format("Lock for %s currently held by %s, expiration: %s", + entityId, content.ownerId(), content.expireMs())); + } + + long expiration = System.currentTimeMillis() + heartbeatTimeoutMs(); + boolean succeed; + if (content == null) { + DefaultLockContent previous = LOCKS.putIfAbsent( + entityId, new DefaultLockContent(ownerId, expiration)); + succeed = previous == null; + } else { + succeed = LOCKS.replace(entityId, content, new DefaultLockContent(ownerId, expiration)); + } + + if (succeed) { + // cleanup old heartbeat + if (HEARTBEATS.containsKey(entityId)) { + HEARTBEATS.remove(entityId).cancel(false); + } + + HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> { + DefaultLockContent lastContent = LOCKS.get(entityId); + try { + long newExpiration = System.currentTimeMillis() + heartbeatTimeoutMs(); + LOCKS.replace(entityId, lastContent, new DefaultLockContent(ownerId, newExpiration)); + } catch (NullPointerException e) { + throw new RuntimeException("Cannot heartbeat to a deleted lock " + entityId, e); + } + + }, 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS)); + + } else { + throw new IllegalStateException("Unable to acquire lock " + entityId); + } + } + + @Override + public boolean acquire(String entityId, String ownerId) { + try { + Tasks.foreach(entityId) + .retry(Integer.MAX_VALUE - 1) + .onlyRetryOn(IllegalStateException.class) + .throwFailureWhenFinished() + .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1) + .run(id -> acquireOnce(id, ownerId)); + return true; + } catch (IllegalStateException e) { + return false; + } + } + + @Override + public void release(String entityId, String ownerId) { + DefaultLockContent currentContent = LOCKS.get(entityId); + if (currentContent == null) { + throw new IllegalArgumentException("Cannot find lock for entity " + entityId); + } + + if (!currentContent.ownerId().equals(ownerId)) { + throw new IllegalArgumentException(String.format( + "Cannot unlock %s by %s, current owner: %s", entityId, ownerId, currentContent.ownerId())); + } + HEARTBEATS.remove(entityId).cancel(false); + LOCKS.remove(entityId); + } + + @Override + public void close() { + HEARTBEATS.values().forEach(future -> future.cancel(false)); + HEARTBEATS.clear(); + LOCKS.clear(); + } + } + + private static class DefaultLockContent { + private final String ownerId; + private final long expireMs; + + DefaultLockContent(String ownerId, long expireMs) { + this.ownerId = ownerId; + this.expireMs = expireMs; + } + + public long expireMs() { + return expireMs; + } + + public String ownerId() { + return ownerId; + } + + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java index 45329c1c1e77..95dfad687f06 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java @@ -70,7 +70,8 @@ public class GlueCatalogTest { public void before() { glue = Mockito.mock(GlueClient.class); glueCatalog = new GlueCatalog(); - glueCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), glue, null); + glueCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), glue, + LockManagers.defaultLockManager(), null); } @Test @@ -80,14 +81,16 @@ public void constructor_emptyWarehousePath() { "Cannot initialize GlueCatalog because warehousePath must not be null", () -> { GlueCatalog catalog = new GlueCatalog(); - catalog.initialize(CATALOG_NAME, null, new AwsProperties(), glue, null); + catalog.initialize(CATALOG_NAME, null, new AwsProperties(), glue, + LockManagers.defaultLockManager(), null); }); } @Test public void constructor_warehousePathWithEndSlash() { GlueCatalog catalogWithSlash = new GlueCatalog(); - catalogWithSlash.initialize(CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), glue, null); + catalogWithSlash.initialize( + CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), glue, LockManagers.defaultLockManager(), null); Mockito.doReturn(GetDatabaseResponse.builder() .database(Database.builder().name("db").build()).build()) .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java new file mode 100644 index 000000000000..b8e8896b11ee --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class InMemoryLockManagerTest { + + private LockManagers.InMemoryLockManager lockManager; + private String lockEntityId; + private String ownerId; + + @Before + public void before() { + lockEntityId = UUID.randomUUID().toString(); + ownerId = UUID.randomUUID().toString(); + lockManager = new LockManagers.InMemoryLockManager(Maps.newHashMap()); + } + + @After + public void after() { + lockManager.close(); + } + + @Test + public void testAcquireOnce_singleProcess() { + lockManager.acquireOnce(lockEntityId, ownerId); + } + + @Test + public void testAcquireOnce_multiProcess() { + List results = IntStream.range(0, 10).parallel() + .mapToObj(i -> { + try { + lockManager.acquireOnce(lockEntityId, ownerId); + return true; + } catch (IllegalStateException e) { + return false; + } + }) + .collect(Collectors.toList()); + Assert.assertEquals( + "only 1 thread should have acquired the lock", + 1, results.stream().filter(s -> s).count()); + } + + @Test + public void testReleaseAndAcquire() { + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + lockManager.release(lockEntityId, ownerId); + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + } + + @Test + public void testReleaseWithWrongOwner() { + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + AssertHelpers.assertThrows("should throw exception if ownerId is wrong", + IllegalArgumentException.class, + "current owner", + () -> lockManager.release(lockEntityId, UUID.randomUUID().toString())); + } + + @Test + public void testAcquire_singleProcess() throws Exception { + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + String oldOwner = ownerId; + + CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + lockManager.release(lockEntityId, oldOwner); + return null; + }); + + ownerId = UUID.randomUUID().toString(); + long start = System.currentTimeMillis(); + Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + Assert.assertTrue("should succeed after 5 seconds", + System.currentTimeMillis() - start >= 5000); + } + + @Test + public void testAcquire_multiProcess_allSucceed() { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500" + )); + long start = System.currentTimeMillis(); + List results = IntStream.range(0, 10).parallel() + .mapToObj(i -> { + String owner = UUID.randomUUID().toString(); + boolean succeeded = lockManager.acquire(lockEntityId, owner); + if (succeeded) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + lockManager.release(lockEntityId, owner); + } + return succeeded; + }) + .collect(Collectors.toList()); + Assert.assertEquals("all lock acquire should succeed sequentially", + 10, results.stream().filter(s -> s).count()); + Assert.assertTrue("must take more than 10 seconds", System.currentTimeMillis() - start >= 10000); + } + + @Test + public void testAcquire_multiProcess_onlyOneSucceed() { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "10000" + )); + + List results = IntStream.range(0, 10).parallel() + .mapToObj(i -> lockManager.acquire(lockEntityId, ownerId)) + .collect(Collectors.toList()); + Assert.assertEquals("only 1 thread should have acquired the lock", + 1, results.stream().filter(s -> s).count()); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java new file mode 100644 index 000000000000..028cd98bd647 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +public class LockManagersTest { + + @Test + public void testLoadDefaultLockManager() { + Assert.assertTrue(LockManagers.defaultLockManager() instanceof LockManagers.InMemoryLockManager); + } + + @Test + public void testLoadCustomLockManager() { + Map properties = Maps.newHashMap(); + properties.put(CatalogProperties.LOCK_IMPL, CustomLockManager.class.getName()); + Assert.assertTrue(LockManagers.from(properties) instanceof CustomLockManager); + } + + static class CustomLockManager implements LockManager { + + @Override + public boolean acquire(String entityId, String ownerId) { + return false; + } + + @Override + public void release(String entityId, String ownerId) { + + } + + @Override + public void close() throws Exception { + + } + + @Override + public void initialize(Map properties) { + + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index e27a507fef8b..5992810b261d 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -19,6 +19,8 @@ package org.apache.iceberg; +import java.util.concurrent.TimeUnit; + public class CatalogProperties { private CatalogProperties() { @@ -31,4 +33,24 @@ private CatalogProperties() { public static final String HIVE_URI = "uri"; public static final String HIVE_CLIENT_POOL_SIZE = "clients"; public static final int HIVE_CLIENT_POOL_SIZE_DEFAULT = 2; + + public static final String LOCK_IMPL = "lock.impl"; + + public static final String LOCK_HEARTBEAT_INTERVAL_MS = "lock.heartbeat-interval-ms"; + public static final long LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(3); + + public static final String LOCK_HEARTBEAT_TIMEOUT_MS = "lock.heartbeat-timeout-ms"; + public static final long LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15); + + public static final String LOCK_HEARTBEAT_THREADS = "lock.heartbeat-threads"; + public static final int LOCK_HEARTBEAT_THREADS_DEFAULT = 4; + + public static final String LOCK_ACQUIRE_INTERVAL_MS = "lock.acquire-interval-ms"; + public static final long LOCK_ACQUIRE_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(5); + + public static final String LOCK_ACQUIRE_TIMEOUT_MS = "lock.acquire-timeout-ms"; + public static final long LOCK_ACQUIRE_TIMEOUT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(3); + + public static final String LOCK_TABLE = "lock.table"; + } From 2c2822cf375b9d3473fb650d23c78106a2f10635 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 4 Jan 2021 22:26:06 -0800 Subject: [PATCH 2/4] update name BaseLockManager --- .../main/java/org/apache/iceberg/aws/glue/LockManagers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java index 5ee3ec4f3ac1..738adbe64d4f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java @@ -71,7 +71,7 @@ private static LockManager loadLockManager(String impl, Map prop return lockManager; } - abstract static class LockManagerBase implements LockManager { + abstract static class BaseLockManager implements LockManager { private static volatile ScheduledExecutorService scheduler; @@ -133,7 +133,7 @@ public void initialize(Map properties) { * This implementation should only be used for testing, * or if the caller only needs locking within the same JVM during table commits. */ - static class InMemoryLockManager extends LockManagerBase { + static class InMemoryLockManager extends BaseLockManager { private static final Map LOCKS = Maps.newConcurrentMap(); private static final Map> HEARTBEATS = Maps.newHashMap(); From e46fba906da05f1efd990e3301e36089b336fef3 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 5 Jan 2021 09:58:36 -0800 Subject: [PATCH 3/4] use ExitingScheduledExecutorService --- .../org/apache/iceberg/aws/glue/LockManagers.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java index 738adbe64d4f..cf239b62ed05 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java @@ -23,11 +23,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -106,7 +109,13 @@ public ScheduledExecutorService scheduler() { if (scheduler == null) { synchronized (this) { if (scheduler == null) { - scheduler = Executors.newScheduledThreadPool(heartbeatThreads); + scheduler = MoreExecutors.getExitingScheduledExecutorService( + (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( + heartbeatThreads(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("iceberg-lock-manager-%d") + .build())); } } } From 4b953d3be031948520700b86b98e63f6b5674a49 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 5 Jan 2021 11:49:02 -0800 Subject: [PATCH 4/4] update tests --- .../apache/iceberg/aws/glue/LockManager.java | 4 +- .../apache/iceberg/aws/glue/LockManagers.java | 39 +++++++++------- .../aws/glue/InMemoryLockManagerTest.java | 46 ++++++++++++------- .../iceberg/aws/glue/LockManagersTest.java | 4 +- 4 files changed, 57 insertions(+), 36 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java index 0eb3fdb0c687..16c7e25a5359 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManager.java @@ -38,9 +38,9 @@ interface LockManager extends AutoCloseable { * Release a lock * @param entityId ID of the entity to lock * @param ownerId ID of the owner if the lock - * @throws IllegalArgumentException if lock entity not found or trying to unlock with a wrong owner ID + * @return if the lock for the entity of the owner is released */ - void release(String entityId, String ownerId); + boolean release(String entityId, String ownerId); /** * Initialize lock manager from catalog properties. diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java index cf239b62ed05..02ed234e1c00 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java @@ -33,6 +33,8 @@ import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class LockManagers { @@ -104,10 +106,9 @@ public int heartbeatThreads() { return heartbeatThreads; } - @SuppressWarnings("StaticGuardedByInstance") public ScheduledExecutorService scheduler() { if (scheduler == null) { - synchronized (this) { + synchronized (BaseLockManager.class) { if (scheduler == null) { scheduler = MoreExecutors.getExitingScheduledExecutorService( (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( @@ -119,6 +120,7 @@ public ScheduledExecutorService scheduler() { } } } + return scheduler; } @@ -144,7 +146,9 @@ public void initialize(Map properties) { */ static class InMemoryLockManager extends BaseLockManager { - private static final Map LOCKS = Maps.newConcurrentMap(); + private static final Logger LOG = LoggerFactory.getLogger(InMemoryLockManager.class); + + private static final Map LOCKS = Maps.newConcurrentMap(); private static final Map> HEARTBEATS = Maps.newHashMap(); InMemoryLockManager(Map properties) { @@ -153,7 +157,7 @@ static class InMemoryLockManager extends BaseLockManager { @VisibleForTesting void acquireOnce(String entityId, String ownerId) { - DefaultLockContent content = LOCKS.get(entityId); + InMemoryLockContent content = LOCKS.get(entityId); if (content != null && content.expireMs() > System.currentTimeMillis()) { throw new IllegalStateException(String.format("Lock for %s currently held by %s, expiration: %s", entityId, content.ownerId(), content.expireMs())); @@ -162,11 +166,11 @@ void acquireOnce(String entityId, String ownerId) { long expiration = System.currentTimeMillis() + heartbeatTimeoutMs(); boolean succeed; if (content == null) { - DefaultLockContent previous = LOCKS.putIfAbsent( - entityId, new DefaultLockContent(ownerId, expiration)); + InMemoryLockContent previous = LOCKS.putIfAbsent( + entityId, new InMemoryLockContent(ownerId, expiration)); succeed = previous == null; } else { - succeed = LOCKS.replace(entityId, content, new DefaultLockContent(ownerId, expiration)); + succeed = LOCKS.replace(entityId, content, new InMemoryLockContent(ownerId, expiration)); } if (succeed) { @@ -176,10 +180,10 @@ void acquireOnce(String entityId, String ownerId) { } HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> { - DefaultLockContent lastContent = LOCKS.get(entityId); + InMemoryLockContent lastContent = LOCKS.get(entityId); try { long newExpiration = System.currentTimeMillis() + heartbeatTimeoutMs(); - LOCKS.replace(entityId, lastContent, new DefaultLockContent(ownerId, newExpiration)); + LOCKS.replace(entityId, lastContent, new InMemoryLockContent(ownerId, newExpiration)); } catch (NullPointerException e) { throw new RuntimeException("Cannot heartbeat to a deleted lock " + entityId, e); } @@ -207,18 +211,21 @@ public boolean acquire(String entityId, String ownerId) { } @Override - public void release(String entityId, String ownerId) { - DefaultLockContent currentContent = LOCKS.get(entityId); + public boolean release(String entityId, String ownerId) { + InMemoryLockContent currentContent = LOCKS.get(entityId); if (currentContent == null) { - throw new IllegalArgumentException("Cannot find lock for entity " + entityId); + LOG.error("Cannot find lock for entity {}", entityId); + return false; } if (!currentContent.ownerId().equals(ownerId)) { - throw new IllegalArgumentException(String.format( - "Cannot unlock %s by %s, current owner: %s", entityId, ownerId, currentContent.ownerId())); + LOG.error("Cannot unlock {} by {}, current owner: {}", entityId, ownerId, currentContent.ownerId()); + return false; } + HEARTBEATS.remove(entityId).cancel(false); LOCKS.remove(entityId); + return true; } @Override @@ -229,11 +236,11 @@ public void close() { } } - private static class DefaultLockContent { + private static class InMemoryLockContent { private final String ownerId; private final long expireMs; - DefaultLockContent(String ownerId, long expireMs) { + InMemoryLockContent(String ownerId, long expireMs) { this.ownerId = ownerId; this.expireMs = expireMs; } diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java index b8e8896b11ee..636dfdef1ca7 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.AssertHelpers; @@ -31,7 +32,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; public class InMemoryLockManagerTest { @@ -39,6 +42,9 @@ public class InMemoryLockManagerTest { private String lockEntityId; private String ownerId; + @Rule + public Timeout timeout = new Timeout(5, TimeUnit.SECONDS); + @Before public void before() { lockEntityId = UUID.randomUUID().toString(); @@ -54,6 +60,10 @@ public void after() { @Test public void testAcquireOnce_singleProcess() { lockManager.acquireOnce(lockEntityId, ownerId); + AssertHelpers.assertThrows("should fail when acquire again", + IllegalStateException.class, + "currently held", + () -> lockManager.acquireOnce(lockEntityId, ownerId)); } @Test @@ -76,39 +86,41 @@ public void testAcquireOnce_multiProcess() { @Test public void testReleaseAndAcquire() { Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); - lockManager.release(lockEntityId, ownerId); - Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); + Assert.assertTrue(lockManager.release(lockEntityId, ownerId)); + Assert.assertTrue("acquire after release should succeed", lockManager.acquire(lockEntityId, ownerId)); } @Test public void testReleaseWithWrongOwner() { Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); - AssertHelpers.assertThrows("should throw exception if ownerId is wrong", - IllegalArgumentException.class, - "current owner", - () -> lockManager.release(lockEntityId, UUID.randomUUID().toString())); + Assert.assertFalse("should return false if ownerId is wrong", + lockManager.release(lockEntityId, UUID.randomUUID().toString())); } @Test public void testAcquire_singleProcess() throws Exception { + lockManager.initialize(ImmutableMap.of( + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500", + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "2000" + )); Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); String oldOwner = ownerId; CompletableFuture.supplyAsync(() -> { try { - Thread.sleep(5000); + Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } - lockManager.release(lockEntityId, oldOwner); + Assert.assertTrue(lockManager.release(lockEntityId, oldOwner)); return null; }); ownerId = UUID.randomUUID().toString(); long start = System.currentTimeMillis(); Assert.assertTrue(lockManager.acquire(lockEntityId, ownerId)); - Assert.assertTrue("should succeed after 5 seconds", - System.currentTimeMillis() - start >= 5000); + Assert.assertTrue("should succeed after 200ms", + System.currentTimeMillis() - start >= 200); } @Test @@ -117,7 +129,7 @@ public void testAcquire_multiProcess_allSucceed() { CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500" )); long start = System.currentTimeMillis(); - List results = IntStream.range(0, 10).parallel() + List results = IntStream.range(0, 3).parallel() .mapToObj(i -> { String owner = UUID.randomUUID().toString(); boolean succeeded = lockManager.acquire(lockEntityId, owner); @@ -127,23 +139,25 @@ public void testAcquire_multiProcess_allSucceed() { } catch (InterruptedException e) { throw new RuntimeException(e); } - lockManager.release(lockEntityId, owner); + Assert.assertTrue(lockManager.release(lockEntityId, owner)); } return succeeded; }) .collect(Collectors.toList()); Assert.assertEquals("all lock acquire should succeed sequentially", - 10, results.stream().filter(s -> s).count()); - Assert.assertTrue("must take more than 10 seconds", System.currentTimeMillis() - start >= 10000); + 3, results.stream().filter(s -> s).count()); + Assert.assertTrue("must take more than 3 seconds", System.currentTimeMillis() - start >= 3000); } @Test public void testAcquire_multiProcess_onlyOneSucceed() { lockManager.initialize(ImmutableMap.of( - CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "10000" + CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, "100", + CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, "500", + CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, "2000" )); - List results = IntStream.range(0, 10).parallel() + List results = IntStream.range(0, 3).parallel() .mapToObj(i -> lockManager.acquire(lockEntityId, ownerId)) .collect(Collectors.toList()); Assert.assertEquals("only 1 thread should have acquired the lock", diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java index 028cd98bd647..b5bf054d2122 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/LockManagersTest.java @@ -47,8 +47,8 @@ public boolean acquire(String entityId, String ownerId) { } @Override - public void release(String entityId, String ownerId) { - + public boolean release(String entityId, String ownerId) { + return false; } @Override