Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public void process(final Record<K, Change<V>> record) {
throw new StreamsException("Record key for the grouping KTable should not be null.");
}

if (useVersionedSemantics && !record.value().isLatest) {
final boolean isLatest = record.value().isLatest;
if (useVersionedSemantics && !isLatest) {
// skip out-of-order records when aggregating a versioned table, since the
// aggregate should include latest-by-timestamp records only. as an optimization,
// do not forward the out-of-order record downstream to the repartition topic either.
Expand All @@ -154,8 +155,6 @@ public void process(final Record<K, Change<V>> record) {
final KeyValue<? extends K1, ? extends V1> oldPair = record.value().oldValue == null ? null :
mapper.apply(record.key(), record.value().oldValue);

final boolean isLatest = record.value().isLatest;

// if the selected repartition key or value is null, skip
// forward oldPair first, to be consistent with reduce and aggregate
final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ public void init(final ProcessorContext<K, Change<VOut>> context) {
public void process(final Record<K, Change<V>> record) {
final VOut newValue = valueTransformer.transform(record.key(), record.value().newValue);

if (queryableName == null) {
final VOut oldValue = sendOldValues ? valueTransformer.transform(record.key(), record.value().oldValue) : null;
context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest)));
} else {
if (queryableName != null) {
final VOut oldValue = sendOldValues ? getValueOrNull(store.get(record.key())) : null;
final long putReturnCode = store.put(record.key(), newValue, record.timestamp());
// if not put to store, do not forward downstream either
if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {
tupleForwarder.maybeForward(record.withValue(new Change<>(newValue, oldValue, putReturnCode == PUT_RETURN_CODE_IS_LATEST)));
}
} else {
final VOut oldValue = sendOldValues ? valueTransformer.transform(record.key(), record.value().oldValue) : null;
context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;

import java.util.List;
Expand All @@ -36,6 +37,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
Copy link
Copy Markdown
Contributor Author

@vcrfxia vcrfxia Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went through and added this additional javadoc line to the methods where it seemed to be missing (as requested in #13188 (comment)). Looks like the usage throughout the codebase is very inconsistent though:

  • RocksDBStore enforces it on all methods even though it wasn't documented in KeyValueStore. The in-memory implementations do not enforce it.
  • WindowStore has docs about this but the actual implementations do not enforce it.
  • SessionStore neither has the annotations nor implements it.
  • VersionedKeyValueStore has docs but the implementation was not enforcing it. I've reconciled this inconsistency in this PR, but the others are larger changes that I'd like to leave for later.

*/
void put(K key, V value);

Expand All @@ -47,6 +49,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @return The old value or {@code null} if there is no such key.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
V putIfAbsent(K key, V value);

Expand All @@ -56,6 +59,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* @param entries A list of entries to put into the store;
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
void putAll(List<KeyValue<K, V>> entries);

Expand All @@ -65,6 +69,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* @param key The key
* @return The old value or {@code null} if there is no such key.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
V delete(K key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ default KeyValueIterator<K, V> reverseAll() {
* @param <PS> Prefix Serializer type
* @param <P> Prefix Type.
* @return The iterator for keys having the specified prefix.
* @throws InvalidStateStoreException if the store is not initialized
*/
default <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public interface VersionedKeyValueStore<K, V> extends StateStore {
* validTo timestamp is undefined. {@code Long.MIN_VALUE} indicates that the record
* was not put, due to grace period having been exceeded.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
long put(K key, V value, long timestamp);

Expand Down Expand Up @@ -96,6 +97,7 @@ public interface VersionedKeyValueStore<K, V> extends StateStore {
* returned {@link VersionedRecord} may be smaller than the provided deletion
* timestamp.
* @throws NullPointerException If {@code null} is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
VersionedRecord<V> delete(K key, long timestamp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @param key The key to associate the value to
* @param value The value; can be null
* @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if the given key is {@code null}
*/
void put(K key, V value, long windowStartTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ private class MeteredVersionedKeyValueStoreInternal
extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {

private final VersionedBytesStore inner;
private final Serde<V> rawValueSerde;
private StateSerdes<K, V> rawValueSerdes;
private final Serde<V> plainValueSerde;
private StateSerdes<K, V> plainValueSerdes;

MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
final String metricScope,
Expand All @@ -104,13 +104,13 @@ private class MeteredVersionedKeyValueStoreInternal
: new ValueAndTimestampSerde<>(valueSerde)
);
this.inner = inner;
this.rawValueSerde = valueSerde;
this.plainValueSerde = valueSerde;
}

public long put(final K key, final V value, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
final long validTo = maybeMeasureLatency(() -> inner.put(keyBytes(key), rawValueSerdes.rawValue(value), timestamp), time, putSensor);
final long validTo = maybeMeasureLatency(() -> inner.put(keyBytes(key), plainValueSerdes.rawValue(value), timestamp), time, putSensor);
maybeRecordE2ELatency();
return validTo;
} catch (final ProcessorStateException e) {
Expand Down Expand Up @@ -178,10 +178,10 @@ protected void initStoreSerde(final ProcessorContext context) {
// additionally init raw value serde
final String storeName = super.name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
rawValueSerdes = new StateSerdes<>(
plainValueSerdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(rawValueSerde, new SerdeGetter(context))
prepareValueSerde(plainValueSerde, new SerdeGetter(context))
);
}

Expand All @@ -192,10 +192,10 @@ protected void initStoreSerde(final StateStoreContext context) {
// additionally init raw value serde
final String storeName = super.name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
rawValueSerdes = new StateSerdes<>(
plainValueSerdes = new StateSerdes<>(
changelogTopic,
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(rawValueSerde, new SerdeGetter(context))
prepareValueSerde(plainValueSerde, new SerdeGetter(context))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte

@Override
public long put(final Bytes key, final byte[] value, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

if (timestamp < observedStreamTime - gracePeriod) {
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
Expand All @@ -135,6 +139,9 @@ public long put(final Bytes key, final byte[] value, final long timestamp) {

@Override
public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

if (timestamp < observedStreamTime - gracePeriod) {
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
LOG.warn("Skipping record for expired delete.");
Expand All @@ -157,6 +164,9 @@ public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {

@Override
public VersionedRecord<byte[]> get(final Bytes key) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

// latest value (if present) is guaranteed to be in the latest value store
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) {
Expand All @@ -171,6 +181,8 @@ public VersionedRecord<byte[]> get(final Bytes key) {

@Override
public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();

if (asOfTimestamp < observedStreamTime - historyRetention) {
// history retention exceeded. we still check the latest value store in case the
Expand Down Expand Up @@ -373,6 +385,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
}
}

private void validateStoreOpen() {
if (!open) {
throw new InvalidStateStoreException("Store " + name + " is currently closed");
}
}

/**
* Generic interface for segment stores. See {@link VersionedStoreClient} for use.
*/
Expand Down Expand Up @@ -847,7 +865,6 @@ private <T extends VersionedStoreSegment> long finishPut(
segment.put(key, segmentValue.serialize());
}
}
return foundTs;
} else {
// insert into segment corresponding to foundTs, as foundTs represents the validTo
// timestamp of the current put.
Expand Down Expand Up @@ -891,8 +908,8 @@ private <T extends VersionedStoreSegment> long finishPut(
segment.put(key, segmentValue.serialize());
}
}
return foundTs;
}
return foundTs;
}

/**
Expand Down
Loading