Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.util.Util;
import org.apache.samza.util.ReflectionUtil;

/**
* Config helper methods related to systems.
Expand Down Expand Up @@ -117,7 +117,7 @@ public Map<String, SystemFactory> getSystemFactories() {
systemName -> {
String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException(
String.format("A stream uses system %s, which is missing from the configuration.", systemName)));
return Util.getObj(systemFactoryClassName, SystemFactory.class);
return ReflectionUtil.getObj(systemFactoryClassName, SystemFactory.class);
}));

return systemFactories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.Util;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -210,7 +210,8 @@ private MetadataStore getMetadataStore() {
if (metadataStoreFactoryClass == null) {
metadataStoreFactoryClass = DEFAULT_METADATA_STORE_FACTORY;
}
MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
MetadataStoreFactory metadataStoreFactory =
ReflectionUtil.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
return metadataStoreFactory.getMetadataStore(STREAM_CREATION_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.Util;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -384,7 +383,8 @@ private void cleanup() {
*/
private MetadataStore getMetadataStoreForRunID() {
String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY);
MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
MetadataStoreFactory metadataStoreFactory =
ReflectionUtil.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
}

Expand Down
54 changes: 30 additions & 24 deletions samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@

package org.apache.samza.util;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigRewriter;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ConfigUtil {
private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class);

