From 1aeef71b75c8615663b4d87258ec2068286ec8b7 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 2 Nov 2017 12:11:45 -0600 Subject: [PATCH] Kafka: Fixes needlessly low interpretation of maxRowsInMemory. AppenderatorImpl already applies maxRowsInMemory across all sinks. So dividing by the number of Kafka partitions is pointless and effectively makes the interpretation of maxRowsInMemory lower than expected. This undoes one of the two changes from #3284, which fixed the original bug twice. In this, that's worse than fixing it once. --- .../io/druid/indexing/kafka/KafkaIndexTask.java | 5 +---- .../druid/indexing/kafka/KafkaTuningConfig.java | 16 ---------------- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 02c11333d678..fdbb42c93e5d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -957,12 +957,9 @@ private boolean isPaused() private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) { - final int maxRowsInMemoryPerPartition = (tuningConfig.getMaxRowsInMemory() / - ioConfig.getStartPartitions().getPartitionOffsetMap().size()); return Appenderators.createRealtime( dataSchema, - tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()) - .withMaxRowsInMemory(maxRowsInMemoryPerPartition), + tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), metrics, toolbox.getSegmentPusher(), toolbox.getObjectMapper(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index e9dc4463f80e..a1792c92a5e2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -185,22 +185,6 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) ); } - public KafkaTuningConfig withMaxRowsInMemory(int rows) - { - return new KafkaTuningConfig( - rows, - maxRowsPerSegment, - intermediatePersistPeriod, - basePersistDirectory, - maxPendingPersists, - indexSpec, - true, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically - ); - } - @Override public boolean equals(Object o) {