Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
files="(KStreamImpl|KTableImpl).java"/>

<suppress checks="CyclomaticComplexity"
files="(StreamsPartitionAssignor|StreamThread|TaskManager).java"/>
files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>

<suppress checks="StaticVariableName"
files="StreamsMetricsImpl.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,16 @@
@SuppressWarnings("deprecation")
public class StreamsConfig extends AbstractConfig {

private final static Logger log = LoggerFactory.getLogger(StreamsConfig.class);
private static final Logger log = LoggerFactory.getLogger(StreamsConfig.class);

private static final ConfigDef CONFIG;

private final boolean eosEnabled;
private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;

public final static int DUMMY_THREAD_INDEX = 1;
public static final int DUMMY_THREAD_INDEX = 1;
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

/**
* Prefix used to provide default topic configs to be applied when creating internal topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ private StreamTask createActiveTask(final TaskId taskId,
time,
stateManager,
recordCollector,
context
context,
logContext
);

log.trace("Created task {} with assigned partitions {}", taskId, inputPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.Iterator;
import java.util.HashSet;
import java.util.function.Function;

/**
Expand All @@ -53,14 +58,18 @@
*/
public class PartitionGroup {

private final Logger logger;
private final Map<TopicPartition, RecordQueue> partitionQueues;
private final Sensor enforcedProcessingSensor;
private final long maxTaskIdleMs;
private final Sensor recordLatenessSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;

private long streamTime;
private int totalBuffered;
private boolean allBuffered;

private final Map<TopicPartition, Long> fetchedLags = new HashMap<>();
private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<>();

static class RecordInfo {
RecordQueue queue;
Expand All @@ -78,15 +87,144 @@ RecordQueue queue() {
}
}

PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
PartitionGroup(final LogContext logContext,
final Map<TopicPartition, RecordQueue> partitionQueues,
final Sensor recordLatenessSensor,
final Sensor enforcedProcessingSensor,
final long maxTaskIdleMs) {
this.logger = logContext.logger(PartitionGroup.class);
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
this.partitionQueues = partitionQueues;
this.enforcedProcessingSensor = enforcedProcessingSensor;
this.maxTaskIdleMs = maxTaskIdleMs;
this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
}

public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
final Long lag = metadata.lag();
if (lag != null) {
logger.trace("added fetched lag {}: {}", partition, lag);
fetchedLags.put(partition, lag);
}
}

public boolean readyToProcess(final long wallClockTime) {
if (logger.isTraceEnabled()) {
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
logger.trace(
"buffered/lag {}: {}/{}",
entry.getKey(),
entry.getValue().size(),
fetchedLags.get(entry.getKey())
);
}
}
// Log-level strategy:
// TRACE for messages that don't wait for fetches
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 on demoting these log entries! :)

// TRACE when we waited for a fetch and decided to wait some more, as configured
// TRACE when we are ready for processing and didn't have to enforce processing
// INFO when we enforce processing, since this has to wait for fetches AND may result in disorder

if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
final Set<TopicPartition> bufferedPartitions = new HashSet<>();
final Set<TopicPartition> emptyPartitions = new HashSet<>();
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
if (entry.getValue().isEmpty()) {
emptyPartitions.add(entry.getKey());
} else {
bufferedPartitions.add(entry.getKey());
}
}
logger.trace("Ready for processing because max.task.idle.ms is disabled." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tBuffered partitions: {}" +
"\n\tNon-buffered partitions: {}",
bufferedPartitions,
emptyPartitions);
}
return true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log INFO if we are indeed enforcing processing? I.e. there are some empty partitions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on a second thought.. if users configures -1 it means they probably do not care about enforced processing, while on the other side the INFO entry may flood the logs here. So NVM.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad we agree ;)

}

final Set<TopicPartition> queued = new HashSet<>();
Map<TopicPartition, Long> enforced = null;

for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
final TopicPartition partition = entry.getKey();
final RecordQueue queue = entry.getValue();

final Long nullableFetchedLag = fetchedLags.get(partition);

if (!queue.isEmpty()) {
// this partition is ready for processing
idlePartitionDeadlines.remove(partition);
queued.add(partition);
} else if (nullableFetchedLag == null) {
// must wait to fetch metadata for the partition
idlePartitionDeadlines.remove(partition);
logger.trace("Waiting to fetch data for {}", partition);
return false;
} else if (nullableFetchedLag > 0L) {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
logger.trace(
"Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
partition,
nullableFetchedLag
);
return false;
} else {
// p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up.
// One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs
// instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero
// lag instead of when we happen to run this method, but realistically it's probably a small difference
// and using wall clock time seems more intuitive for users,
// since the log message will be as of wallClockTime.
idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs);
final long deadline = idlePartitionDeadlines.get(partition);
if (wallClockTime < deadline) {
logger.trace(
"Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
partition,
wallClockTime,
maxTaskIdleMs,
deadline
);
return false;
} else {
// this partition is ready for processing due to the task idling deadline passing
if (enforced == null) {
enforced = new HashMap<>();
}
enforced.put(partition, deadline);
}
}
}
if (enforced == null) {
logger.trace("All partitions were buffered locally, so this task is ready for processing.");
return true;
} else if (queued.isEmpty()) {
logger.trace("No partitions were buffered locally, so this task is not ready for processing.");
return false;
} else {
enforcedProcessingSensor.record(1.0d, wallClockTime);
logger.info("Continuing to process although some partition timestamps were not buffered locally." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tPartitions with local data: {}." +
"\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
"\n\tConfigured max.task.idle.ms: {}." +
"\n\tCurrent wall-clock time: {}.",
queued,
enforced,
maxTaskIdleMs,
wallClockTime);
return true;
}
}

// visible for testing
long partitionTimestamp(final TopicPartition partition) {
final RecordQueue queue = partitionQueues.get(partition);
Expand Down Expand Up @@ -239,7 +377,7 @@ int numBuffered() {
return totalBuffered;
}

boolean allPartitionsBuffered() {
boolean allPartitionsBufferedLocally() {
return allBuffered;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -286,6 +287,11 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
}

@Override
public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
throw new IllegalStateException("Attempted to update metadata for standby task " + id());
}

InternalProcessorContext processorContext() {
return processorContext;
}
Expand Down
Loading