Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,20 @@ public void onCompletion(final RecordMetadata metadata,
}
});
} catch (final TimeoutException e) {
log.error("Timeout exception caught when sending record to topic {}. " +
"This might happen if the producer cannot send data to the Kafka cluster and thus, " +
"its internal buffer fills up. " +
"You can increase producer parameter `max.block.ms` to increase this timeout.", topic);
throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic));
log.error(
"Timeout exception caught when sending record to topic {}. " +
"This might happen if the producer cannot send data to the Kafka cluster and thus, " +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why one more indent ?

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just how Idea seems to format multiline expressions... e.g.,

1 +
    2

instead of

1 + 
2

Is it undesirable?

Copy link
Copy Markdown
Member

@mjsax mjsax Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important -- just look funky to me. We concatenate multiple strings and thus all should have the same indent -- why would we indent the first differently? I would format as follows (even if I know that you don't like that the plus goes into the next line):

"string1"
+ "string2"
+ "string3"

This avoid the "ambiguity", of multiple string parameters, vs one concatenated parameter:

method(
    "param1",
    "param2",
    "param3",
    "param4");

// vs

method(
    "param-part-1" +
    "param-part-2" +
    "param-part-3",
    "new-param");

// vs

method(
    "param-part-1"
    + "param-part-2"
    + "param-part-3",
    "new-param");

Thirst and second hard hard to distinguish (where do parameters start/stop), but third makes it clear, that it's two parameters but not one or four, what is hard to tell in the middle case. Of course, double indent also fixes this but it's weird to me:

method(
    "param-part-1" +
        "param-part-2" +
        "param-part-3",
    "new-param");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember a while back someone on the internet trying to get everyone to always put operators (and commas) at the beginning of the line for this reason. I think it makes sense. I don't think it caught on in general because it creates syntactic ambiguity in Javascript, but since Java requires semicolons to end a line, it should be fine.

Do you have your IDE set up to create this formatting? Maybe it sounds lazy, but the reason I've formatted it this way is that that's what IDEA does by default. I don't want to spend time curating the number of indent spaces by hand on every code change. I couldn't figure out how to get rid of the extra indent in the multi-line string concatenation. Eg, it even does this:

        final String s =
            "asdf"
                + "qwer"
                + "qwer";

which is like the worst outcome.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the IDE setting -- this case is rare enough that I "fix" fit manually if it happens.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you plan to address or ignore this -- I am fine either way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I agree with you in principle, but I think I'll just leave it as-is, until I can figure out a way to get the IDE to do it for me.

"its internal buffer fills up. " +
"This can also happen if the broker is slow to respond, if the network connection to " +
"the broker was interrupted, or if similar circumstances arise. " +
"You can increase producer parameter `max.block.ms` to increase this timeout.",
topic,
e
);
throw new StreamsException(
String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic),
e
);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the TimeoutException to the error message and as the cause of the StreamsException. This made the lines too long, so I reformatted them.

