Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,17 @@ public void process(String dummy, String line) {

@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");

System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();

while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");

System.out.println("[" + entry.key + ", " + entry.value + "]");

context.forward(entry.key, entry.value.toString());
context.forward(entry.key, entry.value.toString());
}
}

iter.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.util.Iterator;

class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {

Expand Down Expand Up @@ -76,15 +75,15 @@ public void process(K key, V1 value) {
long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);

Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo);
while (iter.hasNext()) {
needOuterJoin = false;
context().forward(key, joiner.apply(value, iter.next().value));
}
try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
context().forward(key, joiner.apply(value, iter.next().value));
}

if (needOuterJoin)
context().forward(key, joiner.apply(value, null));
if (needOuterJoin)
context().forward(key, joiner.apply(value, null));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.util.Iterator;
import java.util.Map;

public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
Expand Down Expand Up @@ -90,38 +89,37 @@ public void process(K key, V value) {
timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
}

WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
try (WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo)) {

// for each matching window, try to update the corresponding key and send to the downstream
while (iter.hasNext()) {
KeyValue<Long, T> entry = iter.next();
W window = matchedWindows.get(entry.key);
// for each matching window, try to update the corresponding key and send to the downstream
while (iter.hasNext()) {
KeyValue<Long, T> entry = iter.next();
W window = matchedWindows.get(entry.key);

if (window != null) {
if (window != null) {

T oldAgg = entry.value;
T oldAgg = entry.value;

if (oldAgg == null)
oldAgg = initializer.apply();
if (oldAgg == null)
oldAgg = initializer.apply();

// try to add the new new value (there will never be old value)
T newAgg = aggregator.apply(key, value, oldAgg);
// try to add the new new value (there will never be old value)
T newAgg = aggregator.apply(key, value, oldAgg);

// update the store with the new value
windowStore.put(key, newAgg, window.start());
// update the store with the new value
windowStore.put(key, newAgg, window.start());

// forward the aggregated change pair
if (sendOldValues)
context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
else
context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
// forward the aggregated change pair
if (sendOldValues)
context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
else
context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));

matchedWindows.remove(entry.key);
matchedWindows.remove(entry.key);
}
}
}

iter.close();

// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
T oldAgg = initializer.apply();
Expand Down Expand Up @@ -167,10 +165,9 @@ public T get(Windowed<K> windowedKey) {
W window = (W) windowedKey.window();

// this iterator should contain at most one element
Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());

return iter.hasNext() ? iter.next().value : null;
try (WindowStoreIterator<T> iter = windowStore.fetch(key, window.start(), window.start())) {
return iter.hasNext() ? iter.next().value : null;
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.util.Iterator;
import java.util.Map;

public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
Expand Down Expand Up @@ -88,40 +87,38 @@ public void process(K key, V value) {
timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
}

WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
try (WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo)) {
// for each matching window, try to update the corresponding key and send to the downstream
while (iter.hasNext()) {
KeyValue<Long, V> entry = iter.next();
W window = matchedWindows.get(entry.key);

// for each matching window, try to update the corresponding key and send to the downstream
while (iter.hasNext()) {
KeyValue<Long, V> entry = iter.next();
W window = matchedWindows.get(entry.key);
if (window != null) {

if (window != null) {
V oldAgg = entry.value;
V newAgg = oldAgg;

V oldAgg = entry.value;
V newAgg = oldAgg;
// try to add the new new value (there will never be old value)
if (newAgg == null) {
newAgg = value;
} else {
newAgg = reducer.apply(newAgg, value);
}

// try to add the new new value (there will never be old value)
if (newAgg == null) {
newAgg = value;
} else {
newAgg = reducer.apply(newAgg, value);
}

// update the store with the new value
windowStore.put(key, newAgg, window.start());
// update the store with the new value
windowStore.put(key, newAgg, window.start());

// forward the aggregated change pair
if (sendOldValues)
context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
else
context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
// forward the aggregated change pair
if (sendOldValues)
context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
else
context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));

matchedWindows.remove(entry.key);
matchedWindows.remove(entry.key);
}
}
}

