diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index bf6dfce2d8..9ad52b1f1d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -76,9 +76,9 @@ public class LocalApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); private static final String PROCESSOR_ID = UUID.randomUUID().toString(); - private final static String RUN_ID_METADATA_STORE = "RunIdCoordinationStore"; + private static final String RUN_ID_METADATA_STORE = "RunIdCoordinationStore"; private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory"; - public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName(); + private static final String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName(); private final ApplicationDescriptorImpl appDesc; private final Set> processors = ConcurrentHashMap.newKeySet(); @@ -88,9 +88,9 @@ public class LocalApplicationRunner implements ApplicationRunner { private final boolean isAppModeBatch; private final Optional coordinationUtils; private final Optional metadataStoreFactory; + private Optional runId = Optional.empty(); private Optional runIdGenerator = Optional.empty(); - private ApplicationStatus appStatus = ApplicationStatus.New; /** @@ -111,10 +111,7 @@ public LocalApplicationRunner(SamzaApplication app, Config config) { * @param metadataStoreFactory the instance of {@link MetadataStoreFactory} to read and write to coordinator stream. */ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) { - this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); - this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; - this.coordinationUtils = getCoordinationUtils(config); - this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory); + this(ApplicationDescriptorUtil.getAppDescriptor(app, config), getCoordinationUtils(config), metadataStoreFactory); } /** @@ -122,12 +119,20 @@ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStore */ @VisibleForTesting LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) { + this(appDesc, coordinationUtils, getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig()))); + } + + private LocalApplicationRunner( + ApplicationDescriptorImpl appDesc, + Optional coordinationUtils, + MetadataStoreFactory metadataStoreFactory) { this.appDesc = appDesc; - this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + this.isAppModeBatch = isAppModeBatch(appDesc.getConfig()); this.coordinationUtils = coordinationUtils; - this.metadataStoreFactory = Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig()))); + this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory); } + @VisibleForTesting static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) { String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull(); JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(jobConfig); @@ -144,8 +149,8 @@ static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jo return null; } - private Optional getCoordinationUtils(Config config) { - if (!isAppModeBatch) { + private static Optional getCoordinationUtils(Config config) { + if (!isAppModeBatch(config)) { return Optional.empty(); } JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); @@ -154,6 +159,10 @@ private Optional getCoordinationUtils(Config config) { return Optional.ofNullable(coordinationUtils); } + private static boolean isAppModeBatch(Config config) { + return new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + } + /** * @return LocalJobPlanner created */ @@ -185,7 +194,7 @@ private void initializeRunId() { runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore)); runId = runIdGenerator.flatMap(RunIdGenerator::getRunId); } catch (Exception e) { - LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e); + LOG.warn("Failed to generate run id. Will continue execution without a run id.", e); } } @@ -278,7 +287,7 @@ public boolean waitForFinish(Duration timeout) { @VisibleForTesting protected Set getProcessors() { - return processors.stream().map(sp -> sp.getLeft()).collect(Collectors.toSet()); + return processors.stream().map(Pair::getLeft).collect(Collectors.toSet()); } @VisibleForTesting @@ -340,10 +349,8 @@ StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl reporters.put(name, factory.getMetricsReporter(name, processorId, config))); - StreamProcessor streamProcessor = new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(), + return new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(), appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null, coordinatorStreamStore); - - return streamProcessor; } /**