Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import java.time.Duration;

class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
public class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {

StandbyContextImpl(final TaskId id,
final StreamsConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand Down Expand Up @@ -251,7 +252,7 @@ Map<S, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], byte[]>> re
// This handles the case that state store is moved to a new client and does not
// have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
// will only close the database and open it again with bulk loading enabled.
if (!bulkLoadSegments.contains(segment)) {
if (!bulkLoadSegments.contains(segment) && isStoreForActiveTask()) {
segment.toggleDbForBulkLoading(true);
// If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that
// makes the open flag for the newly created store.
Expand All @@ -270,6 +271,10 @@ Map<S, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], byte[]>> re
return writeBatchMap;
}

private boolean isStoreForActiveTask() {
return context instanceof ProcessorContextImpl;
}

private void toggleForBulkLoading(final boolean prepareForBulkload) {
for (final S segment : segments.allSegments()) {
segment.toggleDbForBulkLoading(prepareForBulkload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.StandbyContextImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
Expand Down Expand Up @@ -64,6 +70,9 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -355,7 +364,43 @@ public void shouldCreateWriteBatches() {
}

@Test
public void shouldRestoreToByteStore() {
public void shouldRestoreToByteStoreForActiveTask() {
final ProcessorContextImpl context = niceMock(ProcessorContextImpl.class);
verifyRestoreToByteStore(context);

// Bulk loading is enabled during recovery for active tasks
// (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824)
for (final S segment : bytesStore.getSegments()) {
assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30));
}
}

@Test
public void shouldRestoreToByteStoreForStandbyTask() {
final StandbyContextImpl context = niceMock(StandbyContextImpl.class);
verifyRestoreToByteStore(context);

// Bulk loading is disabled during recovery for stand-by tasks
// (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824)
for (final S segment : bytesStore.getSegments()) {
assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4));
}
}

private void verifyRestoreToByteStore(final InternalProcessorContext context) {
bytesStore = getBytesStore();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST);
streamsMetrics.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
expect(context.metrics()).andStubReturn(streamsMetrics);
final TaskId taskId = new TaskId(0, 0);
expect(context.taskId()).andStubReturn(taskId);
final Map<String, Object> config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals();
expect(context.appConfigs()).andStubReturn(config);
expect(context.stateDir()).andStubReturn(stateDir);
replay(context);
bytesStore.init(context, bytesStore);

// 0 segments initially.
assertEquals(0, bytesStore.getSegments().size());
final String key = "a";
Expand All @@ -367,11 +412,6 @@ public void shouldRestoreToByteStore() {
// 2 segments are created during restoration.
assertEquals(2, bytesStore.getSegments().size());

// Bulk loading is enabled during recovery.
for (final S segment : bytesStore.getSegments()) {
assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30));
}

final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
Expand Down