diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java index f0a6890aeb272..c6956659e2818 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage. @@ -41,17 +42,19 @@ public interface LedgerOffloaderFactory { boolean isDriverSupported(String driverName); /** - * Create a ledger offloader with the provided configuration, user-metadata and scheduler. + * Create a ledger offloader with the provided configuration, offload policies, user-metadata and scheduler. * * @param properties service configuration + * @param offloadPolicies the namespace offload policies * @param userMetadata user metadata * @param scheduler scheduler * @return the offloader instance * @throws IOException when fail to create an offloader */ T create(Properties properties, + OffloadPolicies offloadPolicies, Map userMetadata, OrderedScheduler scheduler) - throws IOException; + throws IOException; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 79d4da53bd7d8..2f5d9316760a6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1352,7 +1352,31 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_STORAGE_OFFLOADING, doc = "Driver to use to offload old data to long term storage" ) - private String managedLedgerOffloadDriver = null; + private String defaultOffloadDriver = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Endpoint to use to offload old data to long term storage" + ) + private String defaultOffloadEndpoint = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Bucket to use to offload old data to long term storage" + ) + private String defaultOffloadBucket = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Max block size in bytes to use to offload old data to long term storage" + ) + private Long defaultOffloadMaxBlockSizeInBytes = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Read buffer size in bytes to use to offload old data to long term storage" + ) + private Long defaultOffloadReadBufferSizeInBytes = null; @FieldContext( category = CATEGORY_STORAGE_OFFLOADING, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index fd169fe1d1d59..e750ec09a3f1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -85,6 +86,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; +import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.broker.web.WebService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; @@ -95,9 +97,13 @@ import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -130,6 +136,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.core.Response; + /** * Main class for Pulsar broker service */ @@ -152,6 +160,8 @@ public class PulsarService implements AutoCloseable { private GlobalZooKeeperCache globalZkCache; private LocalZooKeeperConnectionService localZooKeeperConnectionProvider; private Compactor compactor; + private LedgerOffloader defaultOffloader; + private Map namespacesOffloaders; private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20, new DefaultThreadFactory("pulsar")); @@ -162,7 +172,6 @@ public class PulsarService implements AutoCloseable { private ScheduledExecutorService compactorExecutor; private OrderedScheduler offloaderScheduler; private Offloaders offloaderManager = new Offloaders(); - private LedgerOffloader offloader; private ScheduledFuture loadReportTask = null; private ScheduledFuture loadSheddingTask = null; private ScheduledFuture loadResourceQuotaTask = null; @@ -398,7 +407,11 @@ public void start() throws PulsarServerException { // Start load management service (even if load balancing is disabled) this.loadManager.set(LoadManager.create(this)); - this.offloader = createManagedLedgerOffloader(this.getConfiguration()); + // Start the leader election service + startLeaderElectionService(); + + // needs load management service + this.startNamespaceService(); brokerService.start(); @@ -560,7 +573,7 @@ protected void acquireSLANamespace() { // Namespace not created hence no need to unload it String nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config); if (!this.globalZkCache.exists( - AdminResource.path(POLICIES) + "/" + nsName)) { + path(POLICIES) + "/" + nsName)) { LOG.info("SLA Namespace = {} doesn't exist.", nsName); return; } @@ -761,7 +774,11 @@ public ManagedLedgerClientFactory getManagedLedgerClientFactory() { } public LedgerOffloader getManagedLedgerOffloader() { - return offloader; + return defaultOffloader; + } + + public LedgerOffloader getManagedLedgerOffloadForNamespace(String namespace) { + return namespacesOffloaders.get(namespace); } public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) @@ -777,6 +794,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur try { return offloaderFactory.create( conf.getProperties(), + null, ImmutableMap.of( LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() @@ -794,6 +812,61 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur } } + public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf, OffloadPolicies offloadPolicies) + throws PulsarServerException { + try { + if (StringUtils.isNotBlank(offloadPolicies.getDriver())) { + checkNotNull(conf.getOffloadersDirectory(), + "Offloader driver is configured to be '%s' but no offloaders directory is configured.", + conf.getManagedLedgerOffloadDriver()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory()); + LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(offloadPolicies.getDriver()); + try { + return offloaderFactory.create( + conf.getProperties(), + offloadPolicies, + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + getOffloaderScheduler(conf)); + } catch (IOException ioe) { + throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); + } + } else { + LOG.info("No ledger offloader configured, using NULL instance"); + return NullLedgerOffloader.INSTANCE; + } + } catch (Throwable t) { + throw new PulsarServerException(t); + } + } + + public Offloaders getOffloaderManager() { + return offloaderManager; + } + + protected Policies getNamespacePolicies(NamespaceName namespaceName) { + try { + final String namespace = namespaceName.toString(); + final String policyPath = path(POLICIES, namespace); + Policies policies = this.configurationCacheService.policiesCache ().get(policyPath) + .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + // fetch bundles from LocalZK-policies + NamespaceBundles bundles = this.getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName); + BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles); + policies.bundles = bundleData != null ? bundleData : policies.bundles; + + return policies; + } catch (RestException re) { + throw re; + } catch (Exception e) { + LOG.error("Failed to get namespace policies {}", namespaceName, e); + throw new RestException(e); + } + } + public ZooKeeperCache getLocalZkCache() { return localZkCache; } @@ -858,7 +931,7 @@ public synchronized Compactor getCompactor() throws PulsarServerException { return this.compactor; } - protected synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) { + public synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() .numThreads(conf.getManagedLedgerOffloadMaxThreads()) @@ -1028,7 +1101,7 @@ private void startWorkerService(AuthenticationService authenticationService, try { NamedEntity.checkName(property); this.getGlobalZkCache().getZooKeeper().create( - AdminResource.path(POLICIES, property), + path(POLICIES, property), ObjectMapperFactory.getThreadLocal().writeValueAsBytes( new TenantInfo( Sets.newHashSet(config.getSuperUserRoles()), @@ -1051,7 +1124,7 @@ private void startWorkerService(AuthenticationService authenticationService, ClusterData clusterData = new ClusterData(this.getSafeWebServiceAddress(), null /* serviceUrlTls */, brokerServiceUrl, null /* brokerServiceUrlTls */); this.getGlobalZkCache().getZooKeeper().create( - AdminResource.path("clusters", cluster), + path("clusters", cluster), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); LOG.info("Created cluster {} for function worker", cluster); @@ -1073,9 +1146,9 @@ private void startWorkerService(AuthenticationService authenticationService, int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles(); policies.bundles = getBundles(defaultNumberOfBundles); - this.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES, namespace)); + this.getConfigurationCache().policiesCache().invalidate(path(POLICIES, namespace)); ZkUtils.createFullPathOptimistic(this.getGlobalZkCache().getZooKeeper(), - AdminResource.path(POLICIES, namespace), + path(POLICIES, namespace), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index adc7bd4d502b6..92f6d5b9fb5ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -84,6 +84,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.KeeperException; @@ -2142,5 +2143,36 @@ private void mutatePolicy(Function policyTransformation, } } + protected void internalSetOffloadPolicies(OffloadPolicies offload) { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + + try { + Stat nodeStat = new Stat(); + final String path = path(POLICIES, namespaceName.toString()); + byte[] content = globalZk().getData(path, null, nodeStat); + Policies policies = jsonMapper().readValue(content, Policies.class); + policies.offloadPolicies = offload; + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); + policiesCache().invalidate(path(POLICIES, namespaceName.toString())); + log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(), + namespaceName, jsonMapper().writeValueAsString(policies.offloadPolicies)); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to update offload configuration for namespace {}: does not exist", clientAppId(), + namespaceName); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn("[{}] Failed to update offload configuration for namespace {}: concurrent modification", + clientAppId(), namespaceName); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (RestException pfe) { + throw pfe; + } catch (Exception e) { + log.error("[{}] Failed to update offload configuration for namespace {}", clientAppId(), namespaceName, + e); + throw new RestException(e); + } + } + private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index eb8a05fd60ad6..29ac79096b445 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -27,6 +27,7 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; @@ -69,15 +70,15 @@ import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.*; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; -import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.bookkeeper.mledger.offload.OffloaderUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -121,6 +122,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies;; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.Policies; @@ -939,6 +941,14 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.getDefaultRetentionSizeInMB()) ); + OffloadPolicies offloadPolicies = policies.map(p -> p.offloadPolicies).orElseGet( + () -> new OffloadPolicies(serviceConfig.getDefaultOffloadDriver(), + serviceConfig.getDefaultOffloadEndpoint(), + serviceConfig.getDefaultOffloadBucket(), + serviceConfig.getDefaultOffloadMaxBlockSizeInBytes(), + serviceConfig.getDefaultOffloadReadBufferSizeInBytes()) + ); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); @@ -981,7 +991,6 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); - managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader()); policies.ifPresent(p -> { long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs(); if (p.offload_deletion_lag_ms != null) { @@ -995,12 +1004,48 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(bytes); }); + + try { + managedLedgerConfig.setLedgerOffloader(createManagedLedgerOffloader(serviceConfig, offloadPolicies)); + } catch (PulsarServerException e) { + log.error("Can't create managed ledger offloader for {} due to {}.", namespace, e); + future.completeExceptionally(e); + } future.complete(managedLedgerConfig); }, (exception) -> future.completeExceptionally(exception))); return future; } + private synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf, OffloadPolicies offloadPolicies) + throws PulsarServerException { + try { + if (StringUtils.isNotBlank(offloadPolicies.getDriver())) { + checkNotNull(conf.getOffloadersDirectory(), + "Offloader driver is configured to be '%s' but no offloaders directory is configured.", + conf.getDefaultOffloadDriver()); + LedgerOffloaderFactory offloaderFactory = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory()).getOffloaderFactory(offloadPolicies.getDriver()); + try { + return offloaderFactory.create( + conf.getProperties(), + offloadPolicies, + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + pulsar.getOffloaderScheduler(conf)); + } catch (IOException ioe) { + throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); + } + } else { + log.info("No ledger offloader configured, using NULL instance"); + return NullLedgerOffloader.INSTANCE; + } + } catch (Throwable t) { + throw new PulsarServerException(t); + } + } + private void addTopicToStatsMaps(TopicName topicName, Topic topic) { try { NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getBundle(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 7f56a3fc02e9b..589640b5e3972 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -71,8 +71,6 @@ private void testOffload(String topicName, String mlName) throws Exception { LedgerOffloader offloader = mock(LedgerOffloader.class); when(offloader.getOffloadDriverName()).thenReturn("mock"); - doReturn(offloader).when(pulsar).getManagedLedgerOffloader(); - CompletableFuture promise = new CompletableFuture<>(); doReturn(promise).when(offloader).offload(any(), any(), any()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 8b858ba53f93f..2a9766f001cbf 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -33,6 +33,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1550,10 +1551,6 @@ SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String namespace) * * @param namespace The namespace in whose policy should be set * @param strategy The schema compatibility strategy - * @throws NotAuthorizedException - * Don't have admin permission - * @throws NotFoundException - * Namespace does not exist * @throws PulsarAdminException * Unexpected error */ @@ -1591,4 +1588,67 @@ boolean getIsAllowAutoUpdateSchema(String namespace) */ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchema) throws PulsarAdminException; + + /* + * Set the offload configuration for all the topics on a namespace. + *

+ * Set the offload configuration on a namespace. This operation requires Pulsar super-user access. + *

+ * Request parameter example: + *

+ * + *

+     * 
+     * {
+     *  "driver": "S3",                            // offload driver type
+     *  "endpoint": "https//endpoint",             // endpoint hostname
+     *  "bucket": "pulsar-storage-",    // bucket name
+     *  "maxBlockSizeInBytes": 67108864,
+     *  "readBufferSizeInBytes": 1048576
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + */ + void setOffloadPolicies(String namespace, OffloadPolicies offload) throws PulsarAdminException; + + /** + * Get the offload configuration for a namespace. + *

+ * Get the offload configuration for a namespace. + *

+ * Response example: + *

+ * + *

+     * 
+     * {
+     *  "driver": "S3",                            // offload driver type
+     *  "endpoint": "https//endpoint",             // endpoint hostname
+     *  "bucket": "pulsar-storage-",    // bucket name
+     *  "maxBlockSizeInBytes": 67108864,
+     *  "readBufferSizeInBytes": 1048576
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws ConflictException + * Concurrent modification + * @throws PulsarAdminException + * Unexpected error + */ + OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7ab7762c7612d..d50378adb9c5c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -38,11 +38,12 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1012,4 +1013,27 @@ private WebTarget namespacePath(NamespaceName namespace, String... parts) { namespacePath = WebTargets.addParts(namespacePath, parts); return namespacePath; } + + @Override + public void setOffloadPolicies(String namespace, OffloadPolicies offload) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offload"); + request(path).post(Entity.entity(offload, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + + } + + @Override + public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offload"); + return request(path).get(OffloadPolicies.class); + } catch (Exception e) { + throw getApiException(e); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java new file mode 100644 index 0000000000000..f7b38580a1e61 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import java.util.Objects; + +/** + */ +public class OffloadPolicies { + private String driver; + private String endpoint; + private String bucket; + private long maxBlockSizeInBytes; + private long readBufferSizeInBytes; + + public OffloadPolicies(String driver, String endpoint, String bucket, + long maxBlockSizeInBytes, long readBufferSizeInBytes) { + this.driver = driver; + this.endpoint = endpoint; + this.bucket = bucket; + this.maxBlockSizeInBytes = maxBlockSizeInBytes; + this.readBufferSizeInBytes = readBufferSizeInBytes; + } + + public String getDriver() { + return driver; + } + + public String getEndpoint() { + return endpoint; + } + + public String getBucket() { + return bucket; + } + + public long getMaxBlockSizeInBytes() { + return maxBlockSizeInBytes; + } + + public long getReadBufferSizeInBytes() { + return readBufferSizeInBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OffloadPolicies that = (OffloadPolicies) o; + return maxBlockSizeInBytes == that.maxBlockSizeInBytes + && readBufferSizeInBytes == that.readBufferSizeInBytes + && Objects.equals(driver, that.driver) + && Objects.equals(endpoint, that.endpoint) + && Objects.equals(bucket, that.bucket); + } + + @Override + public int hashCode() { + return Objects.hash(driver, endpoint, bucket, maxBlockSizeInBytes, readBufferSizeInBytes); + } + + @Override + public String toString() { + return "OffloadPolicies{" + + "driver=" + driver + + ", endpoint='" + endpoint + '\'' + + ", bucket='" + bucket + '\'' + + ", maxBlockSizeInBytes=" + maxBlockSizeInBytes + + ", readBufferSizeInBytes=" + readBufferSizeInBytes + + '}'; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 8e55280fc8de9..bb70b285d08b1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -79,6 +79,7 @@ public class Policies { public long offload_threshold = -1; @SuppressWarnings("checkstyle:MemberName") public Long offload_deletion_lag_ms = null; + public OffloadPolicies offloadPolicies = null; @SuppressWarnings("checkstyle:MemberName") @Deprecated @@ -107,6 +108,7 @@ public int hashCode() { max_consumers_per_topic, max_consumers_per_subscription, compaction_threshold, offload_threshold, offload_deletion_lag_ms, + offloadPolicies, schema_auto_update_compatibility_strategy, schema_validation_enforced, schema_compatibility_strategy, @@ -140,6 +142,7 @@ public boolean equals(Object obj) { && compaction_threshold == other.compaction_threshold && offload_threshold == other.offload_threshold && offload_deletion_lag_ms == other.offload_deletion_lag_ms + && offloadPolicies == other.offloadPolicies && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy && schema_validation_enforced == other.schema_validation_enforced && schema_compatibility_strategy == other.schema_compatibility_strategy @@ -188,6 +191,7 @@ public String toString() { .add("compaction_threshold", compaction_threshold) .add("offload_threshold", offload_threshold) .add("offload_deletion_lag_ms", offload_deletion_lag_ms) + .add("offloadPolicies", offloadPolicies) .add("schema_auto_update_compatibility_strategy", schema_auto_update_compatibility_strategy) .add("schema_validation_enforced", schema_validation_enforced) .add("schema_compatibility_Strategy", schema_compatibility_strategy) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 1af86a2225a06..abc2cee50f890 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -142,12 +142,13 @@ private LedgerOffloader initManagedLedgerOffloader(PulsarConnectorConfig conf) { try { return offloaderFactory.create( - PulsarConnectorUtils.getProperties(offloaderProperties), - ImmutableMap.of( - LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), - LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() - ), - getOffloaderScheduler(conf)); + PulsarConnectorUtils.getProperties(offloaderProperties), + null, + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + getOffloaderScheduler(conf)); } catch (IOException ioe) { log.error("Failed to create offloader: ", ioe); throw new RuntimeException(ioe.getMessage(), ioe.getCause()); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java index 899887b6f8fbf..e1dbb8ab06fb0 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem; import lombok.Data; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import java.io.Serializable; import java.lang.reflect.Field; @@ -65,4 +66,29 @@ public static FileSystemConfigurationData create(Properties properties) { }); return data; } + + /** + * Create a tiered storage configuration from the provided properties and offloadPolicies. + * + * @param properties the configuration properties + * @param offloadPolicies the offload policies + * @return tiered storage configuration + */ + public static FileSystemConfigurationData create(Properties properties, OffloadPolicies offloadPolicies) { + // TODO: replace default conf by policies one + FileSystemConfigurationData data = new FileSystemConfigurationData(); + Field[] fields = FileSystemConfigurationData.class.getDeclaredFields(); + Arrays.stream(fields).forEach(f -> { + if (properties.containsKey(f.getName())) { + try { + f.setAccessible(true); + f.set(data, value((String) properties.get(f.getName()), f)); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s", + f.getName(), properties.get(f.getName())), e); + } + } + }); + return data; + } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java index cd52197a59bb5..ebd0dbde50c97 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java @@ -21,6 +21,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import java.io.IOException; import java.util.Map; @@ -33,8 +34,8 @@ public boolean isDriverSupported(String driverName) { } @Override - public FileSystemManagedLedgerOffloader create(Properties properties, Map userMetadata, OrderedScheduler scheduler) throws IOException { - FileSystemConfigurationData data = FileSystemConfigurationData.create(properties); + public FileSystemManagedLedgerOffloader create(Properties properties, OffloadPolicies offloadPolicies, Map userMetadata, OrderedScheduler scheduler) throws IOException { + FileSystemConfigurationData data = FileSystemConfigurationData.create(properties, offloadPolicies); return FileSystemManagedLedgerOffloader.create(data, scheduler); } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 309076c4c3605..1fbc68947b489 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java index dffe253f94a98..7f561a42517f8 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * A jcloud based offloader factory. @@ -43,9 +44,10 @@ public boolean isDriverSupported(String driverName) { @Override public BlobStoreManagedLedgerOffloader create(Properties properties, + OffloadPolicies offloadPolicies, Map userMetadata, OrderedScheduler scheduler) throws IOException { - TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties); + TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties, offloadPolicies); return BlobStoreManagedLedgerOffloader.create(data, userMetadata, scheduler); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java index a4c5cf4fa8b2e..b3e8ee70f7147 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Properties; import lombok.Data; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Configuration for tiered storage. @@ -118,4 +119,29 @@ public static TieredStorageConfigurationData create(Properties properties) { return data; } + /** + * Create a tiered storage configuration from the provided properties and offloadPolicies. + * + * @param properties the configuration properties + * @param offloadPolicies the offload policies of the current namespace + * @return tiered storage configuration + */ + public static TieredStorageConfigurationData create(Properties properties, OffloadPolicies offloadPolicies) { + // TODO: replace default conf by policies one + TieredStorageConfigurationData data = new TieredStorageConfigurationData(); + Field[] fields = TieredStorageConfigurationData.class.getDeclaredFields(); + Arrays.stream(fields).forEach(f -> { + if (properties.containsKey(f.getName())) { + try { + f.setAccessible(true); + f.set(data, value((String) properties.get(f.getName()), f)); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s", + f.getName(), properties.get(f.getName())), e); + } + } + }); + return data; + } + }