Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.glue.GlueCatalog;
Expand All @@ -27,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -42,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

Expand Down Expand Up @@ -100,7 +103,7 @@ public void after() {
}

@Test
public void testAssumeRoleGlueCatalog() throws Exception {
public void testAssumeRoleGlueCatalog() {
String glueArnPrefix = "arn:aws:glue:*:" + AwsIntegTestUtil.testAccountId();
iam.putRolePolicy(
PutRolePolicyRequest.builder()
Expand Down Expand Up @@ -189,7 +192,19 @@ public void testAssumeRoleS3FileIO() throws Exception {
Assert.assertFalse("should be able to access file", inputFile.exists());
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(10000); // sleep to make sure IAM up to date
private void waitForIamConsistency() {
Awaitility.await("wait for IAM role policy to update.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amogh-jahagirdar do you know if that would work for the AWS integration tests?

.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.ignoreExceptions()
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.roleName(policyName)
.build()))
.isNotNull());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.glue;

import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -132,21 +134,19 @@ public void testParallelCommitMultiThreadMultiCommit() {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
table.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,6 +38,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
Expand Down Expand Up @@ -67,6 +70,7 @@
import software.amazon.awssdk.services.lakeformation.model.DataLocationResource;
import software.amazon.awssdk.services.lakeformation.model.DatabaseResource;
import software.amazon.awssdk.services.lakeformation.model.DeregisterResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.DescribeResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.EntityNotFoundException;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsRequest;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsResponse;
Expand Down Expand Up @@ -175,7 +179,7 @@ public static void beforeClass() throws Exception {
lfRegisterPathRoleIamPolicyName,
lfRegisterPathRolePolicyDocForIam(lfRegisterPathRoleArn),
lfRegisterPathRoleName);
waitForIamConsistency();
waitForIamConsistency(lfRegisterPathRoleName, lfRegisterPathRoleIamPolicyName);

// create lfPrivilegedRole
response =
Expand Down Expand Up @@ -205,7 +209,7 @@ public static void beforeClass() throws Exception {
lfPrivilegedRolePolicyName,
lfPrivilegedRolePolicyDoc(),
lfPrivilegedRoleName);
waitForIamConsistency();
waitForIamConsistency(lfPrivilegedRoleName, lfPrivilegedRolePolicyName);

// build lf and glue client with lfRegisterPathRole
lakeformation =
Expand Down Expand Up @@ -250,7 +254,6 @@ public static void beforeClass() throws Exception {
// register S3 test bucket path
deregisterResource(testBucketPath);
registerResource(testBucketPath);
waitForIamConsistency();
}

@AfterClass
Expand Down Expand Up @@ -357,8 +360,20 @@ String getRandomTableName() {
return LF_TEST_TABLE_PREFIX + UUID.randomUUID().toString().replace("-", "");
}

private static void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
private static void waitForIamConsistency(String roleName, String policyName) {
// wait to make sure IAM up to date
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build()))
.isNotNull());
}

private static LakeFormationClient buildLakeFormationClient(
Expand Down Expand Up @@ -417,7 +432,19 @@ private static void registerResource(String s3Location) {
.build());
// when a resource is registered, LF will update SLR with necessary permissions which has a
// propagation delay
waitForIamConsistency();
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.ignoreExceptions()
.untilAsserted(
() ->
Assertions.assertThat(
lakeformation
.describeResource(
DescribeResourceRequest.builder().resourceArn(arn).build())
.resourceInfo()
.roleArn())
.isEqualToIgnoringCase(lfRegisterPathRoleArn));
} catch (AlreadyExistsException e) {
LOG.warn("Resource {} already registered. Error: {}", arn, e.getMessage(), e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.lakeformation;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.AwsIntegTestUtil;
Expand All @@ -26,6 +27,8 @@
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -41,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;

public class TestLakeFormationAwsClientFactory {
Expand Down Expand Up @@ -128,8 +132,18 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
+ glueArnPrefix
+ ":userDefinedFunction/allowed_*/*\"]}]}")
.build());
waitForIamConsistency();

Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build()))
.isNotNull());
GlueCatalog glueCatalog = new GlueCatalog();
assumeRoleProperties.put("warehouse", "s3://path");
glueCatalog.initialize("test", assumeRoleProperties);
Expand Down Expand Up @@ -162,8 +176,4 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
}
}
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
}
}
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ project(':iceberg-api') {
compileOnly libs.findbugs.jsr305
testImplementation libs.avro.avro
testImplementation libs.esotericsoftware.kryo
testImplementation libs.awaitility
}

tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
Expand Down Expand Up @@ -497,6 +498,7 @@ project(':iceberg-aws') {
testImplementation libs.mockserver.client.java
testImplementation libs.jaxb.api
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation libs.awaitility
}

sourceSets {
Expand Down Expand Up @@ -717,6 +719,7 @@ project(':iceberg-hive-metastore') {
}

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation libs.awaitility
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -54,6 +55,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
Expand Down Expand Up @@ -435,13 +437,11 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception {
for (int numCommittedFiles = 0;
numCommittedFiles < numberOfCommitedFilesPerThread;
numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * threadsCount) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
tableWithHighRetries.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -85,21 +87,19 @@ public synchronized void testConcurrentFastAppends() throws IOException {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
icebergTable.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

public class TestHiveTableConcurrency extends HiveTableBaseTest {
Expand All @@ -56,21 +58,19 @@ public synchronized void testConcurrentFastAppends() {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
icebergTable.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Loading