Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
Expand All @@ -47,10 +53,12 @@
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.JmxServer;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.model.SamzaObjectMapper;
Expand All @@ -59,6 +67,7 @@
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.SystemClock;
Expand Down Expand Up @@ -95,7 +104,6 @@ public class ClusterBasedJobCoordinator {
private final static String METRICS_SOURCE_NAME = "ApplicationMaster";

private final Config config;
private final ClusterManagerConfig clusterManagerConfig;

/**
* State to track container failures, host-processor mappings
Expand Down Expand Up @@ -154,7 +162,7 @@ public class ClusterBasedJobCoordinator {
* Metrics to track stats around container failures, needed containers etc.
*/
private final MetricsRegistryMap metrics;
private final CoordinatorStreamStore coordinatorStreamStore;
private final MetadataStore metadataStore;

private final SystemAdmins systemAdmins;

Expand All @@ -169,40 +177,38 @@ public class ClusterBasedJobCoordinator {
volatile private Exception coordinatorException = null;

/**
* Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually
* run the jobcoordinator.
* Creates a new ClusterBasedJobCoordinator instance.
* Invoke run() to actually run the job coordinator.
*
* @param coordinatorSystemConfig the coordinator stream config that can be used to read the
* {@link org.apache.samza.job.model.JobModel} from.
* @param metrics the registry for reporting metrics.
* @param metadataStore metadata store to hold metadata.
* @param fullJobConfig full job config.
*/
public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
metrics = new MetricsRegistryMap();

coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics);
coordinatorStreamStore.init();
config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);

private ClusterBasedJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore, Config fullJobConfig) {
this.metrics = metrics;
this.metadataStore = metadataStore;
this.config = fullJobConfig;
// build a JobModelManager and ChangelogStreamManager and perform partition assignments.
changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE));
jobModelManager =
JobModelManager.apply(config, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metrics);
this.changelogStreamManager = new ChangelogStreamManager(
new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
this.jobModelManager =
JobModelManager.apply(config, changelogStreamManager.readPartitionMapping(), metadataStore, metrics);

hasDurableStores = new StorageConfig(config).hasDurableStores();
state = new SamzaApplicationState(jobModelManager);
this.hasDurableStores = new StorageConfig(config).hasDurableStores();
this.state = new SamzaApplicationState(jobModelManager);
// The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
systemAdmins = new SystemAdmins(config);
partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
this.systemAdmins = new SystemAdmins(config);
this.partitionMonitor = getPartitionCountMonitor(config, systemAdmins);

Set<SystemStream> inputSystemStreams = JobModelUtil.getSystemStreams(jobModelManager.jobModel());
inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, inputSystemStreams);
this.inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, inputSystemStreams);

clusterManagerConfig = new ClusterManagerConfig(config);
isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();

jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval();
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
this.jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval();

// build a container process Manager
containerProcessManager = createContainerProcessManager();
this.containerProcessManager = createContainerProcessManager();
}

/**
Expand Down Expand Up @@ -300,7 +306,7 @@ private void onShutDown() {
inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::stop);
systemAdmins.stop();
containerProcessManager.stop();
coordinatorStreamStore.close();
metadataStore.close();
} catch (Throwable e) {
LOG.error("Exception while stopping cluster based job coordinator", e);
}
Expand Down Expand Up @@ -386,8 +392,7 @@ public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemS
@VisibleForTesting
SamzaApplicationState.SamzaAppStatus getAppStatus() {
// make sure to only return a unmodifiable copy of the status variable
final SamzaApplicationState.SamzaAppStatus copy = state.status;
return copy;
return state.status;
}

@VisibleForTesting
Expand All @@ -397,7 +402,7 @@ StreamPartitionCountMonitor getPartitionMonitor() {

@VisibleForTesting
StartpointManager createStartpointManager() {
return new StartpointManager(coordinatorStreamStore);
return new StartpointManager(metadataStore);
}

@VisibleForTesting
Expand Down Expand Up @@ -480,20 +485,103 @@ private static void executeRunClusterBasedJobCoordinatorForClass(Class<?> cluste
* {@link #main(String[])} so that it can be executed directly or from a separate classloader.
*/
private static void runClusterBasedJobCoordinator(String[] args) {
Config coordinatorSystemConfig;
final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
try {
//Read and parse the coordinator system config.
LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
coordinatorSystemConfig =
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig);
} catch (IOException e) {
LOG.error("Exception while reading coordinator stream config", e);
throw new SamzaException(e);
final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());

if (!StringUtils.isBlank(submissionEnv)) {
Config submissionConfig;
try {
//Read and parse the coordinator system config.
LOG.info("Parsing submission config {}", submissionEnv);
submissionConfig =
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class));
LOG.info("Using the submission config: {}.", submissionConfig);
} catch (IOException e) {
LOG.error("Exception while reading submission config", e);
throw new SamzaException(e);
}

ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
jc.run();
LOG.info("Finished running ClusterBasedJobCoordinator");
} else {
// TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
Config coordinatorSystemConfig;
try {
//Read and parse the coordinator system config.
LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
coordinatorSystemConfig =
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig);
} catch (IOException e) {
LOG.error("Exception while reading coordinator stream config", e);
throw new SamzaException(e);
}
ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig);
jc.run();
LOG.info("Finished running ClusterBasedJobCoordinator");
}
ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig);
jc.run();
LOG.info("Finished running ClusterBasedJobCoordinator");
}

/**
* Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from
* coordinator stream.
*
* @param metadataStoreConfig to initialize {@link MetadataStore}
* @return {@link ClusterBasedJobCoordinator}
*/
// TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
@VisibleForTesting
static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) {
MetricsRegistryMap metrics = new MetricsRegistryMap();

CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics);
coordinatorStreamStore.init();
Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);

return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config);
}

/**
* Initialize {@link ClusterBasedJobCoordinator} with submission config, full job config will be fetched using
* specified {@link org.apache.samza.config.ConfigLoaderFactory}
*
* @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory}
* @return {@link ClusterBasedJobCoordinator}
*/
@VisibleForTesting
static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) {
JobConfig jobConfig = new JobConfig(submissionConfig);

if (!jobConfig.getConfigLoaderFactory().isPresent()) {
throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
}

MetricsRegistryMap metrics = new MetricsRegistryMap();
// load full job config with ConfigLoader
Config originalConfig = ConfigUtil.loadConfig(submissionConfig);

// Execute planning
ApplicationDescriptorImpl<? extends ApplicationDescriptor>
appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig);
RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
List<JobConfig> jobConfigs = planner.prepareJobs();

if (jobConfigs.size() != 1) {
throw new SamzaException("Only support single remote job is supported.");
}

Config config = jobConfigs.get(0);

// This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true);
DiagnosticsUtil.createDiagnosticsStream(config);
MetadataStore metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics);
metadataStore.init();

return new ClusterBasedJobCoordinator(
metrics,
metadataStore,
config);
}
}
43 changes: 43 additions & 0 deletions samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

package org.apache.samza.util;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.ConfigLoader;
import org.apache.samza.config.ConfigLoaderFactory;
import org.apache.samza.config.ConfigRewriter;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,4 +73,41 @@ public static Config applyRewriter(Config config, String rewriterName) {
LOG.info("Re-writing config with {}", rewriter);
return rewriter.rewrite(rewriterName, config);
}

/**
* Load full job config with {@link ConfigLoaderFactory} when present.
*
* @param original config
* @return full job config
*/
public static Config loadConfig(Config original) {
JobConfig jobConfig = new JobConfig(original);

if (!jobConfig.getConfigLoaderFactory().isPresent()) {
throw new ConfigException("Missing key " + JobConfig.CONFIG_LOADER_FACTORY + ".");
}

ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class);
ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX));
// overrides config loaded with original config, which may contain overridden values.
return rewriteConfig(override(loader.getConfig(), original));
}

/**
* Overrides original config with overridden values.
*
* @param original config to be overridden.
* @param overrides overridden values.
* @return the overridden config.
*/
@SafeVarargs
private static Config override(Config original, Map<String, String>... overrides) {
Map<String, String> map = new HashMap<>(original);

for (Map<String, String> override : overrides) {
map.putAll(override);
}

return new MapConfig(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskMode
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metadatastore.MetadataStore
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.LocationId
Expand Down Expand Up @@ -79,13 +80,13 @@ object JobModelManager extends Logging {
* @return the instantiated {@see JobModelManager}.
*/
def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer],
coordinatorStreamStore: CoordinatorStreamStore,
metadataStore: MetadataStore,
metricsRegistry: MetricsRegistry = new MetricsRegistryMap()): JobModelManager = {

// Instantiate the respective metadata store util classes which uses the same coordinator metadata store.
val localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE))
val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskModeMapping.TYPE))
val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskPartitionMapping.TYPE))
val localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE))
val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE))
val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE))

val systemAdmins = new SystemAdmins(config)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object CoordinatorStreamUtil extends Logging {
*/
def buildCoordinatorStreamConfig(config: Config): MapConfig = {
val jobConfig = new JobConfig(config)
val buildConfigFactory = jobConfig.getCoordinatorStreamFactory()
val buildConfigFactory = jobConfig.getCoordinatorStreamFactory
val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config)

new MapConfig(coordinatorSystemConfig);
Expand Down
Loading