Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8e6fc2d
Remove all usages of StreamSpec and ApplicationRunner from the operat…
prateekm Jun 9, 2018
7d7aa50
Updated with Cameron and Daniel's feedback.
Jun 13, 2018
8561930
Merge branch 'master' into stream-spec-cleanup
prateekm Jun 15, 2018
7e71dc7
Merge with master
Jun 27, 2018
f1cb8f0
Merge branch 'master' into single-app-api-May-21-18
nickpan47 Jul 10, 2018
d43e923
WIP: proto-type with ApplicationRunnable and no ApplicationRunner exp…
nickpan47 Jul 11, 2018
42782d8
Merge branch 'prateek-remove-app-runner-stream-spec' into app-spec-wi…
nickpan47 Jul 17, 2018
95577b7
WIP: trying to figure out the two interface classes for spec: a) spec…
nickpan47 Jul 18, 2018
30a4e5f
WIP: application runner refactor - proto-type for SEP-13
nickpan47 Jul 23, 2018
fb1bc49
Merge branch 'master' into app-spec-with-app-runtime-Jul-16-18
nickpan47 Jul 30, 2018
973eb52
WIP: compiles, still working on LocalContainerRunner refactor
nickpan47 Aug 3, 2018
f20cdcd
WIP: adding unit tests. Pending update on StreamProcessorLifecycleLis…
nickpan47 Aug 6, 2018
c4bb0dc
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 6, 2018
a82708b
SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for hig…
nickpan47 Aug 13, 2018
0143371
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 13, 2018
db96da8
SAMZA-1789: WIP - revision to address review feedbacks.
Aug 17, 2018
3b2f04d
SAMZA-1789: moved all impl classes from samza-api to samza-core.
Aug 19, 2018
4382d45
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 19, 2018
6e446fe
SAMZA-1789: address Cameron's review comments.
nickpan47 Aug 23, 2018
1621c4d
SAMZA-1789: a few more fixes to address Cameron's reviews
nickpan47 Aug 23, 2018
247dcff
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 23, 2018
055bd91
SAMZA-1789: fix unit test with ThreadJobFactory
nickpan47 Aug 23, 2018
8d4d3ff
Merge with master
nickpan47 Aug 24, 2018
e7af693
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 24, 2018
a072118
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 27, 2018
12c09af
SAMZA-1789: Fix a merging error (with SAMZA-1813)
nickpan47 Aug 27, 2018
33753f7
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 29, 2018
f04404c
SAMZA-1789: move createStreams out of the loop in prepareJobs
nickpan47 Aug 29, 2018
f2969f8
SAMZA-1789: fixed ApplicationDescriptor to use InputDescriptor and Ou…
nickpan47 Aug 31, 2018
f4b3d43
SAMZA-1789: Fxing TaskApplication examples and some checkstyle errors
nickpan47 Aug 31, 2018
9997b98
SAMZA-1789: renamed all ApplicationDescriptor classes with full-spell…
nickpan47 Aug 31, 2018
7a73992
SAMZA-1789: fixing checkstyle and javadoc errors
nickpan47 Aug 31, 2018
222abf2
SAMZA-1789: added a constructor to StreamProcessor to take a StreamPr…
nickpan47 Aug 31, 2018
02076c8
SAMZA-1789: fixed the modifier for the mandatory constructor of Appli…
nickpan47 Aug 31, 2018
34ffda8
SAMZA-1789: disabling tests due to SAMZA-1836
nickpan47 Aug 31, 2018
91fcd73
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 31, 2018
9c89c63
Merge branch 'master' into app-runtime-with-processor-callbacks
nickpan47 Aug 31, 2018
ec4bb1d
SAMZA-1789: merge with fix for SAMZA-1836
Sep 1, 2018
66af5b7
SAMZA-1789: addressing Cameron's review comments.
nickpan47 Sep 5, 2018
16bef1b
SAMZA-1814: WIP fixing the task application configuration generation …
nickpan47 Sep 7, 2018
9d56464
Merge branch 'master' into SAMZA-1814
nickpan47 Sep 7, 2018
05637e6
SAMZA-1814: WIP consolidate all JobGraph and JobNode Json and Config …
nickpan47 Sep 11, 2018
97c00a2
SAMZA-1814: WIP unit tests fixed for configure generation.
nickpan47 Sep 14, 2018
3a91b9a
SAMZA-1814: moving some logic to ApplicationDescriptorImpl to simplif…
nickpan47 Sep 16, 2018
dae98ce
SAMZA-1814: consolidate the configure generation between high and low…
nickpan47 Sep 17, 2018
8797cdd
SAMZA-1814: merge with master
nickpan47 Sep 17, 2018
4484463
SAMZA-1814: merge with master
nickpan47 Sep 17, 2018
c7fde4a
Merge branch 'master' into SAMZA-1814
nickpan47 Sep 17, 2018
ffc6f1a
SAMZA-1814: consolidate configuration generation in ExecutionPlanner …
nickpan47 Sep 17, 2018
2c856c5
SAMZA-1814: consolidate configuration generation for high and low-lev…
nickpan47 Sep 17, 2018
0db5068
SAMZA-1814: fix merge issue and consolidated some test classes
nickpan47 Sep 18, 2018
b66b9fa
SAMZA-1814: moving serde generation to a single top-level configurati…
nickpan47 Sep 21, 2018
c8681a0
Merge branch 'master' into SAMZA-1814
nickpan47 Sep 21, 2018
f8c8108
SAMZA-1814: Fix merging errors.
nickpan47 Sep 21, 2018
2143739
Merge branch 'master' into SAMZA-1814. With minor fixes to allow merg…
nickpan47 Sep 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ project(":samza-core_$scalaVersion") {
testCompile "org.powermock:powermock-core:$powerMockVersion"
testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion"
}

