diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index d9ca4d5ac6..d97a4a1e09 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -23,12 +23,18 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationUtil; +import org.apache.samza.application.descriptors.ApplicationDescriptor; +import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; +import org.apache.samza.application.descriptors.ApplicationDescriptorUtil; import org.apache.samza.classloader.IsolatingClassLoaderFactory; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; @@ -47,10 +53,12 @@ import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; +import org.apache.samza.execution.RemoteJobPlanner; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.JobModelUtil; import org.apache.samza.job.model.TaskModel; +import org.apache.samza.metadatastore.MetadataStore; import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.model.SamzaObjectMapper; @@ -59,6 +67,7 @@ import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; +import org.apache.samza.util.ConfigUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.apache.samza.util.DiagnosticsUtil; import org.apache.samza.util.SystemClock; @@ -95,7 +104,6 @@ public class ClusterBasedJobCoordinator { private final static String METRICS_SOURCE_NAME = "ApplicationMaster"; private final Config config; - private final ClusterManagerConfig clusterManagerConfig; /** * State to track container failures, host-processor mappings @@ -154,7 +162,7 @@ public class ClusterBasedJobCoordinator { * Metrics to track stats around container failures, needed containers etc. */ private final MetricsRegistryMap metrics; - private final CoordinatorStreamStore coordinatorStreamStore; + private final MetadataStore metadataStore; private final SystemAdmins systemAdmins; @@ -169,40 +177,38 @@ public class ClusterBasedJobCoordinator { volatile private Exception coordinatorException = null; /** - * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually - * run the jobcoordinator. + * Creates a new ClusterBasedJobCoordinator instance. + * Invoke run() to actually run the job coordinator. * - * @param coordinatorSystemConfig the coordinator stream config that can be used to read the - * {@link org.apache.samza.job.model.JobModel} from. + * @param metrics the registry for reporting metrics. + * @param metadataStore metadata store to hold metadata. + * @param fullJobConfig full job config. */ - public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { - metrics = new MetricsRegistryMap(); - - coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); - coordinatorStreamStore.init(); - config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); - + private ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore, Config fullJobConfig) { + this.metrics = metrics; + this.metadataStore = metadataStore; + this.config = fullJobConfig; // build a JobModelManager and ChangelogStreamManager and perform partition assignments. - changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE)); - jobModelManager = - JobModelManager.apply(config, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metrics); + this.changelogStreamManager = new ChangelogStreamManager( + new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE)); + this.jobModelManager = + JobModelManager.apply(config, changelogStreamManager.readPartitionMapping(), metadataStore, metrics); - hasDurableStores = new StorageConfig(config).hasDurableStores(); - state = new SamzaApplicationState(jobModelManager); + this.hasDurableStores = new StorageConfig(config).hasDurableStores(); + this.state = new SamzaApplicationState(jobModelManager); // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped. - systemAdmins = new SystemAdmins(config); - partitionMonitor = getPartitionCountMonitor(config, systemAdmins); + this.systemAdmins = new SystemAdmins(config); + this.partitionMonitor = getPartitionCountMonitor(config, systemAdmins); Set inputSystemStreams = JobModelUtil.getSystemStreams(jobModelManager.jobModel()); - inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, inputSystemStreams); + this.inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, inputSystemStreams); - clusterManagerConfig = new ClusterManagerConfig(config); - isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator(); - - jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval(); + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator(); + this.jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval(); // build a container process Manager - containerProcessManager = createContainerProcessManager(); + this.containerProcessManager = createContainerProcessManager(); } /** @@ -300,7 +306,7 @@ private void onShutDown() { inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::stop); systemAdmins.stop(); containerProcessManager.stop(); - coordinatorStreamStore.close(); + metadataStore.close(); } catch (Throwable e) { LOG.error("Exception while stopping cluster based job coordinator", e); } @@ -386,8 +392,7 @@ public void onInputStreamsChanged(Set initialInputSet, Set cluste * {@link #main(String[])} so that it can be executed directly or from a separate classloader. */ private static void runClusterBasedJobCoordinator(String[] args) { - Config coordinatorSystemConfig; final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); - try { - //Read and parse the coordinator system config. - LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); - LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); - } catch (IOException e) { - LOG.error("Exception while reading coordinator stream config", e); - throw new SamzaException(e); + final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); + + if (!StringUtils.isBlank(submissionEnv)) { + Config submissionConfig; + try { + //Read and parse the coordinator system config. + LOG.info("Parsing submission config {}", submissionEnv); + submissionConfig = + new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class)); + LOG.info("Using the submission config: {}.", submissionConfig); + } catch (IOException e) { + LOG.error("Exception while reading submission config", e); + throw new SamzaException(e); + } + + ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); + } else { + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + Config coordinatorSystemConfig; + try { + //Read and parse the coordinator system config. + LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); + coordinatorSystemConfig = + new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); + LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); + } catch (IOException e) { + LOG.error("Exception while reading coordinator stream config", e); + throw new SamzaException(e); + } + ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); } - ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig); - jc.run(); - LOG.info("Finished running ClusterBasedJobCoordinator"); + } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from + * coordinator stream. + * + * @param metadataStoreConfig to initialize {@link MetadataStore} + * @return {@link ClusterBasedJobCoordinator} + */ + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + @VisibleForTesting + static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) { + MetricsRegistryMap metrics = new MetricsRegistryMap(); + + CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics); + coordinatorStreamStore.init(); + Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); + + return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config); + } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with submission config, full job config will be fetched using + * specified {@link org.apache.samza.config.ConfigLoaderFactory} + * + * @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory} + * @return {@link ClusterBasedJobCoordinator} + */ + @VisibleForTesting + static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) { + JobConfig jobConfig = new JobConfig(submissionConfig); + + if (!jobConfig.getConfigLoaderFactory().isPresent()) { + throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader"); + } + + MetricsRegistryMap metrics = new MetricsRegistryMap(); + // load full job config with ConfigLoader + Config originalConfig = ConfigUtil.loadConfig(submissionConfig); + + // Execute planning + ApplicationDescriptorImpl + appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); + RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); + List jobConfigs = planner.prepareJobs(); + + if (jobConfigs.size() != 1) { + throw new SamzaException("Only support single remote job is supported."); + } + + Config config = jobConfigs.get(0); + + // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run + CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); + DiagnosticsUtil.createDiagnosticsStream(config); + MetadataStore metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics); + metadataStore.init(); + + return new ClusterBasedJobCoordinator( + metrics, + metadataStore, + config); } } diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index 7d86bf52a0..7206fe2848 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -19,11 +19,17 @@ package org.apache.samza.util; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.ConfigLoader; +import org.apache.samza.config.ConfigLoaderFactory; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,4 +73,41 @@ public static Config applyRewriter(Config config, String rewriterName) { LOG.info("Re-writing config with {}", rewriter); return rewriter.rewrite(rewriterName, config); } + + /** + * Load full job config with {@link ConfigLoaderFactory} when present. + * + * @param original config + * @return full job config + */ + public static Config loadConfig(Config original) { + JobConfig jobConfig = new JobConfig(original); + + if (!jobConfig.getConfigLoaderFactory().isPresent()) { + throw new ConfigException("Missing key " + JobConfig.CONFIG_LOADER_FACTORY + "."); + } + + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + // overrides config loaded with original config, which may contain overridden values. + return rewriteConfig(override(loader.getConfig(), original)); + } + + /** + * Overrides original config with overridden values. + * + * @param original config to be overridden. + * @param overrides overridden values. + * @return the overridden config. + */ + @SafeVarargs + private static Config override(Config original, Map... overrides) { + Map map = new HashMap<>(original); + + for (Map override : overrides) { + map.putAll(override); + } + + return new MapConfig(map); + } } diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 07055de1fc..b8c18fe768 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -42,6 +42,7 @@ import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskMode import org.apache.samza.job.model.TaskModel +import org.apache.samza.metadatastore.MetadataStore import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.runtime.LocationId @@ -79,13 +80,13 @@ object JobModelManager extends Logging { * @return the instantiated {@see JobModelManager}. */ def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer], - coordinatorStreamStore: CoordinatorStreamStore, + metadataStore: MetadataStore, metricsRegistry: MetricsRegistry = new MetricsRegistryMap()): JobModelManager = { // Instantiate the respective metadata store util classes which uses the same coordinator metadata store. - val localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE)) - val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskModeMapping.TYPE)) - val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskPartitionMapping.TYPE)) + val localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE)) + val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE)) + val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE)) val systemAdmins = new SystemAdmins(config) try { diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index f108387a2c..c9aad0b8bd 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -41,7 +41,7 @@ object CoordinatorStreamUtil extends Logging { */ def buildCoordinatorStreamConfig(config: Config): MapConfig = { val jobConfig = new JobConfig(config) - val buildConfigFactory = jobConfig.getCoordinatorStreamFactory() + val buildConfigFactory = jobConfig.getCoordinatorStreamFactory val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config) new MapConfig(coordinatorSystemConfig); diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java index 8c51c642f0..787edf217a 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java @@ -21,21 +21,29 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.samza.Partition; import org.apache.samza.SamzaException; +import org.apache.samza.application.MockStreamApplication; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory; import org.apache.samza.coordinator.StreamPartitionCountMonitor; +import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer; import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; +import org.apache.samza.execution.RemoteJobPlanner; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.startpoint.StartpointManager; import org.apache.samza.system.MockSystemFactory; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.ConfigUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.junit.After; import org.junit.Before; @@ -51,7 +59,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.AdditionalMatchers.aryEq; -import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -59,16 +67,21 @@ import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mock; import static org.powermock.api.mockito.PowerMockito.verifyPrivate; +import static org.powermock.api.mockito.PowerMockito.verifyNew; /** * Tests for {@link ClusterBasedJobCoordinator} */ @RunWith(PowerMockRunner.class) -@PrepareForTest({CoordinatorStreamUtil.class, ClusterBasedJobCoordinator.class}) +@PrepareForTest({ + CoordinatorStreamUtil.class, + ClusterBasedJobCoordinator.class, + CoordinatorStreamStore.class, + RemoteJobPlanner.class}) public class TestClusterBasedJobCoordinator { - Map configMap; + private Map configMap; @Before public void setUp() { @@ -106,7 +119,7 @@ public void testPartitionCountMonitorWithDurableStates() { CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); producer.writeConfig("test-job", config); - ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); + ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config); // change the input system stream metadata MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>()); @@ -126,7 +139,7 @@ public void testPartitionCountMonitorWithoutDurableStates() { CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); producer.writeConfig("test-job", config); - ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); + ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config); // change the input system stream metadata MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>()); @@ -144,7 +157,7 @@ public void testVerifyStartpointManagerFanOut() throws IOException { Config config = new MapConfig(configMap); MockitoException stopException = new MockitoException("Stop"); - ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(new ClusterBasedJobCoordinator(config)); + ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(ClusterBasedJobCoordinator.createFromMetadataStore(config)); ContainerProcessManager mockContainerProcessManager = mock(ContainerProcessManager.class); doReturn(true).when(mockContainerProcessManager).shouldShutdown(); StartpointManager mockStartpointManager = mock(StartpointManager.class); @@ -198,4 +211,33 @@ public void testRunWithClassLoader() throws Exception { // make sure runClusterBasedJobCoordinator only got called once verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)}); } + + @Test(expected = SamzaException.class) + public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() { + ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig()); + } + + @Test + public void testCreateFromConfigLoader() throws Exception { + Map config = new HashMap<>(); + config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName()); + config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); + config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", + getClass().getResource("/test.properties").getPath()); + Config submissionConfig = new MapConfig(config); + JobConfig fullJobConfig = new JobConfig(ConfigUtil.loadConfig(submissionConfig)); + + RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class); + CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class); + + PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class)); + PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any()); + PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore); + PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner); + when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig)); + + ClusterBasedJobCoordinator.createFromConfigLoader(submissionConfig); + + verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig)); + } } diff --git a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java index 218eb179fd..c005f37a35 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java @@ -24,9 +24,11 @@ import com.google.common.collect.ImmutableMap; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -161,6 +163,34 @@ public void testApplyRewriterClassDoesNotExist() { assertEquals(expectedConfig, ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } + @Test(expected = ConfigException.class) + public void testLoadConfigWithoutLoader() { + Map config = new HashMap<>(); + config.put(JobConfig.JOB_NAME, "new-test-job"); + + ConfigUtil.loadConfig(new MapConfig(config)); + } + + @Test + public void testLoadConfigWithOverridesAndRewrites() { + Map config = new HashMap<>(); + config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); + config.put(JobConfig.JOB_NAME, "new-test-job"); + config.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + config.put(CONFIG_KEY, CONFIG_VALUE); + config.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath()); + + Config actual = ConfigUtil.loadConfig(new MapConfig(config)); + + assertEquals("org.apache.samza.job.MockJobFactory", actual.get("job.factory.class")); + assertEquals("bar", actual.get("foo")); + // overridden value + assertEquals("new-test-job", actual.get("job.name")); + // rewritten value + assertEquals(CONFIG_VALUE, actual.get(NEW_CONFIG_KEY)); + } + /** * Adds a new config entry for the key {@link #NEW_CONFIG_KEY} which has the same value as {@link #CONFIG_KEY}. */