Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.policies.data.OffloadPolicies;

/**
* Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage.
Expand All @@ -41,17 +42,19 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
boolean isDriverSupported(String driverName);

/**
* Create a ledger offloader with the provided configuration, user-metadata and scheduler.
* Create a ledger offloader with the provided configuration, offload policies, user-metadata and scheduler.
*
* @param properties service configuration
* @param offloadPolicies the namespace offload policies
* @param userMetadata user metadata
* @param scheduler scheduler
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(Properties properties,
OffloadPolicies offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler)
throws IOException;
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,31 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Driver to use to offload old data to long term storage"
)
private String managedLedgerOffloadDriver = null;
private String defaultOffloadDriver = null;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Endpoint to use to offload old data to long term storage"
)
private String defaultOffloadEndpoint = null;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Bucket to use to offload old data to long term storage"
)
private String defaultOffloadBucket = null;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Max block size in bytes to use to offload old data to long term storage"
)
private Long defaultOffloadMaxBlockSizeInBytes = null;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Read buffer size in bytes to use to offload old data to long term storage"
)
private Long defaultOffloadReadBufferSizeInBytes = null;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
Expand All @@ -95,9 +97,13 @@
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -130,6 +136,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.Response;

/**
* Main class for Pulsar broker service
*/
Expand All @@ -152,6 +160,8 @@ public class PulsarService implements AutoCloseable {
private GlobalZooKeeperCache globalZkCache;
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private Compactor compactor;
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> namespacesOffloaders;

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
new DefaultThreadFactory("pulsar"));
Expand All @@ -162,7 +172,6 @@ public class PulsarService implements AutoCloseable {
private ScheduledExecutorService compactorExecutor;
private OrderedScheduler offloaderScheduler;
private Offloaders offloaderManager = new Offloaders();
private LedgerOffloader offloader;
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
private ScheduledFuture<?> loadResourceQuotaTask = null;
Expand Down Expand Up @@ -398,7 +407,11 @@ public void start() throws PulsarServerException {
// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));

this.offloader = createManagedLedgerOffloader(this.getConfiguration());
// Start the leader election service
startLeaderElectionService();

// needs load management service
this.startNamespaceService();

brokerService.start();

Expand Down Expand Up @@ -560,7 +573,7 @@ protected void acquireSLANamespace() {
// Namespace not created hence no need to unload it
String nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
if (!this.globalZkCache.exists(
AdminResource.path(POLICIES) + "/" + nsName)) {
path(POLICIES) + "/" + nsName)) {
LOG.info("SLA Namespace = {} doesn't exist.", nsName);
return;
}
Expand Down Expand Up @@ -761,7 +774,11 @@ public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
}

public LedgerOffloader getManagedLedgerOffloader() {
return offloader;
return defaultOffloader;
}

public LedgerOffloader getManagedLedgerOffloadForNamespace(String namespace) {
return namespacesOffloaders.get(namespace);
}

public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
Expand All @@ -777,6 +794,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur
try {
return offloaderFactory.create(
conf.getProperties(),
null,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
Expand All @@ -794,6 +812,61 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur
}
}

public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf, OffloadPolicies offloadPolicies)
throws PulsarServerException {
try {
if (StringUtils.isNotBlank(offloadPolicies.getDriver())) {
checkNotNull(conf.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
conf.getManagedLedgerOffloadDriver());
this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory());
LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(offloadPolicies.getDriver());
try {
return offloaderFactory.create(
conf.getProperties(),
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(conf));
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
}
} else {
LOG.info("No ledger offloader configured, using NULL instance");
return NullLedgerOffloader.INSTANCE;
}
} catch (Throwable t) {
throw new PulsarServerException(t);
}
}

public Offloaders getOffloaderManager() {
return offloaderManager;
}

protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
final String policyPath = path(POLICIES, namespace);
Policies policies = this.configurationCacheService.policiesCache ().get(policyPath)
.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = this.getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;

return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
LOG.error("Failed to get namespace policies {}", namespaceName, e);
throw new RestException(e);
}
}

public ZooKeeperCache getLocalZkCache() {
return localZkCache;
}
Expand Down Expand Up @@ -858,7 +931,7 @@ public synchronized Compactor getCompactor() throws PulsarServerException {
return this.compactor;
}

protected synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) {
public synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
Expand Down Expand Up @@ -1028,7 +1101,7 @@ private void startWorkerService(AuthenticationService authenticationService,
try {
NamedEntity.checkName(property);
this.getGlobalZkCache().getZooKeeper().create(
AdminResource.path(POLICIES, property),
path(POLICIES, property),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
new TenantInfo(
Sets.newHashSet(config.getSuperUserRoles()),
Expand All @@ -1051,7 +1124,7 @@ private void startWorkerService(AuthenticationService authenticationService,
ClusterData clusterData = new ClusterData(this.getSafeWebServiceAddress(), null /* serviceUrlTls */,
brokerServiceUrl, null /* brokerServiceUrlTls */);
this.getGlobalZkCache().getZooKeeper().create(
AdminResource.path("clusters", cluster),
path("clusters", cluster),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.info("Created cluster {} for function worker", cluster);
Expand All @@ -1073,9 +1146,9 @@ private void startWorkerService(AuthenticationService authenticationService,
int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles();
policies.bundles = getBundles(defaultNumberOfBundles);

this.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES, namespace));
this.getConfigurationCache().policiesCache().invalidate(path(POLICIES, namespace));
ZkUtils.createFullPathOptimistic(this.getGlobalZkCache().getZooKeeper(),
AdminResource.path(POLICIES, namespace),
path(POLICIES, namespace),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -2142,5 +2143,36 @@ private <T> void mutatePolicy(Function<Policies, Policies> policyTransformation,
}
}

protected void internalSetOffloadPolicies(OffloadPolicies offload) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
policies.offloadPolicies = offload;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(policies.offloadPolicies));
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update offload configuration for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update offload configuration for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update offload configuration for namespace {}", clientAppId(), namespaceName,
e);
throw new RestException(e);
}
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Loading