diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 9a94ad6a6faf7..2d0d3d3af8dfb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -29,7 +29,7 @@ import java.time.Duration; -class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { +public class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { StandbyContextImpl(final TaskId id, final StreamsConfig config, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 4c321042fba80..dd0feec17c3e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -251,7 +252,7 @@ Map getWriteBatches(final Collection> re // This handles the case that state store is moved to a new client and does not // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading // will only close the database and open it again with bulk loading enabled. - if (!bulkLoadSegments.contains(segment)) { + if (!bulkLoadSegments.contains(segment) && isStoreForActiveTask()) { segment.toggleDbForBulkLoading(true); // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that // makes the open flag for the newly created store. @@ -270,6 +271,10 @@ Map getWriteBatches(final Collection> re return writeBatchMap; } + private boolean isStoreForActiveTask() { + return context instanceof ProcessorContextImpl; + } + private void toggleForBulkLoading(final boolean prepareForBulkload) { for (final S segment : segments.allSegments()) { segment.toggleDbForBulkLoading(prepareForBulkload); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 7c4f44394ea69..6a4b9c20ce6cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -29,10 +29,16 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.StandbyContextImpl; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.StreamsTestUtils; @@ -64,6 +70,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.niceMock; +import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; @@ -355,7 +364,43 @@ public void shouldCreateWriteBatches() { } @Test - public void shouldRestoreToByteStore() { + public void shouldRestoreToByteStoreForActiveTask() { + final ProcessorContextImpl context = niceMock(ProcessorContextImpl.class); + verifyRestoreToByteStore(context); + + // Bulk loading is enabled during recovery for active tasks + // (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824) + for (final S segment : bytesStore.getSegments()) { + assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30)); + } + } + + @Test + public void shouldRestoreToByteStoreForStandbyTask() { + final StandbyContextImpl context = niceMock(StandbyContextImpl.class); + verifyRestoreToByteStore(context); + + // Bulk loading is disabled during recovery for stand-by tasks + // (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824) + for (final S segment : bytesStore.getSegments()) { + assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4)); + } + } + + private void verifyRestoreToByteStore(final InternalProcessorContext context) { + bytesStore = getBytesStore(); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST); + streamsMetrics.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger()); + expect(context.metrics()).andStubReturn(streamsMetrics); + final TaskId taskId = new TaskId(0, 0); + expect(context.taskId()).andStubReturn(taskId); + final Map config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals(); + expect(context.appConfigs()).andStubReturn(config); + expect(context.stateDir()).andStubReturn(stateDir); + replay(context); + bytesStore.init(context, bytesStore); + // 0 segments initially. assertEquals(0, bytesStore.getSegments().size()); final String key = "a"; @@ -367,11 +412,6 @@ public void shouldRestoreToByteStore() { // 2 segments are created during restoration. assertEquals(2, bytesStore.getSegments().size()); - // Bulk loading is enabled during recovery. - for (final S segment : bytesStore.getSegments()) { - assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30)); - } - final List, Long>> expected = new ArrayList<>(); expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));