From 3889f1418cbe213d86ddde4604ce5f724c877dc4 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 11:30:40 -0800 Subject: [PATCH 1/2] Refactor LocalApplicationRunner Issues: LocalApplicationRunner has multiple constructors and follows different initialization logic. Changes: 1. Refactor all constructors to end up with the same private constructor. 2. Align "static final" and "final static" to "static final" 3. Group class variales based on accessibility 4. Minor fixs on inconsistent styling, return etc. Tests: Unit Tests --- .../samza/runtime/LocalApplicationRunner.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index bf6dfce2d8..8a98ec70b0 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -76,9 +76,9 @@ public class LocalApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); private static final String PROCESSOR_ID = UUID.randomUUID().toString(); - private final static String RUN_ID_METADATA_STORE = "RunIdCoordinationStore"; + private static final String RUN_ID_METADATA_STORE = "RunIdCoordinationStore"; private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory"; - public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName(); + private static final String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName(); private final ApplicationDescriptorImpl appDesc; private final Set> processors = ConcurrentHashMap.newKeySet(); @@ -88,9 +88,9 @@ public class LocalApplicationRunner implements ApplicationRunner { private final boolean isAppModeBatch; private final Optional coordinationUtils; private final Optional metadataStoreFactory; + private Optional runId = Optional.empty(); private Optional runIdGenerator = Optional.empty(); - private ApplicationStatus appStatus = ApplicationStatus.New; /** @@ -110,11 +110,9 @@ public LocalApplicationRunner(SamzaApplication app, Config config) { * @param config configuration for the application * @param metadataStoreFactory the instance of {@link MetadataStoreFactory} to read and write to coordinator stream. */ - public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) { - this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); - this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; - this.coordinationUtils = getCoordinationUtils(config); - this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory); + @VisibleForTesting + LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) { + this(ApplicationDescriptorUtil.getAppDescriptor(app, config), getCoordinationUtils(config), metadataStoreFactory); } /** @@ -122,12 +120,20 @@ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStore */ @VisibleForTesting LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) { + this(appDesc, coordinationUtils, getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig()))); + } + + private LocalApplicationRunner( + ApplicationDescriptorImpl appDesc, + Optional coordinationUtils, + MetadataStoreFactory metadataStoreFactory) { this.appDesc = appDesc; - this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + this.isAppModeBatch = isAppModeBatch(appDesc.getConfig()); this.coordinationUtils = coordinationUtils; - this.metadataStoreFactory = Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig()))); + this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory); } + @VisibleForTesting static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) { String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull(); JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(jobConfig); @@ -144,8 +150,8 @@ static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jo return null; } - private Optional getCoordinationUtils(Config config) { - if (!isAppModeBatch) { + private static Optional getCoordinationUtils(Config config) { + if (!isAppModeBatch(config)) { return Optional.empty(); } JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); @@ -154,6 +160,10 @@ private Optional getCoordinationUtils(Config config) { return Optional.ofNullable(coordinationUtils); } + private static boolean isAppModeBatch(Config config) { + return new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + } + /** * @return LocalJobPlanner created */ @@ -185,7 +195,7 @@ private void initializeRunId() { runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore)); runId = runIdGenerator.flatMap(RunIdGenerator::getRunId); } catch (Exception e) { - LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e); + LOG.warn("Failed to generate run id. Will continue execution without a run id.", e); } } @@ -278,7 +288,7 @@ public boolean waitForFinish(Duration timeout) { @VisibleForTesting protected Set getProcessors() { - return processors.stream().map(sp -> sp.getLeft()).collect(Collectors.toSet()); + return processors.stream().map(Pair::getLeft).collect(Collectors.toSet()); } @VisibleForTesting @@ -340,10 +350,8 @@ StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl reporters.put(name, factory.getMetricsReporter(name, processorId, config))); - StreamProcessor streamProcessor = new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(), + return new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(), appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null, coordinatorStreamStore); - - return streamProcessor; } /** From 5f527f4fb37598d0a091c019cbbea2b1736a6e04 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Fri, 10 Jan 2020 11:48:09 -0800 Subject: [PATCH 2/2] Revert LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) from package private back to public --- .../java/org/apache/samza/runtime/LocalApplicationRunner.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 8a98ec70b0..9ad52b1f1d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -110,8 +110,7 @@ public LocalApplicationRunner(SamzaApplication app, Config config) { * @param config configuration for the application * @param metadataStoreFactory the instance of {@link MetadataStoreFactory} to read and write to coordinator stream. */ - @VisibleForTesting - LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) { + public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) { this(ApplicationDescriptorUtil.getAppDescriptor(app, config), getCoordinationUtils(config), metadataStoreFactory); }