Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private static abstract class NodeFactory {
this.name = name;
}

public abstract ProcessorNode build();
public abstract ProcessorNode build(String jobId);
}

private static class ProcessorNodeFactory extends NodeFactory {
Expand All @@ -105,7 +105,7 @@ public void addStateStore(String stateStoreName) {

@SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
public ProcessorNode build(String jobId) {
return new ProcessorNode(name, supplier.get(), stateStoreNames);
}
}
Expand All @@ -124,12 +124,12 @@ private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserial

@SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
public ProcessorNode build(String jobId) {
return new SourceNode(name, keyDeserializer, valDeserializer);
}
}

private static class SinkNodeFactory extends NodeFactory {
private class SinkNodeFactory extends NodeFactory {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we can no longer make this class static?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To have access to internalTopicNames

public final String[] parents;
public final String topic;
private Serializer keySerializer;
Expand All @@ -147,8 +147,13 @@ private SinkNodeFactory(String name, String[] parents, String topic, Serializer

@SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
public ProcessorNode build(String jobId) {
if (internalTopicNames.contains(topic)) {
// prefix the job id to the internal topic name
return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer, partitioner);
} else {
return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
}
}
}

Expand Down Expand Up @@ -491,7 +496,7 @@ private void connectProcessorAndStateStore(String processorName, String stateSto
*
* @return groups of topic names
*/
public Map<Integer, TopicsInfo> topicGroups() {
public Map<Integer, TopicsInfo> topicGroups(String jobId) {
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();

if (nodeGroups == null)
Expand All @@ -506,27 +511,35 @@ public Map<Integer, TopicsInfo> topicGroups() {
// if the node is a source node, add to the source topics
String[] topics = nodeToSourceTopics.get(node);
if (topics != null) {
sourceTopics.addAll(Arrays.asList(topics));

// if some of the topics are internal, add them to the internal topics
for (String topic : topics) {
if (this.internalTopicNames.contains(topic))
internalSourceTopics.add(topic);
if (this.internalTopicNames.contains(topic)) {
// prefix the job id to the internal topic name
String internalTopic = jobId + "-" + topic;
internalSourceTopics.add(internalTopic);
sourceTopics.add(internalTopic);
} else {
sourceTopics.add(topic);
}
}
}

// if the node is a sink node, add to the sink topics
String topic = nodeToSinkTopic.get(node);
if (topic != null)
sinkTopics.add(topic);
if (topic != null) {
if (internalTopicNames.contains(topic)) {
// prefix the job id to the change log topic name
sinkTopics.add(jobId + "-" + topic);
} else {
sinkTopics.add(topic);
}
}

// if the node is connected to a state, add to the state topics
for (StateStoreFactory stateFactory : stateFactories.values()) {

// we store the changelog topic here without the job id prefix
// since it is within a single job and is only used for
if (stateFactory.isInternal && stateFactory.users.contains(node)) {
stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
// prefix the job id to the change log topic name
stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
}
}
}
Expand Down Expand Up @@ -586,7 +599,7 @@ private Map<Integer, Set<String>> makeNodeGroups() {

return nodeGroups;
}

/**
* Asserts that the streams of the specified source nodes must be copartitioned.
*
Expand Down Expand Up @@ -624,19 +637,19 @@ public Collection<Set<String>> copartitionGroups() {
*
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
public ProcessorTopology build(Integer topicGroupId) {
public ProcessorTopology build(String jobId, Integer topicGroupId) {
Set<String> nodeGroup;
if (topicGroupId != null) {
nodeGroup = nodeGroups().get(topicGroupId);
} else {
// when nodeGroup is null, we build the full topology. this is used in some tests.
nodeGroup = null;
}
return build(nodeGroup);
return build(jobId, nodeGroup);
}

@SuppressWarnings("unchecked")
private ProcessorTopology build(Set<String> nodeGroup) {
private ProcessorTopology build(String jobId, Set<String> nodeGroup) {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
Expand All @@ -645,7 +658,7 @@ private ProcessorTopology build(Set<String> nodeGroup) {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories.values()) {
if (nodeGroup == null || nodeGroup.contains(factory.name)) {
ProcessorNode node = factory.build();
ProcessorNode node = factory.build(jobId);
processorNodes.add(node);
processorMap.put(node.name(), node);

Expand All @@ -660,7 +673,12 @@ private ProcessorTopology build(Set<String> nodeGroup) {
}
} else if (factory instanceof SourceNodeFactory) {
for (String topic : ((SourceNodeFactory) factory).topics) {
topicSourceMap.put(topic, (SourceNode) node);
if (internalTopicNames.contains(topic)) {
// prefix the job id to the internal topic name
topicSourceMap.put(jobId + "-" + topic, (SourceNode) node);
} else {
topicSourceMap.put(topic, (SourceNode) node);
}
}
} else if (factory instanceof SinkNodeFactory) {
for (String parent : ((SinkNodeFactory) factory).parents) {
Expand All @@ -679,7 +697,15 @@ private ProcessorTopology build(Set<String> nodeGroup) {
* Get the names of topics that are to be consumed by the source nodes created by this builder.
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
*/
public Set<String> sourceTopics() {
return Collections.unmodifiableSet(sourceTopicNames);
public Set<String> sourceTopics(String jobId) {
Set<String> topics = new HashSet<>();
for (String topic : sourceTopicNames) {
if (internalTopicNames.contains(topic)) {
topics.add(jobId + "-" + topic);
} else {
topics.add(topic);
}
}
return Collections.unmodifiableSet(topics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void configure(Map<String, ?> configs) {
streamThread = (StreamThread) o;
streamThread.partitionAssignor(this);

this.topicGroups = streamThread.builder.topicGroups();
this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId);

if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
internalTopicManager = new InternalTopicManager(
Expand Down Expand Up @@ -350,7 +350,7 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
topicToTaskIds.putAll(internalSourceTopicToTaskIds);

for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
String topic = streamThread.jobId + "-" + entry.getKey();
String topic = entry.getKey();

// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = 0;
Expand Down Expand Up @@ -445,7 +445,7 @@ private void ensureCopartitioning(Set<String> copartitionGroup, Set<String> inte

/* For Test Only */
public Set<TaskId> tasksForState(String stateName) {
return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId, stateName));
}

public Set<TaskId> tasksForPartition(TopicPartition partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public StreamThread(TopologyBuilder builder,
this.jobId = jobId;
this.config = config;
this.builder = builder;
this.sourceTopics = builder.sourceTopics();
this.sourceTopics = builder.sourceTopics(jobId);
this.clientId = clientId;
this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
Expand Down Expand Up @@ -580,7 +580,7 @@ public Set<TaskId> cachedTasks() {
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();

ProcessorTopology topology = builder.build(id.topicGroupId);
ProcessorTopology topology = builder.build(jobId, id.topicGroupId);

return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
}
Expand Down Expand Up @@ -650,7 +650,7 @@ private void closeOne(AbstractTask task) {
protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();

ProcessorTopology topology = builder.build(id.topicGroupId);
ProcessorTopology topology = builder.build(jobId, id.topicGroupId);

if (!topology.stateStoreSuppliers().isEmpty()) {
return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,6 @@ public Integer apply(Integer value1, Integer value2) {
1 + // to
2 + // through
1, // process
builder.build(null).processors().size());
builder.build("X", null).processors().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,14 @@ public void testSourceTopics() {
builder.addSource("source-1", "topic-1");
builder.addSource("source-2", "topic-2");
builder.addSource("source-3", "topic-3");
builder.addInternalTopic("topic-3");

assertEquals(3, builder.sourceTopics().size());
Set<String> expected = new HashSet<String>();
expected.add("topic-1");
expected.add("topic-2");
expected.add("X-topic-3");

assertEquals(expected, builder.sourceTopics("X"));
}

@Test(expected = TopologyBuilderException.class)
Expand Down Expand Up @@ -184,13 +190,13 @@ public void testAddStateStore() {

StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
builder.addStateStore(supplier);
suppliers = builder.build(null).stateStoreSuppliers();
suppliers = builder.build("X", null).stateStoreSuppliers();
assertEquals(0, suppliers.size());

builder.addSource("source-1", "topic-1");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.connectProcessorAndStateStores("processor-1", "store-1");
suppliers = builder.build(null).stateStoreSuppliers();
suppliers = builder.build("X", null).stateStoreSuppliers();
assertEquals(1, suppliers.size());
assertEquals(supplier.name(), suppliers.get(0).name());
}
Expand All @@ -212,7 +218,7 @@ public void testTopicGroups() {

builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");

Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");

Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
Expand Down Expand Up @@ -250,12 +256,12 @@ public void testTopicGroupsByStateStore() {
builder.addStateStore(supplier);
builder.connectProcessorAndStateStores("processor-5", "store-3");

Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");

Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));
expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-2"))));
expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-3"))));

assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
Expand All @@ -275,9 +281,9 @@ public void testBuild() {
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");

ProcessorTopology topology0 = builder.build(0);
ProcessorTopology topology1 = builder.build(1);
ProcessorTopology topology2 = builder.build(2);
ProcessorTopology topology0 = builder.build("X", 0);
ProcessorTopology topology1 = builder.build("X", 1);
ProcessorTopology topology2 = builder.build("X", 2);

assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testTopologyMetadata() {
builder.addSink("sink-1", "topic-3", "processor-1");
builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");

final ProcessorTopology topology = builder.build(null);
final ProcessorTopology topology = builder.build("X", null);

assertEquals(6, topology.processors().size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public void testAssignWithInternalTopics() throws Exception {
builder.addSink("sink1", "topicX", "processor1");
builder.addSource("source2", "topicX");
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
List<String> topics = Utils.mkList("topic1", "topicX");
List<String> topics = Utils.mkList("topic1", "test-topicX");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);

UUID uuid1 = UUID.randomUUID();
Expand All @@ -543,9 +543,7 @@ public void testAssignWithInternalTopics() throws Exception {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);

// check prepared internal topics
// TODO: we need to change it to 1 after fixing the prefix
assertEquals(2, internalTopicManager.readyTopics.size());
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX"));
assertEquals(1, internalTopicManager.readyTopics.size());
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testPartitionAssignmentChange() throws Exception {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
ProcessorTopology topology = builder.build("X", id.topicGroupId);
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
Expand Down Expand Up @@ -298,7 +298,7 @@ public void maybeClean() {

@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
ProcessorTopology topology = builder.build("X", id.topicGroupId);
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
Expand Down Expand Up @@ -420,7 +420,7 @@ public void maybeCommit() {

@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
ProcessorTopology topology = builder.build("X", id.topicGroupId);
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public KStreamTestDriver(KStreamBuilder builder,
File stateDir,
Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
this.topology = builder.build(null);
this.topology = builder.build("X", null);
this.stateDir = stateDir;
this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());

Expand Down Expand Up @@ -127,7 +127,7 @@ private class MockRecordCollector extends RecordCollector {
public MockRecordCollector() {
super(null);
}

@Override
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
StreamPartitioner<K, V> partitioner) {
Expand Down
Loading