Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,15 @@ public class KafkaStreams implements AutoCloseable {
private final Time time;
private final Logger log;
private final String clientId;
protected final String applicationId;
private final Metrics metrics;
private final StreamsConfig config;
protected final List<StreamThread> threads;
protected final StateDirectory stateDirectory;
private final StreamsMetadataState streamsMetadataState;
private final ScheduledExecutorService stateDirCleaner;
private final ScheduledExecutorService rocksDBMetricsRecordingService;
private final Admin adminClient;
protected final Admin adminClient;
private final StreamsMetricsImpl streamsMetrics;
private final long totalCacheSize;
private final StreamStateListener streamStateListener;
Expand Down Expand Up @@ -854,7 +855,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,

// The application ID is a required config and hence should always have value
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
clientId = applicationId + "-" + processId;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

Expand All @@ -51,7 +52,7 @@

class ActiveTaskCreator {
private final TopologyMetadata topologyMetadata;
private final StreamsConfig config;
private final StreamsConfig applicationConfig;
private final StreamsMetricsImpl streamsMetrics;
private final StateDirectory stateDirectory;
private final ChangelogReader storeChangelogReader;
Expand All @@ -71,7 +72,7 @@ class ActiveTaskCreator {
private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated = new HashMap<>();

ActiveTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
Expand All @@ -82,7 +83,7 @@ class ActiveTaskCreator {
final UUID processId,
final Logger log) {
this.topologyMetadata = topologyMetadata;
this.config = config;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
Expand All @@ -93,7 +94,7 @@ class ActiveTaskCreator {
this.log = log;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
processingMode = StreamThread.processingMode(config);
processingMode = StreamThread.processingMode(applicationConfig);

if (processingMode == EXACTLY_ONCE_ALPHA) {
threadProducer = null;
Expand All @@ -105,7 +106,7 @@ class ActiveTaskCreator {
final LogContext logContext = new LogContext(threadIdPrefix);

threadProducer = new StreamsProducer(
config,
applicationConfig,
threadId,
clientSupplier,
null,
Expand Down Expand Up @@ -169,7 +170,7 @@ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Task.TaskType.ACTIVE,
StreamThread.eosEnabled(config),
StreamThread.eosEnabled(applicationConfig),
logContext,
stateDirectory,
storeChangelogReader,
Expand All @@ -179,7 +180,7 @@ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,

final InternalProcessorContext context = new ProcessorContextImpl(
taskId,
config,
applicationConfig,
stateManager,
streamsMetrics,
cache
Expand Down Expand Up @@ -238,7 +239,7 @@ private StreamTask createActiveTask(final TaskId taskId,
if (processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
log.info("Creating producer client for task {}", taskId);
streamsProducer = new StreamsProducer(
config,
applicationConfig,
threadId,
clientSupplier,
taskId,
Expand All @@ -253,7 +254,7 @@ private StreamTask createActiveTask(final TaskId taskId,
logContext,
taskId,
streamsProducer,
config.defaultProductionExceptionHandler(),
applicationConfig.defaultProductionExceptionHandler(),
streamsMetrics
);

Expand All @@ -262,7 +263,8 @@ private StreamTask createActiveTask(final TaskId taskId,
inputPartitions,
topology,
consumer,
config,
new TaskConfig(applicationConfig, topologyMetadata.getConfigForTask(taskId)),
StreamThread.eosEnabled(applicationConfig),
streamsMetrics,
stateDirectory,
cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,14 @@ public synchronized List<String> fullSourceTopicNames() {
return new ArrayList<>(maybeDecorateInternalSourceTopics(sourceTopicNames));
}

/**
* @return a list of all topic partitions that have ever been consumed from, and possibly have committed offsets for
*/
public synchronized Set<TopicPartition> allSourceTopicPartitions() {
// TODO KAFKA-12648: Pt.4
return new HashSet<>();
}

/**
* @return a copy of the string representation of any pattern subscribed source nodes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;

import java.util.Collections;
Expand Down Expand Up @@ -55,7 +56,8 @@ public class StandbyTask extends AbstractTask implements Task {
StandbyTask(final TaskId id,
final Set<TopicPartition> inputPartitions,
final ProcessorTopology topology,
final StreamsConfig config,
final TaskConfig config,
final boolean eosEnabled,
final StreamsMetricsImpl streamsMetrics,
final ProcessorStateManager stateMgr,
final StateDirectory stateDirectory,
Expand All @@ -67,7 +69,7 @@ public class StandbyTask extends AbstractTask implements Task {
stateDirectory,
stateMgr,
inputPartitions,
config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG),
config.maxTaskIdleMs,
"standby-task",
StandbyTask.class
);
Expand All @@ -76,7 +78,7 @@ public class StandbyTask extends AbstractTask implements Task {
processorContext.transitionToStandby(cache);

closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
eosEnabled = StreamThread.eosEnabled(config);
this.eosEnabled = eosEnabled;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

Expand All @@ -38,7 +39,7 @@

class StandbyTaskCreator {
private final TopologyMetadata topologyMetadata;
private final StreamsConfig config;
private final StreamsConfig applicationConfig;
private final StreamsMetricsImpl streamsMetrics;
private final StateDirectory stateDirectory;
private final ChangelogReader storeChangelogReader;
Expand All @@ -50,14 +51,14 @@ class StandbyTaskCreator {
private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated = new HashMap<>();

StandbyTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
final String threadId,
final Logger log) {
this.topologyMetadata = topologyMetadata;
this.config = config;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
Expand Down Expand Up @@ -101,7 +102,7 @@ Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCre
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Task.TaskType.STANDBY,
StreamThread.eosEnabled(config),
StreamThread.eosEnabled(applicationConfig),
getLogContext(taskId),
stateDirectory,
storeChangelogReader,
Expand All @@ -111,7 +112,7 @@ Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCre

final InternalProcessorContext context = new ProcessorContextImpl(
taskId,
config,
applicationConfig,
stateManager,
streamsMetrics,
dummyCache
Expand Down Expand Up @@ -160,7 +161,8 @@ StandbyTask createStandbyTask(final TaskId taskId,
taskId,
inputPartitions,
topology,
config,
new TaskConfig(applicationConfig, topologyMetadata.getConfigForTask(taskId)),
StreamThread.eosEnabled(applicationConfig),
streamsMetrics,
stateManager,
stateDirectory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;

import java.io.IOException;
Expand Down Expand Up @@ -117,7 +118,8 @@ public StreamTask(final TaskId id,
final Set<TopicPartition> inputPartitions,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> mainConsumer,
final StreamsConfig config,
final TaskConfig config,
final boolean eosEnabled,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
final ThreadCache cache,
Expand All @@ -132,7 +134,7 @@ public StreamTask(final TaskId id,
stateDirectory,
stateMgr,
inputPartitions,
config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG),
config.taskTimeoutMs,
"task",
StreamTask.class
);
Expand All @@ -143,7 +145,7 @@ public StreamTask(final TaskId id,

this.time = time;
this.recordCollector = recordCollector;
eosEnabled = StreamThread.eosEnabled(config);
this.eosEnabled = eosEnabled;

final String threadId = Thread.currentThread().getName();
this.streamsMetrics = streamsMetrics;
Expand Down Expand Up @@ -171,19 +173,19 @@ public StreamTask(final TaskId id,

streamTimePunctuationQueue = new PunctuationQueue();
systemTimePunctuationQueue = new PunctuationQueue();
maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
maxBufferedSize = config.maxBufferedSize;

// initialize the consumed and committed offset cache
consumedOffsets = new HashMap<>();
resetOffsetsForPartitions = new HashSet<>();

recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
recordQueueCreator = new RecordQueueCreator(this.logContext, config.timestampExtractor, config.deserializationExceptionHandler);

recordInfo = new PartitionGroup.RecordInfo();

final Sensor enforcedProcessingSensor;
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
final long maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
final long maxTaskIdleMs = config.maxTaskIdleMs;
partitionGroup = new PartitionGroup(
logContext,
createPartitionQueues(),
Expand Down
Loading