iter.close();

// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
windowStore.put(key, value, windowStartMs);
Expand Down Expand Up @@ -161,10 +158,9 @@ public V get(Windowed<K> windowedKey) {
W window = (W) windowedKey.window();

// this iterator should only contain one element
Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());

return iter.next().value;
try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(), window.start())) {
return iter.next().value;
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
/**
* Iterator interface of {@link KeyValue}.
*
* Users need to call its {@code close} method explicitly upon completeness to release resources,
* or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
*
* @param <K> Type of keys
* @param <V> Type of values
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@

import org.apache.kafka.streams.KeyValue;

import java.io.Closeable;
import java.util.Iterator;

/**
* Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}.
*
* Users need to call its {@code close} method explicitly upon completeness to release resources,
* or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
*
* @param <E> Type of values
*/
public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> {
public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>>, Closeable {
Copy link
Copy Markdown
Member

@ijuma ijuma May 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should tell users that they should call close (as we did for KeyValueIterator).


@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,18 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private final String name;
private final String parentDir;

private final Options options;
private final WriteOptions wOptions;
private final FlushOptions fOptions;

protected File dbDir;
private StateSerdes<K, V> serdes;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;

private StateSerdes<K, V> serdes;
protected File dbDir;
private RocksDB db;

// the following option objects will be created at constructor and disposed at close()
private Options options;
private WriteOptions wOptions;
private FlushOptions fOptions;

private boolean loggingEnabled = false;
private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;

Expand Down Expand Up @@ -313,14 +314,16 @@ public void putAll(List<KeyValue<K, V>> entries) {
private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
WriteBatch batch = new WriteBatch();

for (KeyValue<byte[], byte[]> entry : entries) {
batch.put(entry.key, entry.value);
}

try {
for (KeyValue<byte[], byte[]> entry : entries) {
batch.put(entry.key, entry.value);
}

db.write(wOptions, batch);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
} finally {
batch.dispose();
}
}

Expand Down Expand Up @@ -425,7 +428,15 @@ public void flushInternal() {
@Override
public void close() {
flush();
options.dispose();
wOptions.dispose();
fOptions.dispose();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to null out these fields after you call dispose because using them after this will lead to undefined behaviour (a NPE is preferable in case of misuse).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed you nulled out db, but don't you need to do the same for the 3 options above?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think db is sufficient to trigger NPE upon unexpected usage after close, I have other objects defined final and would prefer not to change it; another way is to have an isClosed flag and check that flag on all operations.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not sufficient if one calls openDB which is a public method. isClosed could work too, but it seems like it would affect more code (unless we only checked openDB). I'm a fan of final fields (as you know), but maybe making them not final and adding a comment about the lifecycle makes it clearer in this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, will do.

db.close();

options = null;
wOptions = null;
fOptions = null;
db = null;
}

private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,11 @@ public void process(String key, String value) {
@Override
public void punctuate(long streamTime) {
int count = 0;
for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) {
iter.next();
++count;
try (KeyValueIterator<String, String> iter = store.all()) {
while (iter.hasNext()) {
iter.next();
++count;
}
}
context().forward(Long.toString(streamTime), count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,11 @@ public int checkForRestoredEntries(KeyValueStore<K, V> store) {
*/
public int sizeOf(KeyValueStore<K, V> store) {
int size = 0;
for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) {
iterator.next();
++size;
try (KeyValueIterator<K, V> iterator = store.all()) {
while (iterator.hasNext()) {
iterator.next();
++size;
}
}
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,10 @@ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySeria
segmentDirs(baseDir)
);

WindowStoreIterator iter = store.fetch(0, 0L, 1000000L);
while (iter.hasNext()) {
iter.next();
try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) {
while (iter.hasNext()) {
iter.next();
}
}

assertEquals(
Expand Down