Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,16 +37,18 @@ public class InternalTopicManager {
public static final String RETENTION_MS = "retention.ms";
public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
private static final int MAX_TOPIC_READY_TRY = 5;

private final Time time;
private final long windowChangeLogAdditionalRetention;

private final int replicationFactor;
private final StreamsKafkaClient streamsKafkaClient;

public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) {
public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor,
final long windowChangeLogAdditionalRetention, final Time time) {
this.streamsKafkaClient = streamsKafkaClient;
this.replicationFactor = replicationFactor;
this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
this.time = time;
}

/**
Expand All @@ -61,11 +64,19 @@ public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
if (metadata.brokers().size() < replicationFactor) {
throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " +
" but replication factor is " + replicationFactor + "." +
" Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\"" +
" or add more brokers to your cluster.");
}
streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata);
return;
} catch (StreamsException ex) {
log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
}
// backoff
time.sleep(100L);
}
throw new StreamsException("Could not create internal topics.");
}
Expand All @@ -74,11 +85,20 @@ public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
* Get the number of partitions for the given topics
*/
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
existingTopicPartitions.keySet().retainAll(topics);
for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
try {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
existingTopicPartitions.keySet().retainAll(topics);

return existingTopicPartitions;
return existingTopicPartitions;
} catch (StreamsException ex) {
log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i);
}
// backoff
time.sleep(100L);
}
throw new StreamsException("Could not get number of partitions.");
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
Expand Down Expand Up @@ -58,6 +59,7 @@
public class StreamPartitionAssignor implements PartitionAssignor, Configurable {

private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
private Time time = Time.SYSTEM;

private final static int UNKNOWN = -1;
public final static int NOT_AVAILABLE = -2;
Expand Down Expand Up @@ -162,6 +164,14 @@ public int compare(TopicPartition p1, TopicPartition p2) {
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsValidator copartitionedTopicsValidator;

/**
* Package-private method to set the time. Used for tests.
* @param time Time to be used.
*/
void time(final Time time) {
this.time = time;
}

/**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible
* since the former needs later's cached metadata while sending subscriptions,
Expand Down Expand Up @@ -211,7 +221,7 @@ public void configure(Map<String, ?> configs) {
configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ?
(Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
: WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
: WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);

this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,17 +436,21 @@ private RuntimeException unAssignChangeLogPartitions() {

@SuppressWarnings("ThrowableNotThrown")
private void shutdownTasksAndState() {
log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix,
activeTasks.keySet(), standbyTasks.keySet());
log.debug("{} shutdownTasksAndState: shutting down" +
"active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
suspendedTasks.keySet(), suspendedStandbyTasks.keySet());

final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// Close all processors in topology order
firstException.compareAndSet(null, closeAllTasks());
firstException.compareAndSet(null, closeTasks(activeAndStandbytasks()));
firstException.compareAndSet(null, closeTasks(suspendedAndSuspendedStandbytasks()));
// flush state
firstException.compareAndSet(null, flushAllState());
// Close all task state managers. Don't need to set exception as all
// state would have been flushed above
closeAllStateManagers(firstException.get() == null);
closeStateManagers(activeAndStandbytasks(), firstException.get() == null);
closeStateManagers(suspendedAndSuspendedStandbytasks(), firstException.get() == null);
// only commit under clean exit
if (cleanRun && firstException.get() == null) {
firstException.set(commitOffsets());
Expand All @@ -465,7 +469,7 @@ private void suspendTasksAndState() {
activeTasks.keySet(), standbyTasks.keySet());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// Close all topology nodes
firstException.compareAndSet(null, closeAllTasksTopologies());
firstException.compareAndSet(null, closeActiveAndStandbyTasksTopologies());
// flush state
firstException.compareAndSet(null, flushAllState());
// only commit after all state has been flushed and there hasn't been an exception
Expand All @@ -488,21 +492,20 @@ interface AbstractTaskAction {
void apply(final AbstractTask task);
}

private RuntimeException performOnAllTasks(final AbstractTaskAction action,
final String exceptionMessage) {
private RuntimeException performOnTasks(final List<AbstractTask> tasks,
final AbstractTaskAction action,
final String exceptionMessage) {
RuntimeException firstException = null;
final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values());
allTasks.addAll(standbyTasks.values());
for (final AbstractTask task : allTasks) {
for (final AbstractTask task : tasks) {
try {
action.apply(task);
} catch (RuntimeException t) {
log.error("{} Failed while executing {} {} due to {}: ",
StreamThread.this.logPrefix,
task.getClass().getSimpleName(),
task.id(),
exceptionMessage,
t);
StreamThread.this.logPrefix,
task.getClass().getSimpleName(),
task.id(),
exceptionMessage,
t);
if (firstException == null) {
firstException = t;
}
Expand All @@ -511,8 +514,20 @@ private RuntimeException performOnAllTasks(final AbstractTaskAction action,
return firstException;
}

private Throwable closeAllStateManagers(final boolean writeCheckpoint) {
return performOnAllTasks(new AbstractTaskAction() {
private List<AbstractTask> activeAndStandbytasks() {
final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values());
tasks.addAll(standbyTasks.values());
return tasks;
}

private List<AbstractTask> suspendedAndSuspendedStandbytasks() {
final List<AbstractTask> tasks = new ArrayList<AbstractTask>(suspendedTasks.values());
tasks.addAll(suspendedStandbyTasks.values());
return tasks;
}

private Throwable closeStateManagers(final List<AbstractTask> tasks, final boolean writeCheckpoint) {
return performOnTasks(tasks, new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id());
Expand All @@ -523,7 +538,7 @@ public void apply(final AbstractTask task) {

private RuntimeException commitOffsets() {
// Exceptions should not prevent this call from going through all shutdown steps
return performOnAllTasks(new AbstractTaskAction() {
return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, task.id());
Expand All @@ -533,7 +548,7 @@ public void apply(final AbstractTask task) {
}

private RuntimeException flushAllState() {
return performOnAllTasks(new AbstractTaskAction() {
return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, task.id());
Expand Down Expand Up @@ -1033,22 +1048,22 @@ private void removeStandbyTasks() {
standbyRecords.clear();
}

private RuntimeException closeAllTasks() {
return performOnAllTasks(new AbstractTaskAction() {
private RuntimeException closeTasks(final List<AbstractTask> tasks) {
return performOnTasks(tasks, new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id());
log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id());
task.close();
streamsMetrics.tasksClosedSensor.record();
}
}, "close");
}

private RuntimeException closeAllTasksTopologies() {
return performOnAllTasks(new AbstractTaskAction() {
private RuntimeException closeActiveAndStandbyTasksTopologies() {
return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix, task.id());
log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id());
task.closeTopology();
streamsMetrics.tasksClosedSensor.record();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ private String ensureOneNodeIsReady(final List<Node> nodes) {
break;
}
}
kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
try {
kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
} catch (final Exception e) {
throw new StreamsException("Could not poll.", e);
}
}
if (brokerId == null) {
throw new StreamsException("Could not find any available broker.");
Expand Down Expand Up @@ -230,11 +234,20 @@ private String getAnyReadyBrokerId() {
}

private ClientResponse sendRequest(final ClientRequest clientRequest) {
kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
try {
kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
} catch (final Exception e) {
throw new StreamsException("Could not send request.", e);
}
final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
// Poll for the response.
while (Time.SYSTEM.milliseconds() < responseTimeout) {
List<ClientResponse> responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
final List<ClientResponse> responseList;
try {
responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
} catch (final IllegalStateException e) {
throw new StreamsException("Could not poll.", e);
}
if (!responseList.isEmpty()) {
if (responseList.size() > 1) {
throw new StreamsException("Sent one request but received multiple or no responses.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.test.MockTimestampExtractor;
Expand All @@ -41,7 +43,7 @@ public class InternalTopicManagerTest {
private final String topic = "test_topic";
private final String userEndPoint = "localhost:2171";
private MockStreamKafkaClient streamsKafkaClient;

private final Time time = new MockTime();
@Before
public void init() {
final StreamsConfig config = new StreamsConfig(configProps());
Expand All @@ -55,19 +57,22 @@ public void shutdown() throws IOException {

@Test
public void shouldReturnCorrectPartitionCounts() throws Exception {
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
}

@Test
public void shouldCreateRequiredTopics() throws Exception {
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
}

@Test
public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception {
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
boolean exceptionWasThrown = false;
try {
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2));
Expand Down Expand Up @@ -105,9 +110,9 @@ public MetadataResponse fetchMetadata() {
Node node = new Node(1, "host1", 1001);
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>());
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID,
Collections.singletonList(topicMetadata), 0);
MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
Collections.singletonList(topicMetadata));
return response;
}
}
}
}
Loading