Skip to content

KAFKA-13785: [6/N][Emit final] emit final for TimeWindowedKStreamImpl#11896

Closed
lihaosky wants to merge 45 commits intoapache:trunkfrom
lihaosky:time-window-emit-final
Closed

KAFKA-13785: [6/N][Emit final] emit final for TimeWindowedKStreamImpl#11896
lihaosky wants to merge 45 commits intoapache:trunkfrom
lihaosky:time-window-emit-final

Conversation

@lihaosky
Copy link
Copy Markdown
Contributor

@lihaosky lihaosky commented Mar 15, 2022

Description

Initial implementation to emit final for TimeWindowedKStreamImpl. This PR is on top of #12030

Testing

Unit test and integration test

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass; we need to rebase this branch a bit and after that I will review again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be updated?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should hasIndex be true here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems windowStore in window aggregate is only used by windowStore.fetch(key, window.start());, so I don't need to use index as fetching from time ordered schema is also a point get.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class's changes should be reverted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It was previously on topic another change

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would it be better named emitWindowStart?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a suggestion: we are sending null with the guarantee that we should have never forward for this key before. I think a good test case coverage would be to have a windowed aggregation emit final, followed by a join. The join results would need both the old/new values to be able to correct, and if emit final we should only emit once, with old value setting to null.

Copy link
Copy Markdown
Contributor Author

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang . Yeah. I'll need to rebase and add unit tests. Merging this needs #11829 which is also in review. I'll rebase this on top of #11829 and add unit test. I can start perf testing first before this can be merged.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It was previously on topic another change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems windowStore in window aggregate is only used by windowStore.fetch(key, window.start());, so I don't need to use index as fetching from time ordered schema is also a point get.

@mjsax mjsax added streams kip Requires or implements a KIP labels Mar 31, 2022
@lihaosky lihaosky force-pushed the time-window-emit-final branch from 8c40ed1 to 2f4550b Compare March 31, 2022 06:50
@lihaosky lihaosky changed the title [Emit final][5/N] emit final for TimeWindowedKStreamImpl KAFKA-13785: [Emit final][5/N] emit final for TimeWindowedKStreamImpl Mar 31, 2022
@lihaosky lihaosky force-pushed the time-window-emit-final branch from 2f4550b to 39ed91b Compare April 1, 2022 22:10
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang , should we call this another name to avoid overriding existing emit eager's store?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what scenario should it happen that we have two stores with the same name? (I believe there is no problem and the code is fine, but maybe I miss something?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user just enables emit final for their existing topology which uses emit eager, will it cause it to use existing state store which has wrong data format etc?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to declare this a non-supported pattern? We should call it out in the docs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I got to this late --- also sgtm.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add details what "closes" means, ie, when a window is closed?

Should we also mention caching? Or would this go into the weeds?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I feel this is not related to caching.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if it's used anyway?

(nit: missing . at the end of the sentence)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update with details.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might debug level be better?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this won't be called many times and flood the log while this is helpful information. Will this be printed somewhere else (i.e. topology)?

Copy link
Copy Markdown
Member

@mjsax mjsax Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this is helpful information

Well, is it? To me, it sounds like as if a filter logs: "you are using a filter". -- The concern is not about spamming the logs, but it seems not to be helpful but just noise in the logs. -- I am also ok to leave it in.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is filter logged in the topology? This isn't logged in topology though :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to use debug level?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to previous comment, I feel this won't be called many time (just once when creating the processor?). And the config won't be printed out when we print all configs since this is an internal config. This will be convenient for debugging.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about spamming but it's a question if it's useful? Also seems to leak an internal config that we might want to keep hidden?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it here since we don't log it since it's internal config... I'm ok to delete this though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we do the same thing for stream-stream join? I don't think we did? Might be worth to do there, too? (Not part of this PR...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream-stream join didn't do this. Created https://issues.apache.org/jira/browse/KAFKA-13817 to track

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this record be skipped? You define a hopping window, and while window [0,10) is closed, window [5,15) is still open and the record should still go into this second window?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It does go to second window. Will update the comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output might be easier to read/understand if we use unique values, 1 to 7 for the seven input records.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the order change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get your question. The fetch for emit final is based on windowStart order.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we get one more output record?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added one more input in processData: inputTopic.pipeInput("2", "30", 1000L);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does [-5,5) exists?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really 🥲 . Will update

@lihaosky lihaosky changed the title KAFKA-13785: [Emit final][5/N] emit final for TimeWindowedKStreamImpl KAFKA-13785: [6/N][Emit final] emit final for TimeWindowedKStreamImpl Apr 11, 2022
@lihaosky lihaosky force-pushed the time-window-emit-final branch from 6ad18b5 to e1ce13d Compare April 11, 2022 19:15
@lihaosky lihaosky force-pushed the time-window-emit-final branch from 4543a23 to 36b789d Compare April 15, 2022 19:10
cmccabe and others added 6 commits April 19, 2022 13:17
…pache#12063)

Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a
broker configuration is changed. This is backwards. This function must be called only for broker 
configs, and never for topic configs or cluster configs.

The second bug is that there were several configurations such as max.connections which are related
to broker listeners, but which do not involve changing the registered listeners. We should support
these configurations in KRaft. This PR fixes the configuration change validation to support this case.

Reviewers: Jason Gustafson <jason@confluent.io>, Matthew de Detrich <mdedetrich@gmail.com>
…12004)


Reviewers: Mickael Maison <mickael.maison@gmail.com>
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>
Improve documentation for Kafka zero-copy. Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer.

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
…pache#12030)

