From 542cd48c75c6b4e697ee460e5f0724a90c0f0400 Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Fri, 10 Mar 2023 12:48:29 -0800 Subject: [PATCH] Fixed line-length related formatting issues. --- ...nTransactionalStateTaskRestoreManager.java | 5 +-- .../TransactionalStateTaskRestoreManager.java | 5 +-- .../storage/ContainerStorageManager.java | 43 +++++++++++-------- .../samza/storage/SideInputsManager.java | 16 ++++--- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java index e2a3742823..88e4a5bce1 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java @@ -346,9 +346,8 @@ private Map createStoreEngines(Set storeNames, Jo taskModel.getTaskMode()); StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory, StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs, - taskModel, jobContext, containerContext, - serdes, metricsRegistry, messageCollector, - this.config); + taskModel, jobContext, containerContext, serdes, + metricsRegistry, messageCollector, this.config); storageEngines.put(storeName, engine); }); return storageEngines; diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java index 1b3762a0ca..104bc2c5f2 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java @@ -199,9 +199,8 @@ private Map createStoreEngines(Set storeNames, Jo taskModel.getTaskMode()); StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory, StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs, - taskModel, jobContext, containerContext, - serdes, metricsRegistry, messageCollector, - this.config); + taskModel, jobContext, containerContext, serdes, + metricsRegistry, messageCollector, this.config); storageEngines.put(storeName, engine); }); return storageEngines; diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 4b38fd8559..64e52c5954 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -172,8 +172,8 @@ public ContainerStorageManager( this.activeTaskChangelogSystemStreams = ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel); - LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams = {}", - this.activeTaskChangelogSystemStreams, sideInputSystemStreams); + LOG.info("Starting with changelogSystemStreams = {} activeTaskChangelogSystemStreams = {} sideInputSystemStreams = {}", + changelogSystemStreams, activeTaskChangelogSystemStreams, sideInputSystemStreams); if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) { LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure" @@ -183,15 +183,17 @@ public ContainerStorageManager( this.storeDirectoryPaths = new HashSet<>(); - this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(changelogSystemStreams, storageEngineFactories, sideInputStoreNames, storeDirectoryPaths, containerModel, - jobContext, containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config - ); + this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores( + activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, + storeDirectoryPaths, containerModel, jobContext, containerContext, + taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); // Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams' // (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams. // create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams) - this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(activeTaskChangelogSystemStreams, - systemFactories, samzaContainerMetrics.registry(), config); + this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers( + activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config); JobConfig jobConfig = new JobConfig(config); int restoreThreadPoolSize = @@ -226,9 +228,8 @@ public void start() throws SamzaException, InterruptedException { storageEngineFactories, storeDirectoryPaths, containerModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, - streamMetadataCache, systemAdmins, serdeManager, serdes, - storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock - ); + streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock); // blocks until initial side inputs restore is complete sideInputsManager.start(); @@ -270,12 +271,16 @@ private void restoreStores() throws InterruptedException { } taskCheckpoints.put(taskName, taskCheckpoint); Map> backendFactoryStoreNames = - ContainerStorageManagerUtil.getBackendFactoryStoreNames(nonSideInputStoreNames, taskCheckpoint, - new StorageConfig(config)); + ContainerStorageManagerUtil.getBackendFactoryStoreNames( + nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config)); Map taskStoreRestoreManagers = - ContainerStorageManagerUtil.createTaskRestoreManagers(taskName, backendFactoryStoreNames, restoreStateBackendFactories, - storageEngineFactories, storeConsumers, inMemoryStores, systemAdmins, restoreExecutor, taskModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock - ); + ContainerStorageManagerUtil.createTaskRestoreManagers( + taskName, backendFactoryStoreNames, restoreStateBackendFactories, + storageEngineFactories, storeConsumers, + inMemoryStores, systemAdmins, restoreExecutor, + taskModel, jobContext, containerContext, + samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock); taskRestoreManagers.put(taskName, taskStoreRestoreManagers); }); @@ -356,9 +361,11 @@ private void restoreStores() throws InterruptedException { this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop); // Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is - this.taskStores = ContainerStorageManagerUtil.createTaskStores(nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames, this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths, this.containerModel, - this.jobContext, this.containerContext, this.serdes, this.taskInstanceMetrics, - this.taskInstanceCollectors, this.storageManagerUtil, + this.taskStores = ContainerStorageManagerUtil.createTaskStores( + nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames, + this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths, + this.containerModel, this.jobContext, this.containerContext, + this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil, this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config); // Add in memory stores diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java index 2707ecff81..96390ba3bb 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java @@ -145,10 +145,11 @@ public SideInputsManager(Map> sideInputSystemStreams, this.config = config; // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories - this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories, - sideInputStoreNames, activeTaskChangelogSystemStreams, storeDirectoryPaths, containerModel, - jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors, - storageManagerUtil, + this.sideInputStores = ContainerStorageManagerUtil.createTaskStores( + sideInputStoreNames, storageEngineFactories, sideInputStoreNames, + activeTaskChangelogSystemStreams, storeDirectoryPaths, + containerModel, jobContext, containerContext, serdes, + taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs, @@ -379,8 +380,8 @@ private static Map createSideInputH Config config, Clock clock) { // creating sideInput store processors, one per store per task Map> taskSideInputProcessors = - createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs, containerModel, serdes, new StorageConfig(config) - ); + createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs, + containerModel, serdes, new StorageConfig(config)); Map handlers = new HashMap<>(); @@ -454,7 +455,8 @@ private static Map> createSideInputPr config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName); } else { - // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy + // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, + // we rely on upstream validations to fail the deploy // if this is a standby-task and the store is a non-side-input changelog store // we creating identity sideInputProcessor for stores of standbyTasks