From 21772159ab77f2b9d6b4c41c7387005c71aa3171 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 13 Mar 2019 22:07:01 -0700 Subject: [PATCH 1/4] Fix testIncrementalHandOffReadsThroughEndOffsets in Kafka/KinesisIndexTaskTest --- .../indexing/kafka/KafkaIndexTaskTest.java | 71 +++-- .../kinesis/KinesisIndexTaskTest.java | 266 ++++++++---------- .../SeekableStreamIndexTaskRunner.java | 1 + 3 files changed, 179 insertions(+), 159 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 070396f46722..675c0d618c9a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -86,6 +86,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ListenableFutures; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -914,7 +915,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)); - final KafkaIndexTask task = createTask( + final KafkaIndexTask normalReplica = createTask( null, new KafkaIndexTaskIOConfig( 0, @@ -928,34 +929,68 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null ) ); - final ListenableFuture future = runTask(task); - while (task.getRunner().getStatus() != Status.PAUSED) { + final KafkaIndexTask staleReplica = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final ListenableFuture normalReplicaFuture = runTask(normalReplica); + // Simulating one replica is slower than the other + final ListenableFuture staleReplicaFuture = ListenableFutures.transformAsync( + taskExec.submit(() -> { + Thread.sleep(1000); + return staleReplica; + }), + this::runTask + ); + + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + staleReplica.getRunner().pause(); + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Map currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); - // Simulating the case when another replica has consumed up to the offset of 8 - task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false); + normalReplica.getRunner().setEndOffsets(currentOffsets, false); + staleReplica.getRunner().setEndOffsets(currentOffsets, false); - // The task is supposed to consume remaining rows up to the offset of 13 - while (task.getRunner().getStatus() != Status.PAUSED) { + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); + currentOffsets = ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); - task.getRunner().setEndOffsets( - ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L), - true - ); + normalReplica.getRunner().setEndOffsets(currentOffsets, true); + staleReplica.getRunner().setEndOffsets(currentOffsets, true); - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, normalReplicaFuture.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, staleReplicaFuture.get().getStatusCode()); - // processed count would be 8 if it stopped at it's current offsets - Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(9, normalReplica.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getThrownAway()); + + Assert.assertEquals(9, staleReplica.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, staleReplica.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, staleReplica.getRunner().getRowIngestionMeters().getThrownAway()); } @Test(timeout = 60_000L) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 857b4a76235c..a417d2858e7f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -91,6 +91,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ListenableFutures; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -133,8 +134,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -2557,6 +2556,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception final KinesisIndexTask task = createTask( "task1", + DATA_SCHEMA, new KinesisIndexTaskIOConfig( null, "sequence0", @@ -2616,125 +2616,147 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception final String baseSequenceName = "sequence0"; // as soon as any segment has more than one record, incremental publishing should happen maxRowsPerSegment = 2; - maxRecordsPerPoll = 1; - recordSupplier.assign(anyObject()); + final KinesisRecordSupplier recordSupplier1 = mock(KinesisRecordSupplier.class); + recordSupplier1.assign(anyObject()); expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); + expect(recordSupplier1.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); + recordSupplier1.seek(anyObject(), anyString()); expectLastCall().anyTimes(); - - expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5)) - .once() - .andReturn(records.subList(4, 10)) - .once() - .andReturn(records.subList(9, 15)) - .once(); - - recordSupplier.close(); + expect(recordSupplier1.poll(anyLong())).andReturn(records.subList(0, 5)) + .once() + .andReturn(records.subList(4, 10)) + .once(); + recordSupplier1.close(); + expectLastCall().once(); + final KinesisRecordSupplier recordSupplier2 = mock(KinesisRecordSupplier.class); + recordSupplier2.assign(anyObject()); + expectLastCall().anyTimes(); + expect(recordSupplier2.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); + recordSupplier2.seek(anyObject(), anyString()); + expectLastCall().anyTimes(); + expect(recordSupplier2.poll(anyLong())).andReturn(records.subList(0, 5)) + .once() + .andReturn(records.subList(4, 10)) + .once(); + recordSupplier2.close(); expectLastCall().once(); replayAll(); final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "0" - ) + ImmutableMap.of(shardId1, "0") ); final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "4" - ) + ImmutableMap.of(shardId1, "4") ); final SeekableStreamPartitions checkpoint2 = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "9" - ) + ImmutableMap.of(shardId1, "9") ); final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "14" - ) + ImmutableMap.of(shardId1, "100") // simulating unlimited ); - final KinesisIndexTask task = createTask( + final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig( null, - new KinesisIndexTaskIOConfig( - null, - baseSequenceName, - startPartitions, - endPartitions, - true, - null, - null, - "awsEndpoint", - null, - null, - null, - null, - null, - false - ) + baseSequenceName, + startPartitions, + endPartitions, + true, + null, + null, + "awsEndpoint", + null, + null, + null, + null, + null, + false ); - final ListenableFuture future = runTask(task); - while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) { + final KinesisIndexTask normalReplica = createTask( + null, + DATA_SCHEMA, + ioConfig, + null + ); + ((TestableKinesisIndexTask) normalReplica).setLocalSupplier(recordSupplier1); + final KinesisIndexTask staleReplica = createTask( + null, + DATA_SCHEMA, + ioConfig, + null + ); + ((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2); + final ListenableFuture normalReplicaFuture = runTask(normalReplica); + // Simulating one replica is slower than the other + final ListenableFuture staleReplicaFuture = ListenableFutures.transformAsync( + taskExec.submit(() -> { + Thread.sleep(1000); + return staleReplica; + }), + this::runTask + ); + + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + staleReplica.getRunner().pause(); + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Map currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); - task.getRunner().setEndOffsets(currentOffsets, false); - // The task is supposed to consume remaining rows up to the offset of 13 - while (task.getRunner().getStatus() != Status.PAUSED) { + normalReplica.getRunner().setEndOffsets(currentOffsets, false); + staleReplica.getRunner().setEndOffsets(currentOffsets, false); + + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); + currentOffsets = ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); - task.getRunner().setEndOffsets( - ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)), - true - ); + normalReplica.getRunner().setEndOffsets(currentOffsets, true); + staleReplica.getRunner().setEndOffsets(currentOffsets, true); - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, normalReplicaFuture.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, staleReplicaFuture.get().getStatusCode()); verifyAll(); Assert.assertEquals(2, checkpointRequestsHash.size()); // Check metrics - Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(10, normalReplica.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getThrownAway()); // Check published metadata final Set descriptors = new HashSet<>(); - descriptors.add(sd(task, "2008/P1D", 0)); - descriptors.add(sd(task, "2008/P1D", 1)); - descriptors.add(sd(task, "2009/P1D", 0)); - descriptors.add(sd(task, "2010/P1D", 0)); - descriptors.add(sd(task, "2010/P1D", 1)); - descriptors.add(sd(task, "2011/P1D", 0)); - descriptors.add(sd(task, "2011/P1D", 1)); - descriptors.add(sd(task, "2012/P1D", 0)); - descriptors.add(sd(task, "2013/P1D", 0)); + descriptors.add(sd(normalReplica, "2008/P1D", 0)); + descriptors.add(sd(normalReplica, "2009/P1D", 0)); + descriptors.add(sd(normalReplica, "2010/P1D", 0)); + descriptors.add(sd(normalReplica, "2010/P1D", 1)); + descriptors.add(sd(normalReplica, "2011/P1D", 0)); + descriptors.add(sd(normalReplica, "2011/P1D", 1)); + descriptors.add(sd(normalReplica, "2012/P1D", 0)); + descriptors.add(sd(normalReplica, "2013/P1D", 0)); Assert.assertEquals(descriptors, publishedDescriptors()); Assert.assertEquals( new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of( shardId1, - "10" + "9" ))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -2791,26 +2813,27 @@ private KinesisIndexTask createTask( final KinesisIndexTaskIOConfig ioConfig ) { - return createTask(taskId, DATA_SCHEMA, ioConfig); + return createTask(taskId, DATA_SCHEMA, ioConfig, null); } private KinesisIndexTask createTask( final String taskId, - final KinesisIndexTaskIOConfig ioConfig, - final Map context + final DataSchema dataSchema, + final KinesisIndexTaskIOConfig ioConfig ) { - return createTask(taskId, DATA_SCHEMA, ioConfig, context); + return createTask(taskId, dataSchema, ioConfig, null); } private KinesisIndexTask createTask( final String taskId, final DataSchema dataSchema, - final KinesisIndexTaskIOConfig ioConfig + final KinesisIndexTaskIOConfig ioConfig, + @Nullable final Map context ) { final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig( - 1000, + maxRowsInMemory, null, maxRowsPerSegment, maxTotalRows, @@ -2822,11 +2845,11 @@ private KinesisIndexTask createTask( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - skipAvailabilityCheck, + true, + null, null, null, null, - 5000, null, null, logParseExceptions, @@ -2835,58 +2858,20 @@ private KinesisIndexTask createTask( maxRecordsPerPoll, intermediateHandoffPeriod ); - final Map context = null; - final KinesisIndexTask task = new TestableKinesisIndexTask( - taskId, - null, - cloneDataSchema(dataSchema), - tuningConfig, - ioConfig, - context, - null, - null, - rowIngestionMetersFactory, - null - ); - - return task; + return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); } - private KinesisIndexTask createTask( final String taskId, final DataSchema dataSchema, final KinesisIndexTaskIOConfig ioConfig, - final Map context + final KinesisIndexTaskTuningConfig tuningConfig, + @Nullable final Map context ) { - final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig( - maxRowsInMemory, - null, - maxRowsPerSegment, - maxTotalRows, - new Period("P1Y"), - null, - null, - null, - true, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - true, - null, - null, - null, - null, - null, - null, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions, - maxRecordsPerPoll, - intermediateHandoffPeriod - ); - context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + if (context != null) { + context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + } final KinesisIndexTask task = new TestableKinesisIndexTask( taskId, @@ -3059,14 +3044,6 @@ public void close() final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig(); dataSegmentPusherConfig.storageDirectory = getSegmentDirectory(); final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig); - SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() - { - @Override - public List getLocations() - { - return new ArrayList<>(); - } - }; toolboxFactory = new TaskToolboxFactory( taskConfig, taskActionClientFactory, @@ -3242,8 +3219,10 @@ private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOExcep @JsonTypeName("index_kinesis") private static class TestableKinesisIndexTask extends KinesisIndexTask { + private KinesisRecordSupplier localSupplier; + @JsonCreator - public TestableKinesisIndexTask( + private TestableKinesisIndexTask( @JsonProperty("id") String id, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("dataSchema") DataSchema dataSchema, @@ -3258,22 +3237,27 @@ public TestableKinesisIndexTask( { super( id, - taskResource, + null, dataSchema, tuningConfig, ioConfig, context, - chatHandlerProvider, - authorizerMapper, + null, + null, rowIngestionMetersFactory, - awsCredentialsConfig + null ); } + private void setLocalSupplier(KinesisRecordSupplier recordSupplier) + { + this.localSupplier = recordSupplier; + } + @Override protected KinesisRecordSupplier newTaskRecordSupplier() { - return recordSupplier; + return localSupplier == null ? recordSupplier : localSupplier; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 4fca19d12174..efa3389898ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -534,6 +534,7 @@ public void run() } else { rows = new ArrayList<>(); for (byte[] valueBytes : valueBytess) { + log.info("valueBytes len: " + valueBytes.length); rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes))); } } From 9064ebed06a5cb207e7e931322d6bf4710fc9ab1 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 13 Mar 2019 22:11:31 -0700 Subject: [PATCH 2/4] revert unnecessary change --- .../druid/indexing/kinesis/KinesisIndexTaskTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index a417d2858e7f..2aded4a410b2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -3237,15 +3237,15 @@ private TestableKinesisIndexTask( { super( id, - null, + taskResource, dataSchema, tuningConfig, ioConfig, context, - null, - null, + chatHandlerProvider, + authorizerMapper, rowIngestionMetersFactory, - null + awsCredentialsConfig ); } From ba96d7c4ebe8d10bce3d33665cafb95d396acb9f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 14 Mar 2019 10:49:46 -0700 Subject: [PATCH 3/4] fix test --- .../org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 5779f573c15a..6668ddd0c5e7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -911,7 +911,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)); final SeekableStreamPartitions checkpoint2 = - new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L)); + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)); final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)); From 7bd17495a437278f261fed0458c546b1d0c81688 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 14 Mar 2019 15:16:42 -0700 Subject: [PATCH 4/4] remove debug log --- .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index efa3389898ff..4fca19d12174 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -534,7 +534,6 @@ public void run() } else { rows = new ArrayList<>(); for (byte[] valueBytes : valueBytess) { - log.info("valueBytes len: " + valueBytes.length); rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes))); } }