I also added some more failure modes to the log message; I felt the existing message could be misleading if the problem was actually just a network interruption.

} catch (final Exception uncaughtException) {
if (uncaughtException instanceof KafkaException &&
uncaughtException.getCause() instanceof ProducerFencedException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
Expand Down Expand Up @@ -246,7 +247,7 @@ public StreamTask(final TaskId id,
// initialize transactions if eos is turned on, which will block if the previous transaction has not
// completed yet; do not start the first transaction until the topology has been initialized later
if (eosEnabled) {
this.producer.initTransactions();
initializeTransactions();
}
}

Expand Down Expand Up @@ -298,7 +299,7 @@ public void resume() {
throw new IllegalStateException("Task producer should be null.");
}
producer = producerSupplier.get();
producer.initTransactions();
initializeTransactions();
recordCollector.init(producer);

if (stateMgr.checkpoint != null) {
Expand Down Expand Up @@ -872,4 +873,23 @@ RecordCollector recordCollector() {
Producer<byte[], byte[]> getProducer() {
return producer;
}

private void initializeTransactions() {
try {
producer.initTransactions();
} catch (final TimeoutException retriable) {
log.error(
"Timeout exception caught when initializing transactions for task {}. " +
"This might happen if the broker is slow to respond, if the network connection to " +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: as above

"the broker was interrupted, or if similar circumstances arise. " +
"You can increase producer parameter `max.block.ms` to increase this timeout.",
id,
retriable
);
throw new StreamsException(
format("%sFailed to initialize task %s due to timeout.", logPrefix, id),
retriable
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -47,11 +48,12 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
Expand All @@ -64,16 +66,19 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -186,6 +191,134 @@ public void cleanup() throws IOException {
}
}

@Test
public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() {
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

final ProcessorTopology topology = ProcessorTopology.withSources(
asList(source1, source2, processorStreamTime, processorSystemTime),
mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
);

source1.addChild(processorStreamTime);
source2.addChild(processorStreamTime);
source1.addChild(processorSystemTime);
source2.addChild(processorSystemTime);

try {
new StreamTask(
taskId00,
partitions,
topology,
consumer,
changelogReader,
createConfig(true),
streamsMetrics,
stateDirectory,
null,
time,
() -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) {
@Override
public void initTransactions() {
throw new TimeoutException("test");
}
},
null,
null
);
fail("Expected an exception");
} catch (final StreamsException expected) {
// make sure we log the explanation as an ERROR
assertTimeoutErrorLog(appender);

// make sure we report the correct message
assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout."));

// make sure we preserve the cause
assertEquals(expected.getCause().getClass(), TimeoutException.class);
assertThat(expected.getCause().getMessage(), is("test"));
}
LogCaptureAppender.unregister(appender);
}

@Test
public void shouldHandleInitTransactionsTimeoutExceptionOnResume() {
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

final ProcessorTopology topology = ProcessorTopology.withSources(
asList(source1, source2, processorStreamTime, processorSystemTime),
mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
);

source1.addChild(processorStreamTime);
source2.addChild(processorStreamTime);
source1.addChild(processorSystemTime);
source2.addChild(processorSystemTime);

final AtomicBoolean timeOut = new AtomicBoolean(false);

final StreamTask testTask = new StreamTask(
taskId00,
partitions,
topology,
consumer,
changelogReader,
createConfig(true),
streamsMetrics,
stateDirectory,
null,
time,
() -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) {
@Override
public void initTransactions() {
if (timeOut.get()) {
throw new TimeoutException("test");
} else {
super.initTransactions();
}
}
},
null,
null
);
testTask.initializeTopology();
testTask.suspend();
timeOut.set(true);
try {
testTask.resume();
fail("Expected an exception");
} catch (final StreamsException expected) {
// make sure we log the explanation as an ERROR
assertTimeoutErrorLog(appender);

// make sure we report the correct message
assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout."));

// make sure we preserve the cause
assertEquals(expected.getCause().getClass(), TimeoutException.class);
assertThat(expected.getCause().getMessage(), is("test"));
}
LogCaptureAppender.unregister(appender);
}

private void assertTimeoutErrorLog(final LogCaptureAppender appender) {

final String expectedErrorLogMessage =
"task [0_0] Timeout exception caught when initializing transactions for task 0_0. " +
"This might happen if the broker is slow to respond, if the network " +
"connection to the broker was interrupted, or if similar circumstances arise. " +
"You can increase producer parameter `max.block.ms` to increase this timeout.";

final List<String> expectedError =
appender
.getEvents()
.stream()
.filter(event -> event.getMessage().equals(expectedErrorLogMessage))
.map(LogCaptureAppender.Event::getLevel)
.collect(Collectors.toList());
assertThat(expectedError, is(singletonList("ERROR")));
}

@SuppressWarnings("unchecked")
@Test
public void testProcessOrder() {
Expand Down