checkstyle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@
package org.apache.samza.application;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.runtime.ProcessorLifecycleListener;
import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -46,10 +54,15 @@
*/
public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
implements ApplicationDescriptor<S> {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);

final Config config;
private final Class<? extends SamzaApplication> appClass;
private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
// serdes used by input/output/intermediate streams, keyed by streamId
private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
// serdes used by tables, keyed by tableId
private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
final Config config;

// Default to no-op functions in ContextManager
// TODO: this should be replaced by shared context factory defined in SAMZA-1714
Expand Down Expand Up @@ -141,6 +154,35 @@ public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
return Optional.empty();
}

/**
* Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
*
* @param streamId id of the stream
* @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
*/
public KV<Serde, Serde> getStreamSerdes(String streamId) {
return streamSerdes.get(streamId);
}

/**
* Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
*
* @param tableId id of the table
* @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
*/
public KV<Serde, Serde> getTableSerdes(String tableId) {
return tableSerdes.get(tableId);
}

/**
* Get the map of all {@link InputOperatorSpec}s in this applicaiton
*
* @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level {@link TaskApplication}
*/
public Map<String, InputOperatorSpec> getInputOperators() {
return Collections.EMPTY_MAP;
}

/**
* Get all the {@link InputDescriptor}s to this application
*
Expand Down Expand Up @@ -176,4 +218,66 @@ public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
*/
public abstract Set<SystemDescriptor> getSystemDescriptors();

/**
* Get all the unique input streamIds in this application
*
* @return an immutable set of input streamIds
*/
public abstract Set<String> getInputStreamIds();

/**
* Get all the unique output streamIds in this application
*
* @return an immutable set of output streamIds
*/
public abstract Set<String> getOutputStreamIds();

KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
Serde keySerde, valueSerde;

KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId);

if (serde instanceof KVSerde) {
keySerde = ((KVSerde) serde).getKeySerde();
valueSerde = ((KVSerde) serde).getValueSerde();
} else {
keySerde = new NoOpSerde();
valueSerde = serde;
}

if (currentSerdePair == null) {
if (keySerde instanceof NoOpSerde) {
LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
". Keys will not be (de)serialized");
}
if (valueSerde instanceof NoOpSerde) {
LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
". Values will not be (de)serialized");
}
streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
} else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
+ "different serdes.", streamId));
}
return streamSerdes.get(streamId);
}

KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) {
Serde keySerde, valueSerde;
keySerde = kvSerde.getKeySerde();
valueSerde = kvSerde.getValueSerde();

if (!tableSerdes.containsKey(tableId)) {
tableSerdes.put(tableId, KV.of(keySerde, valueSerde));
return tableSerdes.get(tableId);
}

KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId);
if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to "
+ "different serdes.", tableId));
}
return streamSerdes.get(tableId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableSpec;
Expand All @@ -78,7 +77,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
// We use a LHM for deterministic order in initializing and closing operators.
private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
private final Map<String, TableImpl> tables = new LinkedHashMap<>();
private final Set<String> operatorIds = new HashSet<>();

private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
Expand Down Expand Up @@ -125,7 +124,7 @@ public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor
"getInputStream must not be called multiple times with the same streamId: " + streamId);

Serde serde = inputDescriptor.getSerde();
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
if (outputStreams.containsKey(streamId)) {
OutputStreamImpl outputStream = outputStreams.get(streamId);
Serde keySerde = outputStream.getKeySerde();
Expand Down Expand Up @@ -156,7 +155,7 @@ public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescript
"getOutputStream must not be called multiple times with the same streamId: " + streamId);

Serde serde = outputDescriptor.getSerde();
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
if (inputOperators.containsKey(streamId)) {
InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
Serde keySerde = inputOperatorSpec.getKeySerde();
Expand Down Expand Up @@ -186,13 +185,15 @@ public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor)
String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);

TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec();
if (tables.containsKey(tableSpec)) {
BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
TableSpec tableSpec = baseTableDescriptor.getTableSpec();
if (tables.containsKey(tableSpec.getId())) {
throw new IllegalStateException(
String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
}
tables.put(tableSpec, new TableImpl(tableSpec));
return tables.get(tableSpec);
tables.put(tableSpec.getId(), new TableImpl(tableSpec));
getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
return tables.get(tableSpec.getId());
}

/**
Expand Down Expand Up @@ -247,6 +248,16 @@ public Set<SystemDescriptor> getSystemDescriptors() {
return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
}

@Override
public Set<String> getInputStreamIds() {
return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
}

@Override
public Set<String> getOutputStreamIds() {
return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
}

/**
* Get the default {@link SystemDescriptor} in this application
*
Expand Down Expand Up @@ -306,7 +317,7 @@ public Map<String, OutputStreamImpl> getOutputStreams() {
return Collections.unmodifiableMap(outputStreams);
}

public Map<TableSpec, TableImpl> getTables() {
public Map<String, TableImpl> getTables() {
return Collections.unmodifiableMap(tables);
}

Expand Down Expand Up @@ -342,7 +353,7 @@ public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamI
kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs
} else {
isKeyed = serde instanceof KVSerde;
kvSerdes = getKVSerdes(streamId, serde);
kvSerdes = getOrCreateStreamSerdes(streamId, serde);
}

InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
Expand All @@ -356,29 +367,6 @@ public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamI
return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
}

private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
Serde keySerde, valueSerde;

if (serde instanceof KVSerde) {
keySerde = ((KVSerde) serde).getKeySerde();
valueSerde = ((KVSerde) serde).getValueSerde();
} else {
keySerde = new NoOpSerde();
valueSerde = serde;
}

if (keySerde instanceof NoOpSerde) {
LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
". Keys will not be (de)serialized");
}
if (valueSerde instanceof NoOpSerde) {
LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
". Values will not be (de)serialized");
}

return KV.of(keySerde, valueSerde);
}

// check uniqueness of the {@code systemDescriptor} and add if it is unique
private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.samza.config.Config;
import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void addInputStream(InputDescriptor inputDescriptor) {
// TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
addSystemDescriptor(inputDescriptor.getSystemDescriptor());
}
Expand All @@ -73,6 +75,7 @@ public void addInputStream(InputDescriptor inputDescriptor) {
public void addOutputStream(OutputDescriptor outputDescriptor) {
Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
addSystemDescriptor(outputDescriptor.getSystemDescriptor());
}
Expand All @@ -81,6 +84,7 @@ public void addOutputStream(OutputDescriptor outputDescriptor) {
public void addTable(TableDescriptor tableDescriptor) {
Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
}

Expand Down Expand Up @@ -111,6 +115,16 @@ public Set<SystemDescriptor> getSystemDescriptors() {
return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
}

@Override
public Set<String> getInputStreamIds() {
return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
}

@Override
public Set<String> getOutputStreamIds() {
return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
}

/**
* Get the user-defined {@link TaskFactory}
* @return the {@link TaskFactory} object
Expand Down
Loading