Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public Object deserialize(byte[] bytes) {
}
}

public InternalTopicManager() {
this.zkClient = null;
this.replicationFactor = 0;
}

public InternalTopicManager(String zkConnect, int replicationFactor) {
this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
this.replicationFactor = replicationFactor;
Expand Down Expand Up @@ -125,7 +130,7 @@ private List<Integer> getBrokers() {
}

@SuppressWarnings("unchecked")
public Map<Integer, List<Integer>> getTopicMetadata(String topic) {
private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);

if (data == null) return null;
Expand All @@ -147,7 +152,7 @@ public Map<Integer, List<Integer>> getTopicMetadata(String topic) {
}
}

public void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
private void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);

List<Integer> brokers = getBrokers();
Expand Down Expand Up @@ -183,13 +188,13 @@ public void createTopic(String topic, int numPartitions, int replicationFactor)
}
}

public void deleteTopic(String topic) throws ZkNodeExistsException {
private void deleteTopic(String topic) throws ZkNodeExistsException {
log.debug("Deleting topic {} from ZK in partition assignor.", topic);

zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
}

public void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
private void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);

List<Integer> brokers = getBrokers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
log.debug("Starting to validate internal source topics in partition assignor.");

for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
String topic = streamThread.jobId + "-" + entry.getKey();
String topic = entry.getKey();

// should have size 1 only
int numPartitions = -1;
Expand Down Expand Up @@ -455,4 +455,8 @@ public Set<TaskId> tasksForPartition(TopicPartition partition) {
public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return standbyTasks;
}

public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,70 @@ public void testOnAssignment() throws Exception {
assertEquals(standbyTasks, partitionAssignor.standbyTasks());
}

@Test
public void testAssignWithInternalTopics() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());

MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);

TopologyBuilder builder = new TopologyBuilder();
builder.addInternalTopic("topicX");
builder.addSource("source1", "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
builder.addSink("sink1", "topicX", "processor1");
builder.addSource("source2", "topicX");
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
List<String> topics = Utils.mkList("topic1", "topicX");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);

UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";

StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());

StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(mockRestoreConsumer);
partitionAssignor.setInternalTopicManager(internalTopicManager);

Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));

Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);

// check prepared internal topics
// TODO: we need to change it to 1 after fixing the prefix
assertEquals(2, internalTopicManager.readyTopics.size());
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX"));
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
}

private class MockInternalTopicManager extends InternalTopicManager {

public Map<String, Integer> readyTopics = new HashMap<>();
public MockConsumer<byte[], byte[]> restoreConsumer;

public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
super();

this.restoreConsumer = restoreConsumer;
}

@Override
public void makeReady(String topic, int numPartitions) {
readyTopics.put(topic, numPartitions);

List<PartitionInfo> partitions = new ArrayList<>();
for (int i = 0; i < numPartitions; i++) {
partitions.add(new PartitionInfo(topic, i, null, null, null));
}

restoreConsumer.updatePartitions(topic, partitions);
}
}
}