Skip to content

KAFKA-3452 Follow-up: Optimize ByteStore Scenarios#2333

Closed
guozhangwang wants to merge 20 commits intoapache:trunkfrom
guozhangwang:K3452-followup-state-store-refactor
Closed

KAFKA-3452 Follow-up: Optimize ByteStore Scenarios#2333
guozhangwang wants to merge 20 commits intoapache:trunkfrom
guozhangwang:K3452-followup-state-store-refactor

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang commented Jan 9, 2017

This is a refactoring follow-up of #2166. Main refactoring changes:

  1. Extract InMemoryKeyValueStore out of InMemoryKeyValueStoreSupplier and remove its duplicates in test package.

  2. Add two abstract classes AbstractKeyValueIterator and AbstractKeyValueStore to collapse common functional logics.

  3. Added specialized BytesXXStore to accommodate cases where key value types are Bytes / byte[] so that we can save calling the dummy serdes.

  4. Make the key type in ThreadCache from byte[] to Bytes, as SessionStore / WindowStore's result serialized bytes are in the form of Bytes anyways, so that we can save unnecessary Bytes.get() and Bytes.wrap(bytes).

Each of these should arguably be a separate PR and I apologize for the mess, this is because this branch was extracted from a rather large diff that has multiple refactoring mingled together and @dguy and myself have already put lots of efforts to break it down to a few separate PRs, and this is the only left-over work. Such PR won't happen in the future.

Ping @dguy @enothereska @mjsax for reviews

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/606/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/607/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/608/
Test FAILed (JDK 8 and Scala 2.11).

@guozhangwang guozhangwang changed the title KAFKA-3452 Follow-up: Refactoring StateStore hierarchies [WIP] KAFKA-3452 Follow-up: Refactoring StateStore hierarchies Jan 9, 2017
@asfbot
Copy link
Copy Markdown

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/611/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/612/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/610/
Test FAILed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@dguy @mjsax for reviews (this is still WIP):

  1. Follow the refactoring ideas in KAFKA-3452: Support session windows #2166 (review). Now the hierarchies can be viewed in RocksDBKeyValueStoreSupplier, RocksDBWindowStoreSupplier, RocksDBSessionStoreSupplier. Note that when caching or logging is enabled, all the underlying stores will be typed as <Bytes, byte[]> for performance optimizations.

  2. Added a few AbstractXXX classes to collapse common functions across some peer classes.

  3. Refactored Stores a bit to better aligned with the supplier class.

  4. Removed some redundant inner maps in ProcessorTopology which is only used ProcessorStateManager for flushing stores and remembering the changelog topic of the stores, instead let the ProcessorTopology to construct the full map from stores to their corresponding changelog topics if they are logging enabled within build(). It also allows to extract logging out of RocksDB.

Some more thoughts

  1. With WrapperStateStore, we can now call context.register(..) in the outer-most store and call its inner().put(...) in the restoreCallback instead of calling it in the inner-most store, and hence can further get rid of the root parameter.

  2. We can let users-customized aggregation / join operators to be per-store configurable so that we can also remove the TupleForwarder interface. I.e.:

StateStoreSupplier<...> mySupplier = new MyStateStoreSupplier(...)

// this means library will not add any wrappers for logging / caching / etc
// if users have caching mechanism in its only supplier, she needs to consider 
// flushing / eviction / forwarding herself.
stream.groupByKey(...).aggregate(..., supplier);   

// this means library will wrap user-provided underlying store engine as required
// users do not need to consider the above and our wrapper will do that without hacking forwarder.
StateStoreSupplier<...> myWrappedSupplier = Stores.create(storeName)
                .withKeys(keySerde)
                .withValues(aggValueSerde)
                .persistent()
                .enableCaching();
                .wrap(mySupplier);
                .build();

stream.groupByKey(...).aggregate(..., supplier);   

InMemoryWrapperFactory<K, V> disableLogging();
}

private static abstract class AbstractInMemoryFactory<K, V> implements InMemoryWrapperFactory<K, V> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is necessary? The only sub-class is InMemoryKeyValueFactory.

StateStoreSupplier build();
PersistentWrapperFactory<K, V> disableCaching();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be an inheritance hierarchy. This class seems like it should be a standalone class that is used to collect the decisions/conifg etc for logging and caching. It could be constructed and passed in to what are currently its subclasses.
With inheritance we end up with competing interfaces in the hierarchy, i.e., the same methods on PersistentWrapperFactory and PersistentKeyValueFactory

this.valueSerde = valueSerde;
}

@Override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of how the inheritance doesn't really work. It actually needs to return this. I think it would be much better to get rid of the inheritance and use delegation instead.

