From 9743e76855b850b1f4ed2910ee77503755c11ef8 Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Tue, 16 Jul 2019 17:14:43 +0200 Subject: [PATCH 01/11] add offload policies --- .../common/policies/data/OffloadPolicies.java | 89 +++++++++++++++++++ .../pulsar/common/policies/data/Policies.java | 4 + .../common/tieredStorage/OffloadType.java | 61 +++++++++++++ 3 files changed, 154 insertions(+) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java new file mode 100644 index 0000000000000..4014100c6e937 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import org.apache.pulsar.common.tieredStorage.OffloadType; + +import java.util.Objects; + +/** + */ +public class OffloadPolicies { + private OffloadType driver; + private String endpoint; + private String bucket; + private long maxBlockSizeInBytes; + private long readBufferSizeInBytes; + + public OffloadPolicies(OffloadType driver, String endpoint, String bucket, long maxBlockSizeInBytes, long readBufferSizeInBytes) { + this.driver = driver; + this.endpoint = endpoint; + this.bucket = bucket; + this.maxBlockSizeInBytes = maxBlockSizeInBytes; + this.readBufferSizeInBytes = readBufferSizeInBytes; + } + + public OffloadType getDriver() { + return driver; + } + + public String getEndpoint() { + return endpoint; + } + + public String getBucket() { + return bucket; + } + + public long getMaxBlockSizeInBytes() { + return maxBlockSizeInBytes; + } + + public long getReadBufferSizeInBytes() { + return readBufferSizeInBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OffloadPolicies that = (OffloadPolicies) o; + return maxBlockSizeInBytes == that.maxBlockSizeInBytes && + readBufferSizeInBytes == that.readBufferSizeInBytes && + driver == that.driver && + Objects.equals(endpoint, that.endpoint) && + Objects.equals(bucket, that.bucket); + } + + @Override + public int hashCode() { + return Objects.hash(driver, endpoint, bucket, maxBlockSizeInBytes, readBufferSizeInBytes); + } + + @Override + public String toString() { + return "OffloadPolicies{" + + "driver=" + driver + + ", endpoint='" + endpoint + '\'' + + ", bucket='" + bucket + '\'' + + ", maxBlockSizeInBytes=" + maxBlockSizeInBytes + + ", readBufferSizeInBytes=" + readBufferSizeInBytes + + '}'; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 8e55280fc8de9..56c729370b5d8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -79,6 +79,7 @@ public class Policies { public long offload_threshold = -1; @SuppressWarnings("checkstyle:MemberName") public Long offload_deletion_lag_ms = null; + public OffloadPolicies offload_policies = null; @SuppressWarnings("checkstyle:MemberName") @Deprecated @@ -107,6 +108,7 @@ public int hashCode() { max_consumers_per_topic, max_consumers_per_subscription, compaction_threshold, offload_threshold, offload_deletion_lag_ms, + offload_policies, schema_auto_update_compatibility_strategy, schema_validation_enforced, schema_compatibility_strategy, @@ -140,6 +142,7 @@ public boolean equals(Object obj) { && compaction_threshold == other.compaction_threshold && offload_threshold == other.offload_threshold && offload_deletion_lag_ms == other.offload_deletion_lag_ms + && offload_policies == other.offload_policies && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy && schema_validation_enforced == other.schema_validation_enforced && schema_compatibility_strategy == other.schema_compatibility_strategy @@ -188,6 +191,7 @@ public String toString() { .add("compaction_threshold", compaction_threshold) .add("offload_threshold", offload_threshold) .add("offload_deletion_lag_ms", offload_deletion_lag_ms) + .add("offload_policies", offload_policies) .add("schema_auto_update_compatibility_strategy", schema_auto_update_compatibility_strategy) .add("schema_validation_enforced", schema_validation_enforced) .add("schema_compatibility_Strategy", schema_compatibility_strategy) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java new file mode 100644 index 0000000000000..ab13435bfc493 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.tieredStorage; + +/** + * Types of supported schema for Pulsar messages + * + * + *

Ideally we should have just one single set of enum definitions + * for schema type. but we have 3 locations of defining schema types. + * + *

when you are adding a new schema type that whose + * schema info is required to be recorded in schema registry, + * add corresponding schema type into `pulsar-common/src/main/proto/PulsarApi.proto` + * and `pulsar-broker/src/main/proto/SchemaRegistryFormat.proto`. + */ +public enum OffloadType { + /** + * No offload type defined + */ + NONE(0), + + /** + * S3 type + */ + S3(1); + + int value; + + OffloadType(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + + public static OffloadType valueOf(int value) { + switch (value) { + case 0: return NONE; + case 1: return S3; + default: return NONE; + } + } +} From d3fa073dadd8676049945964d54593557cb6ca55 Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Tue, 16 Jul 2019 17:21:06 +0200 Subject: [PATCH 02/11] add internal offload policies setter for namespace --- .../broker/admin/impl/NamespacesBase.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index adc7bd4d502b6..f4755f063e9d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -68,8 +68,6 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; @@ -84,6 +82,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.KeeperException; @@ -2142,5 +2141,36 @@ private void mutatePolicy(Function policyTransformation, } } + protected void internalSetOffload(OffloadPolicies offload) { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + + try { + Stat nodeStat = new Stat(); + final String path = path(POLICIES, namespaceName.toString()); + byte[] content = globalZk().getData(path, null, nodeStat); + Policies policies = jsonMapper().readValue(content, Policies.class); + policies.offload_policies = offload; + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); + policiesCache().invalidate(path(POLICIES, namespaceName.toString())); + log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(), + namespaceName, jsonMapper().writeValueAsString(policies.offload_policies)); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to update offload configuration for namespace {}: does not exist", clientAppId(), + namespaceName); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn("[{}] Failed to update offload configuration for namespace {}: concurrent modification", + clientAppId(), namespaceName); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (RestException pfe) { + throw pfe; + } catch (Exception e) { + log.error("[{}] Failed to update offload configuration for namespace {}", clientAppId(), namespaceName, + e); + throw new RestException(e); + } + } + private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } From 6a677c9bf8e5cef7d9b6233273a4e22086c525f7 Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Tue, 16 Jul 2019 17:29:37 +0200 Subject: [PATCH 03/11] add offload per namespace method to pulsar-client-admin --- .../pulsar/client/admin/Namespaces.java | 68 +++++++++++++++++++ .../client/admin/internal/NamespacesImpl.java | 27 +++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 8b858ba53f93f..c18cf127268b3 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -33,6 +33,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1529,6 +1530,7 @@ void setSchemaValidationEnforced(String namespace, boolean schemaValidationEnfor throws PulsarAdminException; /** +<<<<<<< HEAD * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema * before it is installed. * @@ -1550,10 +1552,35 @@ SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String namespace) * * @param namespace The namespace in whose policy should be set * @param strategy The schema compatibility strategy +======= + * Set the offload configuration for all the topics on a namespace. + *

+ * Set the offload configuration on a namespace. This operation requires Pulsar super-user access. + *

+ * Request parameter example: + *

+ * + *

+     * 
+     * {
+     *  "driver": "S3",                            // offload driver type
+     *  "endpoint": "https//endpoint",             // endpoint hostname
+     *  "bucket": "pulsar-storage-",    // bucket name
+     *  "maxBlockSizeInBytes": 67108864,
+     *  "readBufferSizeInBytes": 1048576
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * +>>>>>>> add offload per namespace method to pulsar-client-admin * @throws NotAuthorizedException * Don't have admin permission * @throws NotFoundException * Namespace does not exist +<<<<<<< HEAD * @throws PulsarAdminException * Unexpected error */ @@ -1591,4 +1618,45 @@ boolean getIsAllowAutoUpdateSchema(String namespace) */ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchema) throws PulsarAdminException; +======= + * @throws ConflictException + * Concurrent modification + * @throws PulsarAdminException + * Unexpected error + */ + void setOffload(String namespace, OffloadPolicies offload) throws PulsarAdminException; + + /** + * Get the offload configuration for a namespace. + *

+ * Get the offload configuration for a namespace. + *

+ * Response example: + *

+ * + *

+     * 
+     * {
+     *  "driver": "S3",                            // offload driver type
+     *  "endpoint": "https//endpoint",             // endpoint hostname
+     *  "bucket": "pulsar-storage-",    // bucket name
+     *  "maxBlockSizeInBytes": 67108864,
+     *  "readBufferSizeInBytes": 1048576
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws ConflictException + * Concurrent modification + * @throws PulsarAdminException + * Unexpected error + */ + OffloadPolicies getOffload(String namespace) throws PulsarAdminException; +>>>>>>> add offload per namespace method to pulsar-client-admin } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7ab7762c7612d..380404597737e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -36,13 +36,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1012,4 +1010,27 @@ private WebTarget namespacePath(NamespaceName namespace, String... parts) { namespacePath = WebTargets.addParts(namespacePath, parts); return namespacePath; } + + @Override + public void setOffload(String namespace, OffloadPolicies offload) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offload"); + request(path).post(Entity.entity(offload, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + + } + + @Override + public OffloadPolicies getOffload(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offload"); + return request(path).get(OffloadPolicies.class); + } catch (Exception e) { + throw getApiException(e); + } + } } From d565433a75e09a0cb3d8f78b2e8fa21196c3401a Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Wed, 17 Jul 2019 12:28:50 +0200 Subject: [PATCH 04/11] set method naming more relevant --- .../broker/admin/impl/NamespacesBase.java | 2 +- .../pulsar/client/admin/Namespaces.java | 68 ++++++++----------- .../client/admin/internal/NamespacesImpl.java | 4 +- 3 files changed, 33 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f4755f063e9d9..dee849754571a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2141,7 +2141,7 @@ private void mutatePolicy(Function policyTransformation, } } - protected void internalSetOffload(OffloadPolicies offload) { + protected void internalSetOffloadPolicies(OffloadPolicies offload) { validateAdminAccessForTenant(namespaceName.getTenant()); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index c18cf127268b3..2a9766f001cbf 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -1530,7 +1530,6 @@ void setSchemaValidationEnforced(String namespace, boolean schemaValidationEnfor throws PulsarAdminException; /** -<<<<<<< HEAD * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema * before it is installed. * @@ -1552,35 +1551,6 @@ SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String namespace) * * @param namespace The namespace in whose policy should be set * @param strategy The schema compatibility strategy -======= - * Set the offload configuration for all the topics on a namespace. - *

- * Set the offload configuration on a namespace. This operation requires Pulsar super-user access. - *

- * Request parameter example: - *

- * - *

-     * 
-     * {
-     *  "driver": "S3",                            // offload driver type
-     *  "endpoint": "https//endpoint",             // endpoint hostname
-     *  "bucket": "pulsar-storage-",    // bucket name
-     *  "maxBlockSizeInBytes": 67108864,
-     *  "readBufferSizeInBytes": 1048576
-     * }
-     * 
-     * 
- * - * @param namespace - * Namespace name - * ->>>>>>> add offload per namespace method to pulsar-client-admin - * @throws NotAuthorizedException - * Don't have admin permission - * @throws NotFoundException - * Namespace does not exist -<<<<<<< HEAD * @throws PulsarAdminException * Unexpected error */ @@ -1618,13 +1588,36 @@ boolean getIsAllowAutoUpdateSchema(String namespace) */ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchema) throws PulsarAdminException; -======= - * @throws ConflictException - * Concurrent modification - * @throws PulsarAdminException - * Unexpected error + + /* + * Set the offload configuration for all the topics on a namespace. + *

+ * Set the offload configuration on a namespace. This operation requires Pulsar super-user access. + *

+ * Request parameter example: + *

+ * + *

+     * 
+     * {
+     *  "driver": "S3",                            // offload driver type
+     *  "endpoint": "https//endpoint",             // endpoint hostname
+     *  "bucket": "pulsar-storage-",    // bucket name
+     *  "maxBlockSizeInBytes": 67108864,
+     *  "readBufferSizeInBytes": 1048576
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist */ - void setOffload(String namespace, OffloadPolicies offload) throws PulsarAdminException; + void setOffloadPolicies(String namespace, OffloadPolicies offload) throws PulsarAdminException; /** * Get the offload configuration for a namespace. @@ -1657,6 +1650,5 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem * @throws PulsarAdminException * Unexpected error */ - OffloadPolicies getOffload(String namespace) throws PulsarAdminException; ->>>>>>> add offload per namespace method to pulsar-client-admin + OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 380404597737e..d5ca5a2bcfd67 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1012,7 +1012,7 @@ private WebTarget namespacePath(NamespaceName namespace, String... parts) { } @Override - public void setOffload(String namespace, OffloadPolicies offload) throws PulsarAdminException { + public void setOffloadPolicies(String namespace, OffloadPolicies offload) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "offload"); @@ -1024,7 +1024,7 @@ public void setOffload(String namespace, OffloadPolicies offload) throws PulsarA } @Override - public OffloadPolicies getOffload(String namespace) throws PulsarAdminException { + public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "offload"); From 9896db3a064378b3c674088fe1e7719b24125697 Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Wed, 17 Jul 2019 12:29:11 +0200 Subject: [PATCH 05/11] offloadtype is now string --- .../common/policies/data/OffloadPolicies.java | 10 ++- .../common/tieredStorage/OffloadType.java | 61 ------------------- 2 files changed, 4 insertions(+), 67 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 4014100c6e937..2a6d350cfa003 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -18,20 +18,18 @@ */ package org.apache.pulsar.common.policies.data; -import org.apache.pulsar.common.tieredStorage.OffloadType; - import java.util.Objects; /** */ public class OffloadPolicies { - private OffloadType driver; + private String driver; private String endpoint; private String bucket; private long maxBlockSizeInBytes; private long readBufferSizeInBytes; - public OffloadPolicies(OffloadType driver, String endpoint, String bucket, long maxBlockSizeInBytes, long readBufferSizeInBytes) { + public OffloadPolicies(String driver, String endpoint, String bucket, long maxBlockSizeInBytes, long readBufferSizeInBytes) { this.driver = driver; this.endpoint = endpoint; this.bucket = bucket; @@ -39,7 +37,7 @@ public OffloadPolicies(OffloadType driver, String endpoint, String bucket, long this.readBufferSizeInBytes = readBufferSizeInBytes; } - public OffloadType getDriver() { + public String getDriver() { return driver; } @@ -66,7 +64,7 @@ public boolean equals(Object o) { OffloadPolicies that = (OffloadPolicies) o; return maxBlockSizeInBytes == that.maxBlockSizeInBytes && readBufferSizeInBytes == that.readBufferSizeInBytes && - driver == that.driver && + Objects.equals(driver, that.driver) && Objects.equals(endpoint, that.endpoint) && Objects.equals(bucket, that.bucket); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java deleted file mode 100644 index ab13435bfc493..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/tieredStorage/OffloadType.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.tieredStorage; - -/** - * Types of supported schema for Pulsar messages - * - * - *

Ideally we should have just one single set of enum definitions - * for schema type. but we have 3 locations of defining schema types. - * - *

when you are adding a new schema type that whose - * schema info is required to be recorded in schema registry, - * add corresponding schema type into `pulsar-common/src/main/proto/PulsarApi.proto` - * and `pulsar-broker/src/main/proto/SchemaRegistryFormat.proto`. - */ -public enum OffloadType { - /** - * No offload type defined - */ - NONE(0), - - /** - * S3 type - */ - S3(1); - - int value; - - OffloadType(int value) { - this.value = value; - } - - public int getValue() { - return this.value; - } - - public static OffloadType valueOf(int value) { - switch (value) { - case 0: return NONE; - case 1: return S3; - default: return NONE; - } - } -} From 84556fb2980ea3cbcbbb5a81a23ab2e7a036245f Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Wed, 17 Jul 2019 13:49:13 +0200 Subject: [PATCH 06/11] create offloaders per namespaceName on PulsarService --- .../mledger/LedgerOffloaderFactory.java | 17 +++ .../apache/pulsar/broker/PulsarService.java | 133 +++++++++++++++--- .../FileSystemConfigurationData.java | 26 ++++ .../FileSystemLedgerOffloaderFactory.java | 7 + .../FileSystemManagedLedgerOffloader.java | 1 + .../jcloud/JCloudLedgerOffloaderFactory.java | 10 ++ .../TieredStorageConfigurationData.java | 26 ++++ 7 files changed, 204 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java index f0a6890aeb272..22250e9f98f0d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage. @@ -54,4 +55,20 @@ T create(Properties properties, OrderedScheduler scheduler) throws IOException; + /** + * Create a ledger offloader with the provided configuration, offload policies, user-metadata and scheduler. + * + * @param properties service configuration + * @param offloadPolicies the namespace offload policies + * @param userMetadata user metadata + * @param scheduler scheduler + * @return the offloader instance + * @throws IOException when fail to create an offloader + */ + T create(Properties properties, + OffloadPolicies offloadPolicies, + Map userMetadata, + OrderedScheduler scheduler) + throws IOException; + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index fd169fe1d1d59..23c9df102e4d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -85,6 +86,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; +import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.broker.web.WebService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; @@ -93,14 +95,8 @@ import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; -import org.apache.pulsar.common.naming.NamedEntity; -import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.naming.*; +import org.apache.pulsar.common.policies.data.*; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.compaction.Compactor; @@ -130,6 +126,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.core.Response; + /** * Main class for Pulsar broker service */ @@ -162,7 +160,8 @@ public class PulsarService implements AutoCloseable { private ScheduledExecutorService compactorExecutor; private OrderedScheduler offloaderScheduler; private Offloaders offloaderManager = new Offloaders(); - private LedgerOffloader offloader; + private LedgerOffloader defaultOffloader; + private Map namespacesOffloaders; private ScheduledFuture loadReportTask = null; private ScheduledFuture loadSheddingTask = null; private ScheduledFuture loadResourceQuotaTask = null; @@ -398,7 +397,14 @@ public void start() throws PulsarServerException { // Start load management service (even if load balancing is disabled) this.loadManager.set(LoadManager.create(this)); - this.offloader = createManagedLedgerOffloader(this.getConfiguration()); + // Start the leader election service + startLeaderElectionService(); + + // needs load management service + this.startNamespaceService(); + + this.defaultOffloader = createManagedLedgerOffloader(this.getConfiguration()); + this.namespacesOffloaders = createNamespacesOffloaders(); brokerService.start(); @@ -560,7 +566,7 @@ protected void acquireSLANamespace() { // Namespace not created hence no need to unload it String nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config); if (!this.globalZkCache.exists( - AdminResource.path(POLICIES) + "/" + nsName)) { + path(POLICIES) + "/" + nsName)) { LOG.info("SLA Namespace = {} doesn't exist.", nsName); return; } @@ -761,7 +767,11 @@ public ManagedLedgerClientFactory getManagedLedgerClientFactory() { } public LedgerOffloader getManagedLedgerOffloader() { - return offloader; + return defaultOffloader; + } + + public LedgerOffloader getManagedLedgerOffloadForNamespace(String namespace) { + return namespacesOffloaders.get(namespace); } public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) @@ -794,6 +804,97 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur } } + public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf, OffloadPolicies offloadPolicies) + throws PulsarServerException { + try { + if (StringUtils.isNotBlank(offloadPolicies.getDriver())) { + checkNotNull(conf.getOffloadersDirectory(), + "Offloader driver is configured to be '%s' but no offloaders directory is configured.", + conf.getManagedLedgerOffloadDriver()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory()); + LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(offloadPolicies.getDriver()); + try { + return offloaderFactory.create( + conf.getProperties(), + offloadPolicies, + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + getOffloaderScheduler(conf)); + } catch (IOException ioe) { + throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); + } + } else { + LOG.info("No ledger offloader configured, using NULL instance"); + return NullLedgerOffloader.INSTANCE; + } + } catch (Throwable t) { + throw new PulsarServerException(t); + } + } + + public Map createNamespacesOffloaders() { + try { + List tenants = this.getZkClient().getChildren(path(POLICIES), false); + tenants.sort(null); + Map namespacesOffloaders = Maps.newHashMap(); + for (String tenantName: tenants) { + // this will return a cluster in v1 and a namespace in v2 + for (String clusterOrNamespace : this.getZkClient().getChildren(path(POLICIES, tenantName), false)) { + // Then get the list of namespaces + try { + final List children = this.getZkClient().getChildren(path(POLICIES, tenantName, clusterOrNamespace), false); + if (children == null || children.isEmpty()) { + NamespaceName namespaceName = NamespaceName.get(tenantName, clusterOrNamespace); + // if the length is 0 then this is probably a leftover cluster from namespace created + // with the v1 admin format (prop/cluster/ns) and then deleted, so no need to add it to the list + if (this.getZkClient().getData(path(POLICIES, namespaceName.toString()), false, null).length != 0) { + namespacesOffloaders.put(namespaceName, createManagedLedgerOffloader(this.config, getNamespacePolicies(namespaceName).offload_policies)); + } + } else { + children.forEach(ns -> { + NamespaceName namespaceName = NamespaceName.get(tenantName, clusterOrNamespace, ns); + try { + namespacesOffloaders.put(namespaceName, createManagedLedgerOffloader(this.config, getNamespacePolicies(namespaceName).offload_policies)); + } catch (PulsarServerException e) { + LOG.error("Error during create managedLedgerOffloader for {}", namespaceName, e); + } + }); + } + } catch (KeeperException.NoNodeException e) { + // A cluster was deleted between the 2 getChildren() calls, ignoring + } + } + } + return namespacesOffloaders; + } catch (Exception e) { + LOG.error("Failed to get tenants list", e); + throw new RestException(e); + } + } + + protected Policies getNamespacePolicies(NamespaceName namespaceName) { + try { + final String namespace = namespaceName.toString(); + final String policyPath = path(POLICIES, namespace); + Policies policies = this.configurationCacheService.policiesCache ().get(policyPath) + .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + // fetch bundles from LocalZK-policies + NamespaceBundles bundles = this.getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName); + BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles); + policies.bundles = bundleData != null ? bundleData : policies.bundles; + + return policies; + } catch (RestException re) { + throw re; + } catch (Exception e) { + LOG.error("Failed to get namespace policies {}", namespaceName, e); + throw new RestException(e); + } + } + public ZooKeeperCache getLocalZkCache() { return localZkCache; } @@ -1028,7 +1129,7 @@ private void startWorkerService(AuthenticationService authenticationService, try { NamedEntity.checkName(property); this.getGlobalZkCache().getZooKeeper().create( - AdminResource.path(POLICIES, property), + path(POLICIES, property), ObjectMapperFactory.getThreadLocal().writeValueAsBytes( new TenantInfo( Sets.newHashSet(config.getSuperUserRoles()), @@ -1051,7 +1152,7 @@ private void startWorkerService(AuthenticationService authenticationService, ClusterData clusterData = new ClusterData(this.getSafeWebServiceAddress(), null /* serviceUrlTls */, brokerServiceUrl, null /* brokerServiceUrlTls */); this.getGlobalZkCache().getZooKeeper().create( - AdminResource.path("clusters", cluster), + path("clusters", cluster), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); LOG.info("Created cluster {} for function worker", cluster); @@ -1073,9 +1174,9 @@ private void startWorkerService(AuthenticationService authenticationService, int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles(); policies.bundles = getBundles(defaultNumberOfBundles); - this.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES, namespace)); + this.getConfigurationCache().policiesCache().invalidate(path(POLICIES, namespace)); ZkUtils.createFullPathOptimistic(this.getGlobalZkCache().getZooKeeper(), - AdminResource.path(POLICIES, namespace), + path(POLICIES, namespace), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java index 899887b6f8fbf..e1dbb8ab06fb0 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem; import lombok.Data; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import java.io.Serializable; import java.lang.reflect.Field; @@ -65,4 +66,29 @@ public static FileSystemConfigurationData create(Properties properties) { }); return data; } + + /** + * Create a tiered storage configuration from the provided properties and offloadPolicies. + * + * @param properties the configuration properties + * @param offloadPolicies the offload policies + * @return tiered storage configuration + */ + public static FileSystemConfigurationData create(Properties properties, OffloadPolicies offloadPolicies) { + // TODO: replace default conf by policies one + FileSystemConfigurationData data = new FileSystemConfigurationData(); + Field[] fields = FileSystemConfigurationData.class.getDeclaredFields(); + Arrays.stream(fields).forEach(f -> { + if (properties.containsKey(f.getName())) { + try { + f.setAccessible(true); + f.set(data, value((String) properties.get(f.getName()), f)); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s", + f.getName(), properties.get(f.getName())), e); + } + } + }); + return data; + } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java index cd52197a59bb5..d6043255f5641 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java @@ -21,6 +21,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import java.io.IOException; import java.util.Map; @@ -37,4 +38,10 @@ public FileSystemManagedLedgerOffloader create(Properties properties, Map userMetadata, OrderedScheduler scheduler) throws IOException { + FileSystemConfigurationData data = FileSystemConfigurationData.create(properties, offloadPolicies); + return FileSystemManagedLedgerOffloader.create(data, scheduler); + } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 309076c4c3605..1fbc68947b489 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java index dffe253f94a98..7da2ec8c7c0e4 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * A jcloud based offloader factory. @@ -48,4 +49,13 @@ public BlobStoreManagedLedgerOffloader create(Properties properties, TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties); return BlobStoreManagedLedgerOffloader.create(data, userMetadata, scheduler); } + + @Override + public BlobStoreManagedLedgerOffloader create(Properties properties, + OffloadPolicies offloadPolicies, + Map userMetadata, + OrderedScheduler scheduler) throws IOException { + TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties, offloadPolicies); + return BlobStoreManagedLedgerOffloader.create(data, userMetadata, scheduler); + } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java index a4c5cf4fa8b2e..b3e8ee70f7147 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Properties; import lombok.Data; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Configuration for tiered storage. @@ -118,4 +119,29 @@ public static TieredStorageConfigurationData create(Properties properties) { return data; } + /** + * Create a tiered storage configuration from the provided properties and offloadPolicies. + * + * @param properties the configuration properties + * @param offloadPolicies the offload policies of the current namespace + * @return tiered storage configuration + */ + public static TieredStorageConfigurationData create(Properties properties, OffloadPolicies offloadPolicies) { + // TODO: replace default conf by policies one + TieredStorageConfigurationData data = new TieredStorageConfigurationData(); + Field[] fields = TieredStorageConfigurationData.class.getDeclaredFields(); + Arrays.stream(fields).forEach(f -> { + if (properties.containsKey(f.getName())) { + try { + f.setAccessible(true); + f.set(data, value((String) properties.get(f.getName()), f)); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s", + f.getName(), properties.get(f.getName())), e); + } + } + }); + return data; + } + } From 6671cce9442c01a67f356af69f2a4c6d8aee478f Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Mon, 29 Jul 2019 19:27:06 +0200 Subject: [PATCH 07/11] make it generic: replace old offloaderfactory method by new one with offloadPolicies parameter --- .../bookkeeper/mledger/LedgerOffloaderFactory.java | 14 -------------- .../org/apache/pulsar/broker/PulsarService.java | 1 + .../pulsar/sql/presto/PulsarConnectorCache.java | 13 +++++++------ .../FileSystemLedgerOffloaderFactory.java | 6 ------ .../jcloud/JCloudLedgerOffloaderFactory.java | 8 -------- 5 files changed, 8 insertions(+), 34 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java index 22250e9f98f0d..c6956659e2818 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java @@ -41,20 +41,6 @@ public interface LedgerOffloaderFactory { */ boolean isDriverSupported(String driverName); - /** - * Create a ledger offloader with the provided configuration, user-metadata and scheduler. - * - * @param properties service configuration - * @param userMetadata user metadata - * @param scheduler scheduler - * @return the offloader instance - * @throws IOException when fail to create an offloader - */ - T create(Properties properties, - Map userMetadata, - OrderedScheduler scheduler) - throws IOException; - /** * Create a ledger offloader with the provided configuration, offload policies, user-metadata and scheduler. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 23c9df102e4d5..7edb3bb94879d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -787,6 +787,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur try { return offloaderFactory.create( conf.getProperties(), + null, ImmutableMap.of( LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 1af86a2225a06..abc2cee50f890 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -142,12 +142,13 @@ private LedgerOffloader initManagedLedgerOffloader(PulsarConnectorConfig conf) { try { return offloaderFactory.create( - PulsarConnectorUtils.getProperties(offloaderProperties), - ImmutableMap.of( - LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), - LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() - ), - getOffloaderScheduler(conf)); + PulsarConnectorUtils.getProperties(offloaderProperties), + null, + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + getOffloaderScheduler(conf)); } catch (IOException ioe) { log.error("Failed to create offloader: ", ioe); throw new RuntimeException(ioe.getMessage(), ioe.getCause()); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java index d6043255f5641..ebd0dbde50c97 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java @@ -33,12 +33,6 @@ public boolean isDriverSupported(String driverName) { return FileSystemManagedLedgerOffloader.driverSupported(driverName); } - @Override - public FileSystemManagedLedgerOffloader create(Properties properties, Map userMetadata, OrderedScheduler scheduler) throws IOException { - FileSystemConfigurationData data = FileSystemConfigurationData.create(properties); - return FileSystemManagedLedgerOffloader.create(data, scheduler); - } - @Override public FileSystemManagedLedgerOffloader create(Properties properties, OffloadPolicies offloadPolicies, Map userMetadata, OrderedScheduler scheduler) throws IOException { FileSystemConfigurationData data = FileSystemConfigurationData.create(properties, offloadPolicies); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java index 7da2ec8c7c0e4..7f561a42517f8 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java @@ -42,14 +42,6 @@ public boolean isDriverSupported(String driverName) { return BlobStoreManagedLedgerOffloader.driverSupported(driverName); } - @Override - public BlobStoreManagedLedgerOffloader create(Properties properties, - Map userMetadata, - OrderedScheduler scheduler) throws IOException { - TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties); - return BlobStoreManagedLedgerOffloader.create(data, userMetadata, scheduler); - } - @Override public BlobStoreManagedLedgerOffloader create(Properties properties, OffloadPolicies offloadPolicies, From 3250579ccd6857457b8640c2536ca9c1d68fe7db Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Mon, 29 Jul 2019 19:33:01 +0200 Subject: [PATCH 08/11] remove api v1 management --- .../apache/pulsar/broker/PulsarService.java | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 7edb3bb94879d..0b70d3d385034 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -841,28 +841,17 @@ public Map createNamespacesOffloaders() { tenants.sort(null); Map namespacesOffloaders = Maps.newHashMap(); for (String tenantName: tenants) { - // this will return a cluster in v1 and a namespace in v2 - for (String clusterOrNamespace : this.getZkClient().getChildren(path(POLICIES, tenantName), false)) { - // Then get the list of namespaces + for (String namespace : this.getZkClient().getChildren(path(POLICIES, tenantName), false)) { try { - final List children = this.getZkClient().getChildren(path(POLICIES, tenantName, clusterOrNamespace), false); - if (children == null || children.isEmpty()) { - NamespaceName namespaceName = NamespaceName.get(tenantName, clusterOrNamespace); - // if the length is 0 then this is probably a leftover cluster from namespace created - // with the v1 admin format (prop/cluster/ns) and then deleted, so no need to add it to the list - if (this.getZkClient().getData(path(POLICIES, namespaceName.toString()), false, null).length != 0) { + final List children = this.getZkClient().getChildren(path(POLICIES, tenantName, namespace), false); + children.forEach(ns -> { + NamespaceName namespaceName = NamespaceName.get(tenantName, namespace, ns); + try { namespacesOffloaders.put(namespaceName, createManagedLedgerOffloader(this.config, getNamespacePolicies(namespaceName).offload_policies)); + } catch (PulsarServerException e) { + LOG.error("Error during create managedLedgerOffloader for {}", namespaceName, e); } - } else { - children.forEach(ns -> { - NamespaceName namespaceName = NamespaceName.get(tenantName, clusterOrNamespace, ns); - try { - namespacesOffloaders.put(namespaceName, createManagedLedgerOffloader(this.config, getNamespacePolicies(namespaceName).offload_policies)); - } catch (PulsarServerException e) { - LOG.error("Error during create managedLedgerOffloader for {}", namespaceName, e); - } - }); - } + }); } catch (KeeperException.NoNodeException e) { // A cluster was deleted between the 2 getChildren() calls, ignoring } From 0aa1a37800fe03fb85b17a4a7ecdbf46ae982028 Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Tue, 30 Jul 2019 09:25:19 +0200 Subject: [PATCH 09/11] lazy load ledgeroffloaders --- .../pulsar/broker/ServiceConfiguration.java | 26 ++++++++- .../apache/pulsar/broker/PulsarService.java | 36 +----------- .../pulsar/broker/service/BrokerService.java | 55 +++++++++++++++++-- .../broker/admin/AdminApiOffloadTest.java | 2 - .../common/policies/data/OffloadPolicies.java | 8 +++ 5 files changed, 86 insertions(+), 41 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 79d4da53bd7d8..2f5d9316760a6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1352,7 +1352,31 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_STORAGE_OFFLOADING, doc = "Driver to use to offload old data to long term storage" ) - private String managedLedgerOffloadDriver = null; + private String defaultOffloadDriver = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Endpoint to use to offload old data to long term storage" + ) + private String defaultOffloadEndpoint = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Bucket to use to offload old data to long term storage" + ) + private String defaultOffloadBucket = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Max block size in bytes to use to offload old data to long term storage" + ) + private Long defaultOffloadMaxBlockSizeInBytes = null; + + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Read buffer size in bytes to use to offload old data to long term storage" + ) + private Long defaultOffloadReadBufferSizeInBytes = null; @FieldContext( category = CATEGORY_STORAGE_OFFLOADING, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0b70d3d385034..96391761e0012 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -160,8 +160,6 @@ public class PulsarService implements AutoCloseable { private ScheduledExecutorService compactorExecutor; private OrderedScheduler offloaderScheduler; private Offloaders offloaderManager = new Offloaders(); - private LedgerOffloader defaultOffloader; - private Map namespacesOffloaders; private ScheduledFuture loadReportTask = null; private ScheduledFuture loadSheddingTask = null; private ScheduledFuture loadResourceQuotaTask = null; @@ -403,9 +401,6 @@ public void start() throws PulsarServerException { // needs load management service this.startNamespaceService(); - this.defaultOffloader = createManagedLedgerOffloader(this.getConfiguration()); - this.namespacesOffloaders = createNamespacesOffloaders(); - brokerService.start(); this.webService = new WebService(this); @@ -835,33 +830,8 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfigur } } - public Map createNamespacesOffloaders() { - try { - List tenants = this.getZkClient().getChildren(path(POLICIES), false); - tenants.sort(null); - Map namespacesOffloaders = Maps.newHashMap(); - for (String tenantName: tenants) { - for (String namespace : this.getZkClient().getChildren(path(POLICIES, tenantName), false)) { - try { - final List children = this.getZkClient().getChildren(path(POLICIES, tenantName, namespace), false); - children.forEach(ns -> { - NamespaceName namespaceName = NamespaceName.get(tenantName, namespace, ns); - try { - namespacesOffloaders.put(namespaceName, createManagedLedgerOffloader(this.config, getNamespacePolicies(namespaceName).offload_policies)); - } catch (PulsarServerException e) { - LOG.error("Error during create managedLedgerOffloader for {}", namespaceName, e); - } - }); - } catch (KeeperException.NoNodeException e) { - // A cluster was deleted between the 2 getChildren() calls, ignoring - } - } - } - return namespacesOffloaders; - } catch (Exception e) { - LOG.error("Failed to get tenants list", e); - throw new RestException(e); - } + public Offloaders getOffloaderManager() { + return offloaderManager; } protected Policies getNamespacePolicies(NamespaceName namespaceName) { @@ -949,7 +919,7 @@ public synchronized Compactor getCompactor() throws PulsarServerException { return this.compactor; } - protected synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) { + public synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() .numThreads(conf.getManagedLedgerOffloadMaxThreads()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index eb8a05fd60ad6..166447225b38b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -27,6 +27,7 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; @@ -69,15 +70,15 @@ import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.*; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; -import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.bookkeeper.mledger.offload.OffloaderUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -121,6 +122,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies;; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.Policies; @@ -939,6 +941,14 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.getDefaultRetentionSizeInMB()) ); + OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElseGet( + () -> new OffloadPolicies(serviceConfig.getDefaultOffloadDriver(), + serviceConfig.getDefaultOffloadEndpoint(), + serviceConfig.getDefaultOffloadBucket(), + serviceConfig.getDefaultOffloadMaxBlockSizeInBytes(), + serviceConfig.getDefaultOffloadReadBufferSizeInBytes()) + ); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); @@ -981,7 +991,6 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); - managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader()); policies.ifPresent(p -> { long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs(); if (p.offload_deletion_lag_ms != null) { @@ -995,12 +1004,48 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(bytes); }); + + try { + managedLedgerConfig.setLedgerOffloader(createManagedLedgerOffloader(serviceConfig, offloadPolicies)); + } catch (PulsarServerException e) { + log.error("Can't create managed ledger offloader for {} due to {}.", namespace, e); + future.completeExceptionally(e); + } future.complete(managedLedgerConfig); }, (exception) -> future.completeExceptionally(exception))); return future; } + private synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf, OffloadPolicies offloadPolicies) + throws PulsarServerException { + try { + if (StringUtils.isNotBlank(offloadPolicies.getDriver())) { + checkNotNull(conf.getOffloadersDirectory(), + "Offloader driver is configured to be '%s' but no offloaders directory is configured.", + conf.getDefaultOffloadDriver()); + LedgerOffloaderFactory offloaderFactory = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory()).getOffloaderFactory(offloadPolicies.getDriver()); + try { + return offloaderFactory.create( + conf.getProperties(), + offloadPolicies, + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + pulsar.getOffloaderScheduler(conf)); + } catch (IOException ioe) { + throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); + } + } else { + log.info("No ledger offloader configured, using NULL instance"); + return NullLedgerOffloader.INSTANCE; + } + } catch (Throwable t) { + throw new PulsarServerException(t); + } + } + private void addTopicToStatsMaps(TopicName topicName, Topic topic) { try { NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getBundle(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 7f56a3fc02e9b..589640b5e3972 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -71,8 +71,6 @@ private void testOffload(String topicName, String mlName) throws Exception { LedgerOffloader offloader = mock(LedgerOffloader.class); when(offloader.getOffloadDriverName()).thenReturn("mock"); - doReturn(offloader).when(pulsar).getManagedLedgerOffloader(); - CompletableFuture promise = new CompletableFuture<>(); doReturn(promise).when(offloader).offload(any(), any(), any()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 2a6d350cfa003..f12f6cd634714 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -18,8 +18,16 @@ */ package org.apache.pulsar.common.policies.data; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.common.naming.NamespaceName; + +import java.io.IOException; import java.util.Objects; +import static com.google.common.base.Preconditions.checkNotNull; + /** */ public class OffloadPolicies { From 06e45849efaa9bd85c519a7e76246444eb97e7d8 Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Mon, 12 Aug 2019 14:15:16 +0200 Subject: [PATCH 10/11] remove star imports --- .../org/apache/pulsar/broker/PulsarService.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 96391761e0012..bd9ccc6f8b7ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -95,8 +95,18 @@ import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; -import org.apache.pulsar.common.naming.*; -import org.apache.pulsar.common.policies.data.*; +import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundles; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.compaction.Compactor; From 0f8e0571acca0ee0c42d5ca32ccde7ad37711028 Mon Sep 17 00:00:00 2001 From: Alexandre DUVAL Date: Fri, 10 Jan 2020 18:29:57 +0100 Subject: [PATCH 11/11] fix --- .../apache/pulsar/broker/PulsarService.java | 2 + .../broker/admin/impl/NamespacesBase.java | 6 ++- .../pulsar/broker/service/BrokerService.java | 2 +- .../client/admin/internal/NamespacesImpl.java | 3 ++ .../common/policies/data/OffloadPolicies.java | 43 +++++++++---------- .../pulsar/common/policies/data/Policies.java | 8 ++-- 6 files changed, 34 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index bd9ccc6f8b7ae..e750ec09a3f1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -160,6 +160,8 @@ public class PulsarService implements AutoCloseable { private GlobalZooKeeperCache globalZkCache; private LocalZooKeeperConnectionService localZooKeeperConnectionProvider; private Compactor compactor; + private LedgerOffloader defaultOffloader; + private Map namespacesOffloaders; private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20, new DefaultThreadFactory("pulsar")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index dee849754571a..92f6d5b9fb5ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -68,6 +68,8 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; @@ -2150,11 +2152,11 @@ protected void internalSetOffloadPolicies(OffloadPolicies offload) { final String path = path(POLICIES, namespaceName.toString()); byte[] content = globalZk().getData(path, null, nodeStat); Policies policies = jsonMapper().readValue(content, Policies.class); - policies.offload_policies = offload; + policies.offloadPolicies = offload; globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(), - namespaceName, jsonMapper().writeValueAsString(policies.offload_policies)); + namespaceName, jsonMapper().writeValueAsString(policies.offloadPolicies)); } catch (KeeperException.NoNodeException e) { log.warn("[{}] Failed to update offload configuration for namespace {}: does not exist", clientAppId(), namespaceName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 166447225b38b..29ac79096b445 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -941,7 +941,7 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.getDefaultRetentionSizeInMB()) ); - OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElseGet( + OffloadPolicies offloadPolicies = policies.map(p -> p.offloadPolicies).orElseGet( () -> new OffloadPolicies(serviceConfig.getDefaultOffloadDriver(), serviceConfig.getDefaultOffloadEndpoint(), serviceConfig.getDefaultOffloadBucket(), diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index d5ca5a2bcfd67..d50378adb9c5c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -36,7 +36,10 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.ErrorData; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index f12f6cd634714..f7b38580a1e61 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -18,16 +18,8 @@ */ package org.apache.pulsar.common.policies.data; -import com.google.common.collect.ImmutableMap; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.PulsarVersion; -import org.apache.pulsar.common.naming.NamespaceName; - -import java.io.IOException; import java.util.Objects; -import static com.google.common.base.Preconditions.checkNotNull; - /** */ public class OffloadPolicies { @@ -37,7 +29,8 @@ public class OffloadPolicies { private long maxBlockSizeInBytes; private long readBufferSizeInBytes; - public OffloadPolicies(String driver, String endpoint, String bucket, long maxBlockSizeInBytes, long readBufferSizeInBytes) { + public OffloadPolicies(String driver, String endpoint, String bucket, + long maxBlockSizeInBytes, long readBufferSizeInBytes) { this.driver = driver; this.endpoint = endpoint; this.bucket = bucket; @@ -67,14 +60,18 @@ public long getReadBufferSizeInBytes() { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } OffloadPolicies that = (OffloadPolicies) o; - return maxBlockSizeInBytes == that.maxBlockSizeInBytes && - readBufferSizeInBytes == that.readBufferSizeInBytes && - Objects.equals(driver, that.driver) && - Objects.equals(endpoint, that.endpoint) && - Objects.equals(bucket, that.bucket); + return maxBlockSizeInBytes == that.maxBlockSizeInBytes + && readBufferSizeInBytes == that.readBufferSizeInBytes + && Objects.equals(driver, that.driver) + && Objects.equals(endpoint, that.endpoint) + && Objects.equals(bucket, that.bucket); } @Override @@ -84,12 +81,12 @@ public int hashCode() { @Override public String toString() { - return "OffloadPolicies{" + - "driver=" + driver + - ", endpoint='" + endpoint + '\'' + - ", bucket='" + bucket + '\'' + - ", maxBlockSizeInBytes=" + maxBlockSizeInBytes + - ", readBufferSizeInBytes=" + readBufferSizeInBytes + - '}'; + return "OffloadPolicies{" + + "driver=" + driver + + ", endpoint='" + endpoint + '\'' + + ", bucket='" + bucket + '\'' + + ", maxBlockSizeInBytes=" + maxBlockSizeInBytes + + ", readBufferSizeInBytes=" + readBufferSizeInBytes + + '}'; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 56c729370b5d8..bb70b285d08b1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -79,7 +79,7 @@ public class Policies { public long offload_threshold = -1; @SuppressWarnings("checkstyle:MemberName") public Long offload_deletion_lag_ms = null; - public OffloadPolicies offload_policies = null; + public OffloadPolicies offloadPolicies = null; @SuppressWarnings("checkstyle:MemberName") @Deprecated @@ -108,7 +108,7 @@ public int hashCode() { max_consumers_per_topic, max_consumers_per_subscription, compaction_threshold, offload_threshold, offload_deletion_lag_ms, - offload_policies, + offloadPolicies, schema_auto_update_compatibility_strategy, schema_validation_enforced, schema_compatibility_strategy, @@ -142,7 +142,7 @@ public boolean equals(Object obj) { && compaction_threshold == other.compaction_threshold && offload_threshold == other.offload_threshold && offload_deletion_lag_ms == other.offload_deletion_lag_ms - && offload_policies == other.offload_policies + && offloadPolicies == other.offloadPolicies && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy && schema_validation_enforced == other.schema_validation_enforced && schema_compatibility_strategy == other.schema_compatibility_strategy @@ -191,7 +191,7 @@ public String toString() { .add("compaction_threshold", compaction_threshold) .add("offload_threshold", offload_threshold) .add("offload_deletion_lag_ms", offload_deletion_lag_ms) - .add("offload_policies", offload_policies) + .add("offloadPolicies", offloadPolicies) .add("schema_auto_update_compatibility_strategy", schema_auto_update_compatibility_strategy) .add("schema_validation_enforced", schema_validation_enforced) .add("schema_compatibility_Strategy", schema_compatibility_strategy)