Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -132,7 +132,6 @@
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -1649,11 +1648,16 @@ public void shutdownNow() {
}


private static boolean isTransactionSystemTopic(TopicName topicName) {
public static boolean isTransactionSystemTopic(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
|| topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -161,7 +162,9 @@ protected List<String> internalGetList() {
}

try {
return topicResources().listPersistentTopicsAsync(namespaceName).join();
return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics ->
topics.stream().filter(topic ->
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList())).join();
} catch (Exception e) {
log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
Expand Down Expand Up @@ -244,6 +247,13 @@ protected void validateAdminAndClientPermission() {
}
}

protected void validateCreateTopic(TopicName topicName) {
if (isTransactionInternalName(topicName)) {
log.warn("Try to create a topic in the system topic format! {}", topicName);
throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!");
}
}

public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
Expand Down Expand Up @@ -3683,7 +3693,10 @@ private Topic getTopicReference(TopicName topicName) {
} catch (RestException e) {
throw e;
} catch (Exception e) {
throw new RestException(e);
if (e.getCause() instanceof NotAllowedException) {
throw new RestException(Status.CONFLICT, e.getCause());
}
throw new RestException(e.getCause());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public void createPartitionedTopic(
validateGlobalNamespaceOwnership();
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
Expand Down Expand Up @@ -267,6 +268,7 @@ public void createNonPartitionedTopic(
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validateTopicName(tenant, namespace, encodedTopic);
validateCreateTopic(topicName);
internalCreateNonPartitionedTopic(authoritative);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -1284,6 +1285,14 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
return;
}

if (isTransactionSystemTopic(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new NotAllowedException(msg));
return;
}

CompletableFuture<Void> maxTopicsCheck = createIfMissing
? checkMaxTopicsPerNamespace(topicName, 1)
: CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@
package org.apache.pulsar.broker.transaction;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -101,6 +108,63 @@ protected void setup() throws Exception {
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
}

@Test
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();

try {
// init pending ack
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);

// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
assertEquals(list.size(), 4);
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));

try {
// can't create transaction system topic
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
fail();
} catch (PulsarClientException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Can not create transaction system topic"));
}

// can't create transaction system topic
try {
admin.topics().getSubscriptions(topicName);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
}

// can't create transaction system topic
try {
admin.topics().createPartitionedTopic(topicName, 3);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}

// can't create transaction system topic
try {
admin.topics().createNonPartitionedTopic(topicName);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}
}

@Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static boolean checkTopicIsEventsNames(TopicName topicName) {
}

public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) {
return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString());
return topicName != null && topicName.toString()
.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public TopicName load(String name) throws Exception {
public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");

public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");

public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);
Expand Down