From 550195a696077254b190fff6096e75550c06af6b Mon Sep 17 00:00:00 2001 From: Harshpreet Singh Date: Mon, 22 Jun 2020 19:06:01 -0700 Subject: [PATCH 1/4] retry 500 and 503 errors against kinesis --- .../apache/druid/indexing/kinesis/KinesisRecordSupplier.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 65ae1f62ba07..d29d0851be01 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -110,7 +110,8 @@ private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) { final boolean isIOException = ex.getCause() instanceof IOException; final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode()); - return isIOException || isTimeout; + final boolean isInternalError = ex.getStatusCode() == 500 || ex.getStatusCode() == 503; + return isIOException || isTimeout || isInternalError; } /** From facb7dd15ab2cdef4cbfc787495e4ef926787ea8 Mon Sep 17 00:00:00 2001 From: Harshpreet Singh Date: Mon, 22 Jun 2020 22:06:53 -0700 Subject: [PATCH 2/4] add test that exercises retry logic --- .../kinesis/KinesisRecordSupplierTest.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 6b2f32f6cdda..3f7b6957e85b 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kinesis; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; @@ -316,6 +317,88 @@ public void testPoll() throws InterruptedException Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } + @Test + public void testPollWithKinesisInternalFailure() throws InterruptedException + { + recordsPerFetch = 100; + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult0).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult1).anyTimes(); + + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + .andReturn(getRecordsResult0) + .anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + .andReturn(getRecordsResult1) + .anyTimes(); + AmazonServiceException getException = new AmazonServiceException("InternalFailure"); + getException.setErrorCode("InternalFailure"); + getException.setStatusCode(500); + getException.setServiceName("AmazonKinesis"); + EasyMock.expect(getRecordsResult0.getRecords()).andThrow(getException).once(); + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); + EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); + EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + + replayAll(); + + Set> partitions = ImmutableSet.of( + StreamPartition.of(STREAM, SHARD_ID0), + StreamPartition.of(STREAM, SHARD_ID1) + ); + + + recordSupplier = new KinesisRecordSupplier( + kinesis, + recordsPerFetch, + 0, + 2, + false, + 100, + 5000, + 5000, + 60000, + 100, + true + ); + + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + recordSupplier.start(); + + while (recordSupplier.bufferSize() < 12) { + Thread.sleep(100); + } + + List> polledRecords = cleanRecords(recordSupplier.poll( + POLL_TIMEOUT_MILLIS)); + + verifyAll(); + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); + } + @Test public void testSeek() throws InterruptedException From 9119d898dc3b1f3c0f720aeafff678e29b7fef09 Mon Sep 17 00:00:00 2001 From: Harshpreet Singh Date: Mon, 22 Jun 2020 23:27:54 -0700 Subject: [PATCH 3/4] more branch coverage --- .../druid/indexing/kinesis/KinesisRecordSupplierTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 3f7b6957e85b..881750de2a79 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -352,12 +352,18 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException getException.setServiceName("AmazonKinesis"); EasyMock.expect(getRecordsResult0.getRecords()).andThrow(getException).once(); EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); + AmazonServiceException getException2 = new AmazonServiceException("InternalFailure"); + getException2.setErrorCode("InternalFailure"); + getException2.setStatusCode(503); + getException2.setServiceName("AmazonKinesis"); + EasyMock.expect(getRecordsResult1.getRecords()).andThrow(getException2).once(); EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -385,7 +391,7 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException recordSupplier.seekToEarliest(partitions); recordSupplier.start(); - while (recordSupplier.bufferSize() < 12) { + while (recordSupplier.bufferSize() < 14) { Thread.sleep(100); } From af40b4f1d1646f70935c617942b03a3937c928fb Mon Sep 17 00:00:00 2001 From: Harshpreet Singh Date: Tue, 23 Jun 2020 14:04:18 -0700 Subject: [PATCH 4/4] retry 500 and 503 on getRecords request when fetching sequence numberu --- .../apache/druid/indexing/kinesis/KinesisRecordSupplier.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index d29d0851be01..23e5bec573f1 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -810,6 +810,10 @@ private String getSequenceNumber(StreamPartition partition, ShardIterato ); return true; } + if (throwable instanceof AmazonServiceException) { + AmazonServiceException ase = (AmazonServiceException) throwable; + return isServiceExceptionRecoverable(ase); + } return false; }, GET_SEQUENCE_NUMBER_RETRY_COUNT