Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion vi
public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
wViewByRegion.region = key.value();
wViewByRegion.region = key.key();

RegionCount rCount = new RegionCount();
rCount.region = key.value();
rCount.region = key.key();
rCount.count = value;

return new KeyValue<>(wViewByRegion, rCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
keyNode.put("window-start", key.window().start())
.put("region", key.value());
.put("region", key.key());

ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
valueNode.put("count", value);
Expand Down
10 changes: 8 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,21 @@ public class KafkaStreams {
// usage only and should not be exposed to users at all.
private final UUID processId;

/**
* Construct the stream instance.
*
* @param builder the processor topology builder specifying the computational logic
* @param props properties for the {@link StreamsConfig}
*/
public KafkaStreams(TopologyBuilder builder, Properties props) {
this(builder, new StreamsConfig(props));
}

/**
* Construct the stream instance.
*
* @param builder The processor topology builder specifying the computational logic
* @param config The stream configs
* @param builder the processor topology builder specifying the computational logic
* @param config the stream configs
*/
public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
// create the metrics
Expand Down
17 changes: 17 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,31 @@
*/
public class KeyValue<K, V> {

/** The key of the key-value pair. */
public final K key;
/** The value of the key-value pair. */
public final V value;

/**
* Create a new key-value pair.
*
* @param key the key
* @param value the value
*/
public KeyValue(K key, V value) {
this.key = key;
this.value = value;
}

/**
* Create a new key-value pair.
*
* @param key the key
* @param value the value
* @param <K> the type of the key
* @param <V> the type of the value
* @return a new key value pair
*/
public static <K, V> KeyValue<K, V> pair(K key, V value) {
return new KeyValue<>(key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@
package org.apache.kafka.streams.kstream;

/**
* The Aggregator interface for aggregating values of the given key.
* The {@link Aggregator} interface for aggregating values of the given key.
*
* @param <K> key type
* @param <V> original value type
* @param <T> aggregate value type
*/
public interface Aggregator<K, V, T> {

/**
* Compute a new aggregate from the key and value of a record and the current aggregate of the same key.
*
* @param aggKey the key of the record
* @param value the value of the record
* @param aggregate the current aggregate value
* @return the new aggregate value
*/
T apply(K aggKey, V value, T aggregate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.kafka.streams.kstream;



/**
* The ForeachAction interface for performing an action on a key-value pair.
* The {@link ForeachAction} interface for performing an action on a key-value pair.
* Note that this action is stateless. If stateful processing is required, consider
* using {@link KStream#transform(TransformerSupplier, String...)} or
* {@link KStream#process(ProcessorSupplier, String...)} instead.
Expand All @@ -29,6 +28,13 @@
* @param <V> original value type
*/
public interface ForeachAction<K, V> {

/**
* Perform an action for each record of a stream.
*
* @param key the key of the record
* @param value the value of the record
*/
void apply(K key, V value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
package org.apache.kafka.streams.kstream;

/**
* The Initializer interface for creating an initial value in aggregations.
* The {@link Initializer} interface for creating an initial value in aggregations.
*
* @param <T> aggregate value type
*/
public interface Initializer<T> {

/**
* Return the initial value for an aggregation.
*
* @return the initial value for an aggregation
*/
T apply();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
*/
public class JoinWindows extends Windows<TimeWindow> {

/** Maximum time difference for tuples that are before the join tuple. */
public final long before;
/** Maximum time difference for tuples that are after the join tuple. */
public final long after;

private JoinWindows(String name, long before, long after) {
Expand All @@ -41,40 +43,41 @@ public static JoinWindows of(String name) {
}

/**
* Specifies that records of the same key are joinable if their timestamp stamps are within
* timeDifference.
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
*
* @param timeDifference join window interval in milliseconds
* @param timeDifference join window interval
*/
public JoinWindows within(long timeDifference) {
return new JoinWindows(this.name, timeDifference, timeDifference);
}

/**
* Specifies that records of the same key are joinable if their timestamp stamps are within
* Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream is
* earlier than or equal to the timestamp of a record from the first stream.
*
* @param timeDifference join window interval in milliseconds
* @param timeDifference join window interval
*/
public JoinWindows before(long timeDifference) {
return new JoinWindows(this.name, timeDifference, this.after);
}

/**
* Specifies that records of the same key are joinable if their timestamp stamps are within
* Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream
* is later than or equal to the timestamp of a record from the first stream.
*
* @param timeDifference join window interval in milliseconds
* @param timeDifference join window interval
*/
public JoinWindows after(long timeDifference) {
return new JoinWindows(this.name, this.before, timeDifference);
}

/**
* Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}.
*/
@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
// this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}

Expand All @@ -98,4 +101,4 @@ public int hashCode() {
return result;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.kafka.streams.processor.StreamPartitioner;

/**
* KStream is an abstraction of a <i>record stream</i> of key-value pairs.
* {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs.
*
* @param <K> Type of keys
* @param <V> Type of values
Expand Down Expand Up @@ -510,7 +510,7 @@ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
String name);

/**
* Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}.
* Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
*
* @param windows the specification of the aggregation {@link Windows}
* @param keySerde key serdes for materializing the counting table,
Expand All @@ -519,15 +519,15 @@ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);

/**
* Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}
* Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}
* with default serializers and deserializers.
*
* @param windows the specification of the aggregation {@link Windows}
*/
<W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);

/**
* Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}.
* Count number of records of this stream by key into a new instance of ever-updating {@link KTable}.
*
* @param keySerde key serdes for materializing the counting table,
* if not specified the default serdes defined in the configs will be used
Expand All @@ -536,7 +536,7 @@ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
KTable<K, Long> countByKey(Serde<K> keySerde, String name);

/**
* Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}
* Count number of records of this stream by key into a new instance of ever-updating {@link KTable}
* with default serializers and deserializers.
*
* @param name the name of the resulted {@link KTable}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
* {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
* for users to specify computational logic and translates the given logic to a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
*/
public class KStreamBuilder extends TopologyBuilder {

private final AtomicInteger index = new AtomicInteger(0);

/**
* Create a new {@link KStreamBuilder} instance.
*/
public KStreamBuilder() {
super();
}

/**
* Creates a {@link KStream} instance from the specified topics.
* Create a {@link KStream} instance from the specified topics.
* The default deserializers specified in the config are used.
*
* @param topics the topic names; must contain at least one topic name
Expand All @@ -50,7 +53,7 @@ public <K, V> KStream<K, V> stream(String... topics) {
}

/**
* Creates a {@link KStream} instance for the specified topics.
* Create a {@link KStream} instance for the specified topics.
*
* @param keySerde key serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
Expand All @@ -67,7 +70,7 @@ public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String.
}

/**
* Creates a {@link KTable} instance for the specified topic.
* Create a {@link KTable} instance for the specified topic.
* The default deserializers specified in the config are used.
*
* @param topic the topic name; cannot be null
Expand All @@ -77,7 +80,7 @@ public <K, V> KTable<K, V> table(String topic) {
}

/**
* Creates a {@link KTable} instance for the specified topic.
* Create a {@link KTable} instance for the specified topic.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
Expand All @@ -98,7 +101,7 @@ public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String to
}

/**
* Creates a new instance of {@link KStream} by merging the given streams
* Create a new instance of {@link KStream} by merging the given streams.
*
* @param streams the instances of {@link KStream} to be merged
*/
Expand Down
19 changes: 10 additions & 9 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.kafka.streams.processor.StreamPartitioner;

/**
* KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
Expand All @@ -39,7 +39,7 @@ public interface KTable<K, V> {
KTable<K, V> filter(Predicate<K, V> predicate);

/**
* Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate
* Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
*
* @param predicate the instance of {@link Predicate}
*/
Expand All @@ -55,15 +55,15 @@ public interface KTable<K, V> {


/**
* Print the elements of this stream to System.out
* Print the elements of this stream to {@code System.out}
*
* Implementors will need to override toString for keys and values that are not of
* type String, Integer etc to get meaningful information.
*/
void print();

/**
* Print the elements of this stream to System.out
* Print the elements of this stream to {@code System.out}
* @param keySerde key serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
* @param valSerde value serde used to send key-value pairs,
Expand All @@ -75,24 +75,25 @@ public interface KTable<K, V> {
void print(Serde<K> keySerde, Serde<V> valSerde);

/**
* Write the elements of this stream to a file at the given path.
* Write the elements of this stream to a file at the given path using default serializers and deserializers.
* @param filePath name of file to write to
*
* Implementors will need to override toString for keys and values that are not of
* type String, Integer etc to get meaningful information.
* Implementors will need to override {@code toString} for keys and values that are not of
* type {@link String}, {@link Integer} etc. to get meaningful information.
*/
void writeAsText(String filePath);

/**
* Write the elements of this stream to a file at the given path.
*
* @param filePath name of file to write to
* @param keySerde key serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
* @param valSerde value serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
*
* Implementors will need to override toString for keys and values that are not of
* type String, Integer etc to get meaningful information.
* Implementors will need to override {@code toString} for keys and values that are not of
* type {@link String}, {@link Integer} etc. to get meaningful information.
*/
void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);

Expand Down
Loading