Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
Expand Down Expand Up @@ -749,7 +750,7 @@ public void testPartitionsForWithNullTopic() {
public void testInitTransactionTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 500);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");

Time time = new MockTime(1);
Expand All @@ -761,7 +762,18 @@ public void testInitTransactionTimeout() {

try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));

assertThrows(TimeoutException.class, producer::initTransactions);

client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));

// retry initialization should work
producer.initTransactions();
Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang Feb 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to verify that initTransactions can indeed be retried. cc @hachikuji

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ <K, V> void send(final String topic,

void commit(final Map<TopicPartition, OffsetAndMetadata> offsets);

/**
* Initialize the internal {@link Producer}; note this function should be made idempotent
*
* @throws org.apache.kafka.common.errors.TimeoutException if producer initializing txn id timed out
*/
void initialize();

/**
* Flush the internal {@link Producer}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class RecordCollectorImpl implements RecordCollector {

// used when eosEnabled is true only
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private Producer<byte[], byte[]> producer;
private volatile KafkaException sendException;

Expand All @@ -95,24 +97,30 @@ public RecordCollectorImpl(final TaskId taskId,
this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), streamsMetrics);

producer = producerSupplier.get(taskId);
}

@Override
public void initialize() {
maybeInitTxns();
}

private void maybeInitTxns() {
if (eosEnabled) {
if (eosEnabled && !transactionInitialized) {
// initialize transactions if eos is turned on, which will block if the previous transaction has not
// completed yet; do not start the first transaction until the topology has been initialized later
try {
producer.initTransactions();

transactionInitialized = true;
} catch (final TimeoutException exception) {
final String errorMessage = "Timeout exception caught when initializing transactions for task " + taskId + ". " +
log.warn("Timeout exception caught when initializing transactions for task {}. " +
"\nThe broker is either slow or in bad state (like not having enough replicas) in responding to the request, " +
"or the connection to broker was interrupted sending the request or receiving the response. " +
"\n Consider overwriting `max.block.ms` to a larger value to avoid timeout errors";
"Would retry initializing the task in the next loop." +
"\nConsider overwriting producer config {} to a larger value to avoid timeout errors",
ProducerConfig.MAX_BLOCK_MS_CONFIG, taskId);

// TODO K9113: we do NOT try to handle timeout exception here but throw it as a fatal error
throw new StreamsException(errorMessage, exception);
throw exception;
} catch (final KafkaException exception) {
throw new StreamsException("Error encountered while initializing transactions for task " + taskId, exception);
}
Expand Down Expand Up @@ -163,7 +171,7 @@ public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
} catch (final ProducerFencedException error) {
throw new TaskMigratedException(taskId, "Producer get fenced trying to commit a transaction", error);
} catch (final TimeoutException error) {
// TODO K9113: currently handle timeout exception as a fatal error, should discuss whether we want to handle it
// TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
throw new StreamsException("Timed out while committing transaction via producer for task " + taskId, error);
} catch (final KafkaException error) {
throw new StreamsException("Error encountered sending offsets and committing transaction " +
Expand All @@ -176,7 +184,7 @@ public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
throw new TaskMigratedException(taskId, "Consumer committing offsets failed, " +
"indicating the corresponding thread is no longer part of the group.", error);
} catch (final TimeoutException error) {
// TODO K9113: currently handle timeout exception as a fatal error
// TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
throw new StreamsException("Timed out while committing offsets via consumer for task " + taskId, error);
} catch (final KafkaException error) {
throw new StreamsException("Error encountered committing offsets via consumer for task " + taskId, error);
Expand Down Expand Up @@ -244,9 +252,16 @@ public <K, V> void send(final String topic,
final StreamPartitioner<? super K, ? super V> partitioner) {
final Integer partition;

// TODO K9113: we need to decide how to handle exceptions from partitionsFor
if (partitioner != null) {
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
final List<PartitionInfo> partitions;
try {
partitions = producer.partitionsFor(topic);
} catch (final KafkaException e) {
// here we cannot drop the message on the floor even if it is a transient timeout exception,
// so we treat everything the same as a fatal exception
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well -- we could also "buffer" the record and try to send it later? In the mean time we would need to pause the corresponding task though to not process more input records (or course, we would need to let the task finish processing the current input record what might lead to more output records that we would need to buffer, too). -- This is just a wild thought and we could also handle this case later if required.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about the buffering mechanism here, but decided it may not worth since we've not seen timeout from partitionsFor -- it should be quite rare because in most cases the producer already got the partition metadata cached locally. If we found this call timing out become an issue we can revisit the buffering, wdyt?

throw new StreamsException("Could not determine the number of partitions for topic '" + topic +
"' for task " + taskId + " due to " + e.toString());
}
if (partitions.size() > 0) {
partition = partitioner.partition(topic, key, value, partitions.size());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -186,11 +187,14 @@ public boolean isActive() {

/**
* @throws LockException could happen when multi-threads within the single instance, could retry
* @throws TimeoutException if initializing record collector timed out
* @throws StreamsException fatal error, should close the thread
*/
@Override
public void initializeIfNeeded() {
if (state() == State.CREATED) {
recordCollector.initialize();

StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext);

transitionTo(State.RESTORING);
Expand All @@ -199,6 +203,9 @@ public void initializeIfNeeded() {
}
}

/**
* @throws TimeoutException if fetching committed offsets timed out
*/
@Override
public void completeRestoration() {
if (state() == State.RESTORING) {
Expand Down Expand Up @@ -612,6 +619,12 @@ private void initializeMetadata() {
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
initializeTaskTime(offsetsAndMetadata);
} catch (final TimeoutException e) {
log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
"\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

throw e;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we throw TimeoutException directly but not wrap it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because on the caller TaskManager we would swallow TimeoutException anyways.

} catch (final KafkaException e) {
throw new StreamsException(format("task [%s] Failed to initialize offsets for %s", id, partitions), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
Expand Down Expand Up @@ -214,7 +215,7 @@ boolean checkForCompletedRestoration() {
if (task.state() == CREATED) {
try {
task.initializeIfNeeded();
} catch (final LockException e) {
} catch (final LockException | TimeoutException e) {
// it is possible that if there are multiple threads within the instance that one thread
// trying to grab the task from the other, while the other has not released the lock since
// it did not participate in the rebalance. In this case we can just retry in the next iteration
Expand All @@ -232,7 +233,13 @@ boolean checkForCompletedRestoration() {
final Set<TopicPartition> restored = changelogReader.completedChangelogs();
for (final Task task : restoringTasks) {
if (restored.containsAll(task.changelogPartitions())) {
task.completeRestoration();
try {
task.completeRestoration();
} catch (final TimeoutException e) {
log.debug("Cloud complete restoration for {} due to {}; will retry", task.id(), e.toString());

allRunning = false;
}
} else {
// we found a restoring task that isn't done restoring, which is evidence that
// not all tasks are running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,27 +454,27 @@ public synchronized Future<RecordMetadata> send(final ProducerRecord record, fin
}

@Test
public void shouldThrowStreamsExceptionOnEOSInitializeTimeout() {
public void shouldRethrowOnEOSInitializeTimeout() {
final KafkaException exception = new TimeoutException("KABOOM!");
final Properties props = StreamsTestUtils.getStreamsConfig("test");
props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

final StreamsException thrown = assertThrows(StreamsException.class, () ->
new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
final RecordCollector recordCollector = new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
)
}
);
assertEquals(exception, thrown.getCause());

final TimeoutException thrown = assertThrows(TimeoutException.class, recordCollector::initialize);
assertEquals(exception, thrown);
}

@Test
Expand All @@ -483,21 +483,21 @@ public void shouldThrowStreamsExceptionOnEOSInitializeError() {
final Properties props = StreamsTestUtils.getStreamsConfig("test");
props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

final StreamsException thrown = assertThrows(StreamsException.class, () ->
new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
final RecordCollector recordCollector = new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
)
}
);

final StreamsException thrown = assertThrows(StreamsException.class, recordCollector::initialize);
assertEquals(exception, thrown.getCause());
}

Expand Down Expand Up @@ -625,6 +625,7 @@ public void sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata
}
}
);
collector.initialize();

assertThrows(TaskMigratedException.class, () -> collector.commit(null));
}
Expand All @@ -646,6 +647,7 @@ public void commitTransaction() {
}
}
);
collector.initialize();

assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.emptyMap()));
}
Expand Down Expand Up @@ -688,6 +690,7 @@ public void sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata
}
}
);
collector.initialize();

assertThrows(StreamsException.class, () -> collector.commit(null));
}
Expand All @@ -709,6 +712,7 @@ public void commitTransaction() {
}
}
);
collector.initialize();

assertThrows(StreamsException.class, () -> collector.commit(Collections.emptyMap()));
}
Expand Down Expand Up @@ -780,7 +784,7 @@ public void abortTransaction() {
}
}
);

collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
collector.commit(Collections.emptyMap());

Expand All @@ -807,7 +811,7 @@ public void abortTransaction() {
}
}
);

collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

final StreamsException thrown = assertThrows(StreamsException.class, collector::close);
Expand All @@ -831,6 +835,7 @@ public void abortTransaction() {
}
}
);
collector.initialize();

// this call is to begin an inflight txn
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,6 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE
thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);

for (final Task task : thread.activeTasks()) {
assertTrue(((MockProducer) ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()).transactionInitialized());
}

thread.shutdown();
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public <K, V> void send(final String topic,
headers));
}

@Override
public void initialize() {}

@Override
public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
committed.add(offsets);
Expand Down