Skip to content

KAFKA-20173: Ensure Metered kv-stores pass headers correctly#21768

Merged
mjsax merged 1 commit intoapache:trunkfrom
mjsax:kafka-20173-passing-headers-metered-stores
Mar 17, 2026
Merged

KAFKA-20173: Ensure Metered kv-stores pass headers correctly#21768
mjsax merged 1 commit intoapache:trunkfrom
mjsax:kafka-20173-passing-headers-metered-stores

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Mar 16, 2026

Ensures that all Metered KV-stores (plain, ts, headers, version) pass
headers into de/serializers.

Reviewers: Alieh Saeedi asaeedi@confluent.io, TengYao Chi
frankvicky@apache.org, Uladzislau Blok, blokv75@gmail.com, Bill
Bejeck bill@confluent.io

@mjsax mjsax added streams kip Requires or implements a KIP labels Mar 16, 2026
protected Sensor getSensor;
protected Sensor deleteSensor;
private Sensor putAllSensor;
protected Sensor putAllSensor;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need access in MeteredKeyValueTimestampStoreWithHeaders

if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueStoreIterator(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup: I think there is no reason to have MeteredKeyValueTimestampedIterator (removing blow), and we can just use existing MeteredKeyValueStoreIterator. -- We don't have the case to bridge between different value types in MeteredKeyValueStore.

Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
return new MeteredKeyValueIterator(wrapped().prefixScan(prefix, prefixKeySerializer), prefixScanSensor);
return new MeteredKeyValueStoreIterator(wrapped().prefixScan(prefix, prefixKeySerializer), prefixScanSensor);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renaming, to align to established naming patterns across the different kv-metered stores.

public ValueTimestampHeaders<V> get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
return maybeMeasureLatency(() -> deserializeValue(wrapped().get(serializeKey(key, new RecordHeaders()))), time, getSensor);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to overwrite get() to be able to pass in headers into serializeKey(...) -- For get() it might not be strictly necessary, but if forces us to make a conscious decision what to pass -- it's a side-effect of disallowing to call serializeKey w/o headers.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we would want to pass context.headers() instead of new RecordHeaders(), similar to what we did in #21684

If yes, applies elsewhere in this class, too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the same argument applies here: serdes should have access to record context.

Copy link
Copy Markdown
Contributor

@frankvicky frankvicky Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to context.headers()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for context.headers()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think context.headers() is better, but still have a question though
If we do it this way, effectively this is the same as


Can we just reuse it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG -- seems it only affect get() -- so yes, it's the same as serializeKey(key) but we make a conscious decision about it, what it desired.

I'll update it for delete can in the PR, too for now, but after #21639 is merged this will change (of if Frank's PR is merged first, it will go away after rebasing).

try {
final Headers headers = value != null ? value.headers() : new RecordHeaders();
maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers), serdes.rawValue(value, headers)), time, putSensor);
maybeMeasureLatency(() -> wrapped().put(serializeKey(key, headers), serdes.rawValue(value, headers)), time, putSensor);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unifying the existing keyBytes method with the naming of MeteredKeyValueStore which uses serializeKey

