Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,8 @@ private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, Jo
taskModel.getTaskMode());
StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
taskModel, jobContext, containerContext,
serdes, metricsRegistry, messageCollector,
this.config);
taskModel, jobContext, containerContext, serdes,
metricsRegistry, messageCollector, this.config);
storageEngines.put(storeName, engine);
});
return storageEngines;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, Jo
taskModel.getTaskMode());
StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
taskModel, jobContext, containerContext,
serdes, metricsRegistry, messageCollector,
this.config);
taskModel, jobContext, containerContext, serdes,
metricsRegistry, messageCollector, this.config);
storageEngines.put(storeName, engine);
});
return storageEngines;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public ContainerStorageManager(
this.activeTaskChangelogSystemStreams =
ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel);

LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams = {}",
this.activeTaskChangelogSystemStreams, sideInputSystemStreams);
LOG.info("Starting with changelogSystemStreams = {} activeTaskChangelogSystemStreams = {} sideInputSystemStreams = {}",
changelogSystemStreams, activeTaskChangelogSystemStreams, sideInputSystemStreams);

if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure"
Expand All @@ -183,15 +183,17 @@ public ContainerStorageManager(

this.storeDirectoryPaths = new HashSet<>();

this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(changelogSystemStreams, storageEngineFactories, sideInputStoreNames, storeDirectoryPaths, containerModel,
jobContext, containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config
);
this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Changed changelogSystemStreams to activeTaskChangelogSystemStreams here for correctness.

storeDirectoryPaths, containerModel, jobContext, containerContext,
taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

// Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
// (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams.
// create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams)
this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(activeTaskChangelogSystemStreams,
systemFactories, samzaContainerMetrics.registry(), config);
this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);

JobConfig jobConfig = new JobConfig(config);
int restoreThreadPoolSize =
Expand Down Expand Up @@ -226,9 +228,8 @@ public void start() throws SamzaException, InterruptedException {
storageEngineFactories, storeDirectoryPaths,
containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
streamMetadataCache, systemAdmins, serdeManager, serdes,
storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
);
streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);

// blocks until initial side inputs restore is complete
sideInputsManager.start();
Expand Down Expand Up @@ -270,12 +271,16 @@ private void restoreStores() throws InterruptedException {
}
taskCheckpoints.put(taskName, taskCheckpoint);
Map<String, Set<String>> backendFactoryStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(nonSideInputStoreNames, taskCheckpoint,
new StorageConfig(config));
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));
Map<String, TaskRestoreManager> taskStoreRestoreManagers =
ContainerStorageManagerUtil.createTaskRestoreManagers(taskName, backendFactoryStoreNames, restoreStateBackendFactories,
storageEngineFactories, storeConsumers, inMemoryStores, systemAdmins, restoreExecutor, taskModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
);
ContainerStorageManagerUtil.createTaskRestoreManagers(
taskName, backendFactoryStoreNames, restoreStateBackendFactories,
storageEngineFactories, storeConsumers,
inMemoryStores, systemAdmins, restoreExecutor,
taskModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
});

Expand Down Expand Up @@ -356,9 +361,11 @@ private void restoreStores() throws InterruptedException {
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);

// Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is
this.taskStores = ContainerStorageManagerUtil.createTaskStores(nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames, this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths, this.containerModel,
this.jobContext, this.containerContext, this.serdes, this.taskInstanceMetrics,
this.taskInstanceCollectors, this.storageManagerUtil,
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames,
this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);

// Add in memory stores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
this.config = config;

// create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories,
sideInputStoreNames, activeTaskChangelogSystemStreams, storeDirectoryPaths, containerModel,
jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors,
storageManagerUtil,
this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
containerModel, jobContext, containerContext, serdes,
taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs,
Expand Down Expand Up @@ -379,8 +380,8 @@ private static Map<SystemStreamPartition, TaskSideInputHandler> createSideInputH
Config config, Clock clock) {
// creating sideInput store processors, one per store per task
Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs, containerModel, serdes, new StorageConfig(config)
);
createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs,
containerModel, serdes, new StorageConfig(config));

Map<SystemStreamPartition, TaskSideInputHandler> handlers = new HashMap<>();

Expand Down Expand Up @@ -454,7 +455,8 @@ private static Map<TaskName, Map<String, SideInputsProcessor>> createSideInputPr
config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);

} else {
// if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy
// if this is a active-task with a side-input store but no sideinput-processor-factory defined in config,
// we rely on upstream validations to fail the deploy

// if this is a standby-task and the store is a non-side-input changelog store
// we creating identity sideInputProcessor for stores of standbyTasks
Expand Down