A new cache for RocksDBTimeOrderedWindowStore. Need this because RocksDBTimeOrderedWindowStore's key ordering is different from CachingWindowStore which has issues for MergedSortedCacheWindowStoreIterator

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
…ation of internal topic names (apache#11703)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
mjsax and others added 18 commits April 20, 2022 14:39
…pache#12074)

Reviewers: Victoria Xia <victoria.xia@confluent.io>, David Arthur <mumrah@gmail.com>
In drainBatchesForOneNode method, there's possibility causing some partitions in a node will never get picked. Fix this issue by maintaining a drainIndex for each node.

Reviewers: Luke Chen <showuon@gmail.com>, RivenSun <91005273+RivenSun2@users.noreply.github.com>
* Fix UP-TO-DATE check in `create*VersionFile` tasks

`create*VersionFile` tasks explicitly declared output UP-TO-DATE status
 as being false. This change properly sets the inputs to
`create*VersionFile` tasks to the `commitId` and `version` values and
sets `receiptFile` locally rather than in an extra property.

* Enable output caching for `process*Messages` tasks

`process*Messages` tasks did not have output caching enabled. This
change enables that caching, as well as setting a property name and
RELATIVE path sensitivity.

* Fix existing Gradle deprecations

Replaces `JavaExec#main` with `JavaExec#mainClass`

Replaces `Report#destination` with `Report#outputLocation`

Adds a `generator` configuration to projects that need to resolve
the `generator` project (rather than referencing the runtimeClasspath
of the `generator` project from other project contexts.

Reviewers: Mickael Maison <mickael.maison@gmail.com>

When LeaderRecoveryState was added to the PartitionChangeRecord, the
check for being a noop was not updated. This commit fixes that and
improves the associated test to avoid this oversight in the future.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
…rs in KRaft mode (apache#12075)

This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically,
if we had a broker registration in the metadata log, but no associated heartbeat, previously the
HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the
metadata log replay path in ClusterControlManager.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
This patch does some initial cleanups in the context of KAFKA-13790. Mainly, it renames `ZkVersion` field to `PartitionEpoch` in the `LeaderAndIsrRequest`, the `LeaderAndIsr` and the `Partition`.

Reviewers: Jason Gustafson <jason@confluent.io>, dengziming <dengziming1993@gmail.com>
The wildcard * in command without wrapped by single quote will be replaced into the file name under the current folder by bash. So we need to wrap with single quote. Update the doc and command option description.

Reviewers: dengziming <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
Using enums instead of Strings for auto.offset.reset configuration

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Luke Chen <showuon@gmail.com
…pache#12032)

Currently we validate recovery state before checking leader epoch in `KafkaController`. It seems more intuitive to validate leader epoch first since the leader might be working with stale state, which is what we do in KRaft. This patch fixes this and adds a couple additional validations to make the behavior consistent. 

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
This patch refactors kafka.cluster.Replica, it usages and tests. This is part of the work in KAFKA-13790.

Reviewers: Jason Gustafson <jason@confluent.io>
Adding KRaft and ZK params to ConfigCommandIntegrationTest wherever appropriate.

Reviewers: Kvicii <42023367+Kvicii@users.noreply.github.com>, dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
…2064)

The bug was introduced in apache#11689 that an additional onAcknowledgement was made using the InterceptorCallback class. This is undesirable since onSendError will attempt to call onAcknowledgement once more.

Reviewers: Jun Rao <junrao@gmail.com>
)

Since we have changed the `AlterIsr` API to `AlterPartition`, it makes sense to rename `AlterIsrManager` as well and some of the associated classes.

Reviewers: dengziming <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
The html document generation has some errors in it, specifically related to protocols. The two issues identified and resolved are:

* Missing </tbody> closing tags added
* Invalid usage of a <p> tag as a wrapper element for <table> elements. Changed the <p> tag to be a <div>.

Tested by running ./gradlew siteDocsTar and observing that the output was properly formed.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I incorporated the latest comments and pushed a new commit but it seems I messed github with other commits from trunk as well..

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rephrase the comment a bit to make it clearer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I got to this late --- also sgtm.

}
}

final long startNs = time.nanoseconds();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do intend to use nanoseconds instead of milliseconds, then we should name the metrics name as "...-latency-ns" and also in the description to emphasize it is measured in nanos, since by default all latency are measured in millis across AK package unless otherwise explicitly named / described.

Personally I think it's sufficient to measure in milis. WDYT @lihaosky @mjsax ?

private static final String EMITTED_RECORDS_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

private static final String EMIT_FINAL_LATENCY = "window-aggregate-final-emit" + LATENCY_SUFFIX;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other comment --- if we measure in nanos we'd need to explicitly add that in the name and in the description.

RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

private static final String EMIT_FINAL_LATENCY = "window-aggregate-final-emit" + LATENCY_SUFFIX;
private static final String EMIT_FINAL_DESCRIPTION = "calls to emit final";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace with EMITTED_RECORDS + LATENCY_SUFFIX.


builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()))
.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all these transforms can be replaced as process since there's no resulted stream to continue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: never mind, I saw that you've done so in the other PR...

public void shouldSkipNonExistBaseKeyInCache() {
cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);

final SegmentedCacheFunction baseCacheFunction = new SegmentedCacheFunction(new TimeFirstWindowKeySchema(), SEGMENT_INTERVAL);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

@guozhangwang
Copy link
Copy Markdown
Contributor

Sorry about that @lihaosky , I just created a copy PR from the branch in order to merge sooner: #12100

@mjsax could you take a look at that PR.

guozhangwang added a commit that referenced this pull request May 3, 2022
…amImpl (#12100)

This is a copy PR of #11896, authored by @lihaosky (Hao Li): Initial implementation to emit final for TimeWindowedKStreamImpl. This PR is on top of #12030 

Author: Hao Li
Reviewers: John Roesler <vvcephei@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.