@dguy
Copy link
Copy Markdown
Contributor

dguy commented Jan 9, 2017

@guozhangwang - i've made a quick pass to try and understand where this is heading. I left a couple of comments, but i largely decided not to comment too much as this is a WIP and i wasn't sure what you are after.

Anyway, i think the refactoring of the StateStore hierarchy generally looks good. The introduction of the WrapperStateStore interface and the abstract class makes the code a lot tidier. I'm not so sure about AbstractSessionStore and AbstractWindowStore as they don't offer much value, and i have a tendency to avoid unnecessarily deep inheritance (well, i'd actually prefer no impl inheritance at all!).

In Stores i think the addition of PersistentSessionWindowedFactory and PersistentTimeWindowedFactory are good. But i'm not really getting why we need PersistentWrapperFactory and the abstract classes that implement it? It seems to me that we just need a class that captures whether or not the stores are logged/cached etc. This could just be a shared class among all the factories that is created and passed to the constructor of the appropriate factory.

@dguy
Copy link
Copy Markdown
Contributor

dguy commented Jan 9, 2017

Something i forgot to add. In ProcessorStateManager you have removed the below block of code in flush

final ProcessorNode processorNode = stateStoreProcessorNodeMap.get(store);
                if (processorNode != null) {
                    context.setCurrentNode(processorNode);
                }

It isn't clear to me where you intend for this logic to go now?

@dguy
Copy link
Copy Markdown
Contributor

dguy commented Jan 9, 2017

Another thing. Point 6 you mention that we can get rid of TupleForwarder, yet i still don't see how that is possible? i.e, a user can still provide a store that is not cached - in which case we would still need it.
Perhaps you are suggesting the Store or WrappedStore takes care of the forwarding?

@guozhangwang
Copy link
Copy Markdown
Contributor Author

About wappers in Stores: I think I agree. The original motivation was that we will let for example XXXStoreSupplier.build() to return XXXStore instead of not only StateStore, and hence in the DSL we will enforce Window/SessionStoreSupplier in aggregation / joins instead of just StateStoreSupplier, which actually allow users to pass a key-value store supplier that will only cause a RTE. But while implementing it I feel there maybe better ways to achieve this. So feel free to change.

About stateStoreProcessorNodeMap, my observation is that it is neither correct nor necessary: it links a store to the FIRST processor node that links to it, while multiple processors could link to this store, and it is used to set the currentProcessorNode in context when we call store.flush, but since we have already stored that information in dirtyEntry and always set/reset it in the flushing listener I feel it is not needed any more. Maybe I miss anything?

About TupleForwarder: my rationale is that, we can add some functions like wrap like I showed in the example to allow users to wrap their underlying storage engines with library-provided metering / logging / caching functionalities, AND if they don't do it, they either 1) do not really want caching or logging, which is fine; 2) they use their only logging / caching mechanism, in which case they should implement the loggingEnabled / etc functions to be false. Then in KStream/KTableXXXAggregateProcessor.process() implementation, we can decide if windowStore isInstanceOf CachingWindowStore to decide whether or not call context.foward() or not knowing that the caching store will execute this step itself; in this case we just won't do any coupling of dedupping with caching if user's customized store does that internally. And in the future, if we completely change KIP-63 to not coupling dedupping with caching but let user specifies triggers, then we can get rid of the windowStore isInstanceOf hacky logic as well.

@dguy BTW I think my refactoring is mingled with too many independent features and each of the above refactoring can be done in a single PR so we can make the reviewing process easier.

@dguy
Copy link
Copy Markdown
Contributor

dguy commented Jan 10, 2017

w.r.t the TupleForwarder bit - the reason it exists at the moment is to avoid duplicating the decision etc of whether context.forward() should or shouldn't be called. I don't think this is actually changed by what is suggested, but maybe i'm just not seeing it at the moment. Anyway, i'll have a look and see where we get to

@guozhangwang
Copy link
Copy Markdown
Contributor Author

w.r.t the TupleForwarder bit

I think you are right. As long as we do not have the full decision on which stores should always / never be caching enabled, we cannot drop it. I guess we can only revisit it if we consider reverting the unification of caching and dedupping.

@dguy
Copy link
Copy Markdown
Contributor

dguy commented Jan 11, 2017

@guozhangwang - i've been trying to work with this, but it is an uphill battle. There are changes in here that definitely need a KIP, i.e, the refactoring in Stores, so that should be done separately. So i'd need to revert all those changes. Then i am having put a bunch of effort in to understand where you are heading, get it to compile, get tests to pass etc. I feel like i've been spinning my wheels and not getting anywhere.
So, i can't continue with this PR. I think the best course of action is to do this in smaller PRs. I.e, first do the StateStore hierarchy refactor. I can start that now. We need a KIP for the changes to Stores, so that is going to need to wait. There is also the TopologyBuilder etc stuff you were doing.

asfgit pushed a commit that referenced this pull request Jan 13, 2017
This PR is extracted from #2333 as an incremental fix to ease the reviewing:

1. Removed `storeToProcessorNodeMap` from ProcessorTopology since it was previously used to set the context current record, and can now be replaced with the dirty entry in the named cache.

2. Replaced `sourceStoreToSourceTopic` from ProcessorTopology with `storeToChangelogTopic` map, which includes the corresponding changelog topic name for all stores that are changelog enabled.

3. Modified `ProcessorStateManager` to rely on `sourceStoreToSourceTopic` when retrieving the changelog topic; this makes the second parameter `loggingEnabled` in `register` not needed any more, and we can deprecate the old API with a new one.

4. Also fixed a minor issue in `KStreamBuilder`: if the storeName is not provided in the `table(..)` function, do not create the underlying materialized store. Modified the unit tests to cover this case.

5. Fixed a bunch of other unit tests failures that are exposed by this refactoring, in which we are not setting the applicationId correctly when constructing the mocking processor topology.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Ewen Cheslack-Postava

Closes #2338 from guozhangwang/KMinor-refactor-state-to-changelogtopic
@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/979/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/977/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/977/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/979/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/981/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/979/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/980/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1404/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1401/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1401/
Test FAILed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@dguy ready for review again. Some changes:

  1. Refactored RocksDBWindowStoreTest a bit to put common variables as class variables.
  2. Use the byte stores in RocksDBSessionStoreSupplier.

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1410/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1407/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1407/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. I left a couple of very minor nit picks.
Also, should we add tests for WrappedWindowStoreIterator and WrappedSessionStoreIterator ?

return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
}

