Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ void reinitializeStateStoresForPartitions(final Collection<TopicPartition> parti
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
* @throws ProcessorStateException if there is an error while closing the state manager
*/
// visible for testing
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
ProcessorStateException exception = null;
log.trace("Closing state manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
private final Map<TaskId, StreamTask> restoring = new HashMap<>();
Expand All @@ -46,6 +45,52 @@ public StreamTask restoringTaskFor(final TopicPartition partition) {
return restoringByPartition.get(partition);
}

@Override
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following three functions do not have logical changes, just re-grouping all overridden functions on top of AssignedTasks here.

List<StreamTask> allTasks() {
final List<StreamTask> tasks = super.allTasks();
tasks.addAll(restoring.values());
return tasks;
}

@Override
Set<TaskId> allAssignedTaskIds() {
final Set<TaskId> taskIds = super.allAssignedTaskIds();
taskIds.addAll(restoring.keySet());
return taskIds;
}

@Override
boolean allTasksRunning() {
return super.allTasksRunning() && restoring.isEmpty();
}

RuntimeException closeAllRestoringTasks() {
RuntimeException exception = null;

log.trace("Closing all restoring stream tasks {}", restoring.keySet());
final Iterator<StreamTask> restoringTaskIterator = restoring.values().iterator();
while (restoringTaskIterator.hasNext()) {
final StreamTask task = restoringTaskIterator.next();
log.debug("Closing restoring task {}", task.id());
try {
task.closeStateManager(true);
} catch (final RuntimeException e) {
log.error("Failed to remove restoring task {} due to the following error:", task.id(), e);
if (exception == null) {
exception = e;
}
} finally {
restoringTaskIterator.remove();
}
}

restoring.clear();
restoredPartitions.clear();
restoringByPartition.clear();

return exception;
}

void updateRestored(final Collection<TopicPartition> restored) {
if (restored.isEmpty()) {
return;
Expand Down Expand Up @@ -86,20 +131,6 @@ void addToRestoring(final StreamTask task) {
}
}

@Override
boolean allTasksRunning() {
return super.allTasksRunning() && restoring.isEmpty();
}

RuntimeException suspend() {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(super.suspend());
log.trace("Close restoring stream task {}", restoring.keySet());
firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
restoring.clear();
restoringByPartition.clear();
return firstException.get();
}

/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
Expand Down Expand Up @@ -218,34 +249,20 @@ int punctuate() {
return punctuated;
}

public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
describe(builder, restoring.values(), indent, "Restoring:");
return builder.toString();
}

@Override
List<StreamTask> allTasks() {
final List<StreamTask> tasks = super.allTasks();
tasks.addAll(restoring.values());
return tasks;
}

@Override
Set<TaskId> allAssignedTaskIds() {
final Set<TaskId> taskIds = super.allAssignedTaskIds();
taskIds.addAll(restoring.keySet());
return taskIds;
}

void clear() {
super.clear();
restoring.clear();
restoringByPartition.clear();
restoredPartitions.clear();
}

public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
describe(builder, restoring.values(), indent, "Restoring:");
return builder.toString();
}

// for testing only

Collection<StreamTask> restoringTasks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ void initializeNewTasks() {
}

boolean allTasksRunning() {
return created.isEmpty()
&& suspended.isEmpty();
return created.isEmpty() && suspended.isEmpty();
}

Collection<T> running() {
Expand All @@ -106,7 +105,7 @@ RuntimeException suspend() {
return firstException.get();
}

RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
RuntimeException exception = null;
for (final T task : tasks) {
try {
Expand Down Expand Up @@ -167,7 +166,7 @@ boolean hasRunningTasks() {
boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
final T task = suspended.get(taskId);
log.trace("found suspended {} {}", taskTypeName, taskId);
log.trace("Found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
task.resume();
Expand All @@ -185,10 +184,10 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>
}
throw e;
}
log.trace("resuming suspended {} {}", taskTypeName, task.id());
log.trace("Resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
log.warn("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
}
}
return false;
Expand All @@ -198,7 +197,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void transitionToRunning(final T task) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
log.debug("Transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
task.initializeTopology();
for (final TopicPartition topicPartition : task.partitions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void reset() {
needsRestoring.clear();
endOffsets.clear();
needsInitializing.clear();
completedRestorers.clear();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is @linyli001 's fix.

Copy link
Copy Markdown

@suiyuan2009 suiyuan2009 Feb 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some hack code to listeners and transformer init func before next version release..

}

private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ void createTasks(final Collection<TopicPartition> assignment) {
throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
}

changelogReader.reset();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not reset changelogReader since we still need its restored offset when closing the restoring tasks.

// do this first as we may have suspended standby tasks that
// will become active or vice versa
standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
active.closeNonAssignedSuspendedTasks(assignedActiveTasks);

addStreamTasks(assignment);
addStandbyTasks();
// Pause all the partitions until the underlying state store is ready for all the active tasks.
Expand Down Expand Up @@ -240,7 +240,14 @@ void suspendTasksAndState() {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);

firstException.compareAndSet(null, active.suspend());
// close all restoring tasks as well and then reset changelog reader;
// for those restoring and still assigned tasks, they will be re-created
// in addStreamTasks.
firstException.compareAndSet(null, active.closeAllRestoringTasks());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we close those tasks instead of just suspending them? Could we close them only if they are not re-assigned?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The life time of a task is:

created -> [initializeStateStores] -> restoring (writes to the initialized state stores) -> [initializeTopology] -> running -> [closeTopology] -> suspended -> [closeStateManager] -> dead

I.e. the restoring tasks do not have topology initialized at all, whereas suspend call is just trying to closeTopology.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but why can't we keep restoring task open than, and hope they get reassigned so we can continue restoring them?

Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang Feb 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can, the issue is that today we clear all the store-restorers hence there's an issue.

We can, of course do some optimizations like do not close restoring tasks, and also do not clear their corresponding restorers as well, but this is out of the scope of this PR and I want to address it separately.

cc @vvcephei

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Can you create a Jira to track this cleanup?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changelogReader.reset();

firstException.compareAndSet(null, standby.suspend());

// remove the changelog partitions from restore consumer
restoreConsumer.unsubscribe();

Expand Down Expand Up @@ -368,7 +375,7 @@ public void setPartitionsByHostState(final Map<HostInfo, Set<TopicPartition>> pa
}

public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
this.assignedActiveTasks = activeTasks;
this.assignedStandbyTasks = standbyTasks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ public void shouldCloseRestoringTasks() {
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
t1.close(false, false);
t1.closeStateManager(true);
EasyMock.expectLastCall();
EasyMock.replay(t1);

assertThat(suspendTask(), nullValue());
addAndInitTask();
assertThat(assignedTasks.closeAllRestoringTasks(), nullValue());

EasyMock.verify(t1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,18 +260,6 @@ public void shouldCloseStandbyUnAssignedSuspendedTasksWhenCreatingNewTasks() {
verify(active);
}

@Test
public void shouldResetChangeLogReaderOnCreateTasks() {
mockSingleActiveTask();
changeLogReader.reset();
EasyMock.expectLastCall();
replay();

taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.createTasks(taskId0Partitions);
verify(changeLogReader);
}

@Test
public void shouldAddNonResumedActiveTasks() {
mockSingleActiveTask();
Expand Down