KAFKA-9088: Consolidate InternalMockProcessorContext and MockInternalProcessorContext#8157
KAFKA-9088: Consolidate InternalMockProcessorContext and MockInternalProcessorContext#8157pierDipi wants to merge 45 commits intoapache:trunkfrom
Conversation
Demonstrate the consolidation approach via: migrating KStreamSessionWindowAggregateProcessorTest to use MockInternalProcessorContext instead of InternalMockProcessorContext
…to internal-processor-context-test
- Migrated AbstractKeyValueStoreTest - Migrated CachingKeyValueStoreTest - Migrated InMemoryKeyValueLoggedStoreTest - Migrated InMemoryKeyValueStoreTest - Migrated InMemoryLRUCacheStoreTest - Migrated RocksDBKeyValueStoreTest
- Migrated RocksDBTimestampedStoreTest
- Migrated InMemorySessionStoreTest - Migrated RocksDBSessionStoreTest
…ernalProcessorContext
|
Call for review: @cadonna @vvcephei @guozhangwang |
|
test this please |
|
@pierDipi Sorry for the late reply. I will try to review this PR this week. |
| } | ||
|
|
||
| private void init(final Properties config) { | ||
| init(config, createMetrics()); |
There was a problem hiding this comment.
req: Could you try to re-use the metrics that are created in the MockProcessorContext instead of creating an additional instance here?
You probably also need to modify StreamsTestUtils#containsMetric() to use the map returned from StreamsMetrics#metrics() where the StreamsMetrics instance is returned from InternalProcessorContext#metrics().
| init(config); | ||
| } | ||
|
|
||
| public MockInternalProcessorContext(final StreamsMetricsImpl metrics, final ThreadCache threadCache) { |
There was a problem hiding this comment.
req: I looked a bit into the usages of this constructor and I think we can get rid of it. If we can get rid of it, we could also simply reuse the metrics created in MockProcessorContext instead of creating here another instance.
| public MockInternalProcessorContext() { | ||
| super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory()); | ||
| init(StreamsTestUtils.getStreamsConfig()); | ||
| } | ||
|
|
||
| public MockInternalProcessorContext(final Properties config) { | ||
| this(config, createStateDir(config)); | ||
| } | ||
|
|
||
| public MockInternalProcessorContext(final File stateDir) { | ||
| this(StreamsTestUtils.getStreamsConfig(), stateDir); | ||
| } | ||
|
|
||
| public MockInternalProcessorContext(final Properties config, final File stateDir) { | ||
| super(config, DEFAULT_TASK_ID, stateDir); | ||
| init(config); | ||
| } | ||
|
|
||
| public MockInternalProcessorContext(final StreamsMetricsImpl metrics, final ThreadCache threadCache) { | ||
| super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory()); | ||
| init(metrics, threadCache); | ||
| } | ||
|
|
||
| public MockInternalProcessorContext(final LogContext logContext, final long maxCacheSizeBytes) { | ||
| super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory()); | ||
| final Metrics metrics = createMetrics(); | ||
| final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, DEFAULT_CLIENT_ID, DEFAULT_METRICS_VERSION); | ||
| final ThreadCache threadCache = createThreadCache(StreamsTestUtils.getStreamsConfig(), logContext, maxCacheSizeBytes, metrics); | ||
| init(streamsMetrics, threadCache); | ||
| } | ||
|
|
||
| public MockInternalProcessorContext(final Properties config, final TaskId taskId, final Metrics metrics) { | ||
| super(config, taskId, createStateDir(config)); | ||
| init(config, metrics); | ||
| } |
There was a problem hiding this comment.
req: Could try to reduce the number of constructors in general? I guess their number grew with the time and maybe you are able to eliminate some of them.
| recordContext.topic(), | ||
| recordContext.partition(), | ||
| recordContext.offset(), | ||
| recordContext.headers(), | ||
| recordContext.timestamp() |
There was a problem hiding this comment.
req: The indentation was correct here. Please remove the superfluous spaces.
| private List<KeyValueTimestamp<Object, Object>> results() { | ||
| return context.forwarded().stream().map(cf -> new KeyValueTimestamp<>(cf.keyValue().key, cf.keyValue().value, cf.timestamp())).collect(Collectors.toList()); | ||
| } |
There was a problem hiding this comment.
prop: Here you could fix the generics, but that would mean a bit of work in MockProcessorContext, I guess. This is not required for this PR, but if you want you are very welcome to fix this in new PR. I opened ticket KAFKA-9738 to document this issue.
| private boolean containsMetric(final StreamsMetricsImpl streamsMetrics, | ||
| final String name, | ||
| final Map<String, String> tags) { | ||
| final MetricName metricName = new MetricName(name, METRIC_GROUP, "", tags); | ||
| return streamsMetrics.metrics().containsKey(metricName); | ||
| } |
There was a problem hiding this comment.
req: Add this to StreamsTestUtils, please. This is the method you need that I mentioned in a comment on MockInternalProcessorContext.
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
| ); | ||
| final Properties props = StreamsTestUtils.getStreamsConfig(); | ||
| props.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); | ||
| context = new MockInternalProcessorContext(props, metrics); |
There was a problem hiding this comment.
I didn't find a way of re-using the MockProcessorContext's StreamsMetricsImpl instance because of this test that needs to call Metrics#addReporter().
If we find a way we can get rid of the constructor MockInternalProcessorContext(Properties, Metrics)
comment: #8157 (comment)
There was a problem hiding this comment.
Thank you for pointing this out. I will look into it.
|
Hi @cadonna, the PR is ready for another pass. |
|
@pierDipi Thank you for the updates. I will try to review it next week. |
cadonna
left a comment
There was a problem hiding this comment.
Thank you for the updates @pierDipi !
Sorry that I haven't reviewed your PR before but I simply did not find the time.
I think the PR evolves nicely.
I saw a lot of wrong indentations throughout the PR. Could you please fix those?
You also need to rebase. Be aware that we needed to revert a change regarding the ProcessorContext that might affect this PR. So you might get some compilation error after rebasing.
Looking forward to your updates!
| public static final long DEFAULT_OFFSET = 0L; | ||
| public static final long DEFAULT_TIMESTAMP = 0L; | ||
| public static final String DEFAULT_CLIENT_ID = "client-id"; | ||
| public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache "; |
There was a problem hiding this comment.
| public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache "; | |
| public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache"; |
| public static final long DEFAULT_OFFSET = 0L; | ||
| public static final long DEFAULT_TIMESTAMP = 0L; |
There was a problem hiding this comment.
prop: Could you set these two default values to something not equal 0, e.g., 42? Just to avoid the ambiguity when something in a test is initialized to 0. Also the two values should be distinct. In that way, we would immediately discover if we compare offsets with timestamps by mistake.
|
|
||
| public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0); | ||
| public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders(); | ||
| public static final String DEFAULT_TOPIC = ""; |
There was a problem hiding this comment.
prop: Could you give the default topic a non-empty string like "test-topic"?
| public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> { | ||
|
|
||
| public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0); | ||
| public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders(); |
There was a problem hiding this comment.
prop:
| public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders(); | |
| public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders( | |
| Collections.singletonList(new RecordHeader("headerKey", "headerValue".getBytes())) | |
| ); |
| verifyDefaultMetricsVersion(context); | ||
| verifyDefaultRecordCollector(context); | ||
| verifyDefaultTaskId(context); | ||
| verifyDefaultTopic(context); | ||
| verifyDefaultPartition(context); | ||
| verifyDefaultTimestamp(context); | ||
| verifyDefaultOffset(context); | ||
| verifyDefaultHeaders(context); | ||
| verifyDefaultProcessorNodeName(context); |
There was a problem hiding this comment.
req: Please do not add a method for each default value. IMO using assertThat() as I did in the following is readable enough.
| verifyDefaultMetricsVersion(context); | |
| verifyDefaultRecordCollector(context); | |
| verifyDefaultTaskId(context); | |
| verifyDefaultTopic(context); | |
| verifyDefaultPartition(context); | |
| verifyDefaultTimestamp(context); | |
| verifyDefaultOffset(context); | |
| verifyDefaultHeaders(context); | |
| verifyDefaultProcessorNodeName(context); | |
| assertThat(context.metrics().version(), is(Version.LATEST)); | |
| assertThat(context.recordCollector(), notNullValue()); | |
| assertThat(context.taskId(), is(MockInternalProcessorContext.DEFAULT_TASK_ID)); | |
| assertThat(context.recordContext().topic(), is(MockInternalProcessorContext.DEFAULT_TOPIC)); | |
| assertThat(context.recordContext().partition(), is(MockInternalProcessorContext.DEFAULT_PARTITION)); | |
| assertThat(context.recordContext().timestamp(), is(MockInternalProcessorContext.DEFAULT_TIMESTAMP)); | |
| assertThat(context.recordContext().offset(), is(MockInternalProcessorContext.DEFAULT_OFFSET)); | |
| assertThat(context.recordContext().headers(), is(MockInternalProcessorContext.DEFAULT_HEADERS)); | |
| assertThat(context.currentNode().name(), is(MockInternalProcessorContext.DEFAULT_PROCESSOR_NODE_NAME)); |
| final StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder( | ||
| Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false), | ||
| Serdes.String(), | ||
| Serdes.String()) | ||
| .withCachingEnabled(); | ||
| Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false), | ||
| Serdes.String(), | ||
| Serdes.String()) | ||
| .withCachingEnabled(); |
There was a problem hiding this comment.
prop: Should be
final StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false),
Serdes.String(),
Serdes.String()
).withCachingEnabled();
| Consumed.with(Serdes.String(), Serdes.String())) | ||
| .transform(() -> new Transformer<String, String, KeyValue<String, String>>() { | ||
| private WindowStore<String, String> store; | ||
| private int numRecordsProcessed; | ||
| private ProcessorContext context; | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| @Override | ||
| public void init(final ProcessorContext processorContext) { | ||
| this.context = processorContext; | ||
| this.store = (WindowStore<String, String>) processorContext.getStateStore("store-name"); | ||
| int count = 0; | ||
|
|
||
| final KeyValueIterator<Windowed<String>, String> all = store.all(); | ||
| while (all.hasNext()) { | ||
| count++; | ||
| all.next(); | ||
| Consumed.with(Serdes.String(), Serdes.String())) | ||
| .transform(() -> new Transformer<String, String, KeyValue<String, String>>() { | ||
| private WindowStore<String, String> store; | ||
| private int numRecordsProcessed; | ||
| private ProcessorContext context; | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| @Override | ||
| public void init(final ProcessorContext processorContext) { | ||
| this.context = processorContext; | ||
| this.store = (WindowStore<String, String>) processorContext.getStateStore("store-name"); | ||
| int count = 0; | ||
|
|
||
| final KeyValueIterator<Windowed<String>, String> all = store.all(); | ||
| while (all.hasNext()) { | ||
| count++; | ||
| all.next(); | ||
| } | ||
|
|
||
| assertThat(count, equalTo(0)); | ||
| } | ||
|
|
||
| assertThat(count, equalTo(0)); | ||
| } | ||
| @Override | ||
| public KeyValue<String, String> transform(final String key, final String value) { | ||
| int count = 0; | ||
|
|
||
| @Override | ||
| public KeyValue<String, String> transform(final String key, final String value) { | ||
| int count = 0; | ||
| final KeyValueIterator<Windowed<String>, String> all = store.all(); | ||
| while (all.hasNext()) { | ||
| count++; | ||
| all.next(); | ||
| } | ||
| assertThat(count, equalTo(numRecordsProcessed)); | ||
|
|
||
| final KeyValueIterator<Windowed<String>, String> all = store.all(); | ||
| while (all.hasNext()) { | ||
| count++; | ||
| all.next(); | ||
| } | ||
| assertThat(count, equalTo(numRecordsProcessed)); | ||
| store.put(value, value, context.timestamp()); | ||
|
|
||
| store.put(value, value, context.timestamp()); | ||
| numRecordsProcessed++; | ||
|
|
||
| numRecordsProcessed++; | ||
|
|
||
| return new KeyValue<>(key, value); | ||
| } | ||
| return new KeyValue<>(key, value); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() {} | ||
| }, "store-name"); | ||
| @Override | ||
| public void close() { | ||
| } |
There was a problem hiding this comment.
req: It seems something happened with the indentation. I think before it was OK. In any case indentation should be 4 spaces.
| Serdes.String().serializer(), | ||
| Serdes.String().serializer(), | ||
| initialWallClockTime, | ||
| Duration.ZERO); |
There was a problem hiding this comment.
| Serdes.String().serializer(), | |
| Serdes.String().serializer(), | |
| initialWallClockTime, | |
| Duration.ZERO); | |
| Serdes.String().serializer(), | |
| Serdes.String().serializer(), | |
| initialWallClockTime, | |
| Duration.ZERO | |
| ); |
There was a problem hiding this comment.
The rest of the class seem also to have the wrong indentation.
| ); | ||
| final Properties props = StreamsTestUtils.getStreamsConfig(); | ||
| props.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); | ||
| context = new MockInternalProcessorContext(props, metrics); |
There was a problem hiding this comment.
Thank you for pointing this out. I will look into it.
|
|
||
| public MockInternalProcessorContext(final Properties config, final TaskId taskId, final File stateDir) { | ||
| super(config, taskId, stateDir); | ||
| public MockInternalProcessorContext(final Properties config, final File stateDir, final ThreadCache cache) { |
There was a problem hiding this comment.
Q: Do you think we can eliminate the stateDir parameter in this constructor and just take the state dir in the config parameter?
|
Sorry, I haven't had time to continue this contribution. :( |
These changes migrate usages of
InternalMockProcessorContextto use a new version ofMockInternalProcessorContext.Committer Checklist (excluded from commit message)