From 5ed6e7816a397f3f9b6ce41a2ffc2176522fda45 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 20 Apr 2023 15:03:57 -0700 Subject: [PATCH 1/2] docs for kip-914 --- docs/streams/core-concepts.html | 14 ++-- docs/streams/developer-guide/dsl-api.html | 68 +++++++++++++++++++ .../developer-guide/processor-api.html | 2 +- docs/streams/upgrade-guide.html | 5 +- docs/upgrade.html | 1 + 5 files changed, 82 insertions(+), 8 deletions(-) diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html index 884b39898ed9b..dec11905b45f6 100644 --- a/docs/streams/core-concepts.html +++ b/docs/streams/core-concepts.html @@ -309,7 +309,7 @@

Out-of-Order Handling

- Besides the guarantee that each record will be processed exactly-once, another issue that many stream processing application will face is how to + Besides the guarantee that each record will be processed exactly-once, another issue that many stream processing applications will face is how to handle out-of-order data that may impact their business logic. In Kafka Streams, there are two causes that could potentially result in out-of-order data arrivals with respect to their timestamps:

@@ -328,13 +328,17 @@

< for stateful operations such as aggregations and joins, however, out-of-order data could cause the processing logic to be incorrect. If users want to handle such out-of-order data, generally they need to allow their applications to wait for longer time while bookkeeping their states during the wait time, i.e. making trade-off decisions between latency, cost, and correctness. In Kafka Streams specifically, users can configure their window operators for windowed aggregations to achieve such trade-offs (details can be found in Developer Guide). - As for Joins, users have to be aware that some of the out-of-order data cannot be handled by increasing on latency and cost in Streams yet: + As for Joins, users may use versioned state stores to address concerns with out-of-order data, but out-of-order data will not be handled by default:

@@ -2261,6 +2265,9 @@