/**
* Re-writes configuration using a ConfigRewriter, if one is defined. If
* there is no ConfigRewriter defined for the job, then this method is a
Expand All @@ -37,28 +39,32 @@ public class ConfigUtil {
* @param config The config to re-write
* @return rewrited configs
*/
static public Config rewriteConfig(Config config) {
try {
final String rewriters = config.get(JobConfig.CONFIG_REWRITERS, "");
if (!rewriters.isEmpty()) {
Map<String, String> resultConfig = new HashMap<>(config);
for (String rewriter : rewriters.split(",")) {
String rewriterClassCfg = String.format(JobConfig.CONFIG_REWRITER_CLASS, rewriter);
String rewriterClass = config.get(rewriterClassCfg, "");
if (rewriterClass.isEmpty()) {
throw new SamzaException(
"Unable to find class config for config rewriter: " + rewriterClassCfg);
}
ConfigRewriter configRewriter = (ConfigRewriter) Class.forName(rewriterClass).newInstance();
Config rewritedConfig = configRewriter.rewrite(rewriter, config);
resultConfig.putAll(rewritedConfig);
}
return new MapConfig(resultConfig);
} else {
return config;
public static Config rewriteConfig(Config config) {
Optional<String> configRewriterNamesOptional = new JobConfig(config).getConfigRewriters();
if (configRewriterNamesOptional.isPresent()) {
String[] configRewriterNames = configRewriterNamesOptional.get().split(",");
Config rewrittenConfig = config;
for (String configRewriterName : configRewriterNames) {
rewrittenConfig = applyRewriter(rewrittenConfig, configRewriterName);
}
} catch (Exception e) {
throw new RuntimeException(e);
return rewrittenConfig;
} else {
return config;
}
}

/**
* Re-writes configuration using a ConfigRewriter, defined with the given rewriterName in config.
* @param config the config to re-write
* @param rewriterName the name of the rewriter to apply
* @return the rewritten config
*/
public static Config applyRewriter(Config config, String rewriterName) {
String rewriterClassName = new JobConfig(config).getConfigRewriterClass(rewriterName)
.orElseThrow(() -> new SamzaException(
String.format("Unable to find class config for config rewriter %s.", rewriterName)));
ConfigRewriter rewriter = ReflectionUtil.getObj(rewriterClassName, ConfigRewriter.class);
LOG.info("Re-writing config with {}", rewriter);
return rewriter.rewrite(rewriterName, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildD
}

// Create a systemProducer for giving to diagnostic-reporter and diagnosticsManager
SystemFactory systemFactory = Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
SystemFactory systemFactory = ReflectionUtil.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
SystemProducer systemProducer =
systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap());
DiagnosticsManager diagnosticsManager =
Expand Down
123 changes: 123 additions & 0 deletions samza-core/src/main/java/org/apache/samza/util/Util.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.TaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class Util {
private static final Logger LOG = LoggerFactory.getLogger(Util.class);

static final String FALLBACK_VERSION = "0.0.1";

/**
* Make an environment variable string safe to pass.
*/
public static String envVarEscape(String str) {
return str.replace("\"", "\\\"").replace("'", "\\'");
}

public static String getSamzaVersion() {
return Optional.ofNullable(Util.class.getPackage().getImplementationVersion()).orElseGet(() -> {
LOG.warn("Unable to find implementation samza version in jar's meta info. Defaulting to {}", FALLBACK_VERSION);
return FALLBACK_VERSION;
});
}

public static String getTaskClassVersion(Config config) {
try {
Optional<String> appClass = Optional.ofNullable(new ApplicationConfig(config).getAppClass());
if (appClass.isPresent()) {
return Optional.ofNullable(Class.forName(appClass.get()).getPackage().getImplementationVersion())
.orElse(FALLBACK_VERSION);
} else {
Optional<String> taskClass = new TaskConfig(config).getTaskClass();
if (taskClass.isPresent()) {
return Optional.ofNullable(Class.forName(taskClass.get()).getPackage().getImplementationVersion())
.orElse(FALLBACK_VERSION);
} else {
LOG.warn("Unable to find app class or task class. Defaulting to {}", FALLBACK_VERSION);
return FALLBACK_VERSION;
}
}
} catch (Exception e) {
LOG.warn(String.format("Ran into exception while trying to get version of app or task. Defaulting to %s",
FALLBACK_VERSION), e);
return FALLBACK_VERSION;
}
}

/**
* Returns the the first host address which is not the loopback address, or {@link InetAddress#getLocalHost} as a
* fallback.
*
* @return the {@link InetAddress} which represents the localhost
*/
public static InetAddress getLocalHost() {
try {
return doGetLocalHost();
} catch (Exception e) {
throw new SamzaException("Error while getting localhost", e);
}
}

private static InetAddress doGetLocalHost() throws UnknownHostException, SocketException {
InetAddress localHost = InetAddress.getLocalHost();
if (localHost.isLoopbackAddress()) {
LOG.debug("Hostname {} resolves to a loopback address, trying to resolve an external IP address.",
localHost.getHostName());
List<NetworkInterface> networkInterfaces;
if (System.getProperty("os.name").startsWith("Windows")) {
networkInterfaces = Collections.list(NetworkInterface.getNetworkInterfaces());
} else {
networkInterfaces = Lists.reverse(Collections.list(NetworkInterface.getNetworkInterfaces()));
}
for (NetworkInterface networkInterface : networkInterfaces) {
List<InetAddress> addresses = Collections.list(networkInterface.getInetAddresses())
.stream()
.filter(address -> !(address.isLinkLocalAddress() || address.isLoopbackAddress()))
.collect(Collectors.toList());
if (!addresses.isEmpty()) {
InetAddress address = addresses.stream()
.filter(addr -> addr instanceof Inet4Address)
.findFirst()
.orElseGet(() -> addresses.get(0));
LOG.debug("Found an external IP address {} which represents the localhost.", address.getHostAddress());
return InetAddress.getByAddress(address.getAddress());
}
}
}
return localHost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.samza.container.TaskName
import org.apache.samza.job.JobRunner.info
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{CommandLine, CoordinatorStreamUtil, Logging, ReflectionUtil, Util}
import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, Logging, ReflectionUtil, Util}
import org.apache.samza.Partition
import org.apache.samza.SamzaException

Expand Down Expand Up @@ -138,7 +138,7 @@ object CheckpointTool {
val options = cmdline.parser.parse(args: _*)
val userConfig = cmdline.loadConfig(options)
val jobConfig = JobPlanner.generateSingleJobConfig(userConfig)
val rewrittenConfig = Util.rewriteConfig(jobConfig)
val rewrittenConfig = ConfigUtil.rewriteConfig(jobConfig)
info(s"Using the rewritten config: $rewrittenConfig")
val tool = CheckpointTool(rewrittenConfig, cmdline.newOffsets)
tool.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.LocationId
import org.apache.samza.system._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Logging, ReflectionUtil, Util}
import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil, Util}

import scala.collection.JavaConverters
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -274,7 +274,7 @@ object JobModelManager extends Logging {
filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
.getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
.equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
foldLeft(config)(Util.applyRewriter(_, _))
foldLeft(config)(ConfigUtil.applyRewriter(_, _))
case _ => config
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
Expand All @@ -36,11 +34,9 @@
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Util;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConverters;


/**
Expand Down Expand Up @@ -146,24 +142,15 @@ public DiagnosticsManager(String jobName,
resetTime = Instant.now();

try {

Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender",
JavaConverters.collectionAsScalaIterableConverter(
Collections.singletonList(new Tuple2<Class<?>, Object>(DiagnosticsManager.class, this)))
.asScala()
.toSeq());

ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender",
Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class));
LOG.info("Attached log4j diagnostics appender.");
} catch (ClassNotFoundException | InstantiationException | InvocationTargetException e) {
} catch (Exception e) {
try {
Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender",
JavaConverters.collectionAsScalaIterableConverter(
Collections.singletonList(new Tuple2<Class<?>, Object>(DiagnosticsManager.class, this)))
.asScala()
.toSeq());
LOG.info("Attached log4j diagnostics appender.");
} catch (ClassNotFoundException | InstantiationException | InvocationTargetException ex) {

ReflectionUtil.getObjWithArgs("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender",
Object.class, ReflectionUtil.constructorArgument(this, DiagnosticsManager.class));
LOG.info("Attached log4j2 diagnostics appender.");
} catch (Exception ex) {
LOG.warn(
"Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream.",
ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.samza.metrics.reporter

import org.apache.samza.util.{Logging, StreamUtil, Util}
import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
import org.apache.samza.SamzaException
import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
import org.apache.samza.metrics.MetricsReporter
Expand Down Expand Up @@ -53,7 +53,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
.getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))

val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])

info("Got system factory %s." format systemFactory)

Expand All @@ -71,7 +71,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
val serde = if (serdeName != null) {
JavaOptionals.toRichOptional(serializerConfig.getSerdeFactoryClass(serdeName)).toOption match {
case Some(serdeClassName) =>
Util.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config)
ReflectionUtil.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config)
case _ => null
}
} else {
Expand Down
Loading