if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreWithHeadersIterator(
final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renaming to aling naming patterns.

Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
return new MeteredValueTimestampHeadersIterator(
return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renaming to align naming patterns


final KeyValue<Bytes, byte[]> keyValue = iter.next();
final ValueTimestampHeaders<V> valueTimestampHeaders = valueTimestampHeadersDeserializer.apply(keyValue.value);
final Headers headers = valueTimestampHeaders != null ? valueTimestampHeaders.headers() : new RecordHeaders();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can never be null. We know that iter.next cannot return null (this would be a "non existing row" -- null-values are deletes).

Thus simplifying the code (we don't have any null checks for this case in older code for plain- and ts-store either.

Similar below

protected Bytes keyBytes(final K key, final Headers headers) {
@Override
protected Bytes serializeKey(final K key) {
throw new UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders required to pass in Headers when serializing a key.");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually surfacing the missing overrides, highlighting the bug to not override delete. So a good guard to have in place (little bit annoying side effect that it forces us to override put/putAll, too, but I think it's worth the prices to pay)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we don't even need this. See my comment https://github.com/apache/kafka/pull/21768/changes#r2941312324

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is, to force us to make conscious decision of each serializeKey call, because every case is different. So we cannot accidentally call the non-header variant, and introduce a bug w/o noticing it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see now that in many places we propagate headers from ValueTimestampHeaders not from internal context (I don't know how I missed this 🤦‍♂️). In this case that makes sense to go with this guard to enforce dev to think about what to propagate


@Override
protected K deserializeKey(final byte[] rawKey) {
throw new UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders required to pass in Headers when deserializing a key.");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar guard -- this did not surface any issues.

Comment on lines -297 to +341
(KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreWithHeadersIterator(
(KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be a KeyValueIterator<K, ValueTimestampHeaders<V>> resultIterator instead of KeyValueIterator<K, ValueAndTimestamp<V>> ?
Why do we return just ValueAndTimestamp<V> as a query result?
UPD: I see that this is what iterator returns: https://github.com/apache/kafka/pull/21768/changes#diff-43268e867244a98e6614710d9f2a3be2c519345e1f81204b91414ee1db77c754R464 , but still why?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because of backward compatibility in the DSL. There is existing code, which uses IQ to access the state stores, and it expects to get back ValueAndTimestamp type. We change all the DSL code to use metered/caching/changelogging-header-stores and only keep the inner-most byte-store the default ts-store.

Thus, if users upgrade to 4.3, nothing should change for them, and thus we cannot just start to return ValueTimestampHeaders type -- this would break app even if they do not enable header stores in the DSL.

Does this make sense? -- It's known feature gap that IQ will not allow to query header with AK 4.3, and it's something we need a followup KIP to add.

Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks for the clarification.

public ValueTimestampHeaders<V> get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
return maybeMeasureLatency(() -> deserializeValue(wrapped().get(serializeKey(key, new RecordHeaders()))), time, getSensor);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think context.headers() is better, but still have a question though
If we do it this way, effectively this is the same as


Can we just reuse it?

protected Bytes keyBytes(final K key, final Headers headers) {
@Override
protected Bytes serializeKey(final K key) {
throw new UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders required to pass in Headers when serializing a key.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we don't even need this. See my comment https://github.com/apache/kafka/pull/21768/changes#r2941312324

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass @mjsax - I have one additional question

final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, ValueTimestampHeaders<V>> entry : from) {
final Headers headers = entry.value != null ? entry.value.headers() : new RecordHeaders();
byteEntries.add(KeyValue.pair(serializeKey(entry.key, headers), serializeValue(entry.value)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from put -> wrapped().put(serializeKey(key, headers), serdes.rawValue(value, headers))

But here it does serializeValue(entry.value) shouldn't this use the same as put?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's doesn't really matter, because for the header-case the passed in headers are ignored -- but yes, there is some code inconsistency... Let me unify it.

public ValueTimestampHeaders<V> delete(final K key) {
Objects.requireNonNull(key, "key cannot be null");
try {
return maybeMeasureLatency(() -> deserializeValue(wrapped().delete(serializeKey(key, new RecordHeaders()))), time, deleteSensor);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update new RecordHeaders() to context.headers() here and below based on comments up to this point.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it will be mute with Franks fix anyway: #21639


protected K deserializeKey(final byte[] rawKey) {
return rawKey != null ? serdes.keyFrom(rawKey, internalContext.headers()) : null;
return serdes.keyFrom(rawKey, internalContext.headers());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key can actually never be null -- this would be bug that we should not mask by a nul-check but surface directly

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax LGTM modulo fixing relevant failing tests

Ensures that all Metered KV-stores (plain, ts, headers, version)
pass headers into de/serializers.
@mjsax mjsax force-pushed the kafka-20173-passing-headers-metered-stores branch from 7831aa0 to 40439ac Compare March 17, 2026 20:58
@mjsax mjsax merged commit e952ccd into apache:trunk Mar 17, 2026
20 checks passed
@mjsax mjsax deleted the kafka-20173-passing-headers-metered-stores branch March 17, 2026 22:38
Shekharrajak pushed a commit to Shekharrajak/kafka that referenced this pull request Mar 31, 2026
…21768)

Ensures that all Metered KV-stores (plain, ts, headers, version) pass
headers into de/serializers.

Reviewers: Alieh Saeedi <asaeedi@confluent.io>, TengYao Chi
 <frankvicky@apache.org>, Uladzislau Blok, <blokv75@gmail.com>, Bill
 Bejeck <bill@confluent.io>
nileshkumar3 pushed a commit to nileshkumar3/kafka that referenced this pull request Apr 15, 2026
…21768)

Ensures that all Metered KV-stores (plain, ts, headers, version) pass
headers into de/serializers.

Reviewers: Alieh Saeedi <asaeedi@confluent.io>, TengYao Chi
 <frankvicky@apache.org>, Uladzislau Blok, <blokv75@gmail.com>, Bill
 Bejeck <bill@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants