Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public class LocalApplicationRunner implements ApplicationRunner {

private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
private static final String PROCESSOR_ID = UUID.randomUUID().toString();
private final static String RUN_ID_METADATA_STORE = "RunIdCoordinationStore";
private static final String RUN_ID_METADATA_STORE = "RunIdCoordinationStore";
private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory";
public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();
private static final String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();

private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
private final Set<Pair<StreamProcessor, MetadataStore>> processors = ConcurrentHashMap.newKeySet();
Expand All @@ -88,9 +88,9 @@ public class LocalApplicationRunner implements ApplicationRunner {
private final boolean isAppModeBatch;
private final Optional<CoordinationUtils> coordinationUtils;
private final Optional<MetadataStoreFactory> metadataStoreFactory;

private Optional<String> runId = Optional.empty();
private Optional<RunIdGenerator> runIdGenerator = Optional.empty();

private ApplicationStatus appStatus = ApplicationStatus.New;

/**
Expand All @@ -111,23 +111,28 @@ public LocalApplicationRunner(SamzaApplication app, Config config) {
* @param metadataStoreFactory the instance of {@link MetadataStoreFactory} to read and write to coordinator stream.
*/
public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) {
this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = getCoordinationUtils(config);
this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
this(ApplicationDescriptorUtil.getAppDescriptor(app, config), getCoordinationUtils(config), metadataStoreFactory);
}

/**
* Constructor only used in unit test to allow injection of {@link LocalJobPlanner}
*/
@VisibleForTesting
LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) {
this(appDesc, coordinationUtils, getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig())));
}

private LocalApplicationRunner(
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
Optional<CoordinationUtils> coordinationUtils,
MetadataStoreFactory metadataStoreFactory) {
this.appDesc = appDesc;
this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
this.isAppModeBatch = isAppModeBatch(appDesc.getConfig());
this.coordinationUtils = coordinationUtils;
this.metadataStoreFactory = Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig())));
this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
}

@VisibleForTesting
static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) {
String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull();
JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(jobConfig);
Expand All @@ -144,8 +149,8 @@ static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jo
return null;
}

private Optional<CoordinationUtils> getCoordinationUtils(Config config) {
if (!isAppModeBatch) {
private static Optional<CoordinationUtils> getCoordinationUtils(Config config) {
if (!isAppModeBatch(config)) {
return Optional.empty();
}
JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
Expand All @@ -154,6 +159,10 @@ private Optional<CoordinationUtils> getCoordinationUtils(Config config) {
return Optional.ofNullable(coordinationUtils);
}

private static boolean isAppModeBatch(Config config) {
return new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
}

/**
* @return LocalJobPlanner created
*/
Expand Down Expand Up @@ -185,7 +194,7 @@ private void initializeRunId() {
runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore));
runId = runIdGenerator.flatMap(RunIdGenerator::getRunId);
} catch (Exception e) {
LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e);
LOG.warn("Failed to generate run id. Will continue execution without a run id.", e);
}
}

Expand Down Expand Up @@ -278,7 +287,7 @@ public boolean waitForFinish(Duration timeout) {

@VisibleForTesting
protected Set<StreamProcessor> getProcessors() {
return processors.stream().map(sp -> sp.getLeft()).collect(Collectors.toSet());
return processors.stream().map(Pair::getLeft).collect(Collectors.toSet());
}

@VisibleForTesting
Expand Down Expand Up @@ -340,10 +349,8 @@ StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<?
appDesc.getMetricsReporterFactories().forEach((name, factory) ->
reporters.put(name, factory.getMetricsReporter(name, processorId, config)));

StreamProcessor streamProcessor = new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
return new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null, coordinatorStreamStore);

return streamProcessor;
}

/**
Expand Down