-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-14834: [13/N] Docs updates for versioned store semantics #13622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,7 @@ | |
| </li> | ||
| <li><a class="reference internal" href="#naming-a-streams-app" id="id33">Naming Operators in a Streams DSL application</a></li> | ||
| <li><a class="reference internal" href="#controlling-emit-rate" id="id32">Controlling KTable update rate</a></li> | ||
| <li><a class="reference internal" href="#versioned-state-stores" id="id36">Using timestamp-based semantics for table processors</a></li> | ||
| <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li> | ||
| <li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li> | ||
| <li><a class="reference internal" href="#scala-dsl" id="id27">Kafka Streams DSL for Scala</a></li> | ||
|
|
@@ -2217,6 +2218,9 @@ <h4 class="anchor-heading"><a id="streams_concepts_globalktable" class="anchor-l | |
| <li>Input records with a <code class="docutils literal"><span class="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not | ||
| trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding | ||
| key actually exists already in the join result KTable).</li> | ||
| <li>When joining <a class="reference internal" href="#versioned-state-stores"><span class="std std-ref">versioned tables</span></a>, | ||
| out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, | ||
| has already been processed, are ignored and do not trigger the join.</li> | ||
| </ul> | ||
| </div></blockquote> | ||
| </li> | ||
|
|
@@ -2261,6 +2265,9 @@ <h4 class="anchor-heading"><a id="streams_concepts_globalktable" class="anchor-l | |
| <li>Input records with a <code class="docutils literal"><span class="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Right-tombstones trigger the join, | ||
| but left-tombstones don't: when an input tombstone is received, an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding | ||
| key actually exists already in the join result KTable).</li> | ||
| <li>When joining <a class="reference internal" href="#versioned-state-stores"><span class="std std-ref">versioned tables</span></a>, | ||
| out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, | ||
| has already been processed, are ignored and do not trigger the join.</li> | ||
| </ul> | ||
| </div></blockquote> | ||
| </li> | ||
|
|
@@ -2308,6 +2315,9 @@ <h4 class="anchor-heading"><a id="streams_concepts_globalktable" class="anchor-l | |
| <li>Input records with a <code class="docutils literal"><span class="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. Tombstones may trigger joins, | ||
| depending on the content in the left and right tables. When an input tombstone is received, an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding | ||
| key actually exists already in the join result KTable).</li> | ||
| <li>When joining <a class="reference internal" href="#versioned-state-stores"><span class="std std-ref">versioned tables</span></a>, | ||
| out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, | ||
| has already been processed, are ignored and do not trigger the join.</li> | ||
| </ul> | ||
| </div></blockquote> | ||
| </li> | ||
|
|
@@ -2547,6 +2557,9 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key | |
| result KTable if required (i.e. only if the | ||
| corresponding key actually exists already in | ||
| the join result KTable).</li> | ||
| <li>When joining <a class="reference internal" href="#versioned-state-stores"><span class="std std-ref">versioned tables</span></a>, | ||
| out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, | ||
| has already been processed, are ignored and do not trigger the join.</li> | ||
| </ul> | ||
| </div> | ||
| </blockquote> | ||
|
|
@@ -2607,6 +2620,9 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key | |
| result KTable if required (i.e. only if the | ||
| corresponding key actually exists already in | ||
| the join result KTable).</li> | ||
| <li>When joining <a class="reference internal" href="#versioned-state-stores"><span class="std std-ref">versioned tables</span></a>, | ||
| out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, | ||
| has already been processed, are ignored and do not trigger the join.</li> | ||
| </ul> | ||
| </div> | ||
| </blockquote> | ||
|
|
@@ -2830,6 +2846,9 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key | |
| </ul> | ||
| </div></blockquote> | ||
| </li> | ||
| <li>When the table is <a class="reference internal" href="#versioned-state-stores"><span class="std std-ref">versioned</span></a>, | ||
| the table record to join with is determined by performing a timestamped lookup, i.e., the table record which is joined will be the latest-by-timestamp record with timestamp | ||
| less than or equal to the stream record timestamp. If the stream record timestamp is older than the table's history retention, then the record is dropped.</li> | ||
| </ul> | ||
| <p class="last">See the semantics overview at the bottom of this section for a detailed description.</p> | ||
| </td> | ||
|
|
@@ -2884,6 +2903,9 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key | |
| <li><p class="first">For each input record on the left side that does not have any match on the right side, the <code class="docutils literal"><span class="pre">ValueJoiner</span></code> will be called with <code class="docutils literal"><span class="pre">ValueJoiner#apply(leftRecord.value,</span> <span class="pre">null)</span></code>; | ||
| this explains the row with timestamp=3 in the table below, which lists <code class="docutils literal"><span class="pre">[A,</span> <span class="pre">null]</span></code> in the LEFT JOIN column.</p> | ||
| </li> | ||
| <li>When the table is <a class="reference internal" href="#versioned-state-stores"><span class="std std-ref">versioned</span></a>, | ||
| the table record to join with is determined by performing a timestamped lookup, i.e., the table record which is joined will be the latest-by-timestamp record with timestamp | ||
| less than or equal to the stream record timestamp. If the stream record timestamp is older than the table's history retention, then the record that is joined will be <code class="docutils literal"><span class="pre">null</span></code>.</li> | ||
| </ul> | ||
| <p class="last">See the semantics overview at the bottom of this section for a detailed description.</p> | ||
| </td> | ||
|
|
@@ -3609,6 +3631,52 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key | |
| and <a href="https://cwiki.apache.org/confluence/x/sQU0BQ" title="KIP-328">KIP-328</a>. | ||
| </p> | ||
| </div> | ||
| <div class="section" id="versioned-state-stores"> | ||
| <h2><a class="headerlink" href="#versioned-state-stores" title="Permalink to this headline">Using timestamp-based semantics for table processors</a></h2> | ||
| <p>By default, tables in Kafka Streams use offset-based semantics. When multiple records arrive for the same key, the one with the largest record offset | ||
| is considered the latest record for the key, and is the record that appears in aggregation and join results computed on the table. This is true even | ||
| in the event of <a href="/{{version}}/documentation/streams/core-concepts.html#streams_out_of_ordering">out-of-order data</a>. The record with the | ||
| largest offset is considered to be the latest record for the key, even if this record does not have the largest timestamp.</p> | ||
| <p>An alternative to offset-based semantics is timestamp-based semantics. With timestamp-based semantics, the record with the largest timestamp is | ||
| considered the latest record, even if there is another record with a larger offset (and smaller timestamp). If there is no out-of-order data (per key), | ||
| then offset-based semantics and timestamp-based semantics are equivalent; the difference only appears when there is out-of-order data.</p> | ||
| <p>Starting with Kafka Streams 3.5, Kafka Streams supports timestamp-based semantics through the use of | ||
| <a href="/{{version}}/documentation/streams/developer-guide/processor-api.html#versioned-state-stores">versioned state stores</a>. | ||
| When a table is materialized with a versioned state store, it is a versioned table and will result in different processor semantics in the presence of | ||
| out-of-order data.</p> | ||
| <ul class="simple"> | ||
| <li>When performing a stream-table join, stream-side records will join with the latest-by-timestamp table record which has a timestamp less than or equal to | ||
| the stream record's timestamp. This is in contrast to joining a stream to an unversioned table, in which case the latest-by-offset table record will | ||
| be joined, even if the stream-side record is out-of-order and has a lower timestamp.</li> | ||
| <li>Aggregations computed on the table will include the latest-by-timestamp record for each key, instead of the latest-by-offset record. Out-of-order | ||
| updates (per key) will not trigger a new aggregation result. This is true for <code class="docutils literal"><span class="pre">count</span></code> | ||
| and <code class="docutils literal"><span class="pre">reduce</span></code> operations as well, in addition to | ||
| <code class="docutils literal"><span class="pre">aggregate</span></code> operations.</li> | ||
| <li>Table joins will use the latest-by-timestamp record for each key, instead of the latest-by-offset record. Out-of-order updates (per key) will not | ||
| trigger a new join result. This is true for both primary-key table-table joins and also foreign-key table-table joins. If a | ||
| versioned table is joined with an unversioned table, the result will be the join of the latest-by-timestamp record from the versioned table with | ||
| the latest-by-offset record from the unversioned table.</li> | ||
| <li>Table filter operations will no longer suppress consecutive tombstones, so users may observe more <code class="docutils literal"><span class="pre">null</span></code> | ||
| records downstream of the filter than compared to when filtering an unversioned table. This is done in order to preserve a complete version history downstream, | ||
| in the event of out-of-order data.</li> | ||
| <li><code class="docutils literal"><span class="pre">suppress</span></code> operations are not allowed on versioned tables, as this would collapse the version history | ||
| and lead to undefined behavior.</li> | ||
| </ul> | ||
| <p>Once a table is materialized with a versioned store, downstream tables are also considered versioned until any of the following occurs:</p> | ||
| <ul class="simple"> | ||
| <li>A downstream table is explicitly materialized, either with an unversioned store supplier or with no store supplier (all stores are unversioned by default, including the default store supplier)</li> | ||
| <li>Any stateful transformation occurs, including aggregations and joins</li> | ||
| <li>A table is converted to a stream and back.</li> | ||
| </ul> | ||
| <p>The results of certain processors should not be materialized with versioned stores, as these processors do not produce a complete older version history, | ||
| and therefore materialization as a versioned table would lead to unpredictable results:</p> | ||
| <ul class="simple"> | ||
| <li>Aggregate processors, for both table and stream aggregations. This includes <code class="docutils literal"><span class="pre">aggregate</span></code>, | ||
| <code class="docutils literal"><span class="pre">count</span></code> and <code class="docutils literal"><span class="pre">reduce</span></code> operations.</li> | ||
| <li>Table-table join processors, including both primary-key and foreign-key joins.</li> | ||
| </ul> | ||
| <p>For more on versioned stores and how to start using them in your application, see <a href="/{{version}}/documentation/streams/developer-guide/processor-api.html#versioned-state-stores">here</a>.</p> | ||
| </div> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we mention global stores, too?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The section that this line links to (processor API topic on versioned stores) mentions that global tables are not allowed to be materialized with versioned stores. I think since most of the nuances/details are only in the processor API topic, and the DSL API topic links there, it should be OK as is? |
||
| <div class="section" id="writing-streams-back-to-kafka"> | ||
| <span id="streams-developer-guide-dsl-destinations"></span><h2><a class="toc-backref" href="#id25">Writing streams back to Kafka</a><a class="headerlink" href="#writing-streams-back-to-kafka" title="Permalink to this headline"></a></h2> | ||
| <p>Any streams and tables may be (continuously) written back to a Kafka topic. As we will describe in more detail below, the output data might be | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this here, but not for the table joins above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the stream-table join performs timestamped-lookups; table-table joins drop out-of-order records but only ever call
get(key)and notget(key, asOfTimestamp).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Stream record.