diff --git a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java index bfbc2979e9..7b44a3a08f 100644 --- a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java @@ -29,7 +29,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemFactory; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; /** * Config helper methods related to systems. @@ -117,7 +117,7 @@ public Map getSystemFactories() { systemName -> { String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException( String.format("A stream uses system %s, which is missing from the configuration.", systemName))); - return Util.getObj(systemFactoryClassName, SystemFactory.class); + return ReflectionUtil.getObj(systemFactoryClassName, SystemFactory.class); })); return systemFactories; diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java index 902ea50fc9..000e55a4e0 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java @@ -36,7 +36,7 @@ import org.apache.samza.metadatastore.MetadataStoreFactory; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.StreamSpec; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; import org.apache.samza.zk.ZkMetadataStoreFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,7 +210,8 @@ private MetadataStore getMetadataStore() { if (metadataStoreFactoryClass == null) { metadataStoreFactoryClass = DEFAULT_METADATA_STORE_FACTORY; } - MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); + MetadataStoreFactory metadataStoreFactory = + ReflectionUtil.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); return metadataStoreFactory.getMetadataStore(STREAM_CREATION_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap()); } } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 44adfde3a9..bf6dfce2d8 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -64,7 +64,6 @@ import org.apache.samza.task.TaskFactoryUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.apache.samza.util.ReflectionUtil; -import org.apache.samza.util.Util; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkMetadataStoreFactory; import org.slf4j.Logger; @@ -384,7 +383,8 @@ private void cleanup() { */ private MetadataStore getMetadataStoreForRunID() { String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY); - MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); + MetadataStoreFactory metadataStoreFactory = + ReflectionUtil.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap()); } diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java index 8567ecdb7e..7d86bf52a0 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java @@ -19,16 +19,18 @@ package org.apache.samza.util; -import java.util.HashMap; -import java.util.Map; +import java.util.Optional; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigRewriter; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConfigUtil { + private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class); + /** * Re-writes configuration using a ConfigRewriter, if one is defined. If * there is no ConfigRewriter defined for the job, then this method is a @@ -37,28 +39,32 @@ public class ConfigUtil { * @param config The config to re-write * @return rewrited configs */ - static public Config rewriteConfig(Config config) { - try { - final String rewriters = config.get(JobConfig.CONFIG_REWRITERS, ""); - if (!rewriters.isEmpty()) { - Map resultConfig = new HashMap<>(config); - for (String rewriter : rewriters.split(",")) { - String rewriterClassCfg = String.format(JobConfig.CONFIG_REWRITER_CLASS, rewriter); - String rewriterClass = config.get(rewriterClassCfg, ""); - if (rewriterClass.isEmpty()) { - throw new SamzaException( - "Unable to find class config for config rewriter: " + rewriterClassCfg); - } - ConfigRewriter configRewriter = (ConfigRewriter) Class.forName(rewriterClass).newInstance(); - Config rewritedConfig = configRewriter.rewrite(rewriter, config); - resultConfig.putAll(rewritedConfig); - } - return new MapConfig(resultConfig); - } else { - return config; + public static Config rewriteConfig(Config config) { + Optional configRewriterNamesOptional = new JobConfig(config).getConfigRewriters(); + if (configRewriterNamesOptional.isPresent()) { + String[] configRewriterNames = configRewriterNamesOptional.get().split(","); + Config rewrittenConfig = config; + for (String configRewriterName : configRewriterNames) { + rewrittenConfig = applyRewriter(rewrittenConfig, configRewriterName); } - } catch (Exception e) { - throw new RuntimeException(e); + return rewrittenConfig; + } else { + return config; } } + + /** + * Re-writes configuration using a ConfigRewriter, defined with the given rewriterName in config. + * @param config the config to re-write + * @param rewriterName the name of the rewriter to apply + * @return the rewritten config + */ + public static Config applyRewriter(Config config, String rewriterName) { + String rewriterClassName = new JobConfig(config).getConfigRewriterClass(rewriterName) + .orElseThrow(() -> new SamzaException( + String.format("Unable to find class config for config rewriter %s.", rewriterName))); + ConfigRewriter rewriter = ReflectionUtil.getObj(rewriterClassName, ConfigRewriter.class); + LOG.info("Re-writing config with {}", rewriter); + return rewriter.rewrite(rewriterName, config); + } } diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java index 4d2a3bcc1e..e3ff250812 100644 --- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java @@ -132,7 +132,7 @@ public static Optional> buildD } // Create a systemProducer for giving to diagnostic-reporter and diagnosticsManager - SystemFactory systemFactory = Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class); + SystemFactory systemFactory = ReflectionUtil.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class); SystemProducer systemProducer = systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap()); DiagnosticsManager diagnosticsManager = diff --git a/samza-core/src/main/java/org/apache/samza/util/Util.java b/samza-core/src/main/java/org/apache/samza/util/Util.java new file mode 100644 index 0000000000..875b6da6d9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/Util.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import com.google.common.collect.Lists; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.TaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Util { + private static final Logger LOG = LoggerFactory.getLogger(Util.class); + + static final String FALLBACK_VERSION = "0.0.1"; + + /** + * Make an environment variable string safe to pass. + */ + public static String envVarEscape(String str) { + return str.replace("\"", "\\\"").replace("'", "\\'"); + } + + public static String getSamzaVersion() { + return Optional.ofNullable(Util.class.getPackage().getImplementationVersion()).orElseGet(() -> { + LOG.warn("Unable to find implementation samza version in jar's meta info. Defaulting to {}", FALLBACK_VERSION); + return FALLBACK_VERSION; + }); + } + + public static String getTaskClassVersion(Config config) { + try { + Optional appClass = Optional.ofNullable(new ApplicationConfig(config).getAppClass()); + if (appClass.isPresent()) { + return Optional.ofNullable(Class.forName(appClass.get()).getPackage().getImplementationVersion()) + .orElse(FALLBACK_VERSION); + } else { + Optional taskClass = new TaskConfig(config).getTaskClass(); + if (taskClass.isPresent()) { + return Optional.ofNullable(Class.forName(taskClass.get()).getPackage().getImplementationVersion()) + .orElse(FALLBACK_VERSION); + } else { + LOG.warn("Unable to find app class or task class. Defaulting to {}", FALLBACK_VERSION); + return FALLBACK_VERSION; + } + } + } catch (Exception e) { + LOG.warn(String.format("Ran into exception while trying to get version of app or task. Defaulting to %s", + FALLBACK_VERSION), e); + return FALLBACK_VERSION; + } + } + + /** + * Returns the the first host address which is not the loopback address, or {@link InetAddress#getLocalHost} as a + * fallback. + * + * @return the {@link InetAddress} which represents the localhost + */ + public static InetAddress getLocalHost() { + try { + return doGetLocalHost(); + } catch (Exception e) { + throw new SamzaException("Error while getting localhost", e); + } + } + + private static InetAddress doGetLocalHost() throws UnknownHostException, SocketException { + InetAddress localHost = InetAddress.getLocalHost(); + if (localHost.isLoopbackAddress()) { + LOG.debug("Hostname {} resolves to a loopback address, trying to resolve an external IP address.", + localHost.getHostName()); + List networkInterfaces; + if (System.getProperty("os.name").startsWith("Windows")) { + networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); + } else { + networkInterfaces = Lists.reverse(Collections.list(NetworkInterface.getNetworkInterfaces())); + } + for (NetworkInterface networkInterface : networkInterfaces) { + List addresses = Collections.list(networkInterface.getInetAddresses()) + .stream() + .filter(address -> !(address.isLinkLocalAddress() || address.isLoopbackAddress())) + .collect(Collectors.toList()); + if (!addresses.isEmpty()) { + InetAddress address = addresses.stream() + .filter(addr -> addr instanceof Inet4Address) + .findFirst() + .orElseGet(() -> addresses.get(0)); + LOG.debug("Found an external IP address {} which represents the localhost.", address.getHostAddress()); + return InetAddress.getByAddress(address.getAddress()); + } + } + } + return localHost; + } +} \ No newline at end of file diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 74ab562b07..eff1e73955 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -32,7 +32,7 @@ import org.apache.samza.container.TaskName import org.apache.samza.job.JobRunner.info import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.{CommandLine, CoordinatorStreamUtil, Logging, ReflectionUtil, Util} +import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, Logging, ReflectionUtil, Util} import org.apache.samza.Partition import org.apache.samza.SamzaException @@ -138,7 +138,7 @@ object CheckpointTool { val options = cmdline.parser.parse(args: _*) val userConfig = cmdline.loadConfig(options) val jobConfig = JobPlanner.generateSingleJobConfig(userConfig) - val rewrittenConfig = Util.rewriteConfig(jobConfig) + val rewrittenConfig = ConfigUtil.rewriteConfig(jobConfig) info(s"Using the rewritten config: $rewrittenConfig") val tool = CheckpointTool(rewrittenConfig, cmdline.newOffsets) tool.run() diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 76245b84ec..07055de1fc 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -47,7 +47,7 @@ import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.runtime.LocationId import org.apache.samza.system._ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals -import org.apache.samza.util.{Logging, ReflectionUtil, Util} +import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil, Util} import scala.collection.JavaConverters import scala.collection.JavaConversions._ @@ -274,7 +274,7 @@ object JobModelManager extends Logging { filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName)) .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)). - foldLeft(config)(Util.applyRewriter(_, _)) + foldLeft(config)(ConfigUtil.applyRewriter(_, _)) case _ => config } } diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java index 80ddf0dc5f..0900c138fd 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java @@ -21,11 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; @@ -36,11 +34,9 @@ import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStream; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import scala.collection.JavaConverters; /** @@ -146,24 +142,15 @@ public DiagnosticsManager(String jobName, resetTime = Instant.now(); try { - - Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", - JavaConverters.collectionAsScalaIterableConverter( - Collections.singletonList(new Tuple2, Object>(DiagnosticsManager.class, this))) - .asScala() - .toSeq()); - + ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", + Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class)); LOG.info("Attached log4j diagnostics appender."); - } catch (ClassNotFoundException | InstantiationException | InvocationTargetException e) { + } catch (Exception e) { try { - Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", - JavaConverters.collectionAsScalaIterableConverter( - Collections.singletonList(new Tuple2, Object>(DiagnosticsManager.class, this))) - .asScala() - .toSeq()); - LOG.info("Attached log4j diagnostics appender."); - } catch (ClassNotFoundException | InstantiationException | InvocationTargetException ex) { - + ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", + Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class)); + LOG.info("Attached log4j2 diagnostics appender."); + } catch (Exception ex) { LOG.warn( "Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream.", ex); diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index ec5f4d971f..441d834f9c 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -19,7 +19,7 @@ package org.apache.samza.metrics.reporter -import org.apache.samza.util.{Logging, StreamUtil, Util} +import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util} import org.apache.samza.SamzaException import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig} import org.apache.samza.metrics.MetricsReporter @@ -53,7 +53,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName)) - val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory]) + val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory]) info("Got system factory %s." format systemFactory) @@ -71,7 +71,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging val serde = if (serdeName != null) { JavaOptionals.toRichOptional(serializerConfig.getSerdeFactoryClass(serdeName)).toOption match { case Some(serdeClassName) => - Util.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config) + ReflectionUtil.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config) case _ => null } } else { diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala index 14dd1c915f..b97afad119 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala @@ -33,19 +33,19 @@ import scala.collection.JavaConverters._ */ class CommandLine { val parser = new OptionParser() - val configFactoryOpt = + val configFactoryOpt = parser.accepts("config-factory", "The config factory to use to read your config file.") .withRequiredArg .ofType(classOf[java.lang.String]) .describedAs("com.foo.bar.ClassName") .defaultsTo(classOf[PropertiesConfigFactory].getName) val configPathOpt = - parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + + parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + "If multiple files are given they are all used with later files overriding any values that appear in earlier files.") .withRequiredArg .ofType(classOf[URI]) .describedAs("path") - val configOverrideOpt = + val configOverrideOpt = parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.") .withRequiredArg .ofType(classOf[KeyValuePair]) @@ -63,7 +63,7 @@ class CommandLine { // Set up the job parameters. val configFactoryClassName = options.valueOf(configFactoryOpt) val configPaths = options.valuesOf(configPathOpt) - configFactory = Util.getObj(configFactoryClassName, classOf[ConfigFactory]) + configFactory = ReflectionUtil.getObj(configFactoryClassName, classOf[ConfigFactory]) val configOverrides = options.valuesOf(configOverrideOpt).asScala.map(kv => (kv.key, kv.value)).toMap val configs: Buffer[java.util.Map[String, String]] = configPaths.asScala.map(configFactory.getConfig) diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index 810345e005..f108387a2c 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -86,7 +86,7 @@ object CoordinatorStreamUtil extends Logging { val systemConfig = new SystemConfig(config) val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName)) - Util.getObj(systemFactoryClassName, classOf[SystemFactory]) + ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory]) } /** diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index f45f28e901..d416340686 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -25,8 +25,6 @@ import java.io._ import java.nio.file._ import java.util.zip.CRC32 -import org.apache.samza.util.Util.info - class FileUtil extends Logging { /** * Writes checksum & data to a file diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala index ea5eb5a8cc..577bba63da 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala @@ -25,9 +25,8 @@ import java.io.{BufferedReader, IOException, InputStream, InputStreamReader} import java.net.{HttpURLConnection, URL} import org.apache.samza.SamzaException -import org.apache.samza.util.Util.{error, warn} -object HttpUtil { +object HttpUtil extends Logging { /** * Reads a URL and returns the response body as a string. Retries in an exponential backoff, but does no other error handling. diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala deleted file mode 100644 index 1323cd2ec6..0000000000 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util - - -import java.lang.reflect.InvocationTargetException - -import org.apache.samza.config._ -import org.apache.samza.SamzaException -import java.net.Inet4Address -import java.net.InetAddress -import java.net.NetworkInterface -import java.util.Random - -import org.apache.samza.util.ScalaJavaUtil.JavaOptionals - -import scala.collection.JavaConverters._ - - -object Util extends Logging { - private val FALLBACK_VERSION = "0.0.1" - val Random = new Random - - /** - * Make an environment variable string safe to pass. - */ - def envVarEscape(str: String) = str.replace("\"", "\\\"").replace("'", "\\'") - - /** - * Get a random number >= startInclusive, and < endExclusive. - */ - def randomBetween(startInclusive: Int, endExclusive: Int) = - startInclusive + Random.nextInt(endExclusive - startInclusive) - - /** - * Instantiate an object of type T from a given className. - * - * Deprecated: Use [[ReflectionUtil.getObj(String, Class)]] instead. - */ - @Deprecated - def getObj[T](className: String, clazz: Class[T]) = { - try { - Class - .forName(className) - .newInstance - .asInstanceOf[T] - } catch { - case e: Throwable => { - error("Unable to create an instance for class %s." format className, e) - throw e - } - } - } - - def getSamzaVersion(): String = { - Option(this.getClass.getPackage.getImplementationVersion) - .getOrElse({ - warn("Unable to find implementation samza version in jar's meta info. Defaulting to %s" format FALLBACK_VERSION) - FALLBACK_VERSION - }) - } - - def getTaskClassVersion(config: Config): String = { - try { - val appClass = Option(new ApplicationConfig(config).getAppClass) - if (appClass.isDefined) { - Option.apply(Class.forName(appClass.get).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION) - } else { - val taskClass = new TaskConfig(config).getTaskClass - if (taskClass.isPresent) { - Option.apply(Class.forName(taskClass.get()).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION) - } else { - warn("Unable to find app class or task class. Defaulting to %s" format FALLBACK_VERSION) - FALLBACK_VERSION - } - } - } catch { - case e: Exception => { - warn("Unable to find implementation version in jar's meta info. Defaulting to %s" format FALLBACK_VERSION) - FALLBACK_VERSION - } - } - } - - /** - * Instantiate an object from given className, and given constructor parameters. - * - * Deprecated: Use [[ReflectionUtil.getObjWithArgs(String, Class, ConstructorArgument...)]] instead. - */ - @Deprecated - @throws[ClassNotFoundException] - @throws[InstantiationException] - @throws[InvocationTargetException] - def getObj(className: String, constructorParams: (Class[_], Object)*) = { - try { - Class.forName(className).getDeclaredConstructor(constructorParams.map(x => x._1): _*) - .newInstance(constructorParams.map(x => x._2): _*) - } catch { - case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => { - warn("Could not instantiate an instance for class %s." format className, e) - throw e - } - } - } - - /** - * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback - * - * @return the [[java.net.InetAddress]] which represents the localhost - */ - def getLocalHost: InetAddress = { - val localHost = InetAddress.getLocalHost - if (localHost.isLoopbackAddress) { - debug("Hostname %s resolves to a loopback address, trying to resolve an external IP address.".format(localHost.getHostName)) - val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) { - NetworkInterface.getNetworkInterfaces.asScala.toList - } else { - NetworkInterface.getNetworkInterfaces.asScala.toList.reverse - } - for (networkInterface <- networkInterfaces) { - val addresses = networkInterface.getInetAddresses.asScala.toList - .filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress) - if (addresses.nonEmpty) { - val address = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) - debug("Found an external IP address %s which represents the localhost.".format(address.getHostAddress)) - return InetAddress.getByAddress(address.getAddress) - } - } - } - localHost - } - - /** - * Re-writes configuration using a ConfigRewriter, if one is defined. If - * there is no ConfigRewriter defined for the job, then this method is a - * no-op. - * - * @param config The config to re-write - * @return re-written config - */ - def rewriteConfig(config: Config): Config = { - JavaOptionals.toRichOptional(new JobConfig(config).getConfigRewriters).toOption match { - case Some(rewriters) => rewriters.split(",").foldLeft(config)(applyRewriter(_, _)) - case _ => config - } - } - - /** - * Re-writes configuration using a ConfigRewriter, defined with the given rewriterName in config. - * @param config the config to re-write - * @param rewriterName the name of the rewriter to apply - * @return the rewritten config - */ - def applyRewriter(config: Config, rewriterName: String): Config = { - val rewriterClassName = JavaOptionals.toRichOptional(new JobConfig(config).getConfigRewriterClass(rewriterName)) - .toOption - .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName)) - val rewriter = ReflectionUtil.getObj(rewriterClassName, classOf[ConfigRewriter]) - info("Re-writing config with " + rewriter) - rewriter.rewrite(rewriterName, config) - } - -} diff --git a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java index f771a99010..218eb179fd 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java @@ -21,52 +21,186 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.collect.ImmutableMap; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class TestConfigUtil { - Map configMap = new HashMap<>(); + private static final String CONFIG_KEY = "config.key"; + private static final String CONFIG_VALUE = "value"; + private static final String NEW_CONFIG_KEY = "new.rewritten.config.key"; + private static final String REWRITER_NAME = "propertyRewriter"; + private static final String OTHER_REWRITER_NAME = "otherPropertyRewriter"; - @Before - public void setup() { - configMap.put("job.config.rewriter.testRewriter.class", TestConfigRewriter.class.getName()); - configMap.put("job.config.rewriter.testNoneRewriter.class", ""); + @Test + public void testRewriteConfig() { + Map baseConfigMap = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); + + // no rewriters + Map fullConfig = new HashMap<>(baseConfigMap); + assertEquals(fullConfig, ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // rewriter that adds property + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + Map expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // rewriter that updates property + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // rewriter that removes property + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + DeletePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.remove(CONFIG_KEY); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + + // only apply rewriters from rewriters list + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, OTHER_REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); + // two rewriters; second rewriter overwrites configs from first + fullConfig = new HashMap<>(baseConfigMap); + fullConfig.put(JobConfig.CONFIG_REWRITERS, REWRITER_NAME + "," + OTHER_REWRITER_NAME); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), NewPropertyRewriter.class.getName()); + fullConfig.put(String.format(JobConfig.CONFIG_REWRITER_CLASS, OTHER_REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.rewriteConfig(new MapConfig(fullConfig))); } - @Test - public void testRewriterWithConfigRewriter() { - configMap.put("job.config.rewriters", "testRewriter"); - configMap.put("job.config.rewriter.testRewriter.value", "rewrittenTest"); + /** + * This fails because Util will interpret the empty string value as a single rewriter which has the empty string as a + * name, and there is no rewriter class config for a rewriter name which is the empty string. + * TODO: should this be fixed to interpret the empty string as an empty list? + */ + @Test(expected = SamzaException.class) + public void testRewriteConfigConfigRewritersEmptyString() { + Config config = new MapConfig(ImmutableMap.of(JobConfig.CONFIG_REWRITERS, "")); + ConfigUtil.rewriteConfig(config); + } + + @Test(expected = SamzaException.class) + public void testRewriteConfigNoClassForConfigRewriterName() { + Config config = + new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, "unknownRewriter")); + ConfigUtil.rewriteConfig(config); + } - Config config = ConfigUtil.rewriteConfig(new MapConfig(configMap)); - assertEquals("rewrittenTest", config.get("value")); + @Test(expected = SamzaException.class) + public void testRewriteConfigRewriterClassDoesNotExist() { + Config config = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, JobConfig.CONFIG_REWRITERS, REWRITER_NAME, + String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), "not_a_class")); + ConfigUtil.rewriteConfig(config); } @Test - public void testGetRewriterWithoutConfigRewriter() { - Config config = ConfigUtil.rewriteConfig(new MapConfig(configMap)); - assertEquals(config, new MapConfig(configMap)); + public void testApplyRewriter() { + // new property + Map fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + NewPropertyRewriter.class.getName()); + Map expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(NEW_CONFIG_KEY, CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + + // update property + fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + UpdatePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.put(CONFIG_KEY, CONFIG_VALUE + CONFIG_VALUE); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + + // remove property + fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + DeletePropertyRewriter.class.getName()); + expectedConfigMap = new HashMap<>(fullConfig); + expectedConfigMap.remove(CONFIG_KEY); + assertEquals(new MapConfig(expectedConfigMap), ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } - @Test (expected = RuntimeException.class) - public void testGetRewriterWithExceptoion() { - configMap.put("job.config.rewriters", "testNoneRewriter"); - ConfigUtil.rewriteConfig(new MapConfig(configMap)); + @Test(expected = SamzaException.class) + public void testApplyRewriterNoClassForConfigRewriterName() { + Map fullConfig = ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE); + ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME); + } + + @Test(expected = SamzaException.class) + public void testApplyRewriterClassDoesNotExist() { + Map fullConfig = + ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, String.format(JobConfig.CONFIG_REWRITER_CLASS, REWRITER_NAME), + "not_a_class"); + Config expectedConfig = new MapConfig(ImmutableMap.of(CONFIG_KEY, CONFIG_VALUE, NEW_CONFIG_KEY, CONFIG_VALUE)); + assertEquals(expectedConfig, ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); + } + + /** + * Adds a new config entry for the key {@link #NEW_CONFIG_KEY} which has the same value as {@link #CONFIG_KEY}. + */ + public static class NewPropertyRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + ImmutableMap.Builder newConfigMapBuilder = new ImmutableMap.Builder<>(); + newConfigMapBuilder.putAll(config); + newConfigMapBuilder.put(NEW_CONFIG_KEY, config.get(CONFIG_KEY)); + return new MapConfig(newConfigMapBuilder.build()); + } + } + + /** + * If an entry at {@link #NEW_CONFIG_KEY} exists, overwrites it to be the value concatenated with itself. Otherwise, + * updates the entry at {@link #CONFIG_KEY} to be the value concatenated to itself. + */ + public static class UpdatePropertyRewriter implements ConfigRewriter { + @Override + public Config rewrite(String name, Config config) { + Map newConfigMap = new HashMap<>(config); + if (config.containsKey(NEW_CONFIG_KEY)) { + // for testing overwriting of new configs + newConfigMap.put(NEW_CONFIG_KEY, config.get(NEW_CONFIG_KEY) + config.get(NEW_CONFIG_KEY)); + } else { + newConfigMap.put(CONFIG_KEY, config.get(CONFIG_KEY) + config.get(CONFIG_KEY)); + } + return new MapConfig(newConfigMap); + } } - public static class TestConfigRewriter implements ConfigRewriter { + /** + * Removes config entry for the key {@link #CONFIG_KEY} and {@link #NEW_CONFIG_KEY}. + */ + public static class DeletePropertyRewriter implements ConfigRewriter { @Override public Config rewrite(String name, Config config) { - Map configMap = new HashMap<>(config); - configMap.putAll(config.subset(String.format("job.config.rewriter.%s.", name))); - return new MapConfig(configMap); + Map newConfigMap = new HashMap<>(config); + newConfigMap.remove(CONFIG_KEY); + return new MapConfig(newConfigMap); } } } diff --git a/samza-core/src/test/java/org/apache/samza/util/TestUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java new file mode 100644 index 0000000000..2eb2e89833 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestUtil.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import com.google.common.collect.ImmutableMap; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.mockito.AdditionalMatchers.aryEq; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Util.class) // need this to be able to use powermock with system classes like InetAddress +public class TestUtil { + @Test + public void testEnvVarEscape() { + // no special characters in original + String noSpecialCharacters = "hello world 123 .?!"; + assertEquals(noSpecialCharacters, Util.envVarEscape(noSpecialCharacters)); + + String withSpecialCharacters = "quotation \" apostrophe '"; + String escaped = "quotation \\\" apostrophe \\'"; + assertEquals(escaped, Util.envVarEscape(withSpecialCharacters)); + } + + /** + * It's difficult to explicitly test having an actual version and using the fallback, due to the usage of methods of + * Class. + */ + @Test + public void testGetSamzaVersion() { + String utilImplementationVersion = Util.class.getPackage().getImplementationVersion(); + String expectedVersion = + (utilImplementationVersion != null) ? utilImplementationVersion : Util.FALLBACK_VERSION; + assertEquals(expectedVersion, Util.getSamzaVersion()); + } + + /** + * It's difficult to explicitly test having an actual version and using the fallback, due to the usage of methods of + * Class. + */ + @Test + public void testGetTaskClassVersion() { + // cannot find app nor task + assertEquals(Util.FALLBACK_VERSION, Util.getTaskClassVersion(new MapConfig())); + + // only app + String appClassVersion = MyAppClass.class.getPackage().getImplementationVersion(); + String expectedAppClassVersion = (appClassVersion != null) ? appClassVersion : Util.FALLBACK_VERSION; + Config config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName())); + assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); + + // only task + String taskClassVersion = MyTaskClass.class.getPackage().getImplementationVersion(); + String expectedTaskClassVersion = (taskClassVersion != null) ? taskClassVersion : Util.FALLBACK_VERSION; + config = new MapConfig(ImmutableMap.of(TaskConfig.TASK_CLASS, MyTaskClass.class.getName())); + assertEquals(expectedTaskClassVersion, Util.getTaskClassVersion(config)); + + // both app and task; choose app + config = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_CLASS, MyAppClass.class.getName(), + // shouldn't even try to load the task class + TaskConfig.TASK_CLASS, "this_is_not_a_class")); + assertEquals(expectedAppClassVersion, Util.getTaskClassVersion(config)); + } + + @Test + public void testGetLocalHostNotLoopbackAddress() throws UnknownHostException { + mockStatic(InetAddress.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(false); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + assertEquals(inetAddressLocalHost, Util.getLocalHost()); + } + + @Test + public void testGetLocalHostLoopbackAddressNoExternalAddressFound() throws Exception { + mockStatic(InetAddress.class, NetworkInterface.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(true); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + + // network interfaces return addresses which are not external + InetAddress linkLocalAddress = mock(InetAddress.class); + when(linkLocalAddress.isLinkLocalAddress()).thenReturn(true); + InetAddress loopbackAddress = mock(InetAddress.class); + when(loopbackAddress.isLinkLocalAddress()).thenReturn(false); + when(loopbackAddress.isLoopbackAddress()).thenReturn(true); + NetworkInterface networkInterface0 = mock(NetworkInterface.class); + when(networkInterface0.getInetAddresses()).thenReturn( + Collections.enumeration(Arrays.asList(linkLocalAddress, loopbackAddress))); + NetworkInterface networkInterface1 = mock(NetworkInterface.class); + when(networkInterface1.getInetAddresses()).thenReturn( + Collections.enumeration(Collections.singletonList(loopbackAddress))); + when(NetworkInterface.getNetworkInterfaces()).thenReturn( + Collections.enumeration(Arrays.asList(networkInterface0, networkInterface1))); + + assertEquals(inetAddressLocalHost, Util.getLocalHost()); + } + + @Test + public void testGetLocalHostExternalInet4Address() throws Exception { + mockStatic(InetAddress.class, NetworkInterface.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(true); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + + InetAddress linkLocalAddress = mock(InetAddress.class); + when(linkLocalAddress.isLinkLocalAddress()).thenReturn(true); + Inet4Address externalInet4Address = mock(Inet4Address.class); + when(externalInet4Address.isLinkLocalAddress()).thenReturn(false); + when(externalInet4Address.isLoopbackAddress()).thenReturn(false); + byte[] externalInet4AddressBytes = new byte[]{0, 1, 2, 3}; + when(externalInet4Address.getAddress()).thenReturn(externalInet4AddressBytes); + InetAddress otherExternalAddress = mock(InetAddress.class); // not Inet4Address + when(otherExternalAddress.isLinkLocalAddress()).thenReturn(false); + when(otherExternalAddress.isLoopbackAddress()).thenReturn(false); + + NetworkInterface networkInterfaceLinkLocal = mock(NetworkInterface.class); + when(networkInterfaceLinkLocal.getInetAddresses()).thenReturn( + Collections.enumeration(Collections.singletonList(linkLocalAddress))); + NetworkInterface networkInterfaceExternal = mock(NetworkInterface.class); + when(networkInterfaceExternal.getInetAddresses()).thenReturn( + Collections.enumeration(Arrays.asList(otherExternalAddress, externalInet4Address))); + when(NetworkInterface.getNetworkInterfaces()).thenReturn( + Collections.enumeration(Arrays.asList(networkInterfaceLinkLocal, networkInterfaceExternal))); + + InetAddress finalInetAddress = mock(InetAddress.class); + when(InetAddress.getByAddress(aryEq(externalInet4AddressBytes))).thenReturn(finalInetAddress); + + assertEquals(finalInetAddress, Util.getLocalHost()); + } + + @Test + public void testGetLocalHostExternalAddressNotInet4Address() throws Exception { + mockStatic(InetAddress.class, NetworkInterface.class); + InetAddress inetAddressLocalHost = mock(InetAddress.class); + when(inetAddressLocalHost.isLoopbackAddress()).thenReturn(true); + when(InetAddress.getLocalHost()).thenReturn(inetAddressLocalHost); + + byte[] externalAddressBytes = new byte[]{0, 1, 2, 3, 4, 5}; + InetAddress externalAddress = mock(InetAddress.class); + when(externalAddress.isLinkLocalAddress()).thenReturn(false); + when(externalAddress.isLoopbackAddress()).thenReturn(false); + when(externalAddress.getAddress()).thenReturn(externalAddressBytes); + + NetworkInterface networkInterface = mock(NetworkInterface.class); + when(networkInterface.getInetAddresses()).thenReturn( + Collections.enumeration(Collections.singletonList(externalAddress))); + when(NetworkInterface.getNetworkInterfaces()).thenReturn( + Collections.enumeration(Collections.singletonList(networkInterface))); + + InetAddress finalInetAddress = mock(InetAddress.class); + when(InetAddress.getByAddress(aryEq(externalAddressBytes))).thenReturn(finalInetAddress); + + assertEquals(finalInetAddress, Util.getLocalHost()); + } + + /** + * No requirement for this test that this extends any other class. Just need some placeholder class. + */ + public static class MyAppClass { + } + + /** + * No requirement for this test that this extends any other class. Just need some placeholder class. + */ + public static class MyTaskClass { + } +} diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala deleted file mode 100644 index ba3b5df483..0000000000 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util - -import org.junit.Assert._ -import org.junit.Test -import org.apache.samza.config.MapConfig - -class TestUtil { - @Test - def testGetLocalHost(): Unit = { - assertNotNull(Util.getLocalHost) - } - - @Test - def testGetObjExistingClass() { - val obj = Util.getObj("org.apache.samza.config.MapConfig", classOf[MapConfig]) - assertNotNull(obj) - assertEquals(classOf[MapConfig], obj.getClass()) - } - - @Test(expected = classOf[ClassNotFoundException]) - def testGetObjNonexistentClass() { - Util.getObj("this.class.does.NotExist", classOf[Object]) - assert(false, "This should not get hit.") - } -} diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index b20704a61e..706ae65d34 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -26,7 +26,7 @@ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.{StreamSpec, SystemFactory} import org.apache.samza.system.kafka.KafkaStreamSpec import org.apache.samza.util.ScalaJavaUtil.JavaOptionals -import org.apache.samza.util.{KafkaUtil, Logging, Util, _} +import org.apache.samza.util.{KafkaUtil, Logging, _} class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { @@ -45,7 +45,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin .toOption .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format checkpointSystemName)) - val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory]) + val checkpointSystemFactory = ReflectionUtil.getObj(checkpointSystemFactoryName, classOf[SystemFactory]) val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config) info(s"Creating a KafkaCheckpointManager to consume from $checkpointTopic") diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index 376e113051..bbcc976386 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -33,25 +33,21 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.samza.SamzaException; -import org.apache.samza.clustermanager.ClusterBasedJobCoordinator; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; -import org.apache.samza.coordinator.stream.CoordinatorStreamManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; -import org.apache.samza.job.model.JobModel; import org.apache.samza.job.yarn.ClientHelper; import org.apache.samza.metrics.JmxMetricsAccessor; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.MetricsValidator; import org.apache.samza.storage.ChangelogStreamManager; +import org.apache.samza.util.CommandLine; import org.apache.samza.util.CoordinatorStreamUtil; -import org.apache.samza.util.Util; +import org.apache.samza.util.ReflectionUtil; import org.apache.samza.util.hadoop.HttpFileSystem; -import org.apache.samza.util.CommandLine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,7 +192,7 @@ public static void main(String [] args) throws Exception { MetricsValidator validator = null; if (options.has(validatorOpt)) { String validatorClass = options.valueOf(validatorOpt); - validator = Util.getObj(validatorClass, MetricsValidator.class); + validator = ReflectionUtil.getObj(validatorClass, MetricsValidator.class); } YarnConfiguration hadoopConfig = new YarnConfiguration(); diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml index 5c31ccacb6..7fe6305138 100644 --- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml +++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml @@ -25,7 +25,7 @@ -@ val appMasterClasspath: String = scala.util.Properties.javaClassPath -@ val javaVmVersion: String = scala.util.Properties.javaVmVersion -@ val javaVmName: String = scala.util.Properties.javaVmName --@ val samzaVersion: String = org.apache.samza.util.Util.getClass.getPackage.getImplementationVersion +-@ val samzaVersion: String = classOf[org.apache.samza.util.Util].getPackage.getImplementationVersion - attributes("title") = jobName %div.col-xs-2.menu