Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,41 +115,46 @@ private TopicName(String completeTopicName) {
try {
// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
if (!completeTopicName.contains("://")) {
int index = completeTopicName.indexOf("://");
if (index < 0) {
// The short topic name can be:
// - <topic>
// - <tenant>/<namespace>/<topic>
String[] parts = StringUtils.split(completeTopicName, '/');
if (parts.length == 3) {
completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName;
} else if (parts.length == 1) {
completeTopicName = TopicDomain.persistent.name() + "://"
+ PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
List<String> parts = splitBySlash(completeTopicName, 0);
this.domain = TopicDomain.persistent;
if (parts.size() == 3) {
this.tenant = parts.get(0);
this.namespacePortion = parts.get(1);
this.localName = parts.get(2);
} else if (parts.size() == 1) {
this.tenant = PUBLIC_TENANT;
this.namespacePortion = DEFAULT_NAMESPACE;
this.localName = parts.get(0);
} else {
throw new IllegalArgumentException(
"Invalid short topic name '" + completeTopicName + "', it should be in the format of "
+ "<tenant>/<namespace>/<topic> or <topic>");
}
}

// Expected format: persistent://tenant/namespace/topic
List<String> parts = Splitter.on("://").limit(2).splitToList(completeTopicName);
this.domain = TopicDomain.getEnum(parts.get(0));

String rest = parts.get(1);

// Scalable topic domains (topic://, segment://) only support the new format
// and local names may contain '/', so use limit(3) to keep the rest as localName.
boolean isScalableDomain = this.domain == TopicDomain.topic
|| this.domain == TopicDomain.segment;
int splitLimit = isScalableDomain ? 3 : 4;
parts = Splitter.on("/").limit(splitLimit).splitToList(rest);
if (parts.size() == 4) {
throw new IllegalArgumentException(
"V1 topic names (with cluster component) are no longer supported. "
+ "Please use the V2 format: '<domain>://tenant/namespace/topic'. Got: "
+ completeTopicName);
} else if (parts.size() == 3) {
this.segmentRange = null;
this.segmentId = -1;
this.completeTopicName = domain.name() + "://" + tenant + "/" + namespacePortion + "/" + localName;
} else {
this.domain = TopicDomain.getEnum(completeTopicName.substring(0, index));
// Scalable topic domains (topic://, segment://) only support the new format
// and local names may contain '/', so use limit(3) to keep the rest as localName.
boolean isScalableDomain = this.domain == TopicDomain.topic
|| this.domain == TopicDomain.segment;
int splitLimit = isScalableDomain ? 3 : 4;
List<String> parts = splitBySlash(completeTopicName.substring(index + "://".length()),
splitLimit);
if (parts.size() == 4) {
throw new IllegalArgumentException(
"V1 topic names (with cluster component) are no longer supported. "
+ "Please use the V2 format: '<domain>://tenant/namespace/topic'. Got: "
+ completeTopicName);
} else if (parts.size() != 3) {
throw new IllegalArgumentException("Invalid topic name " + completeTopicName);
}
this.tenant = parts.get(0);
this.namespacePortion = parts.get(1);
String rawLocalName = parts.get(2);
Expand All @@ -160,16 +165,16 @@ private TopicName(String completeTopicName) {
if (lastSlash <= 0) {
throw new IllegalArgumentException(
"Invalid segment topic name: local name must contain"
+ " '<parent-topic>/<hashStart>-<hashEnd>-<segmentId>'. Got: "
+ completeTopicName);
+ " '<parent-topic>/<hashStart>-<hashEnd>-<segmentId>'. Got: "
+ completeTopicName);
}
this.localName = rawLocalName.substring(0, lastSlash);
String descriptor = rawLocalName.substring(lastSlash + 1);
String[] descParts = descriptor.split("-");
if (descParts.length != 3) {
throw new IllegalArgumentException(
"Invalid segment descriptor: expected '<hexStart>-<hexEnd>-<segmentId>',"
+ " got: '" + descriptor + "'");
+ " got: '" + descriptor + "'");
}
this.segmentRange = HashRange.of(
Integer.parseInt(descParts[0], 16),
Expand All @@ -181,28 +186,24 @@ private TopicName(String completeTopicName) {
this.segmentId = -1;
}

this.partitionIndex = getPartitionIndex(completeTopicName);
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
} else {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName);
if (this.domain == TopicDomain.segment) {
this.completeTopicName = String.format("%s://%s/%s/%s/%s",
domain, tenant, namespacePortion, localName,
String.format("%04x-%04x-%d", segmentRange.start(), segmentRange.end(), segmentId));
} else {
this.completeTopicName = completeTopicName;
}
}

if (StringUtils.isBlank(localName)) {
throw new IllegalArgumentException(String.format("Invalid topic name: %s. Topic local name must not"
+ " be blank.", completeTopicName));
}

this.partitionIndex = getPartitionIndex(localName);
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
} catch (NullPointerException e) {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e);
}
if (this.domain == TopicDomain.segment) {
this.completeTopicName = String.format("%s://%s/%s/%s/%s",
domain, tenant, namespacePortion, localName,
String.format("%04x-%04x-%d", segmentRange.start(), segmentRange.end(), segmentId));
} else {
this.completeTopicName = String.format("%s://%s/%s/%s",
domain, tenant, namespacePortion, localName);
}
}

public boolean isPersistent() {
Expand Down
Loading