From 5c439628022b0a35d96409f866389d77388be5df Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Mon, 6 Mar 2023 22:42:46 -0800 Subject: [PATCH 1/2] Refactoring ContainerStorageManager for readability and manageability. --- ...nTransactionalStateTaskRestoreManager.java | 8 +- .../TransactionalStateTaskRestoreManager.java | 8 +- .../storage/ContainerStorageManager.java | 813 ++---------------- .../storage/ContainerStorageManagerUtil.java | 396 +++++++++ .../samza/storage/SideInputsManager.java | 499 +++++++++++ .../storage/TestContainerStorageManager.java | 61 +- 6 files changed, 1010 insertions(+), 775 deletions(-) create mode 100644 samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java create mode 100644 samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java index 941731187b..e2a3742823 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java @@ -344,9 +344,11 @@ private Map createStoreEngines(Set storeNames, Jo File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory; File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); - StorageEngine engine = ContainerStorageManager.createStore(storeName, storeDirectory, taskModel, jobContext, containerContext, - storageEngineFactories, serdes, metricsRegistry, messageCollector, - StorageEngineFactory.StoreMode.BulkLoad, this.storeChangelogs, this.config); + StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory, + StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs, + taskModel, jobContext, containerContext, + serdes, metricsRegistry, messageCollector, + this.config); storageEngines.put(storeName, engine); }); return storageEngines; diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java index 45efcb60d8..1b3762a0ca 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java @@ -197,9 +197,11 @@ private Map createStoreEngines(Set storeNames, Jo File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory; File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); - StorageEngine engine = ContainerStorageManager.createStore(storeName, storeDirectory, taskModel, jobContext, containerContext, - storageEngineFactories, serdes, metricsRegistry, messageCollector, - StorageEngineFactory.StoreMode.BulkLoad, this.storeChangelogs, this.config); + StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory, + StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs, + taskModel, jobContext, containerContext, + serdes, metricsRegistry, messageCollector, + this.config); storageEngines.put(storeName, engine); }); return storageEngines; diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index e58b8c301c..6ca64787fe 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -19,13 +19,10 @@ package org.apache.samza.storage; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.File; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -33,26 +30,17 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.commons.collections4.MapUtils; import org.apache.samza.SamzaException; -import org.apache.samza.application.ApplicationUtil; import org.apache.samza.checkpoint.Checkpoint; import org.apache.samza.checkpoint.CheckpointManager; -import org.apache.samza.checkpoint.CheckpointV2; -import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.StorageConfig; -import org.apache.samza.config.TaskConfig; -import org.apache.samza.container.RunLoop; -import org.apache.samza.container.RunLoopTask; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; @@ -60,36 +48,18 @@ import org.apache.samza.context.JobContext; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskMode; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.Gauge; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeManager; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemConsumers; -import org.apache.samza.system.SystemConsumersMetrics; import org.apache.samza.system.SystemFactory; import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamMetadata; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.system.chooser.DefaultChooser; -import org.apache.samza.system.chooser.MessageChooser; -import org.apache.samza.system.chooser.RoundRobinChooserFactory; -import org.apache.samza.table.utils.SerdeUtils; -import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskInstanceCollector; import org.apache.samza.util.Clock; -import org.apache.samza.util.ReflectionUtil; -import org.apache.samza.util.ScalaJavaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; /** @@ -112,32 +82,25 @@ public class ContainerStorageManager { private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class); private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d"; - private static final String SIDEINPUTS_THREAD_NAME = "SideInputs Thread"; - private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-"; - // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer - - // Timeout with which sideinput thread checks for exceptions and for whether SSPs as caught up - private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10; - private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60; - private static final int RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS = 60; - private static final int DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR = 1; - /** Maps containing relevant per-task objects */ private final Map taskInstanceMetrics; private final Map taskInstanceCollectors; private final Map> inMemoryStores; // subset of taskStores after #start() - private Map> taskStores; // Will be available after #start() + private Map> taskStores; // Will be available in #start() after #restoreStores() private final Map storeConsumers; // Mapping from store name to SystemConsumers private final Map> storageEngineFactories; // Map of storageEngineFactories indexed by store name - private final Map changelogSystemStreams; // Map of changelog system-streams indexed by store name + private final Map activeTaskChangelogSystemStreams; // Map of changelog system-streams indexed by store name private final Map> serdes; // Map of Serde objects indexed by serde name (specified in config) + private final SerdeManager serdeManager; private final SystemAdmins systemAdmins; + private final Map systemFactories; private final Clock clock; private final Map restoreStateBackendFactories; + private final Map> sideInputSystemStreams; private final StreamMetadataCache streamMetadataCache; private final SamzaContainerMetrics samzaContainerMetrics; @@ -151,27 +114,14 @@ public class ContainerStorageManager { private final File nonLoggedStoreBaseDirectory; private final Set storeDirectoryPaths; // the set of store directory paths, used by SamzaContainer to initialize its disk-space-monitor - /* Sideinput related parameters */ - private final boolean hasSideInputs; - private final Map> sideInputStores; // subset of taskStores after #start() - // side inputs indexed first by task, then store name - private final Map>> taskSideInputStoreSSPs; - private final Set sideInputStoreNames; - private final Map sspSideInputHandlers; - private SystemConsumers sideInputSystemConsumers; - private final Map sideInputTaskLatches; // Used by the sideInput-read thread to signal to the main thread - private volatile boolean shouldShutdown = false; - private RunLoop sideInputRunLoop; - - private final ExecutorService sideInputsExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_THREAD_NAME).build()); private final ExecutorService restoreExecutor; - private volatile Throwable sideInputException = null; - private final Config config; private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + private final Set sideInputStoreNames; + private SideInputsManager sideInputsManager; // created in start() after restoreStores() for regular stores is complete. + private boolean isStarted = false; public ContainerStorageManager( @@ -197,24 +147,31 @@ public ContainerStorageManager( Clock clock) { this.checkpointManager = checkpointManager; this.containerModel = containerModel; - this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, sideInputSystemStreams, changelogSystemStreams); - this.sideInputStoreNames = getSideInputStores(containerModel, sideInputSystemStreams, changelogSystemStreams); - this.sideInputTaskLatches = new HashMap<>(); - this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream() - .flatMap(m -> m.values().stream()) - .flatMap(Collection::stream) - .findAny() - .isPresent(); - this.changelogSystemStreams = getActiveTaskChangelogSystemStreams(containerModel, changelogSystemStreams); - - LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs = {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs); - - this.clock = clock; - this.restoreStateBackendFactories = restoreStateBackendFactories; + this.streamMetadataCache = streamMetadataCache; + this.systemAdmins = systemAdmins; + this.sideInputSystemStreams = sideInputSystemStreams; this.storageEngineFactories = storageEngineFactories; + this.systemFactories = systemFactories; this.serdes = serdes; + this.config = config; + this.taskInstanceMetrics = taskInstanceMetrics; + this.samzaContainerMetrics = samzaContainerMetrics; + this.jobContext = jobContext; + this.containerContext = containerContext; + this.restoreStateBackendFactories = restoreStateBackendFactories; + this.taskInstanceCollectors = taskInstanceCollectors; this.loggedStoreBaseDirectory = loggedStoreBaseDirectory; this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory; + this.serdeManager = serdeManager; + this.clock = clock; + + this.sideInputStoreNames = + ContainerStorageManagerUtil.getSideInputStoreNames(sideInputSystemStreams, changelogSystemStreams, containerModel); + this.activeTaskChangelogSystemStreams = + ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel); + + LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams = {}", + this.activeTaskChangelogSystemStreams, sideInputSystemStreams); if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) { LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure" @@ -222,47 +179,17 @@ public ContainerStorageManager( loggedStoreBaseDirectory); } - // set the config - this.config = config; - - this.taskInstanceMetrics = taskInstanceMetrics; - - // Setting the metrics registry - this.samzaContainerMetrics = samzaContainerMetrics; - - this.jobContext = jobContext; - this.containerContext = containerContext; - - this.taskInstanceCollectors = taskInstanceCollectors; - - // initializing the set of store directory paths this.storeDirectoryPaths = new HashSet<>(); - this.streamMetadataCache = streamMetadataCache; - this.systemAdmins = systemAdmins; - - // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories - this.sideInputStores = createTaskStores(sideInputStoreNames, containerModel, jobContext, containerContext, - storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors); - StorageConfig storageConfig = new StorageConfig(config); - Set inMemoryStoreNames = storageEngineFactories.keySet().stream() - .filter(storeName -> { - Optional storeFactory = storageConfig.getStorageFactoryClassName(storeName); - return storeFactory.isPresent() && storeFactory.get() - .equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY); - }) - .collect(Collectors.toSet()); - this.inMemoryStores = createTaskStores(inMemoryStoreNames, - this.containerModel, jobContext, containerContext, storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors); - - Set containerChangelogSystems = this.changelogSystemStreams.values().stream() - .map(SystemStream::getSystem) - .collect(Collectors.toSet()); + this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(changelogSystemStreams, storageEngineFactories, sideInputStoreNames, storeDirectoryPaths, containerModel, + jobContext, containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config + ); - // create system consumers (1 per store system in changelogSystemStreams), and index it by storeName - Map storeSystemConsumers = createConsumers( - containerChangelogSystems, systemFactories, config, this.samzaContainerMetrics.registry()); - this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers); + // Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams' + // (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams. + // create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams) + this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(activeTaskChangelogSystemStreams, + systemFactories, samzaContainerMetrics.registry(), config); JobConfig jobConfig = new JobConfig(config); int restoreThreadPoolSize = @@ -273,471 +200,13 @@ public ContainerStorageManager( ); this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build()); - - this.sspSideInputHandlers = createSideInputHandlers(clock); - - // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used - if (this.hasSideInputs) { - Set containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream() - .flatMap(map -> map.values().stream()) - .flatMap(Set::stream) - .map(SystemStreamPartition::getSystemStream) - .collect(Collectors.toSet()); - - Set containerSideInputSystems = containerSideInputSystemStreams.stream() - .map(SystemStream::getSystem) - .collect(Collectors.toSet()); - - // create sideInput consumers indexed by systemName - // Mapping from storeSystemNames to SystemConsumers - Map sideInputConsumers = - createConsumers(containerSideInputSystems, systemFactories, config, this.samzaContainerMetrics.registry()); - - scala.collection.immutable.Map inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false); - - SystemConsumersMetrics sideInputSystemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX); - // we use the same registry as samza-container-metrics - - MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, - sideInputSystemConsumersMetrics.registry(), systemAdmins); - - ApplicationConfig applicationConfig = new ApplicationConfig(config); - - sideInputSystemConsumers = - new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager, - sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), - TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()), - JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId()); - } - - } - - /** - * Remove changeLogSSPs that are associated with standby tasks from changelogSSP map and only return changelogSSPs - * associated with the active tasks. - * The standby changelogs will be consumed and restored as side inputs. - * - * @param containerModel the container's model - * @param changelogSystemStreams the passed in set of changelogSystemStreams - * @return A map of changeLogSSP to storeName across all tasks, assuming no two stores have the same changelogSSP - */ - @VisibleForTesting - Map getActiveTaskChangelogSystemStreams(ContainerModel containerModel, - Map changelogSystemStreams) { - if (MapUtils.invertMap(changelogSystemStreams).size() != changelogSystemStreams.size()) { - throw new SamzaException("Two stores cannot have the same changelog system-stream"); - } - - Map changelogSSPToStore = new HashMap<>(); - changelogSystemStreams.forEach((storeName, systemStream) -> - containerModel.getTasks().forEach((taskName, taskModel) -> - changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName)) - ); - - getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> { - changelogSystemStreams.forEach((storeName, systemStream) -> { - SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()); - changelogSSPToStore.remove(ssp); - }); - }); - - // changelogSystemStreams correspond only to active tasks (since those of standby-tasks moved to sideInputs above) - return MapUtils.invertMap(changelogSSPToStore).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getSystemStream())); - } - - /** - * Fetch the side input stores. For active containers, the stores correspond to the side inputs and for standbys, they - * include the durable stores. - * @param containerModel the container's model - * @param sideInputSystemStreams the map of store to side input system streams - * @param changelogSystemStreams the map of store to changelog system streams - * @return A set of side input stores - */ - @VisibleForTesting - Set getSideInputStores(ContainerModel containerModel, - Map> sideInputSystemStreams, Map changelogSystemStreams) { - // add all the side input stores by default regardless of active vs standby - Set sideInputStores = new HashSet<>(sideInputSystemStreams.keySet()); - - // In case of standby tasks, we treat the stores that have changelogs as side input stores for bootstrapping state - if (getTasks(containerModel, TaskMode.Standby).size() > 0) { - sideInputStores.addAll(changelogSystemStreams.keySet()); - } - return sideInputStores; - } - - /** - * Add all sideInputs to a map of maps, indexed first by taskName, then by sideInput store name. - * - * @param containerModel the containerModel to use - * @param sideInputSystemStreams the map of store to sideInput system stream - * @param changelogSystemStreams the map of store to changelog system stream - * @return taskSideInputSSPs map - */ - @VisibleForTesting - Map>> getTaskSideInputSSPs(ContainerModel containerModel, - Map> sideInputSystemStreams, Map changelogSystemStreams) { - Map>> taskSideInputSSPs = new HashMap<>(); - - containerModel.getTasks().forEach((taskName, taskModel) -> { - taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>()); - sideInputSystemStreams.keySet().forEach(storeName -> { - Set taskSideInputs = taskModel.getSystemStreamPartitions().stream().filter(ssp -> sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet()); - taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs); - }); - }); - - getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> { - taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>()); - changelogSystemStreams.forEach((storeName, systemStream) -> { - SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()); - taskSideInputSSPs.get(taskName).put(storeName, Collections.singleton(ssp)); - }); - }); - - return taskSideInputSSPs; - } - - /** - * Creates SystemConsumer objects for store restoration, creating one consumer per system. - */ - private static Map createConsumers(Set storeSystems, - Map systemFactories, Config config, MetricsRegistry registry) { - // Create one consumer for each system in use, map with one entry for each such system - Map consumers = new HashMap<>(); - - // Iterate over the list of storeSystems and create one sysConsumer per system - for (String storeSystemName : storeSystems) { - SystemFactory systemFactory = systemFactories.get(storeSystemName); - if (systemFactory == null) { - throw new SamzaException("System " + storeSystemName + " does not exist in config"); - } - consumers.put(storeSystemName, systemFactory.getConsumer(storeSystemName, config, registry)); - } - - return consumers; - } - - private static Map createStoreIndexedMap(Map changelogSystemStreams, - Map systemNameToSystemConsumers) { - // Map of each storeName to its respective systemConsumer - Map storeConsumers = new HashMap<>(); - - // Populate the map of storeName to its relevant systemConsumer - for (String storeName : changelogSystemStreams.keySet()) { - storeConsumers.put(storeName, systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem())); - } - return storeConsumers; - } - - private Map createTaskRestoreManagers(Map factories, - Map> backendFactoryStoreNames, Clock clock, SamzaContainerMetrics samzaContainerMetrics, TaskName taskName, - TaskModel taskModel) { - // Get the factories for the task based on the stores of the tasks to be restored from the factory - Map backendFactoryRestoreManagers = new HashMap<>(); // backendFactoryName -> restoreManager - MetricsRegistry taskMetricsRegistry = - taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap(); - - backendFactoryStoreNames.forEach((factoryName, storeNames) -> { - StateBackendFactory factory = factories.get(factoryName); - if (factory == null) { - throw new SamzaException( - String.format("Required restore state backend factory: %s not found in configured factories %s", - factoryName, String.join(", ", factories.keySet()))); - } - KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers, - inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes, - taskInstanceCollectors.get(taskName)); - TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor, - taskMetricsRegistry, storeNames, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, - kafkaChangelogRestoreParams); - - backendFactoryRestoreManagers.put(factoryName, restoreManager); - }); - samzaContainerMetrics.addStoresRestorationGauge(taskName); - return backendFactoryRestoreManagers; - } - - /** - * Return a map of backend factory names to set of stores that should be restored using it - */ - @VisibleForTesting - Map> getBackendFactoryStoreNames(Checkpoint checkpoint, Set storeNames, - StorageConfig storageConfig) { - Map> backendFactoryStoreNames = new HashMap<>(); // backendFactoryName -> set(storeNames) - - if (checkpoint != null && checkpoint.getVersion() == 1) { - // Only restore stores with changelog streams configured - Set changelogStores = storeNames.stream() - .filter(storeName -> storageConfig.getChangelogStream(storeName).isPresent()) - .collect(Collectors.toSet()); - // Default to changelog backend factory when using checkpoint v1 for backwards compatibility - if (!changelogStores.isEmpty()) { - backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, changelogStores); - } - if (storeNames.size() > changelogStores.size()) { - Set nonChangelogStores = storeNames.stream() - .filter(storeName -> !changelogStores.contains(storeName)) - .collect(Collectors.toSet()); - LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1," - + "restore for the store will be skipped", - nonChangelogStores); - } - } else if (checkpoint == null || checkpoint.getVersion() == 2) { - // Extract the state checkpoint markers if checkpoint exists - Map> stateCheckpointMarkers = checkpoint == null ? Collections.emptyMap() : - ((CheckpointV2) checkpoint).getStateCheckpointMarkers(); - - // Find stores associated to each state backend factory - storeNames.forEach(storeName -> { - List storeFactories = storageConfig.getStoreRestoreFactories(storeName); - - if (storeFactories.isEmpty()) { - // If the restore factory is not configured for the store and the store does not have a changelog topic - LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs," - + "restore for the store will be skipped", - storeName); - } else { - // Search the ordered list for the first matched state backend factory in the checkpoint - // If the checkpoint does not exist or state checkpoint markers does not exist, we match the first configured - // restore manager - Optional factoryNameOpt = storeFactories.stream() - .filter(factoryName -> stateCheckpointMarkers.containsKey(factoryName) && - stateCheckpointMarkers.get(factoryName).containsKey(storeName)) - .findFirst(); - String factoryName; - if (factoryNameOpt.isPresent()) { - factoryName = factoryNameOpt.get(); - } else { // Restore factories configured but no checkpoints found - // Use first configured restore factory - factoryName = storeFactories.get(0); - LOG.warn("No matching checkpoints found for configured factories: {}, " + - "defaulting to using the first configured factory with no checkpoints", storeFactories); - } - if (!backendFactoryStoreNames.containsKey(factoryName)) { - backendFactoryStoreNames.put(factoryName, new HashSet<>()); - } - backendFactoryStoreNames.get(factoryName).add(storeName); - } - }); - } else { - throw new SamzaException(String.format("Unsupported checkpoint version %s", checkpoint.getVersion())); - } - return backendFactoryStoreNames; - } - - // Helper method to filter active Tasks from the container model - private static Map getTasks(ContainerModel containerModel, TaskMode taskMode) { - return containerModel.getTasks().entrySet().stream() - .filter(x -> x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - /** - * Create taskStores for all stores in storesToCreate. - * The store mode is chosen as read-write mode. - */ - private Map> createTaskStores(Set storesToCreate, - ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, - Map> storageEngineFactories, Map> serdes, - Map taskInstanceMetrics, - Map taskInstanceCollectors) { - Map> taskStores = new HashMap<>(); - StorageConfig storageConfig = new StorageConfig(config); - - // iterate over each task and each storeName - for (Map.Entry task : containerModel.getTasks().entrySet()) { - TaskName taskName = task.getKey(); - TaskModel taskModel = task.getValue(); - if (!taskStores.containsKey(taskName)) { - taskStores.put(taskName, new HashMap<>()); - } - - for (String storeName : storesToCreate) { - List storeBackupManagers = storageConfig.getStoreBackupFactories(storeName); - // A store is considered durable if it is backed by a changelog or another backupManager factory - boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); - boolean isSideInput = this.sideInputStoreNames.contains(storeName); - // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir - // for non logged stores - File storeBaseDir = isDurable || isSideInput ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory; - File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, - taskModel.getTaskMode()); - this.storeDirectoryPaths.add(storeDirectory.toPath()); - - // if taskInstanceMetrics are specified use those for store metrics, - // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap - MetricsRegistry storeMetricsRegistry = - taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap(); - - StorageEngine storageEngine = - createStore(storeName, storeDirectory, taskModel, jobContext, containerContext, storageEngineFactories, - serdes, storeMetricsRegistry, taskInstanceCollectors.get(taskName), - StorageEngineFactory.StoreMode.ReadWrite, this.changelogSystemStreams, this.config); - - // add created store to map - taskStores.get(taskName).put(storeName, storageEngine); - - LOG.info("Created task store {} in read-write mode for task {} in path {}", storeName, taskName, storeDirectory.getAbsolutePath()); - } - } - return taskStores; - } - - /** - * Method to instantiate a StorageEngine with the given parameters, and populate the storeDirectory paths (used to monitor - * disk space). - */ - public static StorageEngine createStore( - String storeName, - File storeDirectory, - TaskModel taskModel, - JobContext jobContext, - ContainerContext containerContext, - Map> storageEngineFactories, - Map> serdes, - MetricsRegistry storeMetricsRegistry, - MessageCollector messageCollector, - StorageEngineFactory.StoreMode storeMode, - Map changelogSystemStreams, - Config config) { - - StorageConfig storageConfig = new StorageConfig(config); - SystemStreamPartition changeLogSystemStreamPartition = changelogSystemStreams.containsKey(storeName) ? - new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()) : null; - - Optional storageKeySerde = storageConfig.getStorageKeySerde(storeName); - Serde keySerde = null; - if (storageKeySerde.isPresent()) { - keySerde = serdes.get(storageKeySerde.get()); - } - Optional storageMsgSerde = storageConfig.getStorageMsgSerde(storeName); - Serde messageSerde = null; - if (storageMsgSerde.isPresent()) { - messageSerde = serdes.get(storageMsgSerde.get()); - } - - return storageEngineFactories.get(storeName) - .getStorageEngine(storeName, storeDirectory, keySerde, messageSerde, messageCollector, - storeMetricsRegistry, changeLogSystemStreamPartition, jobContext, containerContext, storeMode); - } - - - // Create sideInput store processors, one per store per task - private Map> createSideInputProcessors(StorageConfig config, - ContainerModel containerModel, Map taskInstanceMetrics) { - - Map> sideInputStoresToProcessors = new HashMap<>(); - containerModel.getTasks().forEach((taskName, taskModel) -> { - sideInputStoresToProcessors.put(taskName, new HashMap<>()); - TaskMode taskMode = taskModel.getTaskMode(); - - for (String storeName : this.taskSideInputStoreSSPs.get(taskName).keySet()) { - - SideInputsProcessor sideInputsProcessor; - Optional sideInputsProcessorSerializedInstance = - config.getSideInputsProcessorSerializedInstance(storeName); - - if (sideInputsProcessorSerializedInstance.isPresent()) { - - sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get()); - LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName); - - } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) { - String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get(); - SideInputsProcessorFactory sideInputsProcessorFactory = - ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class); - sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry()); - LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName); - - } else { - // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy - - // if this is a standby-task and the store is a non-side-input changelog store - // we creating identity sideInputProcessor for stores of standbyTasks - // have to use the right serde because the sideInput stores are created - - Serde keySerde = serdes.get(config.getStorageKeySerde(storeName) - .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName))); - Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName) - .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName))); - sideInputsProcessor = new SideInputsProcessor() { - @Override - public Collection> process(IncomingMessageEnvelope message, KeyValueStore store) { - // Ignore message if the key is null - if (message.getKey() == null) { - return ImmutableList.of(); - } else { - // Skip serde if the message is null - return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()), - message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage()))); - } - } - }; - LOG.info("Using identity side-inputs-processor for store: {}, task: {}", storeName, taskName); - } - - sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor); - } - }); - - return sideInputStoresToProcessors; - } - - // Create task sideInput storage managers, one per task, index by the SSP they are responsible for consuming - private Map createSideInputHandlers(Clock clock) { - // creating sideInput store processors, one per store per task - Map> taskSideInputProcessors = - createSideInputProcessors(new StorageConfig(config), this.containerModel, this.taskInstanceMetrics); - - Map handlers = new HashMap<>(); - - if (this.hasSideInputs) { - containerModel.getTasks().forEach((taskName, taskModel) -> { - - Map taskSideInputStores = sideInputStores.get(taskName); - Map> sideInputStoresToSSPs = new HashMap<>(); - boolean taskHasSideInputs = false; - for (String storeName : taskSideInputStores.keySet()) { - Set storeSSPs = this.taskSideInputStoreSSPs.get(taskName).get(storeName); - taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty(); - sideInputStoresToSSPs.put(storeName, storeSSPs); - } - - if (taskHasSideInputs) { - CountDownLatch taskCountDownLatch = new CountDownLatch(1); - this.sideInputTaskLatches.put(taskName, taskCountDownLatch); - - TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName, - taskModel.getTaskMode(), - loggedStoreBaseDirectory, - taskSideInputStores, - sideInputStoresToSSPs, - taskSideInputProcessors.get(taskName), - this.systemAdmins, - this.streamMetadataCache, - taskCountDownLatch, - clock); - - sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> { - handlers.put(ssp, taskSideInputHandler); - }); - - LOG.info("Created TaskSideInputHandler for task {}, taskSideInputStores {} and loggedStoreBaseDirectory {}", - taskName, taskSideInputStores, loggedStoreBaseDirectory); - } - }); - } - return handlers; - } - - private Set getSideInputHandlers() { - return this.sspSideInputHandlers.values().stream().collect(Collectors.toSet()); - } public void start() throws SamzaException, InterruptedException { - // Restores and recreates + // Restores and recreates stores. restoreStores(); + // Shutdown restore executor since it will no longer be used try { restoreExecutor.shutdown(); @@ -745,19 +214,38 @@ public void start() throws SamzaException, InterruptedException { restoreExecutor.shutdownNow(); } } catch (Exception e) { - LOG.error(e.getMessage()); - } - if (this.hasSideInputs) { - startSideInputs(); + LOG.error("Error shutting down restore executor", e); } + + // create and restore side input stores + this.sideInputsManager = new SideInputsManager( + sideInputSystemStreams, systemFactories, activeTaskChangelogSystemStreams, storageEngineFactories, storeDirectoryPaths, containerModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, streamMetadataCache, systemAdmins, serdeManager, + serdes, + storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock + ); + + // blocks until initial side inputs restore is complete + sideInputsManager.start(); + + // Refactor Note (prateekm): In the previous version, side input stores were created in constructor, + // and were added to taskStores (which are created in restoreStores()) at the end of restoreStores(). + // In this version, side input stores are created much later during start() (this method), + // in the SideInputsManager constructor call above, and are added to taskStores here after side input restore + // (in sideInputsManager.start()) is complete. Completing the side input restore first isn't strictly necessary. + sideInputsManager.getSideInputStores().forEach((taskName, stores) -> { + if (!this.taskStores.containsKey(taskName)) { + taskStores.put(taskName, new HashMap<>()); + } + taskStores.get(taskName).putAll(stores); + }); + isStarted = true; } // Restoration of all stores, in parallel across tasks private void restoreStores() throws InterruptedException { LOG.info("Store Restore started"); - Set activeTasks = getTasks(containerModel, TaskMode.Active).keySet(); - // TODO HIGH dchen verify davinci lifecycle + Set activeTasks = ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Active).keySet(); // Find all non-side input stores Set nonSideInputStoreNames = storageEngineFactories.keySet() .stream() @@ -775,10 +263,13 @@ private void restoreStores() throws InterruptedException { LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName); } taskCheckpoints.put(taskName, taskCheckpoint); - Map> backendFactoryStoreNames = getBackendFactoryStoreNames(taskCheckpoint, nonSideInputStoreNames, - new StorageConfig(config)); - Map taskStoreRestoreManagers = createTaskRestoreManagers(restoreStateBackendFactories, - backendFactoryStoreNames, clock, samzaContainerMetrics, taskName, taskModel); + Map> backendFactoryStoreNames = + ContainerStorageManagerUtil.getBackendFactoryStoreNames(nonSideInputStoreNames, taskCheckpoint, + new StorageConfig(config)); + Map taskStoreRestoreManagers = + ContainerStorageManagerUtil.createTaskRestoreManagers(taskName, backendFactoryStoreNames, restoreStateBackendFactories, + storageEngineFactories, storeConsumers, inMemoryStores, systemAdmins, restoreExecutor, taskModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock + ); taskRestoreManagers.put(taskName, taskStoreRestoreManagers); }); @@ -859,8 +350,11 @@ private void restoreStores() throws InterruptedException { this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop); // Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is - this.taskStores = createTaskStores(nonSideInputStoreNames, this.containerModel, jobContext, containerContext, - storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors); + this.taskStores = ContainerStorageManagerUtil.createTaskStores(nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames, this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths, this.containerModel, + this.jobContext, this.containerContext, this.serdes, this.taskInstanceMetrics, + this.taskInstanceCollectors, this.storageManagerUtil, + this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config); + // Add in memory stores this.inMemoryStores.forEach((taskName, stores) -> { if (!this.taskStores.containsKey(taskName)) { @@ -868,145 +362,10 @@ private void restoreStores() throws InterruptedException { } taskStores.get(taskName).putAll(stores); }); - // Add side input stores - this.sideInputStores.forEach((taskName, stores) -> { - if (!this.taskStores.containsKey(taskName)) { - taskStores.put(taskName, new HashMap<>()); - } - taskStores.get(taskName).putAll(stores); - }); LOG.info("Store Restore complete"); } - // Read sideInputs until all sideInputStreams are caughtup, so start() can return - private void startSideInputs() { - - LOG.info("SideInput Restore started"); - - // initialize the sideInputStorageManagers - getSideInputHandlers().forEach(TaskSideInputHandler::init); - - Map taskSideInputHandlers = this.sspSideInputHandlers.values().stream() - .distinct() - .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity())); - - Map sideInputTaskMetrics = new HashMap<>(); - Map sideInputTasks = new HashMap<>(); - this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> { - Set taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream() - .flatMap(Set::stream) - .collect(Collectors.toSet()); - - if (!taskSSPs.isEmpty()) { - String sideInputSource = SIDEINPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source(); - TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics(sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDEINPUTS_METRICS_PREFIX); - sideInputTaskMetrics.put(taskName, sideInputMetrics); - - RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName)); - sideInputTasks.put(taskName, sideInputTask); - } - }); - - // register all sideInput SSPs with the consumers - for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) { - String startingOffset = this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp); - - if (startingOffset == null) { - throw new SamzaException( - "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start."); - } - - // register startingOffset with the sysConsumer and register a metric for it - sideInputSystemConsumers.register(ssp, startingOffset); - taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge( - ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp))); - sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge( - ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp))); - } - - // start the systemConsumers for consuming input - this.sideInputSystemConsumers.start(); - - TaskConfig taskConfig = new TaskConfig(this.config); - SamzaContainerMetrics sideInputContainerMetrics = - new SamzaContainerMetrics(SIDEINPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(), - this.samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX); - - final ApplicationConfig applicationConfig = new ApplicationConfig(config); - - this.sideInputRunLoop = new RunLoop(sideInputTasks, - null, // all operations are executed in the main runloop thread - this.sideInputSystemConsumers, - 1, // single message in flight per task - -1, // no windowing - taskConfig.getCommitMs(), - taskConfig.getCallbackTimeoutMs(), - taskConfig.getDrainCallbackTimeoutMs(), - // TODO consolidate these container configs SAMZA-2275 - this.config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)), - taskConfig.getMaxIdleMs(), - sideInputContainerMetrics, - System::nanoTime, - false, - DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR, - applicationConfig.getRunId(), - ApplicationUtil.isHighLevelApiJob(config) - ); // commit must be synchronous to ensure integrity of state flush - - try { - sideInputsExecutor.submit(() -> { - try { - sideInputRunLoop.run(); - } catch (Exception e) { - LOG.error("Exception in reading sideInputs", e); - sideInputException = e; - } - }); - - // Make the main thread wait until all sideInputs have been caughtup or an exception was thrown - while (!shouldShutdown && sideInputException == null && - !awaitSideInputTasks()) { - LOG.debug("Waiting for SideInput bootstrap to complete"); - } - - if (sideInputException != null) { // Throw exception if there was an exception in catching-up sideInputs - throw new SamzaException("Exception in restoring sideInputs", sideInputException); - } - - } catch (InterruptedException e) { - LOG.warn("Received an interrupt during side inputs store restoration." - + " Exiting prematurely without completing store restore."); - /* - * We want to stop side input restoration and rethrow the exception upstream. Container should handle the - * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the - * resources prematurely here. - */ - shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence? - throw new SamzaException("Side inputs read was interrupted", e); - } - - LOG.info("SideInput Restore complete"); - } - - /** - * Waits for all side input tasks to catch up until a timeout. - * - * @return False if waiting on any latch timed out, true otherwise - * - * @throws InterruptedException if waiting any of the latches is interrupted - */ - private boolean awaitSideInputTasks() throws InterruptedException { - long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(SIDE_INPUT_CHECK_TIMEOUT_SECONDS); - for (CountDownLatch latch : this.sideInputTaskLatches.values()) { - long remainingMillisToWait = endTime - System.currentTimeMillis(); - if (remainingMillisToWait <= 0 || !latch.await(remainingMillisToWait, TimeUnit.MILLISECONDS)) { - return false; - } - } - return true; - } - /** * Get the {@link StorageEngine} instance with a given name for a given task. * @param taskName the task name for which the storage engine is desired. @@ -1049,7 +408,7 @@ public void stopStores() { } public void shutdown() { - // stop all nonsideinputstores including persistent and non-persistent stores + // stop all non side input stores including persistent and non-persistent stores if (taskStores != null) { this.containerModel.getTasks() .forEach((taskName, taskModel) -> taskStores.get(taskName) @@ -1058,23 +417,7 @@ public void shutdown() { .forEach(e -> e.getValue().stop())); } - this.shouldShutdown = true; - - // stop all sideinput consumers and stores - if (this.hasSideInputs) { - this.sideInputRunLoop.shutdown(); - this.sideInputsExecutor.shutdown(); - try { - this.sideInputsExecutor.awaitTermination(SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new SamzaException("Exception while shutting down sideInputs", e); - } - - this.sideInputSystemConsumers.stop(); - - // stop all sideInputStores -- this will perform one last flush on the KV stores, and write the offset file - this.getSideInputHandlers().forEach(TaskSideInputHandler::stop); - } + this.sideInputsManager.shutdown(); LOG.info("Shutdown complete"); } } diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java new file mode 100644 index 0000000000..24430aa8c1 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage; + +import java.io.File; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import org.apache.commons.collections4.MapUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointV2; +import org.apache.samza.config.Config; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.container.TaskInstanceMetrics; +import org.apache.samza.container.TaskName; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.JobContext; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskInstanceCollector; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContainerStorageManagerUtil { + private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManagerUtil.class); + + /** + * Create taskStores for all stores in storesToCreate. + * The store mode is chosen as read-write mode. + * Adds newly created stores to storeDirectoryPaths. + */ + public static Map> createTaskStores(Set storesToCreate, + Map> storageEngineFactories, + Set sideInputStoreNames, + Map changelogSystemStreams, + Set storeDirectoryPaths, + ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, + Map> serdes, + Map taskInstanceMetrics, + Map taskInstanceCollectors, + StorageManagerUtil storageManagerUtil, + File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory, + Config config) { + Map> taskStores = new HashMap<>(); + StorageConfig storageConfig = new StorageConfig(config); + + // iterate over each task and each storeName + for (Map.Entry task : containerModel.getTasks().entrySet()) { + TaskName taskName = task.getKey(); + TaskModel taskModel = task.getValue(); + if (!taskStores.containsKey(taskName)) { + taskStores.put(taskName, new HashMap<>()); + } + + for (String storeName : storesToCreate) { + List storeBackupManagers = storageConfig.getStoreBackupFactories(storeName); + // A store is considered durable if it is backed by a changelog or another backupManager factory + boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); + boolean isSideInput = sideInputStoreNames.contains(storeName); + // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir + // for non logged stores + File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory; + File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, + taskModel.getTaskMode()); + storeDirectoryPaths.add(storeDirectory.toPath()); + + // if taskInstanceMetrics are specified use those for store metrics, + // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap + MetricsRegistry storeMetricsRegistry = + taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap(); + + StorageEngine storageEngine = + createStore(storeName, storeDirectory, StorageEngineFactory.StoreMode.ReadWrite, storageEngineFactories, changelogSystemStreams, taskModel, jobContext, containerContext, + serdes, storeMetricsRegistry, taskInstanceCollectors.get(taskName), + config); + + // add created store to map + taskStores.get(taskName).put(storeName, storageEngine); + + LOG.info("Created task store {} in read-write mode for task {} in path {}", storeName, taskName, storeDirectory.getAbsolutePath()); + } + } + return taskStores; + } + + /** + * Method to instantiate a StorageEngine with the given parameters, and populate the storeDirectory paths (used to monitor + * disk space). + */ + public static StorageEngine createStore( + String storeName, + File storeDirectory, + StorageEngineFactory.StoreMode storeMode, + Map> storageEngineFactories, + Map changelogSystemStreams, + TaskModel taskModel, JobContext jobContext, ContainerContext containerContext, + Map> serdes, + MetricsRegistry storeMetricsRegistry, + MessageCollector messageCollector, + Config config) { + + StorageConfig storageConfig = new StorageConfig(config); + SystemStreamPartition changeLogSystemStreamPartition = changelogSystemStreams.containsKey(storeName) ? + new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()) : null; + + Optional storageKeySerde = storageConfig.getStorageKeySerde(storeName); + Serde keySerde = null; + if (storageKeySerde.isPresent()) { + keySerde = serdes.get(storageKeySerde.get()); + } + Optional storageMsgSerde = storageConfig.getStorageMsgSerde(storeName); + Serde messageSerde = null; + if (storageMsgSerde.isPresent()) { + messageSerde = serdes.get(storageMsgSerde.get()); + } + + return storageEngineFactories.get(storeName) + .getStorageEngine(storeName, storeDirectory, keySerde, messageSerde, messageCollector, + storeMetricsRegistry, changeLogSystemStreamPartition, jobContext, containerContext, storeMode); + } + + public static Map> createInMemoryStores( + Map changelogSystemStreams, + Map> storageEngineFactories, + Set sideInputStoreNames, + Set storeDirectoryPaths, + ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, + Map taskInstanceMetrics, + Map taskInstanceCollectors, + Map> serdes, + StorageManagerUtil storageManagerUtil, + File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory, + Config config) { + StorageConfig storageConfig = new StorageConfig(config); + Set inMemoryStoreNames = storageEngineFactories.keySet().stream() + .filter(storeName -> { + Optional storeFactory = storageConfig.getStorageFactoryClassName(storeName); + return storeFactory.isPresent() && storeFactory.get() + .equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY); + }) + .collect(Collectors.toSet()); + return ContainerStorageManagerUtil.createTaskStores( + inMemoryStoreNames, storageEngineFactories, sideInputStoreNames, changelogSystemStreams, storeDirectoryPaths, containerModel, jobContext, containerContext, serdes, + taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); + } + + /** + * Returns a map from store names to their corresponding changelog system consumers. + * @return map of store name to its changelog system consumer. + */ + public static Map createStoreChangelogConsumers( + Map activeTaskChangelogSystemStreams, + Map systemFactories, MetricsRegistry registry, Config config) { + Set activeTaskChangelogSystems = activeTaskChangelogSystemStreams.values().stream() + .map(SystemStream::getSystem) + .collect(Collectors.toSet()); + + Map systemNameToSystemConsumers = ContainerStorageManagerUtil.createSystemConsumers( + activeTaskChangelogSystems, systemFactories, registry, config); + + // Map of each storeName to its respective systemConsumer + Map storeConsumers = new HashMap<>(); + + // Populate the map of storeName to its relevant systemConsumer + for (String storeName : activeTaskChangelogSystemStreams.keySet()) { + storeConsumers.put(storeName, systemNameToSystemConsumers.get(activeTaskChangelogSystemStreams.get(storeName).getSystem())); + } + return storeConsumers; + } + + /** + * Creates SystemConsumers for store restore, one consumer per system. + * @return map of system name to its system consumer. + */ + public static Map createSystemConsumers(Set storeSystems, + Map systemFactories, MetricsRegistry registry, Config config) { + // Create one consumer for each system in use, map with one entry for each such system + Map consumers = new HashMap<>(); + + // Iterate over the list of storeSystems and create one sysConsumer per system + for (String storeSystemName : storeSystems) { + SystemFactory systemFactory = systemFactories.get(storeSystemName); + if (systemFactory == null) { + throw new SamzaException("System " + storeSystemName + " does not exist in config"); + } + consumers.put(storeSystemName, systemFactory.getConsumer(storeSystemName, config, registry)); + } + + return consumers; + } + + public static Map createTaskRestoreManagers(TaskName taskName, + Map> backendFactoryStoreNames, + Map stateBackendFactories, + Map> storageEngineFactories, + Map storeConsumers, + Map> inMemoryStores, + SystemAdmins systemAdmins, + ExecutorService restoreExecutor, + TaskModel taskModel, JobContext jobContext, ContainerContext containerContext, + SamzaContainerMetrics samzaContainerMetrics, + Map taskInstanceMetrics, + Map taskInstanceCollectors, + Map> serdes, + File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory, + Config config, Clock clock) { + // Get the factories for the task based on the stores of the tasks to be restored from the factory + Map backendFactoryRestoreManagers = new HashMap<>(); // backendFactoryName -> restoreManager + MetricsRegistry taskMetricsRegistry = + taskInstanceMetrics.get(taskName) != null ? + taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap(); + + backendFactoryStoreNames.forEach((factoryName, storeNames) -> { + StateBackendFactory factory = stateBackendFactories.get(factoryName); + if (factory == null) { + throw new SamzaException( + String.format("Required restore state backend factory: %s not found in configured factories %s", + factoryName, String.join(", ", stateBackendFactories.keySet()))); + } + KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers, + inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes, + taskInstanceCollectors.get(taskName)); + TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor, + taskMetricsRegistry, storeNames, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, + kafkaChangelogRestoreParams); + + backendFactoryRestoreManagers.put(factoryName, restoreManager); + }); + samzaContainerMetrics.addStoresRestorationGauge(taskName); + return backendFactoryRestoreManagers; + } + + /** + * Returns a map of storeNames to changelogSSPs associated with the active tasks. + * The standby changelogs will be consumed and restored as side inputs. + * + * @param changelogSystemStreams the passed in map of storeName to changelogSystemStreams + * @param containerModel the container's model + * @return A map of storeName to changelogSSPs across all active tasks, assuming no two stores have the same changelogSSP + */ + public static Map getActiveTaskChangelogSystemStreams(Map changelogSystemStreams, ContainerModel containerModel) { + if (MapUtils.invertMap(changelogSystemStreams).size() != changelogSystemStreams.size()) { + throw new SamzaException("Two stores cannot have the same changelog system-stream"); + } + + Map changelogSSPToStore = new HashMap<>(); + changelogSystemStreams.forEach((storeName, systemStream) -> + containerModel.getTasks().forEach((taskName, taskModel) -> + changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName)) + ); + + getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> { + changelogSystemStreams.forEach((storeName, systemStream) -> { + SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()); + changelogSSPToStore.remove(ssp); + }); + }); + + // changelogSystemStreams correspond only to active tasks (since those of standby-tasks moved to sideInputs above) + return MapUtils.invertMap(changelogSSPToStore).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getSystemStream())); + } + + /** + * Return a map of backend factory names to set of stores that should be restored using it + */ + public static Map> getBackendFactoryStoreNames( + Set storeNames, Checkpoint checkpoint, StorageConfig storageConfig) { + Map> backendFactoryStoreNames = new HashMap<>(); // backendFactoryName -> set(storeNames) + + if (checkpoint != null && checkpoint.getVersion() == 1) { + // Only restore stores with changelog streams configured + Set changelogStores = storeNames.stream() + .filter(storeName -> storageConfig.getChangelogStream(storeName).isPresent()) + .collect(Collectors.toSet()); + // Default to changelog backend factory when using checkpoint v1 for backwards compatibility + if (!changelogStores.isEmpty()) { + backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, changelogStores); + } + if (storeNames.size() > changelogStores.size()) { + Set nonChangelogStores = storeNames.stream() + .filter(storeName -> !changelogStores.contains(storeName)) + .collect(Collectors.toSet()); + LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1," + + "restore for the store will be skipped", + nonChangelogStores); + } + } else if (checkpoint == null || checkpoint.getVersion() == 2) { + // Extract the state checkpoint markers if checkpoint exists + Map> stateCheckpointMarkers = checkpoint == null ? Collections.emptyMap() : + ((CheckpointV2) checkpoint).getStateCheckpointMarkers(); + + // Find stores associated to each state backend factory + storeNames.forEach(storeName -> { + List storeFactories = storageConfig.getStoreRestoreFactories(storeName); + + if (storeFactories.isEmpty()) { + // If the restore factory is not configured for the store and the store does not have a changelog topic + LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs," + + "restore for the store will be skipped", + storeName); + } else { + // Search the ordered list for the first matched state backend factory in the checkpoint + // If the checkpoint does not exist or state checkpoint markers does not exist, we match the first configured + // restore manager + Optional factoryNameOpt = storeFactories.stream() + .filter(factoryName -> stateCheckpointMarkers.containsKey(factoryName) && + stateCheckpointMarkers.get(factoryName).containsKey(storeName)) + .findFirst(); + String factoryName; + if (factoryNameOpt.isPresent()) { + factoryName = factoryNameOpt.get(); + } else { // Restore factories configured but no checkpoints found + // Use first configured restore factory + factoryName = storeFactories.get(0); + LOG.warn("No matching checkpoints found for configured factories: {}, " + + "defaulting to using the first configured factory with no checkpoints", storeFactories); + } + if (!backendFactoryStoreNames.containsKey(factoryName)) { + backendFactoryStoreNames.put(factoryName, new HashSet<>()); + } + backendFactoryStoreNames.get(factoryName).add(storeName); + } + }); + } else { + throw new SamzaException(String.format("Unsupported checkpoint version %s", checkpoint.getVersion())); + } + return backendFactoryStoreNames; + } + + // Helper method to filter active Tasks from the container model + public static Map getTasks(ContainerModel containerModel, TaskMode taskMode) { + return containerModel.getTasks().entrySet().stream() + .filter(x -> x.getValue().getTaskMode().equals(taskMode)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + /** + * Fetch the side input stores. For active containers, the stores correspond to the side inputs and for standbys, they + * include the durable stores. + * @param sideInputSystemStreams the map of store to side input system streams + * @param changelogSystemStreams the map of store to changelog system streams + * @param containerModel the container's model + * @return A set of side input stores + */ + public static Set getSideInputStoreNames( + Map> sideInputSystemStreams, + Map changelogSystemStreams, + ContainerModel containerModel) { + // add all the side input stores by default regardless of active vs standby + Set sideInputStores = new HashSet<>(sideInputSystemStreams.keySet()); + + // In case of standby tasks, we treat the stores that have changelogs as side input stores for bootstrapping state + if (getTasks(containerModel, TaskMode.Standby).size() > 0) { + sideInputStores.addAll(changelogSystemStreams.keySet()); + } + return sideInputStores; + } +} diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java new file mode 100644 index 0000000000..01b3d20bd4 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import scala.collection.JavaConversions; + +import java.io.File; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationUtil; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.RunLoop; +import org.apache.samza.container.RunLoopTask; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.container.TaskInstanceMetrics; +import org.apache.samza.container.TaskName; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.JobContext; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeManager; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemConsumers; +import org.apache.samza.system.SystemConsumersMetrics; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.chooser.DefaultChooser; +import org.apache.samza.system.chooser.MessageChooser; +import org.apache.samza.system.chooser.RoundRobinChooserFactory; +import org.apache.samza.table.utils.SerdeUtils; +import org.apache.samza.task.TaskInstanceCollector; +import org.apache.samza.util.Clock; +import org.apache.samza.util.ReflectionUtil; +import org.apache.samza.util.ScalaJavaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SideInputsManager { + private static final Logger LOG = LoggerFactory.getLogger(SideInputsManager.class); + + private static final String SIDE_INPUTS_THREAD_NAME = "SideInputs Thread"; + // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer + private static final String SIDE_INPUTS_METRICS_PREFIX = "side-inputs-"; + + // Timeout with which sideinput thread checks for exceptions and for whether SSPs as caught up + private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10; + private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60; + private static final int DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR = 1; + + private final SamzaContainerMetrics samzaContainerMetrics; + private final Map taskInstanceMetrics; + private final Config config; + + /* Sideinput related parameters */ + private final boolean hasSideInputs; + private final Map> sideInputStores; + // side inputs indexed first by task, then store name + private final Map>> taskSideInputStoreSSPs; + private final Set sideInputStoreNames; + private final Map sspSideInputHandlers; + private SystemConsumers sideInputSystemConsumers; + + // Used by the sideInput-read thread to signal to the main thread. Map's contents are mutated. + private final Map sideInputTaskLatches; + private final ExecutorService sideInputsExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDE_INPUTS_THREAD_NAME).build()); + private RunLoop sideInputRunLoop; // created in start() + + private volatile boolean shouldShutdown = false; + private volatile Throwable sideInputException = null; + + public SideInputsManager(Map> sideInputSystemStreams, + Map systemFactories, + Map changelogSystemStreams, + Map> storageEngineFactories, + Set storeDirectoryPaths, + ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, + SamzaContainerMetrics samzaContainerMetrics, + Map taskInstanceMetrics, + Map taskInstanceCollectors, + StreamMetadataCache streamMetadataCache, + SystemAdmins systemAdmins, + SerdeManager serdeManager, Map> serdes, + StorageManagerUtil storageManagerUtil, + File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory, + Config config, Clock clock) { + this.taskSideInputStoreSSPs = getTaskSideInputSSPs(sideInputSystemStreams, changelogSystemStreams, containerModel); + this.sideInputStoreNames = ContainerStorageManagerUtil.getSideInputStoreNames( + sideInputSystemStreams, changelogSystemStreams, containerModel); + this.sideInputTaskLatches = new HashMap<>(); + this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream() + .flatMap(m -> m.values().stream()) + .flatMap(Collection::stream) + .findAny() + .isPresent(); + + this.taskInstanceMetrics = taskInstanceMetrics; + this.samzaContainerMetrics = samzaContainerMetrics; + this.config = config; + + // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories + this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories, sideInputStoreNames, changelogSystemStreams, storeDirectoryPaths, containerModel, + jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors, + storageManagerUtil, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); + + this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs, sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock + ); + + // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used + if (this.hasSideInputs) { + Set containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream() + .flatMap(map -> map.values().stream()) + .flatMap(Set::stream) + .map(SystemStreamPartition::getSystemStream) + .collect(Collectors.toSet()); + + Set containerSideInputSystems = containerSideInputSystemStreams.stream() + .map(SystemStream::getSystem) + .collect(Collectors.toSet()); + + // create sideInput consumers indexed by systemName + // Mapping from storeSystemNames to SystemConsumers + Map sideInputConsumers = + ContainerStorageManagerUtil.createSystemConsumers(containerSideInputSystems, systemFactories, + samzaContainerMetrics.registry(), config); + + scala.collection.immutable.Map inputStreamMetadata = + streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false); + + // we use the same registry as samza-container-metrics + SystemConsumersMetrics sideInputSystemConsumersMetrics = + new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX); + + MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, + sideInputSystemConsumersMetrics.registry(), systemAdmins); + + ApplicationConfig applicationConfig = new ApplicationConfig(config); + + this.sideInputSystemConsumers = + new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager, + sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), + SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), + TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()), + JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId()); + } + } + + // read sideInputs until all sideInputStreams are caught up, then return + public void start() { + if (this.hasSideInputs) { + LOG.info("SideInput Restore started"); + + // initialize the sideInputStorageManagers + this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::init); + + Map taskSideInputHandlers = this.sspSideInputHandlers.values().stream() + .distinct() + .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity())); + + Map sideInputTaskMetrics = new HashMap<>(); + Map sideInputTasks = new HashMap<>(); + this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> { + Set taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream() + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + if (!taskSSPs.isEmpty()) { + String sideInputSource = SIDE_INPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source(); + TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics( + sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDE_INPUTS_METRICS_PREFIX); + sideInputTaskMetrics.put(taskName, sideInputMetrics); + + RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, + taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName)); + sideInputTasks.put(taskName, sideInputTask); + } + }); + + // register all sideInput SSPs with the consumers + for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) { + String startingOffset = this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp); + + if (startingOffset == null) { + throw new SamzaException( + "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start."); + } + + // register startingOffset with the sysConsumer and register a metric for it + sideInputSystemConsumers.register(ssp, startingOffset); + taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge( + ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp))); + sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge( + ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp))); + } + + // start the systemConsumers for consuming input + this.sideInputSystemConsumers.start(); + + TaskConfig taskConfig = new TaskConfig(this.config); + SamzaContainerMetrics sideInputContainerMetrics = + new SamzaContainerMetrics(SIDE_INPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(), + this.samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX); + + final ApplicationConfig applicationConfig = new ApplicationConfig(config); + + this.sideInputRunLoop = new RunLoop(sideInputTasks, + null, // all operations are executed in the main runloop thread + this.sideInputSystemConsumers, + 1, // single message in flight per task + -1, // no windowing + taskConfig.getCommitMs(), + taskConfig.getCallbackTimeoutMs(), + taskConfig.getDrainCallbackTimeoutMs(), + // TODO consolidate these container configs SAMZA-2275 + this.config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)), + taskConfig.getMaxIdleMs(), + sideInputContainerMetrics, + System::nanoTime, + false, + DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR, + applicationConfig.getRunId(), + ApplicationUtil.isHighLevelApiJob(config) + ); // commit must be synchronous to ensure integrity of state flush + + try { + sideInputsExecutor.submit(() -> { + try { + sideInputRunLoop.run(); + } catch (Exception e) { + LOG.error("Exception in reading sideInputs", e); + sideInputException = e; + } + }); + + // Make the main thread wait until all sideInputs have been caughtup or an exception was thrown + while (!shouldShutdown && sideInputException == null && + !awaitSideInputTasks(sideInputTaskLatches)) { + LOG.debug("Waiting for SideInput bootstrap to complete"); + } + + if (sideInputException != null) { // Throw exception if there was an exception in catching-up sideInputs + throw new SamzaException("Exception in restoring sideInputs", sideInputException); + } + + } catch (InterruptedException e) { + LOG.warn("Received an interrupt during side inputs store restoration." + + " Exiting prematurely without completing store restore."); + /* + * We want to stop side input restoration and rethrow the exception upstream. Container should handle the + * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the + * resources prematurely here. + */ + shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence? + throw new SamzaException("Side inputs read was interrupted", e); + } + + LOG.info("SideInput Restore complete"); + } + } + + public Map> getSideInputStores() { + return ImmutableMap.copyOf(this.sideInputStores); + } + + public void shutdown() { + // stop all side input consumers and stores + if (this.hasSideInputs) { + this.sideInputRunLoop.shutdown(); + this.sideInputsExecutor.shutdown(); + try { + this.sideInputsExecutor.awaitTermination(SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new SamzaException("Exception while shutting down sideInputs", e); + } + + this.sideInputSystemConsumers.stop(); + + // stop all side input handlers -- this will perform one last flush on the KV stores, and write the offset file + this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::stop); + } + } + + /** + * Add all sideInputs to a map of maps, indexed first by taskName, then by sideInput store name. + * + * @param sideInputSystemStreams the map of store to sideInput system stream + * @param changelogSystemStreams the map of store to changelog system stream + * @param containerModel the containerModel to use + * @return taskSideInputSSPs map + */ + @VisibleForTesting + static Map>> getTaskSideInputSSPs( + Map> sideInputSystemStreams, + Map changelogSystemStreams, + ContainerModel containerModel) { + Map>> taskSideInputSSPs = new HashMap<>(); + + containerModel.getTasks().forEach((taskName, taskModel) -> { + taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>()); + sideInputSystemStreams.keySet().forEach(storeName -> { + Set taskSideInputs = taskModel.getSystemStreamPartitions().stream() + .filter(ssp -> sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())) + .collect(Collectors.toSet()); + taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs); + }); + }); + + ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> { + taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>()); + changelogSystemStreams.forEach((storeName, systemStream) -> { + SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()); + taskSideInputSSPs.get(taskName).put(storeName, Collections.singleton(ssp)); + }); + }); + + return taskSideInputSSPs; + } + + // Create task sideInput storage managers, one per task, index by the SSP they are responsible for consuming + // Mutates (creates and adds to) sideInputTaskLatches. + private static Map createSideInputHandlers( + boolean hasSideInputs, Map> sideInputStores, + Map>> taskSideInputStoreSSPs, + Map sideInputTaskLatches, Map taskInstanceMetrics, + ContainerModel containerModel, StreamMetadataCache streamMetadataCache, SystemAdmins systemAdmins, + Map> serdes, File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory, + Config config, Clock clock) { + // creating sideInput store processors, one per store per task + Map> taskSideInputProcessors = + createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs, containerModel, serdes, new StorageConfig(config) + ); + + Map handlers = new HashMap<>(); + + if (hasSideInputs) { + containerModel.getTasks().forEach((taskName, taskModel) -> { + + Map taskSideInputStores = sideInputStores.get(taskName); + Map> sideInputStoresToSSPs = new HashMap<>(); + boolean taskHasSideInputs = false; + for (String storeName : taskSideInputStores.keySet()) { + Set storeSSPs = taskSideInputStoreSSPs.get(taskName).get(storeName); + taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty(); + sideInputStoresToSSPs.put(storeName, storeSSPs); + } + + if (taskHasSideInputs) { + CountDownLatch taskCountDownLatch = new CountDownLatch(1); + sideInputTaskLatches.put(taskName, taskCountDownLatch); + + TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName, + taskModel.getTaskMode(), + loggedStoreBaseDirectory, + taskSideInputStores, + sideInputStoresToSSPs, + taskSideInputProcessors.get(taskName), + systemAdmins, + streamMetadataCache, + taskCountDownLatch, + clock); + + sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> { + handlers.put(ssp, taskSideInputHandler); + }); + + LOG.info("Created TaskSideInputHandler for task {}, taskSideInputStores {} and loggedStoreBaseDirectory {}", + taskName, taskSideInputStores, loggedStoreBaseDirectory); + } + }); + } + return handlers; + } + + // Create sideInput store processors, one per store per task + private static Map> createSideInputProcessors( + Map taskInstanceMetrics, + Map>> taskSideInputStoreSSPs, + ContainerModel containerModel, Map> serdes, StorageConfig config) { + + Map> sideInputStoresToProcessors = new HashMap<>(); + containerModel.getTasks().forEach((taskName, taskModel) -> { + sideInputStoresToProcessors.put(taskName, new HashMap<>()); + TaskMode taskMode = taskModel.getTaskMode(); + + for (String storeName : taskSideInputStoreSSPs.get(taskName).keySet()) { + SideInputsProcessor sideInputsProcessor; + Optional sideInputsProcessorSerializedInstance = + config.getSideInputsProcessorSerializedInstance(storeName); + + if (sideInputsProcessorSerializedInstance.isPresent()) { + + sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get()); + LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName); + + } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) { + String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get(); + SideInputsProcessorFactory sideInputsProcessorFactory = + ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class); + sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry()); + LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName); + + } else { + // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy + + // if this is a standby-task and the store is a non-side-input changelog store + // we creating identity sideInputProcessor for stores of standbyTasks + // have to use the right serde because the sideInput stores are created + + Serde keySerde = serdes.get(config.getStorageKeySerde(storeName) + .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName))); + Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName) + .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName))); + sideInputsProcessor = new SideInputsProcessor() { + @Override + public Collection> process(IncomingMessageEnvelope message, KeyValueStore store) { + // Ignore message if the key is null + if (message.getKey() == null) { + return ImmutableList.of(); + } else { + // Skip serde if the message is null + return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()), + message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage()))); + } + } + }; + LOG.info("Using identity side-inputs-processor for store: {}, task: {}", storeName, taskName); + } + + sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor); + } + }); + + return sideInputStoresToProcessors; + } + + /** + * Waits for all side input tasks to catch up until a timeout. + * + * @return False if waiting on any latch timed out, true otherwise + * + * @throws InterruptedException if waiting any of the latches is interrupted + */ + private static boolean awaitSideInputTasks(Map sideInputTaskLatches) throws InterruptedException { + long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(SIDE_INPUT_CHECK_TIMEOUT_SECONDS); + for (CountDownLatch latch : sideInputTaskLatches.values()) { + long remainingMillisToWait = endTime - System.currentTimeMillis(); + if (remainingMillisToWait <= 0 || !latch.await(remainingMillisToWait, TimeUnit.MILLISECONDS)) { + return false; + } + } + return true; + } +} diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java index 810433258e..15785644e4 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -393,15 +393,14 @@ public void testCheckpointBasedRestoreFactoryCreation() { CheckpointV1 checkpointV1 = mock(CheckpointV1.class); when(checkpointV1.getVersion()).thenReturn((short) 1); - Map> factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV1, storeNames, mockConfig); + Map> factoriesToStores = + ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV1, mockConfig); Assert.assertEquals(1, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName1", "storeName2"), factoriesToStores.get(StorageConfig.KAFKA_STATE_BACKEND_FACTORY)); - factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(null, storeNames, mockConfig); + factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, null, mockConfig); Assert.assertEquals(2, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName0"), @@ -437,8 +436,8 @@ public void testCheckpointV2BasedRestoreFactoryCreation() { "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""), "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""))); - Map> factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig); + Map> factoriesToStores = + ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig); Assert.assertEquals(2, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName0"), factoriesToStores.get("factory0")); @@ -448,8 +447,7 @@ public void testCheckpointV2BasedRestoreFactoryCreation() { when(checkpointV2.getStateCheckpointMarkers()) .thenReturn(ImmutableMap.of( "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""))); - factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig); + factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig); Assert.assertEquals(1, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName1", "storeName0"), factoriesToStores.get("factory2")); @@ -458,8 +456,7 @@ public void testCheckpointV2BasedRestoreFactoryCreation() { .thenReturn(ImmutableMap.of( "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""), "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""))); - factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig); + factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig); Assert.assertEquals(2, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName0"), factoriesToStores.get("factory1")); @@ -470,8 +467,7 @@ public void testCheckpointV2BasedRestoreFactoryCreation() { .thenReturn(ImmutableMap.of( "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""), "factory2", ImmutableMap.of("storeName0", "", "storeName2", ""))); - factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig); + factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig); Assert.assertEquals(1, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"), factoriesToStores.get("factory1")); @@ -479,16 +475,14 @@ public void testCheckpointV2BasedRestoreFactoryCreation() { when(checkpointV2.getStateCheckpointMarkers()) .thenReturn(ImmutableMap.of( "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""))); - factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig); + factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig); Assert.assertEquals(1, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"), factoriesToStores.get("factory1")); when(checkpointV2.getStateCheckpointMarkers()) .thenReturn(Collections.emptyMap()); - factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig); + factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig); Assert.assertEquals(2, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName0"), factoriesToStores.get("factory0")); @@ -500,8 +494,7 @@ public void testCheckpointV2BasedRestoreFactoryCreation() { "factory0", ImmutableMap.of("storeName1", "", "storeName2", ""), "factory1", ImmutableMap.of("storeName1", "", "storeName2", ""), "factory2", ImmutableMap.of("storeName0", "", "storeName2", ""))); - factoriesToStores = this.containerStorageManager - .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig); + factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig); Assert.assertEquals(2, factoriesToStores.size()); Assert.assertEquals(ImmutableSet.of("storeName1"), factoriesToStores.get("factory1")); @@ -512,8 +505,8 @@ public void testCheckpointV2BasedRestoreFactoryCreation() { @Test public void getActiveTaskChangelogSystemStreams() { Map storeToChangelogSystemStreams = - containerStorageManager.getActiveTaskChangelogSystemStreams(testContext.standbyContainerModel, - testContext.storesToSystemStreams); + ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(testContext.storesToSystemStreams, testContext.standbyContainerModel + ); assertEquals("Standby container should have no active change log", Collections.emptyMap(), storeToChangelogSystemStreams); @@ -523,8 +516,8 @@ public void getActiveTaskChangelogSystemStreams() { public void getActiveTaskChangelogSystemStreamsForActiveAndStandbyContainer() { Map expectedStoreToChangelogSystemStreams = testContext.storesToSystemStreams; - Map storeToChangelogSystemStreams = containerStorageManager.getActiveTaskChangelogSystemStreams( - testContext.activeAndStandbyContainerModel, testContext.storesToSystemStreams); + Map storeToChangelogSystemStreams = ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams( + testContext.storesToSystemStreams, testContext.activeAndStandbyContainerModel); assertEquals("Active and standby container model should have non empty store to changelog mapping", expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams); @@ -534,8 +527,8 @@ public void getActiveTaskChangelogSystemStreamsForActiveAndStandbyContainer() { public void getActiveTaskChangelogSystemStreamsForStandbyContainer() { Map expectedStoreToChangelogSystemStreams = testContext.storesToSystemStreams; - Map storeToChangelogSystemStreams = containerStorageManager.getActiveTaskChangelogSystemStreams( - testContext.activeContainerModel, testContext.storesToSystemStreams); + Map storeToChangelogSystemStreams = ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams( + testContext.storesToSystemStreams, testContext.activeContainerModel); assertEquals("Active container model should have non empty store to changelog mapping", expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams); @@ -545,8 +538,8 @@ public void getActiveTaskChangelogSystemStreamsForStandbyContainer() { public void getSideInputStoresForActiveContainer() { Set expectedSideInputStores = testContext.activeStores; Set actualSideInputStores = - containerStorageManager.getSideInputStores(testContext.activeContainerModel, - testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams); + ContainerStorageManagerUtil.getSideInputStoreNames(testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams, testContext.activeContainerModel + ); assertEquals("Mismatch in stores", expectedSideInputStores, actualSideInputStores); } @@ -555,8 +548,8 @@ public void getSideInputStoresForActiveContainer() { public void getSideInputStoresForStandbyContainer() { final Set expectedSideInputStores = testContext.standbyStores; Set actualSideInputStores = - containerStorageManager.getSideInputStores(testContext.standbyContainerModel, - testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams); + ContainerStorageManagerUtil.getSideInputStoreNames(testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams, testContext.standbyContainerModel + ); assertEquals("Mismatch in side input stores", expectedSideInputStores, actualSideInputStores); } @@ -565,8 +558,8 @@ public void getSideInputStoresForStandbyContainer() { public void getTaskSideInputSSPsForActiveContainer() { Map>> expectedSideInputSSPs = testContext.activeSideInputSSPs; Map>> actualSideInputSSPs = - containerStorageManager.getTaskSideInputSSPs(testContext.activeContainerModel, - Collections.emptyMap(), testContext.storesToSystemStreams); + SideInputsManager.getTaskSideInputSSPs(Collections.emptyMap(), testContext.storesToSystemStreams, testContext.activeContainerModel + ); assertEquals("Mismatch in task name --> store --> SSP mapping", expectedSideInputSSPs, actualSideInputSSPs); } @@ -575,8 +568,8 @@ public void getTaskSideInputSSPsForActiveContainer() { public void getTaskSideInputSSPsForStandbyContainerWithSideInput() { Map>> expectedSideInputSSPs = testContext.standbyWithSideInputSSPs; Map>> actualSideInputSSPs = - containerStorageManager.getTaskSideInputSSPs(testContext.standbyContainerModelWithSideInputs, - testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams); + SideInputsManager.getTaskSideInputSSPs(testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams, testContext.standbyContainerModelWithSideInputs + ); assertEquals("Mismatch in task name --> store --> SSP mapping", expectedSideInputSSPs, actualSideInputSSPs); } @@ -585,8 +578,8 @@ public void getTaskSideInputSSPsForStandbyContainerWithSideInput() { public void getTaskSideInputSSPsForStandbyContainerWithoutSideInputs() { Map>> expectedSideInputSSPs = testContext.standbyChangelogSSPs; Map>> actualSideInputSSPs = - containerStorageManager.getTaskSideInputSSPs(testContext.standbyContainerModel, - Collections.emptyMap(), testContext.storesToSystemStreams); + SideInputsManager.getTaskSideInputSSPs(Collections.emptyMap(), testContext.storesToSystemStreams, testContext.standbyContainerModel + ); assertEquals("Mismatch in task name --> store --> SSP mapping", expectedSideInputSSPs, actualSideInputSSPs); } From 69851fbb0e3894c1d7defe995fc9263ac9680b19 Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Wed, 8 Mar 2023 09:15:02 -0800 Subject: [PATCH 2/2] Updated based on PR feedback. --- .../storage/ContainerStorageManager.java | 10 ++++-- .../storage/ContainerStorageManagerUtil.java | 31 ++++++++++++------- .../samza/storage/SideInputsManager.java | 16 +++++++--- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 6ca64787fe..4b38fd8559 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -92,6 +92,7 @@ public class ContainerStorageManager { private final Map storeConsumers; // Mapping from store name to SystemConsumers private final Map> storageEngineFactories; // Map of storageEngineFactories indexed by store name + private final Map changelogSystemStreams; private final Map activeTaskChangelogSystemStreams; // Map of changelog system-streams indexed by store name private final Map> serdes; // Map of Serde objects indexed by serde name (specified in config) private final SerdeManager serdeManager; @@ -149,6 +150,7 @@ public ContainerStorageManager( this.containerModel = containerModel; this.streamMetadataCache = streamMetadataCache; this.systemAdmins = systemAdmins; + this.changelogSystemStreams = changelogSystemStreams; this.sideInputSystemStreams = sideInputSystemStreams; this.storageEngineFactories = storageEngineFactories; this.systemFactories = systemFactories; @@ -219,8 +221,12 @@ public void start() throws SamzaException, InterruptedException { // create and restore side input stores this.sideInputsManager = new SideInputsManager( - sideInputSystemStreams, systemFactories, activeTaskChangelogSystemStreams, storageEngineFactories, storeDirectoryPaths, containerModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, streamMetadataCache, systemAdmins, serdeManager, - serdes, + sideInputSystemStreams, systemFactories, + changelogSystemStreams, activeTaskChangelogSystemStreams, + storageEngineFactories, storeDirectoryPaths, + containerModel, jobContext, containerContext, + samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, + streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock ); diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java index 24430aa8c1..5b3d90a708 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java @@ -68,7 +68,7 @@ public class ContainerStorageManagerUtil { public static Map> createTaskStores(Set storesToCreate, Map> storageEngineFactories, Set sideInputStoreNames, - Map changelogSystemStreams, + Map activeTaskChangelogSystemStreams, Set storeDirectoryPaths, ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, Map> serdes, @@ -91,7 +91,7 @@ public static Map> createTaskStores(Set storeBackupManagers = storageConfig.getStoreBackupFactories(storeName); // A store is considered durable if it is backed by a changelog or another backupManager factory - boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); + boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); boolean isSideInput = sideInputStoreNames.contains(storeName); // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir // for non logged stores @@ -103,17 +103,21 @@ public static Map> createTaskStores(Set> createInMemoryStores( - Map changelogSystemStreams, + Map activeTaskChangelogSystemStreams, Map> storageEngineFactories, Set sideInputStoreNames, Set storeDirectoryPaths, @@ -176,7 +180,9 @@ public static Map> createInMemoryStores( }) .collect(Collectors.toSet()); return ContainerStorageManagerUtil.createTaskStores( - inMemoryStoreNames, storageEngineFactories, sideInputStoreNames, changelogSystemStreams, storeDirectoryPaths, containerModel, jobContext, containerContext, serdes, + inMemoryStoreNames, storageEngineFactories, sideInputStoreNames, + activeTaskChangelogSystemStreams, storeDirectoryPaths, + containerModel, jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); } @@ -200,7 +206,8 @@ public static Map createStoreChangelogConsumers( // Populate the map of storeName to its relevant systemConsumer for (String storeName : activeTaskChangelogSystemStreams.keySet()) { - storeConsumers.put(storeName, systemNameToSystemConsumers.get(activeTaskChangelogSystemStreams.get(storeName).getSystem())); + storeConsumers.put(storeName, + systemNameToSystemConsumers.get(activeTaskChangelogSystemStreams.get(storeName).getSystem())); } return storeConsumers; } @@ -257,8 +264,9 @@ public static Map createTaskRestoreManagers(TaskName KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers, inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes, taskInstanceCollectors.get(taskName)); - TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor, - taskMetricsRegistry, storeNames, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, + TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, + restoreExecutor, taskMetricsRegistry, storeNames, config, clock, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, kafkaChangelogRestoreParams); backendFactoryRestoreManagers.put(factoryName, restoreManager); @@ -275,7 +283,8 @@ public static Map createTaskRestoreManagers(TaskName * @param containerModel the container's model * @return A map of storeName to changelogSSPs across all active tasks, assuming no two stores have the same changelogSSP */ - public static Map getActiveTaskChangelogSystemStreams(Map changelogSystemStreams, ContainerModel containerModel) { + public static Map getActiveTaskChangelogSystemStreams( + Map changelogSystemStreams, ContainerModel containerModel) { if (MapUtils.invertMap(changelogSystemStreams).size() != changelogSystemStreams.size()) { throw new SamzaException("Two stores cannot have the same changelog system-stream"); } diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java index 01b3d20bd4..2707ecff81 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java @@ -117,6 +117,7 @@ public class SideInputsManager { public SideInputsManager(Map> sideInputSystemStreams, Map systemFactories, Map changelogSystemStreams, + Map activeTaskChangelogSystemStreams, Map> storageEngineFactories, Set storeDirectoryPaths, ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, @@ -144,12 +145,15 @@ public SideInputsManager(Map> sideInputSystemStreams, this.config = config; // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories - this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories, sideInputStoreNames, changelogSystemStreams, storeDirectoryPaths, containerModel, + this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories, + sideInputStoreNames, activeTaskChangelogSystemStreams, storeDirectoryPaths, containerModel, jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); - this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs, sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock + this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs, + sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock ); // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used @@ -309,6 +313,8 @@ public Map> getSideInputStores() { } public void shutdown() { + this.shouldShutdown = true; + // stop all side input consumers and stores if (this.hasSideInputs) { this.sideInputRunLoop.shutdown(); @@ -442,8 +448,10 @@ private static Map> createSideInputPr String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get(); SideInputsProcessorFactory sideInputsProcessorFactory = ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class); - sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry()); - LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName); + sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor( + config, taskInstanceMetrics.get(taskName).registry()); + LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", + config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName); } else { // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy