diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java index ca46d1c3e1..4ae4886d38 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java @@ -53,9 +53,9 @@ import org.apache.samza.table.TableConfigGenerator; import org.apache.samza.table.descriptors.LocalTableDescriptor; import org.apache.samza.table.descriptors.TableDescriptor; +import org.apache.samza.util.ConfigUtil; import org.apache.samza.util.MathUtil; import org.apache.samza.util.StreamUtil; -import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +80,7 @@ static Config mergeConfig(Map originalConfig, Map originalConfig, Map generatedConfig) { diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerUtil.java index 28dfeb1ffc..af649dd039 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerUtil.java @@ -21,7 +21,7 @@ import org.apache.samza.application.ApplicationUtil; import org.apache.samza.config.Config; -import org.apache.samza.util.Util; +import org.apache.samza.util.ConfigUtil; /** @@ -39,7 +39,7 @@ public class ApplicationRunnerUtil { * @return the {@link ApplicationRunner} object. */ public static ApplicationRunner invoke(Config originalConfig, ApplicationRunnerOperation op) { - Config config = Util.rewriteConfig(originalConfig); + Config config = ConfigUtil.rewriteConfig(originalConfig); ApplicationRunner appRunner = ApplicationRunners.getApplicationRunner(ApplicationUtil.fromConfig(config), config); diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java new file mode 100644 index 0000000000..8567ecdb7e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; + + +public class ConfigUtil { + /** + * Re-writes configuration using a ConfigRewriter, if one is defined. If + * there is no ConfigRewriter defined for the job, then this method is a + * no-op. + * + * @param config The config to re-write + * @return rewrited configs + */ + static public Config rewriteConfig(Config config) { + try { + final String rewriters = config.get(JobConfig.CONFIG_REWRITERS, ""); + if (!rewriters.isEmpty()) { + Map resultConfig = new HashMap<>(config); + for (String rewriter : rewriters.split(",")) { + String rewriterClassCfg = String.format(JobConfig.CONFIG_REWRITER_CLASS, rewriter); + String rewriterClass = config.get(rewriterClassCfg, ""); + if (rewriterClass.isEmpty()) { + throw new SamzaException( + "Unable to find class config for config rewriter: " + rewriterClassCfg); + } + ConfigRewriter configRewriter = (ConfigRewriter) Class.forName(rewriterClass).newInstance(); + Config rewritedConfig = configRewriter.rewrite(rewriter, config); + resultConfig.putAll(rewritedConfig); + } + return new MapConfig(resultConfig); + } else { + return config; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index cbbca1a239..b9241d14f1 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -29,7 +29,7 @@ import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLi import org.apache.samza.runtime.ApplicationRunnerOperation import org.apache.samza.system.{StreamSpec, SystemAdmins} import org.apache.samza.util.ScalaJavaUtil.JavaOptionals -import org.apache.samza.util.{CoordinatorStreamUtil, Logging, StreamUtil, Util} +import org.apache.samza.util._ import scala.collection.JavaConverters._ @@ -43,7 +43,7 @@ object JobRunner extends Logging { val config = cmdline.loadConfig(options) val operation = cmdline.getOperation(options) - val runner = new JobRunner(Util.rewriteConfig(config)) + val runner = new JobRunner(ConfigUtil.rewriteConfig(config)) doOperation(runner, operation) } diff --git a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java new file mode 100644 index 0000000000..f771a99010 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.MapConfig; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestConfigUtil { + Map configMap = new HashMap<>(); + + @Before + public void setup() { + configMap.put("job.config.rewriter.testRewriter.class", TestConfigRewriter.class.getName()); + configMap.put("job.config.rewriter.testNoneRewriter.class", ""); + + } + + @Test + public void testRewriterWithConfigRewriter() { + configMap.put("job.config.rewriters", "testRewriter"); + configMap.put("job.config.rewriter.testRewriter.value", "rewrittenTest"); + + Config config = ConfigUtil.rewriteConfig(new MapConfig(configMap)); + assertEquals("rewrittenTest", config.get("value")); + } + + @Test + public void testGetRewriterWithoutConfigRewriter() { + Config config = ConfigUtil.rewriteConfig(new MapConfig(configMap)); + assertEquals(config, new MapConfig(configMap)); + } + + @Test (expected = RuntimeException.class) + public void testGetRewriterWithExceptoion() { + configMap.put("job.config.rewriters", "testNoneRewriter"); + ConfigUtil.rewriteConfig(new MapConfig(configMap)); + } + + public static class TestConfigRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + Map configMap = new HashMap<>(config); + configMap.putAll(config.subset(String.format("job.config.rewriter.%s.", name))); + return new MapConfig(configMap); + } + } +} diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java index ed73725011..8edfa27f9c 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java @@ -28,7 +28,7 @@ import org.apache.samza.runtime.ApplicationRunnerMain; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.ApplicationRunners; -import org.apache.samza.util.Util; +import org.apache.samza.util.ConfigUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,7 @@ public static void main(String[] args) throws Exception { ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine(); OptionSet options = cmdLine.parser().parse(args); Config orgConfig = cmdLine.loadConfig(options); - Config config = Util.rewriteConfig(orgConfig); + Config config = ConfigUtil.rewriteConfig(orgConfig); SamzaApplication app = ApplicationUtil.fromConfig(config); ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);