From ee4ff068207ef2a9218312c4cd7754f5c46d6574 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 22 Jun 2024 10:37:02 +0530 Subject: [PATCH 1/5] Fix flaky test in RetryableS3OutputStreamTest --- .../output/RetryableS3OutputStreamTest.java | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 8e7a81eb48dd..70bcf6617ed6 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -171,6 +171,27 @@ public void testWriteSmallBufferShouldSucceed() throws IOException s3.assertCompleted(chunkSize, 600); } + @Test + public void testWriteSmallBufferExactChunkSizeShouldSucceed() throws IOException + { + chunkSize = 128; + final int numChunks = 5; + final long fileSize = chunkSize * numChunks; + try (RetryableS3OutputStream out = new RetryableS3OutputStream( + config, + s3, + path, + s3UploadManager + )) { + for (int i = 0; i < fileSize; i++) { + out.write(i); + } + } + // each chunk 128 bytes, so there should be 5 chunks. + Assert.assertEquals(numChunks, s3.partRequests.size()); + s3.assertCompleted(chunkSize, fileSize); + } + @Test public void testSuccessToUploadAfterRetry() throws IOException { @@ -284,13 +305,22 @@ private void assertCompleted(long chunkSize, long expectedFileSize) Set partNumbersFromRequest = partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toSet()); Assert.assertEquals(partRequests.size(), partNumbersFromRequest.size()); - for (int i = 0; i < partRequests.size(); i++) { - if (i < partRequests.size() - 1) { - Assert.assertEquals(chunkSize, partRequests.get(i).getPartSize()); - } else { - Assert.assertTrue(chunkSize >= partRequests.get(i).getPartSize()); - } + int numChunksOfChunkSize = 0; + long fileSize = expectedFileSize; + while (fileSize - chunkSize >= 0) { + fileSize -= chunkSize; + numChunksOfChunkSize++; } + + int numChunksOfSmallerSize = fileSize == 0 ? 0 : 1; + + // Validate part sizes + long numOfExactChunks = partRequests.stream().filter(part -> part.getPartSize() == chunkSize).count(); + long numOfSmallerChunks = partRequests.stream().filter(part -> part.getPartSize() < chunkSize).count(); + + Assert.assertEquals(numChunksOfChunkSize, numOfExactChunks); + Assert.assertEquals(numChunksOfSmallerSize, numOfSmallerChunks); + final List eTags = completeRequest.getPartETags(); Assert.assertEquals(partRequests.size(), eTags.size()); Assert.assertEquals( From 1695588a189d8883c943cab949b7e55f27d879dc Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 22 Jun 2024 11:28:04 +0530 Subject: [PATCH 2/5] Address CodeQL --- .../s3/output/RetryableS3OutputStreamTest.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 70bcf6617ed6..01968620845e 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -175,8 +175,7 @@ public void testWriteSmallBufferShouldSucceed() throws IOException public void testWriteSmallBufferExactChunkSizeShouldSucceed() throws IOException { chunkSize = 128; - final int numChunks = 5; - final long fileSize = chunkSize * numChunks; + final int fileSize = 128 * 5; try (RetryableS3OutputStream out = new RetryableS3OutputStream( config, s3, @@ -188,7 +187,7 @@ public void testWriteSmallBufferExactChunkSizeShouldSucceed() throws IOException } } // each chunk 128 bytes, so there should be 5 chunks. - Assert.assertEquals(numChunks, s3.partRequests.size()); + Assert.assertEquals(5, s3.partRequests.size()); s3.assertCompleted(chunkSize, fileSize); } @@ -312,11 +311,11 @@ private void assertCompleted(long chunkSize, long expectedFileSize) numChunksOfChunkSize++; } - int numChunksOfSmallerSize = fileSize == 0 ? 0 : 1; + final int numChunksOfSmallerSize = fileSize == 0 ? 0 : 1; // Validate part sizes - long numOfExactChunks = partRequests.stream().filter(part -> part.getPartSize() == chunkSize).count(); - long numOfSmallerChunks = partRequests.stream().filter(part -> part.getPartSize() < chunkSize).count(); + final long numOfExactChunks = partRequests.stream().filter(part -> part.getPartSize() == chunkSize).count(); + final long numOfSmallerChunks = partRequests.stream().filter(part -> part.getPartSize() < chunkSize).count(); Assert.assertEquals(numChunksOfChunkSize, numOfExactChunks); Assert.assertEquals(numChunksOfSmallerSize, numOfSmallerChunks); From 8780efb11c4b7f72dab3959e06f5b8e67856c081 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 22 Jun 2024 14:26:05 +0530 Subject: [PATCH 3/5] Simplify assertion logic --- .../storage/s3/output/RetryableS3OutputStreamTest.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 01968620845e..04eca858921d 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -304,14 +304,8 @@ private void assertCompleted(long chunkSize, long expectedFileSize) Set partNumbersFromRequest = partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toSet()); Assert.assertEquals(partRequests.size(), partNumbersFromRequest.size()); - int numChunksOfChunkSize = 0; - long fileSize = expectedFileSize; - while (fileSize - chunkSize >= 0) { - fileSize -= chunkSize; - numChunksOfChunkSize++; - } - - final int numChunksOfSmallerSize = fileSize == 0 ? 0 : 1; + final long numChunksOfChunkSize = expectedFileSize / chunkSize; + final int numChunksOfSmallerSize = expectedFileSize % chunkSize == 0 ? 0 : 1; // Validate part sizes final long numOfExactChunks = partRequests.stream().filter(part -> part.getPartSize() == chunkSize).count(); From bee45c00bd034f84000fd756865b9ba4fa0caded Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 22 Jun 2024 14:27:49 +0530 Subject: [PATCH 4/5] Change comment --- .../druid/storage/s3/output/RetryableS3OutputStreamTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 04eca858921d..3bddb797c554 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -304,10 +304,9 @@ private void assertCompleted(long chunkSize, long expectedFileSize) Set partNumbersFromRequest = partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toSet()); Assert.assertEquals(partRequests.size(), partNumbersFromRequest.size()); + // Validate number of chunks uploaded. final long numChunksOfChunkSize = expectedFileSize / chunkSize; final int numChunksOfSmallerSize = expectedFileSize % chunkSize == 0 ? 0 : 1; - - // Validate part sizes final long numOfExactChunks = partRequests.stream().filter(part -> part.getPartSize() == chunkSize).count(); final long numOfSmallerChunks = partRequests.stream().filter(part -> part.getPartSize() < chunkSize).count(); From 123f279225d0d528b6bddd24867b8882c1031541 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 22 Jun 2024 16:57:00 +0530 Subject: [PATCH 5/5] Address review comments --- .../output/RetryableS3OutputStreamTest.java | 65 ++++++------------- 1 file changed, 21 insertions(+), 44 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 3bddb797c554..e02701e90397 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -113,12 +113,8 @@ public void testWriteAndHappy() throws IOException { chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 25; i++) { bb.clear(); bb.putInt(i); @@ -135,12 +131,8 @@ public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws { chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES * 3); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { bb.clear(); bb.putInt(1); bb.putInt(2); @@ -156,12 +148,8 @@ public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws public void testWriteSmallBufferShouldSucceed() throws IOException { chunkSize = 128; - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 600; i++) { out.write(i); } @@ -176,12 +164,8 @@ public void testWriteSmallBufferExactChunkSizeShouldSucceed() throws IOException { chunkSize = 128; final int fileSize = 128 * 5; - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < fileSize; i++) { out.write(i); } @@ -198,12 +182,8 @@ public void testSuccessToUploadAfterRetry() throws IOException chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 25; i++) { bb.clear(); bb.putInt(i); @@ -221,12 +201,8 @@ public void testFailToUploadAfterRetries() throws IOException final TestAmazonS3 s3 = new TestAmazonS3(3); ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 2; i++) { bb.clear(); bb.putInt(i); @@ -304,14 +280,15 @@ private void assertCompleted(long chunkSize, long expectedFileSize) Set partNumbersFromRequest = partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toSet()); Assert.assertEquals(partRequests.size(), partNumbersFromRequest.size()); - // Validate number of chunks uploaded. - final long numChunksOfChunkSize = expectedFileSize / chunkSize; - final int numChunksOfSmallerSize = expectedFileSize % chunkSize == 0 ? 0 : 1; - final long numOfExactChunks = partRequests.stream().filter(part -> part.getPartSize() == chunkSize).count(); - final long numOfSmallerChunks = partRequests.stream().filter(part -> part.getPartSize() < chunkSize).count(); - - Assert.assertEquals(numChunksOfChunkSize, numOfExactChunks); - Assert.assertEquals(numChunksOfSmallerSize, numOfSmallerChunks); + // Verify sizes of uploaded chunks + int numSmallerChunks = 0; + for (UploadPartRequest part : partRequests) { + Assert.assertTrue(part.getPartSize() <= chunkSize); + if (part.getPartSize() < chunkSize) { + ++numSmallerChunks; + } + } + Assert.assertTrue(numSmallerChunks <= 1); final List eTags = completeRequest.getPartETags(); Assert.assertEquals(partRequests.size(), eTags.size());