Skip to content

KAFKA-14834: [8/N] Propagate isLatest as part of Change#13564

Merged
mjsax merged 3 commits intoapache:trunkfrom
vcrfxia:kip-914-boolean-with-change
Apr 14, 2023
Merged

KAFKA-14834: [8/N] Propagate isLatest as part of Change#13564
mjsax merged 3 commits intoapache:trunkfrom
vcrfxia:kip-914-boolean-with-change

Conversation

@vcrfxia
Copy link
Copy Markdown
Contributor

@vcrfxia vcrfxia commented Apr 13, 2023

This PR adds an additional boolean isLatest into Change which specifies whether the new value is the latest for its key. For un-versioned stores, isLatest is always true. For versioned stores, isLatest is true if the value has the latest timestamp seen for the key, else false. This boolean will be used by processors such as the table repartition map processor to determine when a record is out-of-order and should be dropped (when processing a versioned table). See KIP-914 for details.

This PR updates the table repartition map processor accordingly, and also adds test coverage for table filter.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence about updating the equals method here. I'd like for it to be updated in this way but because serialization does not depend on this new boolean, it could be the case that round-trip serialization causes equals() to no longer evaluate to true, which seems confusing.

Maybe it doesn't matter if the equals method isn't called anywhere (I couldn't find any usages). Curious to hear reviewer opinions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tricky one -- if we don't use it, maybe we should just delete it (including hashCode) and sidestep the question?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I should clarify -- there are test usages where Change values are compared, e.g., KTableFilterTest#doTestSkipNullOnMaterialization() (which was also modified as part of this PR), so including isLatest matters in these cases. I could not find any usages in production code for checking equality of Change objects, though.

If we remove this check from the equality check, we wouldn't be able to use helper methods such as MockApiProcessor#checkAndClearProcessResult() in order to validate records with Change value type, for tests where isLatest is relevant. Not the end of the world but annoying for sure.

The flipside would be if a user has code which checks for equality of Change objects but doesn't care about isLatest, and the comparison is thrown off as a result. I guess that shouldn't happen though, since the class is internal, in which case perhaps it's better to leave this as is?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Let's leave it as-is. -- Thanks for explaining.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some processors, including this one, guarantee that their timestamps are in non-decreasing order (per key), which means putReturnCode will always equal PUT_RETURN_CODE_IS_LATEST, so having these extra comparisons here is technically unnecessary, but it seems nice to have them anyway for consistency with other processors (and in case this guarantee ever changes, though it's unlikely). Happy to remove the redundancy if that's preferable, though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cf my comments above -- not 100% sure if keeping the code is better or worse; tend to prefer to remove it and just add a comment about it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid for a computed aggregation to be null? If it is, I think it's actually not guaranteed that aggregation result timestamps are always nondecreasing. If an out-of-order record arrives after the newAgg value is set to null, then the aggregation result will be re-initialized, and the timestamp will be the timestamp of the out-of-order record, which is earlier than the previous aggregation timestamp.

If that scenario is possible, then it's a bit unclear which of the two should be considered the "latest" aggregation value, and we should chose whether to keep the current code or to always set isLatest = true depending on what semantics we want.

If that scenario is not possible, i.e., if it is not valid for an aggregation value to be null, then the two are equivalent and I can update this code to clean it up per your suggestion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This two-arg constructor is currently still being called from places such as the windowed aggregate processors and caching state stores, since versioned tables do not come into play here and therefore all Changes are considered the latest. If we think it'd be better to encode true in these places and remove the two-arg constructor, I can do this cleanup in a follow-up PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tricky one -- if we don't use it, maybe we should just delete it (including hashCode) and sidestep the question?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need this variable?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically no since we could just use PUT_RETURN_CODE_VALID_TO_UNDEFINED directly instead. I just thought it might be confusing for readers to reason about why we set isLatest = putReturnCode == PUT_RETURN_CODE_VALID_TO_UNDEFINED; everywhere, especially since PUT_RETURN_CODE_VALID_TO_UNDEFINED is defined in VersionedKeyValueStore and appears to be specific to versioned stores on first glance, whereas KeyValueStoreWrapper#put() is more general (supports timestamped stores too). If you think this indirection is even more confusing, though, I can remove it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure -- it's internal code only so not a big deal and easy to change in the future if we want

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we compute newTimestamp as max, it should only move forward, even for out-of-order records, so I think this processor does not need an update?

It actually also implies, that we don't really "fix the history" for this case either (what I think is fine), but we can simplify this code (we might also want to call this out on the KIP?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do not attempt to "fix" older aggregation results. I can add a note in the KIP.

See my comment above about whether we can actually guarantee that result timestamps are nondecreasing: #13564 (comment) Same question applies for both table aggregations and stream aggregations.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putReturnCode == PUT_RETURN_CODE_IS_LATEST should always be true ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above -- I'll simplify this if it's correct to.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as KStreamAggregate.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we drop out-of-order for versioned stores, can we just pass in true as "is latest"?

Guess I am also ok to keep the code as-is -- might be more future prove in case we support "repairing history" in the future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought of an interesting edge case earlier, has to do with nulls. Doesn't apply to inner joins but suppose we have a left join:

B: (k, b, ts=3)
A: (k, a1, ts=1) --> emit join result (a1, b, ts=3)
B: (k, null, ts=4) --> emit join result (a1, null, ts=4)
A: (k, a2, ts=2) --> emit join result (a2, null, ts=2)

The reason that the last join result is emitted with timestamp 2 instead of 4 is because when the null is looked up as the latest value from the B store, there is no timestamp associated with it, and therefore the A record timestamp is used.

This example is interesting because the final join result (a2, null) is the "most recent" join result in the sense that it is the join of the latest record from the A side (a2) with the latest record from the B side (null), but it does not have the latest timestamp of the join result records (because the previous join result has timestamp 4). So, which join result should be considered the latest? It should probably be the last one, right? In that case it would be extra wrong to materialize the join result as a versioned store, since a versioned store would think that the second-to-last join result is the latest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to your question on this PR, yes isLatest will always be true in this join processor, but it's not necessarily the case in the join merge processor. It would not necessarily be true in the case where a user materialized the join result as a versioned store (even though they shouldn't, semantically), if they encounter this edge case with nulls.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this ever happen?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally responded above in #13564 (comment), instead of here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this before the if and use instead of !record.value().isLatest ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in #13615.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup: all other processors use queryableName != null -- not sure why it's reversed here. Kinda annoying.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in #13615.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we also need to change the caching layer to store the boolean isLatest flag? Otherwise, it just set it to false using the existing ChangeValue constructor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the default Change constructor? That sets isLatest to true, which is why it's fine to not touch anything in the caching layer at the moment -- caching is only ever enabled for unversioned stores, where we always have isLatest = true anyway.

You're right that this new functionality is not fully wired up, though. See #13564 (comment). I'm happy to finish the plumbing if we think it's valuable even though it's not in use today.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Yes, versioned stores always disable caching, so we are fine. Thanks for the reminder.

@mjsax mjsax force-pushed the kip-914-boolean-with-change branch from 8009e77 to 196199c Compare April 14, 2023 04:18
@mjsax mjsax added streams kip Requires or implements a KIP labels Apr 14, 2023
@mjsax mjsax merged commit a87edf1 into apache:trunk Apr 14, 2023
@vcrfxia vcrfxia deleted the kip-914-boolean-with-change branch April 14, 2023 04:52
@rishiraj88
Copy link
Copy Markdown

Thanks, @vcrfxia and @mjsax for insights.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants