Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 36 additions & 35 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public class KafkaStreams implements AutoCloseable {
private final Logger log;
private final String clientId;
private final Metrics metrics;
private final StreamsConfig config;
protected final StreamsConfig applicationConfigs;
protected final List<StreamThread> threads;
protected final StateDirectory stateDirectory;
private final StreamsMetadataState streamsMetadataState;
Expand Down Expand Up @@ -795,12 +795,12 @@ public KafkaStreams(final Topology topology,
* you still must {@link #close()} it to avoid resource leaks.
*
* @param topology the topology specifying the computational logic
* @param config configs for Kafka Streams
* @param applicationConfigs configs for Kafka Streams
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final StreamsConfig config) {
this(topology, config, new DefaultKafkaClientSupplier());
final StreamsConfig applicationConfigs) {
this(topology, applicationConfigs, new DefaultKafkaClientSupplier());
}

/**
Expand All @@ -810,15 +810,15 @@ public KafkaStreams(final Topology topology,
* you still must {@link #close()} it to avoid resource leaks.
*
* @param topology the topology specifying the computational logic
* @param config configs for Kafka Streams
* @param applicationConfigs configs for Kafka Streams
* @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
* for the new {@code KafkaStreams} instance
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final StreamsConfig config,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier) {
this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier);
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier);
}

/**
Expand All @@ -828,34 +828,34 @@ public KafkaStreams(final Topology topology,
* you still must {@link #close()} it to avoid resource leaks.
*
* @param topology the topology specifying the computational logic
* @param config configs for Kafka Streams
* @param applicationConfigs configs for Kafka Streams
* @param time {@code Time} implementation; cannot be null
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final StreamsConfig config,
final StreamsConfig applicationConfigs,
final Time time) {
this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, new DefaultKafkaClientSupplier(), time);
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, new DefaultKafkaClientSupplier(), time);
}

private KafkaStreams(final Topology topology,
final StreamsConfig config,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier, time);
this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, time);
}

protected KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier) throws StreamsException {
this(topologyMetadata, config, clientSupplier, Time.SYSTEM);
this(topologyMetadata, applicationConfigs, clientSupplier, Time.SYSTEM);
}

private KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
this.config = config;
this.applicationConfigs = applicationConfigs;
this.time = time;

this.topologyMetadata = topologyMetadata;
Expand All @@ -864,15 +864,15 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
final boolean hasGlobalTopology = topologyMetadata.hasGlobalTopology();

try {
stateDirectory = new StateDirectory(config, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies());
stateDirectory = new StateDirectory(applicationConfigs, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies());
processId = stateDirectory.initializeProcessId();
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}

// The application ID is a required config and hence should always have value
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final String userClientId = applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
clientId = applicationId + "-" + processId;
} else {
Expand All @@ -883,46 +883,46 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,

// use client id instead of thread client id since this admin client may be shared among threads
this.clientSupplier = clientSupplier;
adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));

log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());

metrics = getMetrics(config, time, clientId);
metrics = getMetrics(applicationConfigs, time, clientId);
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
applicationConfigs.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
time
);

ClientMetrics.addVersionMetric(streamsMetrics);
ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
threads = Collections.synchronizedList(new LinkedList<>());
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads());

streamsMetadataState = new StreamsMetadataState(
this.topologyMetadata,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
parseHostInfo(applicationConfigs.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));

oldHandler = false;
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
delegatingStateRestoreListener = new DelegatingStateRestoreListener();

totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(config);
totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);

GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(
topologyMetadata.globalTaskTopology(),
config,
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
applicationConfigs,
clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
streamsMetrics,
Expand All @@ -949,13 +949,13 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
}

stateDirCleaner = setupStateDirCleaner();
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
}

private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) {
final StreamThread streamThread = StreamThread.create(
topologyMetadata,
config,
applicationConfigs,
clientSupplier,
adminClient,
processId,
Expand All @@ -964,6 +964,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
time,
streamsMetadataState,
cacheSizePerThread,

stateDirectory,
delegatingStateRestoreListener,
threadIdx,
Expand Down Expand Up @@ -1117,24 +1118,24 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult =
adminClient.removeMembersFromConsumerGroup(
config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
try {
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS);
} catch (final java.util.concurrent.TimeoutException e) {
log.error("Could not remove static member {} from consumer group {} due to a timeout: {}",
groupInstanceID.get(), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
groupInstanceID.get(), applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
throw new TimeoutException(e.getMessage(), e);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
log.error("Could not remove static member {} from consumer group {} due to: {}",
groupInstanceID.get(), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
groupInstanceID.get(), applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
throw new StreamsException(
"Could not remove static member " + groupInstanceID.get()
+ " from consumer group " + config.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+ " from consumer group " + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+ " for the following reason: ",
e.getCause()
);
Expand Down Expand Up @@ -1286,7 +1287,7 @@ public synchronized void start() throws IllegalStateException, StreamsException

processStreamThread(StreamThread::start);

final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
final Long cleanupDelay = applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
stateDirCleaner.scheduleAtFixedRate(() -> {
// we do not use lock here since we only read on the value and act on it
if (state == State.RUNNING) {
Expand Down
17 changes: 14 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
Expand Down Expand Up @@ -75,12 +76,18 @@ public class StreamsBuilder {
protected final InternalStreamsBuilder internalStreamsBuilder;

public StreamsBuilder() {
topology = getNewTopology();
topology = new Topology();
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
}

protected Topology getNewTopology() {
protected StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = getNewTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
}

protected Topology getNewTopology(final TopologyConfig topologyConfigs) {
return new Topology();
}

Expand Down Expand Up @@ -610,7 +617,11 @@ public synchronized Topology build() {
* @return the {@link Topology} that represents the specified processing logic
*/
public synchronized Topology build(final Properties props) {
internalStreamsBuilder.buildAndOptimizeTopology(props);
final boolean optimizeTopology =
props != null &&
StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));

internalStreamsBuilder.buildAndOptimizeTopology(optimizeTopology);
return topology;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code buffered.records.per.partition} */
@SuppressWarnings("WeakerAccess")
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition.";
public static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition.";

/** {@code built.in.metrics.version} */
public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
Expand All @@ -358,7 +358,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code cache.max.bytes.buffering} */
@SuppressWarnings("WeakerAccess")
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";

/** {@code client.id} */
@SuppressWarnings("WeakerAccess")
Expand All @@ -380,7 +380,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code default.deserialization.exception.handler} */
@SuppressWarnings("WeakerAccess")
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";

/** {@code default.production.exception.handler} */
@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -423,11 +423,11 @@ public class StreamsConfig extends AbstractConfig {
/** {@code default.timestamp.extractor} */
@SuppressWarnings("WeakerAccess")
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";

/** {@code max.task.idle.ms} */
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
private static final String MAX_TASK_IDLE_MS_DOC = "This config controls whether joins and merges"
public static final String MAX_TASK_IDLE_MS_DOC = "This config controls whether joins and merges"
+ " may produce out-of-order results."
+ " The config value is the maximum amount of time in milliseconds a stream task will stay idle"
+ " when it is fully caught up on some (but not all) input partitions"
Expand Down
10 changes: 9 additions & 1 deletion streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,15 @@
*/
public class Topology {

protected final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
protected final InternalTopologyBuilder internalTopologyBuilder;

public Topology() {
this(new InternalTopologyBuilder());
}

protected Topology(final InternalTopologyBuilder internalTopologyBuilder) {
this.internalTopologyBuilder = internalTopologyBuilder;
}

/**
* Sets the {@code auto.offset.reset} configuration when
Expand Down
Loading