null value are interpreted as tombstones for the corresponding key, which indicate the deletion of the key from the table. Right-tombstones trigger the join, but left-tombstones don't: when an input tombstone is received, an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable). +
  • When joining versioned tables, + out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, + has already been processed, are ignored and do not trigger the join.
  • @@ -2308,6 +2315,9 @@

    null value are interpreted as tombstones for the corresponding key, which indicate the deletion of the key from the table. Tombstones may trigger joins, depending on the content in the left and right tables. When an input tombstone is received, an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable). +
  • When joining versioned tables, + out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, + has already been processed, are ignored and do not trigger the join.
  • @@ -2547,6 +2557,9 @@

    KTable-KTable Foreign-Key result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable). +
  • When joining versioned tables, + out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, + has already been processed, are ignored and do not trigger the join.
  • @@ -2607,6 +2620,9 @@
    KTable-KTable Foreign-Key result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable). +
  • When joining versioned tables, + out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, + has already been processed, are ignored and do not trigger the join.
  • @@ -2830,6 +2846,9 @@
    KTable-KTable Foreign-Key +
  • When the table is versioned, + the table record to join with is determined by performing a timestamped lookup, i.e., the table record which is joined will be the latest-by-timestamp record with timestamp + less than or equal to the stream record timestamp. If the stream record timestamp is older than the table's history retention, then the record is dropped.
  • See the semantics overview at the bottom of this section for a detailed description.

    @@ -2884,6 +2903,9 @@
    KTable-KTable Foreign-Key
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

  • +
  • When the table is versioned, + the table record to join with is determined by performing a timestamped lookup, i.e., the table record which is joined will be the latest-by-timestamp record with timestamp + less than or equal to the stream record timestamp. If the stream record timestamp is older than the table's history retention, then the record that is joined will be null.
  • See the semantics overview at the bottom of this section for a detailed description.

    @@ -3609,6 +3631,52 @@
    KTable-KTable Foreign-Key and KIP-328.

    +
    +

    Using timestamp-based semantics for table processors

    +

    By default, tables in Kafka Streams use offset-based semantics. When multiple records arrive for the same key, the one with the largest record offset + is considered the latest record for the key, and is the record that appears in aggregation and join results computed on the table. This is true even + in the event of out-of-order data. The record with the + largest offset is considered to be the latest record for the key, even if this record does not have the largest timestamp.

    +

    An alternative to offset-based semantics is timestamp-based semantics. With timestamp-based semantics, the record with the largest timestamp is + considered the latest record, even if there is another record with a larger offset (and smaller timestamp). If there is no out-of-order data (per key), + then offset-based semantics and timestamp-based semantics are equivalent; the difference only appears when there is out-of-order data.

    +

    Starting with Kafka Streams 3.5, Kafka Streams supports timestamp-based semantics through the use of + versioned state stores. + When a table is materialized with a versioned state store, it is a versioned table and will result in different processor semantics in the presence of + out-of-order data.

    +
      +
    • When performing a stream-table join, stream-side records will join with the latest-by-timestamp table record which has a timestamp less than or equal to + the stream record's timestamp. This is in contrast to joining a stream to an unversioned table, in which case the latest-by-offset table record will + be joined, even if the stream-side record is out-of-order and has a lower timestamp.
    • +
    • Aggregations computed on the table will include the latest-by-timestamp record for each key, instead of the latest-by-offset record. Out-of-order + updates (per key) will not trigger a new aggregation result. This is true for count + and reduce operations as well, in addition to + aggregate operations.
    • +
    • Table joins will use the latest-by-timestamp record for each key, instead of the latest-by-offset record. Out-of-order updates (per key) will not + trigger a new join result. This is true for both primary-key table-table joins and also foreign-key table-table joins. If a + versioned table is joined with an unversioned table, the result will be the join of the latest-by-timestamp record from the versioned table with + the latest-by-offset record from the unversioned table.
    • +
    • Table filter operations will no longer suppress consecutive tombstones, so users may observe more null + records downstream of the filter than compared to when filtering an unversioned table. This is done in order to preserve a complete version history downstream, + in the event of out-of-order data.
    • +
    • suppress operations are not allowed on versioned tables, as this would collapse the version history + and lead to undefined behavior.
    • +
    +

    Once a table is materialized with a versioned store, downstream tables are also considered versioned until any of the following occurs:

    +
      +
    • A downstream table is explicitly materialized, either with an unversioned store supplier or with no store supplier (all stores are unversioned by default, including the default store supplier)
    • +
    • Any stateful transformation occurs, including aggregations and joins
    • +
    • A table is converted to a stream and back.
    • +
    +

    The results of certain processors should not be materialized with versioned stores, as these processors do not produce a complete older version history, + and therefore materialization as a versioned table would lead to unpredictable results:

    +
      +
    • Aggregate processors, for both table and stream aggregations. This includes aggregate, + count and reduce operations.
    • +
    • Table-table join processors, including both primary-key and foreign-key joins.
    • +
    +

    For more on versioned stores and how to start using them in your application, see here.

    +

    Writing streams back to Kafka

    Any streams and tables may be (continuously) written back to a Kafka topic. As we will describe in more detail below, the output data might be diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index 586e55f6d0b1a..ccb03ce7b50c1 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -437,7 +437,7 @@

    advised as performance is expected to be worse than when using non-versioned stores.

    Versioned stores do not support caching or interactive queries at this time. - Also, window stores may not be versioned.

    + Also, window stores and global tables may not be versioned.

    Upgrade note: Versioned state stores are opt-in only; no automatic upgrades from non-versioned to versioned stores will take place.

    Upgrades are supported from persistent, non-versioned key-value stores diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b668ba5c150b6..16614db7f1538 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -139,14 +139,15 @@

    Streams API

    A new state store type, versioned key-value stores, was introduced in - KIP-889. + KIP-889 and + KIP-914. Rather than storing a single record version (value and timestamp) per key, versioned state stores may store multiple record versions per key. This allows versioned state stores to support timestamped retrieval operations to return the latest record (per key) as of a specified timestamp. For more information, including how to upgrade from a non-versioned key-value store to a versioned store in an existing application, see the - Developer Guide section. + Developer Guide. Versioned key-value stores are opt-in only; existing applications will not be affected upon upgrading to 3.5 without explicit code changes.

    diff --git a/docs/upgrade.html b/docs/upgrade.html index bec4cc07c5ffa..58dd1a8e12a13 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -35,6 +35,7 @@

    Notable changes in 3 for storing multiple record versions per key, thereby enabling timestamped retrieval operations to return the latest record (per key) as of a specified timestamp. See KIP-889 + and KIP-914 for more details.
  • MirrorMaker now uses incrementalAlterConfigs API by default to synchronize topic configurations instead of the deprecated alterConfigs API. From 513c1051cf5d8a401741bd477da90573aa2239b7 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Tue, 25 Apr 2023 15:06:24 -0700 Subject: [PATCH 2/2] review feedback --- docs/streams/core-concepts.html | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html index dec11905b45f6..d9a2851e2713f 100644 --- a/docs/streams/core-concepts.html +++ b/docs/streams/core-concepts.html @@ -332,8 +332,7 @@

    <

      -
    • For Stream-Stream joins, all three types (inner, outer, left) handle out-of-order records correctly, but the resulting stream may contain unnecessary leftRecord-null for left joins, and leftRecord-null or null-rightRecord for outer joins. - This behavior is the same regardless of whether versioned stores are used.
    • +
    • For Stream-Stream joins, all three types (inner, outer, left) handle out-of-order records correctly.
    • For Stream-Table joins, if not using versioned stores, then out-of-order records are not handled (i.e., Streams applications don't check for out-of-order records and just process all records in offset order), and hence it may produce unpredictable results. With versioned stores, stream-side out-of-order data will be properly handled by performing a timestamp-based lookup in the table. Table-side out-of-order data is still not handled.
    • For Table-Table joins, if not using versioned stores, then out-of-order records are not handled (i.e., Streams applications don't check for out-of-order records and just process all records in offset order).