From 754a2e1dc8c617238a454a6d6edd80c988c799f8 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 11:04:00 -0800 Subject: [PATCH 01/13] SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic to read config from loader Design: https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner Changes: 1. Based on the existence of ENV_SUBMISSION_CONFIG, ClusterBasedJobCoordinator will alternatively deserilize ENV_SUBMISSION_CONFIG and load full job config from config loader factory. 2. Execute planning, create diagnostics stream and persist full job config back to coordinator stream when loading full job config from config loader. API Changes: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Upgrade Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Usage Instructions: N/A. This is part of a series PRs, detailed information will be provided in the last/main PR. Tests: End to end tested with Hello Samza job. --- .../ClusterBasedJobCoordinator.java | 87 +++++++++++++++---- .../org/apache/samza/util/ConfigUtil.java | 21 +++++ 2 files changed, 90 insertions(+), 18 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index d9ca4d5ac6..b183af5cf2 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -23,15 +23,22 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationUtil; +import org.apache.samza.application.descriptors.ApplicationDescriptor; +import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; +import org.apache.samza.application.descriptors.ApplicationDescriptorUtil; import org.apache.samza.classloader.IsolatingClassLoaderFactory; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigLoader; +import org.apache.samza.config.ConfigLoaderFactory; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; @@ -47,6 +54,7 @@ import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; +import org.apache.samza.execution.RemoteJobPlanner; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.JobModelUtil; @@ -59,8 +67,10 @@ import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; +import org.apache.samza.util.ConfigUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.apache.samza.util.DiagnosticsUtil; +import org.apache.samza.util.ReflectionUtil; import org.apache.samza.util.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,9 +188,30 @@ public class ClusterBasedJobCoordinator { public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { metrics = new MetricsRegistryMap(); - coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); - coordinatorStreamStore.init(); - config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); + JobConfig jobConfig = new JobConfig(coordinatorSystemConfig); + + if (jobConfig.getConfigLoaderFactory().isPresent()) { + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(coordinatorSystemConfig.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + Config originalConfig = ConfigUtil.rewriteConfig(loader.getConfig()); + + ApplicationDescriptorImpl + appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); + RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); + List jobConfigs = planner.prepareJobs(); + + config = ConfigUtil.override(jobConfigs.get(0), CoordinatorStreamUtil.buildCoordinatorStreamConfig(jobConfigs.get(0))); + + coordinatorStreamStore = new CoordinatorStreamStore(config, metrics); + coordinatorStreamStore.init(); + CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); + DiagnosticsUtil.createDiagnosticsStream(config); + } else { + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); + coordinatorStreamStore.init(); + config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); + } // build a JobModelManager and ChangelogStreamManager and perform partition assignments. changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE)); @@ -386,8 +417,7 @@ public void onInputStreamsChanged(Set initialInputSet, Set cluste * {@link #main(String[])} so that it can be executed directly or from a separate classloader. */ private static void runClusterBasedJobCoordinator(String[] args) { - Config coordinatorSystemConfig; final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); - try { - //Read and parse the coordinator system config. - LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); - LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); - } catch (IOException e) { - LOG.error("Exception while reading coordinator stream config", e); - throw new SamzaException(e); + final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); + + if (submissionEnv != null) { + Config submissionConfig; + try { + //Read and parse the coordinator system config. + LOG.info("Parsing submission config {}", submissionEnv); + submissionConfig = + new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class)); + LOG.info("Using the submission config: {}.", submissionEnv); + } catch (IOException e) { + LOG.error("Exception while reading submission config", e); + throw new SamzaException(e); + } + + ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(submissionConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); + } else { + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + Config coordinatorSystemConfig; + try { + //Read and parse the coordinator system config. + LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); + coordinatorSystemConfig = + new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); + LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); + } catch (IOException e) { + LOG.error("Exception while reading coordinator stream config", e); + throw new SamzaException(e); + } + ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); } - ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig); - jc.run(); - LOG.info("Finished running ClusterBasedJobCoordinator"); } } diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index 7d86bf52a0..24f2009168 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -19,11 +19,14 @@ package org.apache.samza.util; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,4 +70,22 @@ public static Config applyRewriter(Config config, String rewriterName) { LOG.info("Re-writing config with {}", rewriter); return rewriter.rewrite(rewriterName, config); } + + /** + * Overrides original config with overridden values. + * + * @param original config to be overridden. + * @param overrides overridden values. + * @return the overridden config. + */ + @SafeVarargs + public static Config override(Config original, Map... overrides) { + Map map = new HashMap<>(original); + + for (Map override : overrides) { + map.putAll(override); + } + + return new MapConfig(map); + } } From aa8e1ac3cfb9b870a95c42ea6e5a95db25812910 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 16:22:40 -0800 Subject: [PATCH 02/13] Refactor the common logic to ConfigUtil, which will be used in LocalApplicationRunner as well. --- .../ClusterBasedJobCoordinator.java | 11 +++++++--- .../org/apache/samza/util/ConfigUtil.java | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index b183af5cf2..4c4f5fcb6b 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -191,15 +191,20 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { JobConfig jobConfig = new JobConfig(coordinatorSystemConfig); if (jobConfig.getConfigLoaderFactory().isPresent()) { - ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); - ConfigLoader loader = factory.getLoader(coordinatorSystemConfig.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); - Config originalConfig = ConfigUtil.rewriteConfig(loader.getConfig()); + // load full job config with ConfigLoader + Config originalConfig = ConfigUtil.loadConfig(coordinatorSystemConfig); + // Execute planning ApplicationDescriptorImpl appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); List jobConfigs = planner.prepareJobs(); + if (jobConfigs.size() != 1) { + throw new SamzaException("Only support single remote job is supported."); + } + + // Merge with default coordinator stream config config = ConfigUtil.override(jobConfigs.get(0), CoordinatorStreamUtil.buildCoordinatorStreamConfig(jobConfigs.get(0))); coordinatorStreamStore = new CoordinatorStreamStore(config, metrics); diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index 24f2009168..a687fc36b6 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -24,6 +24,8 @@ import java.util.Optional; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigLoader; +import org.apache.samza.config.ConfigLoaderFactory; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -71,6 +73,25 @@ public static Config applyRewriter(Config config, String rewriterName) { return rewriter.rewrite(rewriterName, config); } + /** + * Load full job config with {@link ConfigLoaderFactory} when present. + * + * @param original config + * @return full job config + */ + public static Config loadConfig(Config original) { + JobConfig jobConfig = new JobConfig(original); + Config fullConfig = original; + + if (jobConfig.getConfigLoaderFactory().isPresent()) { + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + fullConfig = ConfigUtil.rewriteConfig(loader.getConfig()); + } + + return fullConfig; + } + /** * Overrides original config with overridden values. * From 0d65377e8dacfe79f5a32b6ab196066e8a674e9f Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 16:33:39 -0800 Subject: [PATCH 03/13] Remove unused imports --- .../samza/clustermanager/ClusterBasedJobCoordinator.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 4c4f5fcb6b..101cda8615 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -37,8 +37,6 @@ import org.apache.samza.classloader.IsolatingClassLoaderFactory; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigLoader; -import org.apache.samza.config.ConfigLoaderFactory; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; @@ -70,7 +68,6 @@ import org.apache.samza.util.ConfigUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.apache.samza.util.DiagnosticsUtil; -import org.apache.samza.util.ReflectionUtil; import org.apache.samza.util.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From a7b0d0f6c7b4be861fef8200040e0d8dbd67d30b Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 17:26:34 -0800 Subject: [PATCH 04/13] Update ConfigUtil to handle config overrides, update ClusterBasedJobCoordinator not to merge final config with coordinator stream config to keep consistent with before. --- .../samza/clustermanager/ClusterBasedJobCoordinator.java | 8 +++----- .../src/main/java/org/apache/samza/util/ConfigUtil.java | 3 ++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 101cda8615..4f0f2e3316 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -201,10 +201,8 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { throw new SamzaException("Only support single remote job is supported."); } - // Merge with default coordinator stream config - config = ConfigUtil.override(jobConfigs.get(0), CoordinatorStreamUtil.buildCoordinatorStreamConfig(jobConfigs.get(0))); - - coordinatorStreamStore = new CoordinatorStreamStore(config, metrics); + config = jobConfigs.get(0); + coordinatorStreamStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics); coordinatorStreamStore.init(); CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); DiagnosticsUtil.createDiagnosticsStream(config); @@ -522,7 +520,7 @@ private static void runClusterBasedJobCoordinator(String[] args) { LOG.info("Parsing submission config {}", submissionEnv); submissionConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class)); - LOG.info("Using the submission config: {}.", submissionEnv); + LOG.info("Using the submission config: {}.", submissionConfig); } catch (IOException e) { LOG.error("Exception while reading submission config", e); throw new SamzaException(e); diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index a687fc36b6..a3cc3fa1e4 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -86,7 +86,8 @@ public static Config loadConfig(Config original) { if (jobConfig.getConfigLoaderFactory().isPresent()) { ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); - fullConfig = ConfigUtil.rewriteConfig(loader.getConfig()); + // overrides config loaded with original config, which may contain overridden values. + fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), original); } return fullConfig; From 86032a6feaa1e25db3e18e18a1dd1f019ec473b7 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Tue, 14 Jan 2020 12:31:11 -0800 Subject: [PATCH 05/13] Add ticket for clean up --- .../apache/samza/clustermanager/ClusterBasedJobCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 4f0f2e3316..cb51ab6663 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -207,7 +207,7 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); DiagnosticsUtil.createDiagnosticsStream(config); } else { - // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); coordinatorStreamStore.init(); config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); From c234721851151e9c1c3d192d98b518b59abbba2b Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Tue, 14 Jan 2020 15:21:35 -0800 Subject: [PATCH 06/13] Rename coordinatorSystemConfig to jobCoordinatorConfig to be more generic as it may contain config loader properties instead. --- .../clustermanager/ClusterBasedJobCoordinator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index cb51ab6663..da4a9c81f3 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -179,17 +179,17 @@ public class ClusterBasedJobCoordinator { * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually * run the jobcoordinator. * - * @param coordinatorSystemConfig the coordinator stream config that can be used to read the - * {@link org.apache.samza.job.model.JobModel} from. + * @param jobCoordinatorConfig job coordinator config that either contains coordinator stream properties + * or config loader properties to load full job config. */ - public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { + public ClusterBasedJobCoordinator(Config jobCoordinatorConfig) { metrics = new MetricsRegistryMap(); - JobConfig jobConfig = new JobConfig(coordinatorSystemConfig); + JobConfig jobConfig = new JobConfig(jobCoordinatorConfig); if (jobConfig.getConfigLoaderFactory().isPresent()) { // load full job config with ConfigLoader - Config originalConfig = ConfigUtil.loadConfig(coordinatorSystemConfig); + Config originalConfig = ConfigUtil.loadConfig(jobCoordinatorConfig); // Execute planning ApplicationDescriptorImpl @@ -208,7 +208,7 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { DiagnosticsUtil.createDiagnosticsStream(config); } else { // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. - coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); + coordinatorStreamStore = new CoordinatorStreamStore(jobCoordinatorConfig, metrics); coordinatorStreamStore.init(); config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); } From a90387a1a7c89abf58ad84875dade3f9fe2e5361 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Wed, 15 Jan 2020 15:54:01 -0800 Subject: [PATCH 07/13] Add extra comments Override config first before applying config rewriters --- .../apache/samza/clustermanager/ClusterBasedJobCoordinator.java | 2 ++ samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index da4a9c81f3..28e1cdbe88 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -204,6 +204,8 @@ public ClusterBasedJobCoordinator(Config jobCoordinatorConfig) { config = jobConfigs.get(0); coordinatorStreamStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics); coordinatorStreamStore.init(); + + // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); DiagnosticsUtil.createDiagnosticsStream(config); } else { diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index a3cc3fa1e4..86a5f52b5c 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -87,7 +87,7 @@ public static Config loadConfig(Config original) { ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); // overrides config loaded with original config, which may contain overridden values. - fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), original); + fullConfig = ConfigUtil.rewriteConfig(override(loader.getConfig(), original)); } return fullConfig; From c004a97d60e9bff9e5a39328ec9153c014174232 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Wed, 15 Jan 2020 18:26:36 -0800 Subject: [PATCH 08/13] Add unit tests Refactor ClusterBasedJobCoordinator to be testable --- .../ClusterBasedJobCoordinator.java | 145 +++++++++++------- .../samza/coordinator/JobModelManager.scala | 9 +- .../samza/util/CoordinatorStreamUtil.scala | 2 +- .../TestClusterBasedJobCoordinator.java | 39 ++++- .../org/apache/samza/util/TestConfigUtil.java | 26 ++++ 5 files changed, 156 insertions(+), 65 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 28e1cdbe88..d22e694d73 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -57,6 +57,7 @@ import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.JobModelUtil; import org.apache.samza.job.model.TaskModel; +import org.apache.samza.metadatastore.MetadataStore; import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.model.SamzaObjectMapper; @@ -102,7 +103,6 @@ public class ClusterBasedJobCoordinator { private final static String METRICS_SOURCE_NAME = "ApplicationMaster"; private final Config config; - private final ClusterManagerConfig clusterManagerConfig; /** * State to track container failures, host-processor mappings @@ -161,7 +161,7 @@ public class ClusterBasedJobCoordinator { * Metrics to track stats around container failures, needed containers etc. */ private final MetricsRegistryMap metrics; - private final CoordinatorStreamStore coordinatorStreamStore; + private final MetadataStore metadataStore; private final SystemAdmins systemAdmins; @@ -176,66 +176,39 @@ public class ClusterBasedJobCoordinator { volatile private Exception coordinatorException = null; /** - * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually - * run the jobcoordinator. + * Creates a new ClusterBasedJobCoordinator instance. + * Invoke run() to actually run the job coordinator. * - * @param jobCoordinatorConfig job coordinator config that either contains coordinator stream properties - * or config loader properties to load full job config. + * @param metrics the registry for reporting metrics. + * @param metadataStore metadata store to hold metadata. + * @param fullJobConfig full job config. */ - public ClusterBasedJobCoordinator(Config jobCoordinatorConfig) { - metrics = new MetricsRegistryMap(); - - JobConfig jobConfig = new JobConfig(jobCoordinatorConfig); - - if (jobConfig.getConfigLoaderFactory().isPresent()) { - // load full job config with ConfigLoader - Config originalConfig = ConfigUtil.loadConfig(jobCoordinatorConfig); - - // Execute planning - ApplicationDescriptorImpl - appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); - RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); - List jobConfigs = planner.prepareJobs(); - - if (jobConfigs.size() != 1) { - throw new SamzaException("Only support single remote job is supported."); - } - - config = jobConfigs.get(0); - coordinatorStreamStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics); - coordinatorStreamStore.init(); - - // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run - CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); - DiagnosticsUtil.createDiagnosticsStream(config); - } else { - // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. - coordinatorStreamStore = new CoordinatorStreamStore(jobCoordinatorConfig, metrics); - coordinatorStreamStore.init(); - config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); - } - + private ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore, Config fullJobConfig) { + this.metrics = metrics; + this.metadataStore = metadataStore; + this.metadataStore.init(); + this.config = fullJobConfig; // build a JobModelManager and ChangelogStreamManager and perform partition assignments. - changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE)); - jobModelManager = - JobModelManager.apply(config, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metrics); + this.changelogStreamManager = new ChangelogStreamManager( + new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE)); + this.jobModelManager = + JobModelManager.apply(config, changelogStreamManager.readPartitionMapping(), metadataStore, metrics); - hasDurableStores = new StorageConfig(config).hasDurableStores(); - state = new SamzaApplicationState(jobModelManager); + this.hasDurableStores = new StorageConfig(config).hasDurableStores(); + this.state = new SamzaApplicationState(jobModelManager); // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped. - systemAdmins = new SystemAdmins(config); - partitionMonitor = getPartitionCountMonitor(config, systemAdmins); + this.systemAdmins = new SystemAdmins(config); + this.partitionMonitor = getPartitionCountMonitor(config, systemAdmins); Set inputSystemStreams = JobModelUtil.getSystemStreams(jobModelManager.jobModel()); - inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, inputSystemStreams); - - clusterManagerConfig = new ClusterManagerConfig(config); - isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator(); + this.inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, inputSystemStreams); - jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval(); + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator(); + this.jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval(); // build a container process Manager - containerProcessManager = createContainerProcessManager(); + this.containerProcessManager = createContainerProcessManager(); } /** @@ -333,7 +306,7 @@ private void onShutDown() { inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::stop); systemAdmins.stop(); containerProcessManager.stop(); - coordinatorStreamStore.close(); + metadataStore.close(); } catch (Throwable e) { LOG.error("Exception while stopping cluster based job coordinator", e); } @@ -429,7 +402,7 @@ StreamPartitionCountMonitor getPartitionMonitor() { @VisibleForTesting StartpointManager createStartpointManager() { - return new StartpointManager(coordinatorStreamStore); + return new StartpointManager(metadataStore); } @VisibleForTesting @@ -528,7 +501,7 @@ private static void runClusterBasedJobCoordinator(String[] args) { throw new SamzaException(e); } - ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(submissionConfig); + ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig); jc.run(); LOG.info("Finished running ClusterBasedJobCoordinator"); } else { @@ -544,9 +517,69 @@ private static void runClusterBasedJobCoordinator(String[] args) { LOG.error("Exception while reading coordinator stream config", e); throw new SamzaException(e); } - ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig); + ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig); jc.run(); LOG.info("Finished running ClusterBasedJobCoordinator"); } } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from + * coordinator stream. + * + * @param metadataStoreConfig to initialize {@link MetadataStore} + * @return {@link ClusterBasedJobCoordinator} + */ + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + public static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) { + MetricsRegistryMap metrics = new MetricsRegistryMap(); + + CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics); + coordinatorStreamStore.init(); + Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); + + return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config); + } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with submission config, full job config will be fetched using + * specified {@link org.apache.samza.config.ConfigLoaderFactory} + * + * @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory} + * @return {@link ClusterBasedJobCoordinator} + */ + public static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) { + JobConfig jobConfig = new JobConfig(submissionConfig); + + if (!jobConfig.getConfigLoaderFactory().isPresent()) { + throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader"); + } + + MetricsRegistryMap metrics = new MetricsRegistryMap(); + // load full job config with ConfigLoader + Config config = prepareJob(ConfigUtil.loadConfig(submissionConfig)); + + // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run + CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); + DiagnosticsUtil.createDiagnosticsStream(config); + + return new ClusterBasedJobCoordinator( + metrics, + new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics), + config); + } + + private static Config prepareJob(Config config) { + // Execute planning + ApplicationDescriptorImpl + appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config); + RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); + List jobConfigs = planner.prepareJobs(); + + if (jobConfigs.size() != 1) { + throw new SamzaException("Only support single remote job is supported."); + } + + return jobConfigs.get(0); + } } diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 07055de1fc..b8c18fe768 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -42,6 +42,7 @@ import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskMode import org.apache.samza.job.model.TaskModel +import org.apache.samza.metadatastore.MetadataStore import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.runtime.LocationId @@ -79,13 +80,13 @@ object JobModelManager extends Logging { * @return the instantiated {@see JobModelManager}. */ def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer], - coordinatorStreamStore: CoordinatorStreamStore, + metadataStore: MetadataStore, metricsRegistry: MetricsRegistry = new MetricsRegistryMap()): JobModelManager = { // Instantiate the respective metadata store util classes which uses the same coordinator metadata store. - val localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE)) - val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskModeMapping.TYPE)) - val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskPartitionMapping.TYPE)) + val localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE)) + val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE)) + val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE)) val systemAdmins = new SystemAdmins(config) try { diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index f108387a2c..c9aad0b8bd 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -41,7 +41,7 @@ object CoordinatorStreamUtil extends Logging { */ def buildCoordinatorStreamConfig(config: Config): MapConfig = { val jobConfig = new JobConfig(config) - val buildConfigFactory = jobConfig.getCoordinatorStreamFactory() + val buildConfigFactory = jobConfig.getCoordinatorStreamFactory val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config) new MapConfig(coordinatorSystemConfig); diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java index 8c51c642f0..7df0945f87 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java @@ -25,10 +25,14 @@ import java.util.Map; import org.apache.samza.Partition; import org.apache.samza.SamzaException; +import org.apache.samza.application.MockStreamApplication; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory; import org.apache.samza.coordinator.StreamPartitionCountMonitor; +import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer; import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; import org.apache.samza.metrics.MetricsRegistry; @@ -59,6 +63,7 @@ import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mock; import static org.powermock.api.mockito.PowerMockito.verifyPrivate; +import static org.powermock.api.mockito.PowerMockito.verifyNew; /** @@ -68,7 +73,7 @@ @PrepareForTest({CoordinatorStreamUtil.class, ClusterBasedJobCoordinator.class}) public class TestClusterBasedJobCoordinator { - Map configMap; + private Map configMap; @Before public void setUp() { @@ -106,7 +111,7 @@ public void testPartitionCountMonitorWithDurableStates() { CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); producer.writeConfig("test-job", config); - ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); + ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config); // change the input system stream metadata MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>()); @@ -126,7 +131,7 @@ public void testPartitionCountMonitorWithoutDurableStates() { CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); producer.writeConfig("test-job", config); - ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); + ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config); // change the input system stream metadata MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>()); @@ -144,7 +149,7 @@ public void testVerifyStartpointManagerFanOut() throws IOException { Config config = new MapConfig(configMap); MockitoException stopException = new MockitoException("Stop"); - ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(new ClusterBasedJobCoordinator(config)); + ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(ClusterBasedJobCoordinator.createFromMetadataStore(config)); ContainerProcessManager mockContainerProcessManager = mock(ContainerProcessManager.class); doReturn(true).when(mockContainerProcessManager).shouldShutdown(); StartpointManager mockStartpointManager = mock(StartpointManager.class); @@ -198,4 +203,30 @@ public void testRunWithClassLoader() throws Exception { // make sure runClusterBasedJobCoordinator only got called once verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)}); } + + @Test(expected = SamzaException.class) + public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() { + ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig()); + } + + @Test + public void testCreateFromConfigLoader() throws Exception { + // partially mock ClusterBasedJobCoordinator (mock prepareJob method only) + PowerMockito.spy(ClusterBasedJobCoordinator.class); + + Map config = new HashMap<>(); + config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName()); + config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); + config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",getClass().getResource("/test.properties").getPath()); + + PowerMockito.doAnswer(invocation -> invocation.getArgumentAt(0, Config.class)) + .when(ClusterBasedJobCoordinator.class, "prepareJob", any()); + PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class)); + PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mock(CoordinatorStreamStore.class)); + + ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig(config)); + + verifyPrivate(ClusterBasedJobCoordinator.class).invoke("prepareJob", any()); + verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(), any(), any()); + } } diff --git a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java index 218eb179fd..d29296caf0 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java @@ -27,6 +27,7 @@ import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -161,6 +162,31 @@ public void testApplyRewriterClassDoesNotExist() { assertEquals(expectedConfig, ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } + @Test + public void testLoadConfigWithoutLoader() { + Map config = new HashMap<>(); + config.put(JobConfig.JOB_NAME, "new-test-job"); + + Config actual = ConfigUtil.loadConfig(new MapConfig(config)); + + assertEquals(config.size(), actual.size()); + assertEquals("new-test-job", actual.get(JobConfig.JOB_NAME)); + } + + @Test + public void testLoadConfigWithLoader() { + Map config = new HashMap<>(); + config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); + config.put(JobConfig.JOB_NAME, "new-test-job"); + config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath()); + + Config actual = ConfigUtil.loadConfig(new MapConfig(config)); + + assertEquals("org.apache.samza.job.MockJobFactory", actual.get("job.factory.class")); + assertEquals("new-test-job", actual.get("job.name")); + assertEquals("bar", actual.get("foo")); + } + /** * Adds a new config entry for the key {@link #NEW_CONFIG_KEY} which has the same value as {@link #CONFIG_KEY}. */ From de8ed411e5118dce7ce4f6587379fb29a36495ef Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Thu, 16 Jan 2020 10:48:13 -0800 Subject: [PATCH 09/13] Fix style issue in test --- .../samza/clustermanager/TestClusterBasedJobCoordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java index 7df0945f87..377d6b9407 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java @@ -217,7 +217,8 @@ public void testCreateFromConfigLoader() throws Exception { Map config = new HashMap<>(); config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName()); config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); - config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",getClass().getResource("/test.properties").getPath()); + config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", + getClass().getResource("/test.properties").getPath()); PowerMockito.doAnswer(invocation -> invocation.getArgumentAt(0, Config.class)) .when(ClusterBasedJobCoordinator.class, "prepareJob", any()); From b00cb06351e29501269ac03ca702fba16131590e Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 17 Jan 2020 10:27:08 -0800 Subject: [PATCH 10/13] Use StringUtils.isBlank instead of null check --- .../samza/clustermanager/ClusterBasedJobCoordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index d22e694d73..da4d899c66 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.application.ApplicationUtil; import org.apache.samza.application.descriptors.ApplicationDescriptor; @@ -488,7 +489,7 @@ private static void runClusterBasedJobCoordinator(String[] args) { final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); - if (submissionEnv != null) { + if (!StringUtils.isBlank(submissionEnv)) { Config submissionConfig; try { //Read and parse the coordinator system config. From 27210d8aa49b6497400604aab47d722563ff59b4 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 17 Jan 2020 10:52:10 -0800 Subject: [PATCH 11/13] Update --- .../ClusterBasedJobCoordinator.java | 11 +++++++---- .../java/org/apache/samza/util/ConfigUtil.java | 16 ++++++++-------- .../org/apache/samza/util/TestConfigUtil.java | 18 +++++++++++------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index da4d899c66..5dd3a64e4b 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -187,7 +187,6 @@ public class ClusterBasedJobCoordinator { private ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore, Config fullJobConfig) { this.metrics = metrics; this.metadataStore = metadataStore; - this.metadataStore.init(); this.config = fullJobConfig; // build a JobModelManager and ChangelogStreamManager and perform partition assignments. this.changelogStreamManager = new ChangelogStreamManager( @@ -532,7 +531,8 @@ private static void runClusterBasedJobCoordinator(String[] args) { * @return {@link ClusterBasedJobCoordinator} */ // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. - public static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) { + @VisibleForTesting + static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) { MetricsRegistryMap metrics = new MetricsRegistryMap(); CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics); @@ -549,7 +549,8 @@ public static ClusterBasedJobCoordinator createFromMetadataStore(Config metadata * @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory} * @return {@link ClusterBasedJobCoordinator} */ - public static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) { + @VisibleForTesting + static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) { JobConfig jobConfig = new JobConfig(submissionConfig); if (!jobConfig.getConfigLoaderFactory().isPresent()) { @@ -563,10 +564,12 @@ public static ClusterBasedJobCoordinator createFromConfigLoader(Config submissio // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); DiagnosticsUtil.createDiagnosticsStream(config); + MetadataStore metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics); + metadataStore.init(); return new ClusterBasedJobCoordinator( metrics, - new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics), + metadataStore, config); } diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index 86a5f52b5c..7206fe2848 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -24,6 +24,7 @@ import java.util.Optional; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigLoader; import org.apache.samza.config.ConfigLoaderFactory; import org.apache.samza.config.ConfigRewriter; @@ -81,16 +82,15 @@ public static Config applyRewriter(Config config, String rewriterName) { */ public static Config loadConfig(Config original) { JobConfig jobConfig = new JobConfig(original); - Config fullConfig = original; - if (jobConfig.getConfigLoaderFactory().isPresent()) { - ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); - ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); - // overrides config loaded with original config, which may contain overridden values. - fullConfig = ConfigUtil.rewriteConfig(override(loader.getConfig(), original)); + if (!jobConfig.getConfigLoaderFactory().isPresent()) { + throw new ConfigException("Missing key " + JobConfig.CONFIG_LOADER_FACTORY + "."); } - return fullConfig; + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + // overrides config loaded with original config, which may contain overridden values. + return rewriteConfig(override(loader.getConfig(), original)); } /** @@ -101,7 +101,7 @@ public static Config loadConfig(Config original) { * @return the overridden config. */ @SafeVarargs - public static Config override(Config original, Map... overrides) { + private static Config override(Config original, Map... overrides) { Map map = new HashMap<>(original); for (Map override : overrides) { diff --git a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java index d29296caf0..c005f37a35 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -162,29 +163,32 @@ public void testApplyRewriterClassDoesNotExist() { assertEquals(expectedConfig, ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } - @Test + @Test(expected = ConfigException.class) public void testLoadConfigWithoutLoader() { Map config = new HashMap<>(); config.put(JobConfig.JOB_NAME, "new-test-job"); - Config actual = ConfigUtil.loadConfig(new MapConfig(config)); - - assertEquals(config.size(), actual.size()); - assertEquals("new-test-job", actual.get(JobConfig.JOB_NAME)); + ConfigUtil.loadConfig(new MapConfig(config)); } @Test - public void testLoadConfigWithLoader() { + public void testLoadConfigWithOverridesAndRewrites() { Map config = new HashMap<>(); config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); config.put(JobConfig.JOB_NAME, "new-test-job"); + config.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + config.put(CONFIG_KEY, CONFIG_VALUE); + config.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath()); Config actual = ConfigUtil.loadConfig(new MapConfig(config)); assertEquals("org.apache.samza.job.MockJobFactory", actual.get("job.factory.class")); - assertEquals("new-test-job", actual.get("job.name")); assertEquals("bar", actual.get("foo")); + // overridden value + assertEquals("new-test-job", actual.get("job.name")); + // rewritten value + assertEquals(CONFIG_VALUE, actual.get(NEW_CONFIG_KEY)); } /** From 762fa7de922655c2479e44d8f9470f5944d774c4 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 17 Jan 2020 11:48:10 -0800 Subject: [PATCH 12/13] Update unit tests --- .../ClusterBasedJobCoordinator.java | 28 +++++++-------- .../TestClusterBasedJobCoordinator.java | 34 +++++++++++++------ 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 5dd3a64e4b..d97a4a1e09 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -559,7 +559,19 @@ static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig MetricsRegistryMap metrics = new MetricsRegistryMap(); // load full job config with ConfigLoader - Config config = prepareJob(ConfigUtil.loadConfig(submissionConfig)); + Config originalConfig = ConfigUtil.loadConfig(submissionConfig); + + // Execute planning + ApplicationDescriptorImpl + appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); + RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); + List jobConfigs = planner.prepareJobs(); + + if (jobConfigs.size() != 1) { + throw new SamzaException("Only support single remote job is supported."); + } + + Config config = jobConfigs.get(0); // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); @@ -572,18 +584,4 @@ static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig metadataStore, config); } - - private static Config prepareJob(Config config) { - // Execute planning - ApplicationDescriptorImpl - appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config); - RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); - List jobConfigs = planner.prepareJobs(); - - if (jobConfigs.size() != 1) { - throw new SamzaException("Only support single remote job is supported."); - } - - return jobConfigs.get(0); - } } diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java index 377d6b9407..3a02928730 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.samza.Partition; @@ -35,11 +36,15 @@ import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer; import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; +import org.apache.samza.execution.JobPlanner; +import org.apache.samza.execution.RemoteJobPlanner; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.startpoint.StartpointManager; import org.apache.samza.system.MockSystemFactory; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.ConfigUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.junit.After; import org.junit.Before; @@ -55,7 +60,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.AdditionalMatchers.aryEq; -import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -70,7 +75,12 @@ * Tests for {@link ClusterBasedJobCoordinator} */ @RunWith(PowerMockRunner.class) -@PrepareForTest({CoordinatorStreamUtil.class, ClusterBasedJobCoordinator.class}) +@PrepareForTest({ + CoordinatorStreamUtil.class, + ClusterBasedJobCoordinator.class, + CoordinatorStreamStore.class, + RemoteJobPlanner.class +}) public class TestClusterBasedJobCoordinator { private Map configMap; @@ -211,23 +221,25 @@ public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() { @Test public void testCreateFromConfigLoader() throws Exception { - // partially mock ClusterBasedJobCoordinator (mock prepareJob method only) - PowerMockito.spy(ClusterBasedJobCoordinator.class); - Map config = new HashMap<>(); config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName()); config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath()); + Config submissionConfig = new MapConfig(config); + JobConfig fullJobConfig = new JobConfig(ConfigUtil.loadConfig(submissionConfig)); + + RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class); + CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class); - PowerMockito.doAnswer(invocation -> invocation.getArgumentAt(0, Config.class)) - .when(ClusterBasedJobCoordinator.class, "prepareJob", any()); PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class)); - PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mock(CoordinatorStreamStore.class)); + PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any()); + PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore); + PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner); + when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig)); - ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig(config)); + ClusterBasedJobCoordinator.createFromConfigLoader(submissionConfig); - verifyPrivate(ClusterBasedJobCoordinator.class).invoke("prepareJob", any()); - verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(), any(), any()); + verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig)); } } From 35f5db240c24b34dd1814930fc4ed6aa56e4b7c3 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 17 Jan 2020 13:03:39 -0800 Subject: [PATCH 13/13] Fix checkstyle in unit test --- .../samza/clustermanager/TestClusterBasedJobCoordinator.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java index 3a02928730..787edf217a 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java @@ -36,7 +36,6 @@ import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer; import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; -import org.apache.samza.execution.JobPlanner; import org.apache.samza.execution.RemoteJobPlanner; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; @@ -79,8 +78,7 @@ CoordinatorStreamUtil.class, ClusterBasedJobCoordinator.class, CoordinatorStreamStore.class, - RemoteJobPlanner.class -}) + RemoteJobPlanner.class}) public class TestClusterBasedJobCoordinator { private Map configMap;