public static Window extractWindow(final byte [] binaryKey) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the test. One nit: looks like this method doesn't need to be public.

@Override
public void remove(final Windowed<K> sessionKey) {
validateStoreOpen();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason for the extra line?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re One nit: looks like this method doesn't need to be public. Actually the test needs it to be package-private, and while on that, I feel we can either make it pure private and remove the corresponding test, or leave it public with the test if we speculate it to be used in the future.

I'd go with the second option if you do not have a strong opinion.

private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
private ThreadCache cache;
private final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tidy up of this test class :-)

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Also, should we add tests for WrappedWindowStoreIterator and WrappedSessionStoreIterator ?

I tried to add some tests, but while doing that I found all the logic is covered in corresponding unit tests of SessionKeySerde and WindowStoreUtils, so I decided to not introduce overlap coverage.

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1438/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1435/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1435/
Test PASSed (JDK 7 and Scala 2.10).

@Override
public void remove() {
throw new UnsupportedOperationException("remove not supported");
throw new UnsupportedOperationException("remove() is not supported in DelegatingPeekingKeyValueIterator.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "... DelegatingPeekingKeyValueIterator" -> "..." + getClass().getName()

void initInternal(final ProcessorContext context) {
private void initInternal(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
this.serdes = new StateSerdes<>(underlying.name(),
Copy link
Copy Markdown
Member

@mjsax mjsax Feb 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this
(we should avoid this whenever possible)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to leave it here for better code reading.

this.map = new TreeMap<>();
}

InMemoryKeyValueStore(String name) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add final (also all other methods below)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyValueStore interface does not mark all the parameters as final, and I think it is safer to keep it as is. As for this constructor, will remove it.


@Override
public void close() {
this.open = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do map.clear() here?


@Override
public void close() {
// do nothing
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May set iter = null to ensure this iterator is not use after closing anymore?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This iterator is always wrapped in DelegatingPeekingKeyValueIterator, which already does the isOpen validation.


@Override
public KeyValue<Long, byte[]> next() {
if (!bytesIterator.hasNext()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Should bytesIterator.next() not throw the same exception for this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the underlying iterator could be provided by users which does not validate in its own next() call, it's better doing such validation in the higher-level where the library controls.


@Override
public void remove() {
throw new UnsupportedOperationException("remove() is not supported in WrappedWindowStoreIterator.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

final KeyValueIterator<String, String> range = store.range(hello, "zooom");
assertThat(range.next(), equalTo(KeyValue.pair(hello, world)));
assertThat(range.next(), equalTo(KeyValue.pair(hi, there)));
assertFalse(range.hasNext());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused. If we change store.range(hello, "zooom"); should hasNext() not return true (start and end are inclusive). Thus, to me it seems the test is wrong and there must be a bug in the iterator code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch!

@Override
public Long peekNextKey() {
throw new UnsupportedOperationException("peekNextKey not supported in stub");
throw new UnsupportedOperationException("peekNextKey() not supported in ReadOnlyWindowStoreStub.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

public void setUp() {
cache = new ThreadCache("testCache", DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
baseDir = TestUtils.tempDirectory("test");
context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we initialize both vars directly and remove this method?

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1454/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1457/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Feb 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1454/
Test PASSed (JDK 7 and Scala 2.10).

store.put(hi, there);
store.put("zooom", "home");
final KeyValueIterator<String, String> range = store.range(hello, "zooom");
final KeyValueIterator<String, String> range = store.range(hello, hi);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still puzzled why the test did not fail with "zooom" as end range. Can you explain? \cc @dguy WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in the old code of InMemoryKeyValueStore, our range function is implemented as subMap(from, true, to, false) while I changed it to (from, true, to, true) in the latest commit.

Copy link
Copy Markdown
Member

@mjsax mjsax Feb 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Overlooked this change :)

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Feb 3, 2017

Thanks for the patch. LGTM

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Merged to trunk.

@asfgit asfgit closed this in 7ebc5da Feb 3, 2017
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
This PR is extracted from apache#2333 as an incremental fix to ease the reviewing:

1. Removed `storeToProcessorNodeMap` from ProcessorTopology since it was previously used to set the context current record, and can now be replaced with the dirty entry in the named cache.

2. Replaced `sourceStoreToSourceTopic` from ProcessorTopology with `storeToChangelogTopic` map, which includes the corresponding changelog topic name for all stores that are changelog enabled.

3. Modified `ProcessorStateManager` to rely on `sourceStoreToSourceTopic` when retrieving the changelog topic; this makes the second parameter `loggingEnabled` in `register` not needed any more, and we can deprecate the old API with a new one.

4. Also fixed a minor issue in `KStreamBuilder`: if the storeName is not provided in the `table(..)` function, do not create the underlying materialized store. Modified the unit tests to cover this case.

5. Fixed a bunch of other unit tests failures that are exposed by this refactoring, in which we are not setting the applicationId correctly when constructing the mocking processor topology.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Ewen Cheslack-Postava

Closes apache#2338 from guozhangwang/KMinor-refactor-state-to-changelogtopic
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
This is a refactoring follow-up of apache#2166. Main refactoring changes:

1. Extract `InMemoryKeyValueStore` out of `InMemoryKeyValueStoreSupplier` and remove its duplicates in test package.

2. Add two abstract classes `AbstractKeyValueIterator` and `AbstractKeyValueStore` to collapse common functional logics.

3. Added specialized `BytesXXStore` to accommodate cases where key value types are Bytes / byte[] so that we can save calling the dummy serdes.

4. Make the key type in `ThreadCache` from byte[] to Bytes, as SessionStore / WindowStore's result serialized bytes are in the form of Bytes anyways, so that we can save unnecessary `Bytes.get()` and `Bytes.wrap(bytes)`.

Each of these should arguably be a separate PR and I apologize for the mess, this is because this branch was extracted from a rather large diff that has multiple refactoring mingled together and dguy and myself have already put lots of efforts to break it down to a few separate PRs, and this is the only left-over work. Such PR won't happen in the future.

Ping dguy enothereska mjsax for reviews

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Jun Rao

Closes apache#2333 from guozhangwang/K3452-followup-state-store-refactor
@guozhangwang guozhangwang deleted the K3452-followup-state-store-refactor branch July 15, 2017 22:07
valeraBr pushed a commit to valeraBr/kafka that referenced this pull request May 2, 2024
This PR is extracted from apache/kafka#2333 as an incremental fix to ease the reviewing:

1. Removed `storeToProcessorNodeMap` from ProcessorTopology since it was previously used to set the context current record, and can now be replaced with the dirty entry in the named cache.

2. Replaced `sourceStoreToSourceTopic` from ProcessorTopology with `storeToChangelogTopic` map, which includes the corresponding changelog topic name for all stores that are changelog enabled.

3. Modified `ProcessorStateManager` to rely on `sourceStoreToSourceTopic` when retrieving the changelog topic; this makes the second parameter `loggingEnabled` in `register` not needed any more, and we can deprecate the old API with a new one.

4. Also fixed a minor issue in `KStreamBuilder`: if the storeName is not provided in the `table(..)` function, do not create the underlying materialized store. Modified the unit tests to cover this case.

5. Fixed a bunch of other unit tests failures that are exposed by this refactoring, in which we are not setting the applicationId correctly when constructing the mocking processor topology.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Ewen Cheslack-Postava

Closes #2338 from guozhangwang/KMinor-refactor-state-to-changelogtopic
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants