diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 26b466199a..35ddcab50b 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -448,9 +448,11 @@

Samza Configuration Reference

job.coordinator.factory - + org.apache.samza.zk.ZkJobCoordinatorFactory - Class to use for job coordination. Currently available values are: + The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator. + The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors.
+ Samza supports the following coordination modes out of the box.
org.apache.samza.standalone.PassthroughJobCoordinatorFactory
Fixed partition mapping. No Zoookeeper.
@@ -461,20 +463,6 @@

Samza Configuration Reference

Required only for non-cluster-managed applications. Please see the required value for task-name-grouper-factory - - job.coordination.utils.factory - org.apache.samza.zk.ZkCoordinationUtilsFactory - - Class to use to create CoordinationUtils. Currently available values are: -
-
org.apache.samza.zk.ZkCoordinationUtilsFactory
-
ZooKeeper based coordination utils.
-
org.apache.samza.coordinator.AzureCoordinationUtilsFactory
-
Azure based coordination utils.
- These coordination utils are currently used for intermediate stream creation. -
- - job.logged.store.base.dir @@ -539,7 +527,7 @@

Samza Configuration Reference

job.debounce.time.ms - 2000 + 20000 How long the Leader processor will wait before recalculating the JobModel on change of registered processors. diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index 23227276c1..60c43c34b2 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -22,33 +22,44 @@ import com.google.common.base.Strings; import org.apache.samza.SamzaException; import org.apache.samza.coordinator.CoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.util.Util; import org.apache.samza.zk.ZkCoordinationUtilsFactory; +import org.apache.samza.zk.ZkJobCoordinatorFactory; public class JobCoordinatorConfig extends MapConfig { public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; - public static final String JOB_COORDINATION_UTILS_FACTORY = "job.coordination.utils.factory"; - public final static String DEFAULT_COORDINATION_UTILS_FACTORY = ZkCoordinationUtilsFactory.class.getName(); + public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName(); + private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory"; + private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory"; public JobCoordinatorConfig(Config config) { super(config); } public String getJobCoordinationUtilsFactoryClassName() { - String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY); + String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_FACTORY); - if (Strings.isNullOrEmpty(className)) { - throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className); + String coordinationUtilsFactory; + if (AZURE_COORDINATOR_FACTORY.equals(coordinatorFactory)) { + coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY; + } else if (PassthroughJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) { + coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName(); + } else if (ZkJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) { + coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName(); + } else { + throw new SamzaException(String.format("Coordination factory: %s defined by the config: %s is invalid.", coordinatorFactory, JOB_COORDINATOR_FACTORY)); } try { - Class.forName(className); + Class.forName(coordinationUtilsFactory); } catch (ClassNotFoundException e) { throw new SamzaException( - "Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e); + "Failed to validate config value for " + JOB_COORDINATOR_FACTORY + " = " + coordinationUtilsFactory, e); } - return className; + return coordinationUtilsFactory; } public CoordinationUtilsFactory getCoordinationUtilsFactory() { @@ -61,10 +72,9 @@ public CoordinationUtilsFactory getCoordinationUtilsFactory() { public String getJobCoordinatorFactoryClassName() { String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY); if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) { - throw new ConfigException( - String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY)); + return ZkJobCoordinatorFactory.class.getName(); + } else { + return jobCoordinatorFactoryClassName; } - - return jobCoordinatorFactoryClassName; } } diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java deleted file mode 100644 index 2ef92b5c14..0000000000 --- a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config; - -import java.util.HashMap; -import java.util.Map; -import junit.framework.Assert; -import org.apache.samza.SamzaException; -import org.apache.samza.zk.ZkCoordinationUtilsFactory; -import org.junit.Test; - - -public class TestJobCoordinatorConfig { - - private final static String NONEXISTING_FACTORY_CLASS = "AnotherFactory"; - private final static String ANOTHER_FACTORY_CLASS = TestJobCoordinatorConfig.class.getName(); // any valid name - - @Test - public void testJobCoordinationUtilsFactoryConfig() { - - Map map = new HashMap<>(); - JobCoordinatorConfig jConfig = new JobCoordinatorConfig(new MapConfig(map)); - - // test default value - Assert.assertEquals(ZkCoordinationUtilsFactory.class.getName(), jConfig.getJobCoordinationUtilsFactoryClassName()); - - map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, ANOTHER_FACTORY_CLASS); - jConfig = new JobCoordinatorConfig(new MapConfig(map)); - Assert.assertEquals(ANOTHER_FACTORY_CLASS, jConfig.getJobCoordinationUtilsFactoryClassName()); - - // failure case - map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, NONEXISTING_FACTORY_CLASS); - jConfig = new JobCoordinatorConfig(new MapConfig(map)); - try { - jConfig.getJobCoordinationUtilsFactoryClassName(); - Assert.fail("Failed to validate loading of fake class: " + NONEXISTING_FACTORY_CLASS); - } catch (SamzaException e) { - // expected - } - } -} diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java index a96fd0893c..97168d8f16 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java @@ -74,7 +74,6 @@ public static Map fetchStaticConfigsWithFactories(Map - *
  • "job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}
  • *
  • "job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}
  • *
  • "task.name.grouper.factory" = {@link SingleContainerGrouperFactory}
  • *
  • "job.name" = "test-samza"
  • @@ -98,7 +96,6 @@ private TestRunner() { this.inMemoryScope = RandomStringUtils.random(10, true, true); configs.put(JobConfig.JOB_NAME(), JOB_NAME); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index d2aab116f5..136507453f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -39,7 +39,6 @@ import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.test.controlmessages.TestData.PageView; import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; @@ -81,7 +80,6 @@ public void testPipeline() throws Exception { configs.put(JobConfig.JOB_NAME(), "test-eos-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index 05818e96b9..6e60f467b5 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -58,7 +58,6 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerdeFactory; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemAdmin; @@ -133,7 +132,6 @@ public void testWatermark() throws Exception { configs.put(JobConfig.JOB_NAME(), "test-watermark-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java index 658492a7a8..7e89fa930a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java @@ -52,7 +52,6 @@ public void testJob() throws InterruptedException { configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory"); configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); runApplication(new TestSchedulingApp(), "SchedulingTest", configs); diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index 144f1253a2..340f0e75a0 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -82,7 +82,6 @@ public void testRepartitionJoinWindowAppWithoutDeletionOnCommit() throws Excepti String appName = "UserPageAdClickCounter"; Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "false"); @@ -112,7 +111,6 @@ public void testRepartitionJoinWindowAppAndDeleteMessagesOnCommit() throws Excep final String appName = "UserPageAdClickCounter2"; Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "true"); @@ -160,7 +158,6 @@ public void testBroadcastApp() { String outputTopicName = "user-ad-click-counts"; Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1); diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java index 2e1de96394..2f08fede28 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -61,7 +61,6 @@ public void testRepartitionedSessionWindowCounter() throws Exception { Map configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index 419f6c87ec..46cbdad9bf 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -48,7 +48,6 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; @@ -308,7 +307,6 @@ static Map getBaseJobConfig(String bootstrapUrl, String zkConnec configs.put(JobConfig.JOB_NAME(), "test-table-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());