From 652250344fcb6d59bd291d4588e41a84d6087470 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 6 Jul 2020 21:37:17 +0200 Subject: [PATCH 1/2] KAFKA-10221: Backport fix for KAFKA-9603 to 2.5 KAFKA-9603 reports that the number of open files keeps increasing in RocksDB. The reason is that bulk loading is turned on but never turned off in segmented state stores for standby tasks. This bug was fixed in 2.6 by using code that is not present in 2.5. This PR backports the fix to 2.5. --- .../internals/StandbyContextImpl.java | 2 +- .../AbstractRocksDBSegmentedBytesStore.java | 3 +- ...bstractRocksDBSegmentedBytesStoreTest.java | 52 ++++++++++++++++--- 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 9a94ad6a6faf7..2d0d3d3af8dfb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -29,7 +29,7 @@ import java.time.Duration; -class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { +public class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { StandbyContextImpl(final TaskId id, final StreamsConfig config, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 4c321042fba80..a81666444586e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -251,7 +252,7 @@ Map getWriteBatches(final Collection> re // This handles the case that state store is moved to a new client and does not // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading // will only close the database and open it again with bulk loading enabled. - if (!bulkLoadSegments.contains(segment)) { + if (!bulkLoadSegments.contains(segment) && context instanceof ProcessorContextImpl) { segment.toggleDbForBulkLoading(true); // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that // makes the open flag for the newly created store. diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 7c4f44394ea69..6a4b9c20ce6cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -29,10 +29,16 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.StandbyContextImpl; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.StreamsTestUtils; @@ -64,6 +70,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.niceMock; +import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; @@ -355,7 +364,43 @@ public void shouldCreateWriteBatches() { } @Test - public void shouldRestoreToByteStore() { + public void shouldRestoreToByteStoreForActiveTask() { + final ProcessorContextImpl context = niceMock(ProcessorContextImpl.class); + verifyRestoreToByteStore(context); + + // Bulk loading is enabled during recovery for active tasks + // (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824) + for (final S segment : bytesStore.getSegments()) { + assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30)); + } + } + + @Test + public void shouldRestoreToByteStoreForStandbyTask() { + final StandbyContextImpl context = niceMock(StandbyContextImpl.class); + verifyRestoreToByteStore(context); + + // Bulk loading is disabled during recovery for stand-by tasks + // (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824) + for (final S segment : bytesStore.getSegments()) { + assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4)); + } + } + + private void verifyRestoreToByteStore(final InternalProcessorContext context) { + bytesStore = getBytesStore(); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST); + streamsMetrics.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger()); + expect(context.metrics()).andStubReturn(streamsMetrics); + final TaskId taskId = new TaskId(0, 0); + expect(context.taskId()).andStubReturn(taskId); + final Map config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals(); + expect(context.appConfigs()).andStubReturn(config); + expect(context.stateDir()).andStubReturn(stateDir); + replay(context); + bytesStore.init(context, bytesStore); + // 0 segments initially. assertEquals(0, bytesStore.getSegments().size()); final String key = "a"; @@ -367,11 +412,6 @@ public void shouldRestoreToByteStore() { // 2 segments are created during restoration. assertEquals(2, bytesStore.getSegments().size()); - // Bulk loading is enabled during recovery. - for (final S segment : bytesStore.getSegments()) { - assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30)); - } - final List, Long>> expected = new ArrayList<>(); expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L)); From f4ae8ea190c2cddb65066bbbe3054f40fe7bcee7 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 7 Jul 2020 18:25:54 +0200 Subject: [PATCH 2/2] Make code more readable --- .../state/internals/AbstractRocksDBSegmentedBytesStore.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index a81666444586e..dd0feec17c3e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -252,7 +252,7 @@ Map getWriteBatches(final Collection> re // This handles the case that state store is moved to a new client and does not // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading // will only close the database and open it again with bulk loading enabled. - if (!bulkLoadSegments.contains(segment) && context instanceof ProcessorContextImpl) { + if (!bulkLoadSegments.contains(segment) && isStoreForActiveTask()) { segment.toggleDbForBulkLoading(true); // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that // makes the open flag for the newly created store. @@ -271,6 +271,10 @@ Map getWriteBatches(final Collection> re return writeBatchMap; } + private boolean isStoreForActiveTask() { + return context instanceof ProcessorContextImpl; + } + private void toggleForBulkLoading(final boolean prepareForBulkload) { for (final S segment : segments.allSegments()) { segment.toggleDbForBulkLoading(prepareForBulkload);