Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadDirection;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
Expand Down Expand Up @@ -253,13 +254,14 @@ public V get(final K key) {

@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
return wrapped().range(from, to);
final K to,
final ReadDirection direction) {
return wrapped().range(from, to, direction);
}

@Override
public KeyValueIterator<K, V> all() {
return wrapped().all();
public KeyValueIterator<K, V> all(final ReadDirection direction) {
return wrapped().all(direction);
}

@Override
Expand Down Expand Up @@ -331,29 +333,32 @@ public V fetch(final K key,
@Deprecated
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(key, timeFrom, timeTo);
final long timeTo,
final ReadDirection direction) {
return wrapped().fetch(key, timeFrom, timeTo, direction);
}

@Override
@Deprecated
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(from, to, timeFrom, timeTo);
final long timeTo,
final ReadDirection direction) {
return wrapped().fetch(from, to, timeFrom, timeTo, direction);
}

@Override
public KeyValueIterator<Windowed<K>, V> all() {
return wrapped().all();
public KeyValueIterator<Windowed<K>, V> all(final ReadDirection direction) {
return wrapped().all(direction);
}

@Override
@Deprecated
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
final long timeTo,
final ReadDirection direction) {
return wrapped().fetchAll(timeFrom, timeTo, direction);
}
}

Expand Down Expand Up @@ -453,13 +458,14 @@ public V get(final K key) {

@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
return wrapped().range(from, to);
final K to,
final ReadDirection direction) {
return wrapped().range(from, to, direction);
}

@Override
public KeyValueIterator<K, V> all() {
return wrapped().all();
public KeyValueIterator<K, V> all(final ReadDirection direction) {
return wrapped().all(direction);
}

@Override
Expand Down Expand Up @@ -531,29 +537,32 @@ public V fetch(final K key,
@Override
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(key, timeFrom, timeTo);
final long timeTo,
final ReadDirection direction) {
return wrapped().fetch(key, timeFrom, timeTo, direction);
}

@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(from, to, timeFrom, timeTo);
final long timeTo,
final ReadDirection direction) {
return wrapped().fetch(from, to, timeFrom, timeTo, direction);
}

@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
final long timeTo,
final ReadDirection direction) {
return wrapped().fetchAll(timeFrom, timeTo, direction);
}

@Override
public KeyValueIterator<Windowed<K>, V> all() {
return wrapped().all();
public KeyValueIterator<Windowed<K>, V> all(final ReadDirection direction) {
return wrapped().all(direction);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

public enum ReadDirection {
FORWARD, BACKWARD
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public interface ReadOnlyKeyValueStore<K, V> {
* @throws NullPointerException If null is used for from or to.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<K, V> range(K from, K to);
default KeyValueIterator<K, V> range(K from, K to) {
return range(from, to, ReadDirection.FORWARD);
}

KeyValueIterator<K, V> range(K from, K to, ReadDirection direction);

/**
* Return an iterator over all keys in this store. This iterator must be closed after use.
Expand All @@ -62,7 +66,11 @@ public interface ReadOnlyKeyValueStore<K, V> {
* @return An iterator of all key/value pairs in the store.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<K, V> all();
default KeyValueIterator<K, V> all() {
return all(ReadDirection.FORWARD);
}

KeyValueIterator<K, V> all(ReadDirection direction);

/**
* Return an approximate count of key-value mappings in this store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,21 @@ public interface ReadOnlyWindowStore<K, V> {
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
*
* @param key the key to fetch
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @param key the key to fetch
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for key.
* @throws NullPointerException If {@code null} is used for key.
* @deprecated Use {@link #fetch(Object, Instant, Instant)} instead
*/
@Deprecated
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo, ReadDirection direction);

default WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
return fetch(key, timeFrom, timeTo, ReadDirection.FORWARD);
}


/**
* Get all the key-value pairs with the given key and the time range from all the existing windows.
Expand Down Expand Up @@ -104,32 +109,40 @@ public interface ReadOnlyWindowStore<K, V> {
* For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest
* available window to the newest/latest window.
*
* @param key the key to fetch
* @param from time range start (inclusive)
* @param to time range end (inclusive)
* @param key the key to fetch
* @param from time range start (inclusive)
* @param to time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
* @throws NullPointerException If {@code null} is used for key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException;
WindowStoreIterator<V> fetch(K key, Instant from, Instant to, ReadDirection direction) throws IllegalArgumentException;

default WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException {
return fetch(key, from, to, ReadDirection.FORWARD);
}

/**
* Get all the key-value pairs in the given key range and time range from all the existing windows.
* <p>
* This iterator must be closed after use.
*
* @param from the first key in the range
* @param to the last key in the range
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @param from the first key in the range
* @param to the last key in the range
* @param timeFrom time range start (inclusive)
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for any key.
* @throws NullPointerException If {@code null} is used for any key.
* @deprecated Use {@link #fetch(Object, Object, Instant, Instant)} instead
*/
@Deprecated
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo, ReadDirection direction);

default KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
return fetch(from, to, timeFrom, timeTo, ReadDirection.FORWARD);
}

/**
* Get all the key-value pairs in the given key range and time range from all the existing windows.
Expand All @@ -145,39 +158,58 @@ public interface ReadOnlyWindowStore<K, V> {
* @throws NullPointerException If {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime)
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime, ReadDirection direction)
throws IllegalArgumentException;

default KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime)
throws IllegalArgumentException {
return fetch(from, to, fromTime, toTime, ReadDirection.FORWARD);
}

/**
* Gets all the key-value pairs in the existing windows.
*
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<Windowed<K>, V> all();

* Gets all the key-value pairs in the existing windows.
*
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<Windowed<K>, V> all(ReadDirection direction);

default KeyValueIterator<Windowed<K>, V> all() {
return all(ReadDirection.FORWARD);
}


/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
* @param timeFrom the beginning of the time slot from which to search (inclusive)
* @param timeTo the end of the time slot from which to search (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key
* @throws NullPointerException if {@code null} is used for any key
* @deprecated Use {@link #fetchAll(Instant, Instant)} instead
*/
@Deprecated
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo, ReadDirection direction);

default KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
return fetchAll(timeFrom, timeTo, ReadDirection.FORWARD);
}

/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
* @param from the beginning of the time slot from which to search (inclusive)
* @param to the end of the time slot from which to search (inclusive)
* @param direction direction to read iterator results
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
* @throws NullPointerException if {@code null} is used for any key
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException;
KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to, ReadDirection direction) throws IllegalArgumentException;

default KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException {
return fetchAll(from, to, ReadDirection.FORWARD);
}
}
Loading