Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
private long idleStartTime;
private Producer<byte[], byte[]> producer;
private boolean commitRequested = false;
private boolean transactionInFlight = false;

private final String threadId;

Expand Down Expand Up @@ -294,7 +293,6 @@ public void initializeTopology() {
} catch (final ProducerFencedException | UnknownProducerIdException e) {
throw new TaskMigratedException(this, e);
}
transactionInFlight = true;
}

processorContext.initialize();
Expand Down Expand Up @@ -522,10 +520,8 @@ void commit(final boolean startNewTransaction, final Map<TopicPartition, Long> p
if (eosEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
producer.commitTransaction();
transactionInFlight = false;
if (startNewTransaction) {
producer.beginTransaction();
transactionInFlight = true;
}
} else {
consumer.commitSync(consumedOffsetsAndMetadata);
Expand Down Expand Up @@ -602,7 +598,7 @@ private void initTopology() {
*/
public void suspend() {
log.debug("Suspending");
suspend(true, false);
suspend(true);
}

/**
Expand All @@ -618,8 +614,7 @@ public void suspend() {
* or if the task producer got fenced (EOS)
*/
// visible for testing
void suspend(final boolean clean,
final boolean isZombie) {
void suspend(final boolean clean) {
// this is necessary because all partition times are reset to -1 during close
// we need to preserve the original partitions times before calling commit
final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
Expand All @@ -640,14 +635,7 @@ void suspend(final boolean clean,

if (eosEnabled) {
stateMgr.checkpoint(activeTaskCheckpointableOffsets());

try {
recordCollector.close();
} catch (final RecoverableClientException e) {
taskMigratedException = new TaskMigratedException(this, e);
} finally {
producer = null;
}
taskMigratedException = closeRecordCollector();
}
}
if (taskMigratedException != null) {
Expand All @@ -662,37 +650,26 @@ void suspend(final boolean clean,
}

if (eosEnabled) {
maybeAbortTransactionAndCloseRecordCollector(isZombie);
// Ignore any exceptions whilee closing the record collector, i.e task producer.
closeRecordCollector();
}
}
}

private void maybeAbortTransactionAndCloseRecordCollector(final boolean isZombie) {
if (!isZombie) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are some callers of suspend that would set clean to false while isZombie to false as well, e.g. suspendRunningTasks in this case should we still call abortTxn?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That call is inside the catch block of task.suspend. For simplicity, I think calling abortTxn is not very tempting at this point as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After another thought, I'm pretty sure the purpose of this PR is not to maintain the caller of abortTxn in either mode to simplify our error handling. The other txn mechanism like InitPid will make sure to cleanup the pending transactions or through the txn timeout. However the risk of calling abortTxn during close is much higher than a normal processing loop, at least for today's producer. If the producer is in FATAL_ERROR, it becomes a bomb for any caller touching on its APIs except close().

We could think of doing some safe abort operations for producer internally instead of externally, which could be done by either getting a new API or just change the default behavior of producer.close to do the transaction abortion when time allowed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool I think I am convinced, let's just ignore the isZombie flag then.

try {
if (transactionInFlight) {
producer.abortTransaction();
}
transactionInFlight = false;
} catch (final ProducerFencedException ignore) {
/* TODO
* this should actually never happen atm as we guard the call to #abortTransaction
* -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException
* instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got
* fixed and fall-back to this catch-and-swallow code
*/

// can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens
}
}
private TaskMigratedException closeRecordCollector() {
TaskMigratedException taskMigratedException = null;

try {
recordCollector.close();
} catch (final RecoverableClientException e) {
taskMigratedException = new TaskMigratedException(this, e);
} catch (final Throwable e) {
log.error("Failed to close producer due to the following error:", e);
} finally {
producer = null;
}

return taskMigratedException;
}

private void closeTopology() {
Expand Down Expand Up @@ -742,7 +719,7 @@ void closeSuspended(final boolean clean, RuntimeException firstException) {

/**
* <pre>
* - {@link #suspend(boolean, boolean) suspend(clean)}
* - {@link #suspend(boolean) suspend(clean)}
* - close topology
* - if (clean) {@link #commit()}
* - flush state and producer
Expand All @@ -765,7 +742,7 @@ public void close(boolean clean,

RuntimeException firstException = null;
try {
suspend(clean, isZombie);
suspend(clean);
} catch (final RuntimeException e) {
clean = false;
firstException = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,26 +1317,25 @@ public void shouldNotCloseProducerIfFencedOnCloseDuringCleanCloseWithEosEnabled(
}

@Test
public void shouldAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
public void shouldCloseProducerOnUncleanCloseWithEosEnabled() {
task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST);
task.initializeTopology();

task.close(false, false);
task = null;

assertTrue(producer.transactionAborted());
assertFalse(producer.transactionInFlight());
// Make sure no method call on the producer during an unclean close (such as abort).
assertTrue(producer.transactionInFlight());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one in particular might be worth a comment that we're specifically checking because we don't want to call methods on the producer during an unclean close.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

assertTrue(producer.closed());
}

@Test
public void shouldAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
public void shouldCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
task = createTaskThatThrowsException(true);
task.initializeTopology();

task.close(false, false);

assertTrue(producer.transactionAborted());
assertTrue(producer.closed());
}

Expand Down Expand Up @@ -1553,15 +1552,14 @@ public void shouldOnlyCloseFencedProducerOnUncleanClosedWithEosEnabled() {
}

@Test
public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
public void shouldNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST);
task.initializeTopology();
producer.fenceProducerOnClose();

task.close(false, false);
task = null;

assertTrue(producer.transactionAborted());
assertFalse(producer.closed());
}

Expand Down