Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ public interface StorageEngine {
* provided in one {@link java.util.Iterator} and not deserialized for
* efficiency, allowing the implementation to optimize replay, if possible.
*
* The implementers are expected to handle interrupt signals to the restoration thread and rethrow the exception to
* upstream so that {@code TaskRestoreManager} can accordingly notify the container.
*
* @param envelopes
* An iterator of envelopes that the storage engine can read from to
* restore its state on startup.
* @throws InterruptedException when received interrupts during restoration
*/
void restore(ChangelogSSPIterator envelopes);
void restore(ChangelogSSPIterator envelopes) throws InterruptedException;

/**
* Flush any cached messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,18 @@
*
* Describes the valid state transitions of the {@link StreamProcessor}.
*
*
* ────────────────────────────────
* │ │
* │ │
* │ │
* │ │
* New StreamProcessor.start() Rebalance triggered V Receives JobModel │
* StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING
* Creation │ │ by group leader │ and starts Container │
* │ │ │ │
* Receives another re-balance request when the container
* from the previous re-balance is still in INIT phase
* ────────────────────────────────────────────────
* │ │ │
* │ │ │
* │ │ │
* │ │ │
* New StreamProcessor.start() Rebalance triggered V Receives JobModel │ │
* StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING │
* Creation │ │ by group leader │ and starts │Container │ │
* │ │ │ │ │ │
* │ │ │ ───────────────────────────────
* Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop()
* │ │ │ │
* │ │ │ │
Expand Down Expand Up @@ -133,7 +135,6 @@ public class StreamProcessor {
private final Config config;
private final long taskShutdownMs;
private final String processorId;
private final ExecutorService containerExcecutorService;
private final Object lock = new Object();
private final MetricsRegistryMap metricsRegistry;
private final MetadataStore metadataStore;
Expand Down Expand Up @@ -176,6 +177,9 @@ public State getState() {
@VisibleForTesting
JobCoordinatorListener jobCoordinatorListener = null;

@VisibleForTesting
ExecutorService containerExecutorService;

/**
* Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except
* it creates a {@link JobCoordinator} instead of accepting it as an argument.
Expand Down Expand Up @@ -288,11 +292,15 @@ public StreamProcessor(String processorId, Config config, Map<String, MetricsRep
: createJobCoordinator(config, processorId, metricsRegistry, metadataStore);
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
this.containerExcecutorService = Executors.newSingleThreadExecutor(threadFactory);
this.containerExecutorService = createExecutorService();
this.processorListener = listenerFactory.createInstance(this);
}

private ExecutorService createExecutorService() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
return Executors.newSingleThreadExecutor(threadFactory);
}

/**
* Asynchronously starts this {@link StreamProcessor}.
* <p>
Expand Down Expand Up @@ -348,7 +356,7 @@ public void stop() {
boolean hasContainerShutdown = stopSamzaContainer();
if (!hasContainerShutdown) {
LOGGER.info("Interrupting the container: {} thread to die.", container);
containerExcecutorService.shutdownNow();
containerExecutorService.shutdownNow();
}
} catch (Throwable throwable) {
LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable);
Expand Down Expand Up @@ -444,6 +452,19 @@ private boolean stopSamzaContainer() {
return hasContainerShutdown;
}

private boolean interruptContainerAndShutdownExecutorService() {
try {
containerExecutorService.shutdownNow();
containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.info("Received an interrupt during interrupting container. Proceeding to check if the container callback "
+ "decremented the shutdown latch. ");
}

// we call interrupt successful as long as the shut down latch is decremented by the container call back.
return containerShutdownLatch.getCount() == 0;
}

private JobCoordinatorListener createJobCoordinatorListener() {
return new JobCoordinatorListener() {

Expand All @@ -461,8 +482,23 @@ public void onJobModelExpired() {
} else {
LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
}
} else if (state == State.IN_REBALANCE) {
if (container != null) {
boolean hasContainerShutdown = interruptContainerAndShutdownExecutorService();
if (!hasContainerShutdown) {
LOGGER.warn("Job model expire unsuccessful. Failed to interrupt container: {} safely. "
+ "Stopping the stream processor: {}", container, processorId);
state = State.STOPPING;
jobCoordinator.stop();
} else {
containerExecutorService = createExecutorService();
}
} else {
LOGGER.info("Ignoring Job model expired since a rebalance is already in progress");
}
} else {
LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED));
LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state,
ImmutableList.of(State.RUNNING, State.STARTED, State.IN_REBALANCE));
}
}
}
Expand All @@ -475,7 +511,7 @@ public void onNewJobModel(String processorId, JobModel jobModel) {
container = createSamzaContainer(processorId, jobModel);
container.setContainerListener(new ContainerListener());
LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
containerExcecutorService.submit(container);
containerExecutorService.submit(container);
} else {
LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
}
Expand All @@ -490,7 +526,7 @@ public void onCoordinatorStop() {

// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
containerExcecutorService.shutdownNow();
containerExecutorService.shutdownNow();
}
state = State.STOPPED;
}
Expand All @@ -508,7 +544,7 @@ public void onCoordinatorFailure(Throwable throwable) {

// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
containerExcecutorService.shutdownNow();
containerExecutorService.shutdownNow();
}
state = State.STOPPED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private String getStartingOffset(SystemStreamPartition systemStreamPartition, Sy
* Restore each store in taskStoresToRestore sequentially
*/
@Override
public void restore() {
public void restore() throws InterruptedException {
for (String storeName : taskStoresToRestore) {
LOG.info("Restoring store: {} for task: {}", storeName, taskModel.getTaskName());
SystemConsumer systemConsumer = storeConsumers.get(storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,14 @@ public void run() {

systemAdmins.start();
this.containerStorageManagers.forEach((containerName, containerStorageManager) -> {
containerStorageManager.start();
try {
containerStorageManager.start();
} catch (InterruptedException e) {
// we can ignore the exception since its only used in the context of a command line tool and bubbling the
// exception upstream isn't needed.
LOG.warn("Received an interrupt during store restoration for container {}."
+ " Proceeding with the next container", containerName);
}
});
this.containerStorageManagers.forEach((containerName, containerStorageManager) -> {
containerStorageManager.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,17 @@ public interface TaskRestoreManager {

/**
* Restore state from checkpoints, state snapshots and changelog.
* Currently, store restoration happens on a separate thread pool within {@code ContainerStorageManager}. In case of
* interrupt/shutdown signals from {@code SamzaContainer}, {@code ContainerStorageManager} may interrupt the restore
* thread.
*
* Note: Typically, interrupt signals don't bubble up as {@link InterruptedException} unless the restore thread is
* waiting on IO/network. In case of busy looping, implementors are expected to check the interrupt status of the
* thread periodically and shutdown gracefully before throwing {@link InterruptedException} upstream.
* {@code SamzaContainer} will not wait for clean up and the interrupt signal is the best effort by the container
* to notify that its shutting down.
*/
void restore();
void restore() throws InterruptedException;

/**
* Stop all persistent stores after restoring.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void init(Map<SystemStreamPartition, String> checkpointedChangelogOffsets
}

@Override
public void restore() {
public void restore() throws InterruptedException {
Map<String, RestoreOffsets> storesToRestore = storeActions.storesToRestore;

for (Map.Entry<String, RestoreOffsets> entry : storesToRestore.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import java.time.Duration
import java.util
import java.util.{Base64, Optional}
import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit}
import java.util.stream.Collectors

import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
Expand Down Expand Up @@ -769,6 +768,27 @@ class SamzaContainer(
else
Thread.sleep(Long.MaxValue)
} catch {
case e: InterruptedException =>
/*
* We don't want to categorize interrupts as failure since the only place the container thread gets interrupted within
* our code inside stream processor is during the following two scenarios
* 1. During a re-balance, if the container has not started or hasn't reported start status to StreamProcessor.
* Subsequently stream processor attempts to interrupt the container thread before proceeding to join the barrier
* to agree on the new work assignment.
* 2. During shutdown signals to stream processor (external or internal), the stream processor signals the container to
* shutdown and waits for `task.shutdown.ms` before forcefully shutting down the container executor service which in
* turn interrupts the container thread.
*
* In the both of these scenarios, the failure cause is either captured externally (timing out scenario) or internally
* (failed attempt to shut down the container). The act of interrupting the container thread is an explicit intent to shutdown
* the container since it is not capable of reacting to shutdown signals in all scenarios.
*
*/
if (status.equals(SamzaContainerStatus.STARTED)) {
warn("Received an interrupt in run loop.", e)
} else {
warn("Received an interrupt during initialization.", e)
}
case e: Throwable =>
if (status.equals(SamzaContainerStatus.STARTED)) {
error("Caught exception/error in run loop.", e)
Expand Down
Loading