diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 8e7a81eb48dd..e02701e90397 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -113,12 +113,8 @@ public void testWriteAndHappy() throws IOException { chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 25; i++) { bb.clear(); bb.putInt(i); @@ -135,12 +131,8 @@ public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws { chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES * 3); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { bb.clear(); bb.putInt(1); bb.putInt(2); @@ -156,12 +148,8 @@ public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws public void testWriteSmallBufferShouldSucceed() throws IOException { chunkSize = 128; - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 600; i++) { out.write(i); } @@ -171,6 +159,22 @@ public void testWriteSmallBufferShouldSucceed() throws IOException s3.assertCompleted(chunkSize, 600); } + @Test + public void testWriteSmallBufferExactChunkSizeShouldSucceed() throws IOException + { + chunkSize = 128; + final int fileSize = 128 * 5; + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { + for (int i = 0; i < fileSize; i++) { + out.write(i); + } + } + // each chunk 128 bytes, so there should be 5 chunks. + Assert.assertEquals(5, s3.partRequests.size()); + s3.assertCompleted(chunkSize, fileSize); + } + @Test public void testSuccessToUploadAfterRetry() throws IOException { @@ -178,12 +182,8 @@ public void testSuccessToUploadAfterRetry() throws IOException chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 25; i++) { bb.clear(); bb.putInt(i); @@ -201,12 +201,8 @@ public void testFailToUploadAfterRetries() throws IOException final TestAmazonS3 s3 = new TestAmazonS3(3); ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - s3UploadManager - )) { + try (RetryableS3OutputStream out = + new RetryableS3OutputStream(config, s3, path, s3UploadManager)) { for (int i = 0; i < 2; i++) { bb.clear(); bb.putInt(i); @@ -284,13 +280,16 @@ private void assertCompleted(long chunkSize, long expectedFileSize) Set partNumbersFromRequest = partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toSet()); Assert.assertEquals(partRequests.size(), partNumbersFromRequest.size()); - for (int i = 0; i < partRequests.size(); i++) { - if (i < partRequests.size() - 1) { - Assert.assertEquals(chunkSize, partRequests.get(i).getPartSize()); - } else { - Assert.assertTrue(chunkSize >= partRequests.get(i).getPartSize()); + // Verify sizes of uploaded chunks + int numSmallerChunks = 0; + for (UploadPartRequest part : partRequests) { + Assert.assertTrue(part.getPartSize() <= chunkSize); + if (part.getPartSize() < chunkSize) { + ++numSmallerChunks; } } + Assert.assertTrue(numSmallerChunks <= 1); + final List eTags = completeRequest.getPartETags(); Assert.assertEquals(partRequests.size(), eTags.size()); Assert.assertEquals(