From 456f5d231dffb3389012dfd96526d317855fb4d8 Mon Sep 17 00:00:00 2001 From: wangyufan Date: Thu, 17 Dec 2020 15:20:02 +0800 Subject: [PATCH 1/5] support tiered-storage provider by aliyun OSS (#8887) --- .../common/policies/data/OffloadPolicies.java | 4 +- .../provider/JCloudBlobStoreProvider.java | 47 ++++++++++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 422bf24875e36..4c5058b88cf5e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -60,7 +60,9 @@ public class OffloadPolicies implements Serializable { public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2; public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; - public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"}; + public final static String[] DRIVER_NAMES = { + "S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss" + }; public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null; public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java index 9d0871e1d9d66..c6ebbfd27c451 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Properties; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -57,6 +58,8 @@ import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata; import org.jclouds.providers.AnonymousProviderMetadata; import org.jclouds.providers.ProviderMetadata; +import org.jclouds.s3.S3ApiMetadata; +import org.jclouds.s3.reference.S3Constants; /** * Enumeration of the supported JCloud Blob Store Providers. @@ -162,6 +165,48 @@ public void buildCredentials(TieredStorageConfiguration config) { } }, + + /** + * Aliyun OSS is compatible with the S3 API + * https://www.alibabacloud.com/help/doc-detail/64919.htm + */ + ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) { + @Override + public void validate(TieredStorageConfiguration config) throws IllegalArgumentException { + VALIDATION.validate(config); + } + + @Override + public BlobStore getBlobStore(TieredStorageConfiguration config) { + ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata()); + Properties overrides = config.getOverrides(); + // For security reasons, OSS supports only virtual hosted style access. + overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true"); + contextBuilder.overrides(overrides); + + if (StringUtils.isNotEmpty(config.getServiceEndpoint())) { + contextBuilder.endpoint(config.getServiceEndpoint()); + } + + if (config.getProviderCredentials() != null) { + return contextBuilder + .credentialsSupplier(config.getCredentials()) + .buildView(BlobStoreContext.class) + .getBlobStore(); + } else { + log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket()); + return contextBuilder + .buildView(BlobStoreContext.class) + .getBlobStore(); + } + } + + @Override + public void buildCredentials(TieredStorageConfiguration config) { + AWS_CREDENTIAL_BUILDER.buildCredentials(config); + } + }, + TRANSIENT("transient", new AnonymousProviderMetadata(new TransientApiMetadata(), "")) { @Override public void validate(TieredStorageConfiguration config) throws IllegalArgumentException { @@ -177,7 +222,7 @@ public BlobStore getBlobStore(TieredStorageConfiguration config) { ContextBuilder builder = ContextBuilder.newBuilder("transient"); BlobStoreContext ctx = builder .buildView(BlobStoreContext.class); - + BlobStore bs = ctx.getBlobStore(); if (!bs.containerExists(config.getBucket())) { From 5f7000433ad9ae0401e646be64f05ab67032640d Mon Sep 17 00:00:00 2001 From: wangyufan Date: Fri, 18 Dec 2020 16:44:23 +0800 Subject: [PATCH 2/5] support tiered-storage provider by aliyun OSS (#8887) --- .../provider/JCloudBlobStoreProvider.java | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java index c6ebbfd27c451..6fc64f35169e6 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java @@ -178,27 +178,7 @@ public void validate(TieredStorageConfiguration config) throws IllegalArgumentEx @Override public BlobStore getBlobStore(TieredStorageConfiguration config) { - ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata()); - Properties overrides = config.getOverrides(); - // For security reasons, OSS supports only virtual hosted style access. - overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true"); - contextBuilder.overrides(overrides); - - if (StringUtils.isNotEmpty(config.getServiceEndpoint())) { - contextBuilder.endpoint(config.getServiceEndpoint()); - } - - if (config.getProviderCredentials() != null) { - return contextBuilder - .credentialsSupplier(config.getCredentials()) - .buildView(BlobStoreContext.class) - .getBlobStore(); - } else { - log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket()); - return contextBuilder - .buildView(BlobStoreContext.class) - .getBlobStore(); - } + return OSS_BLOB_STORE_BUILDER.getBlobStore(config); } @Override @@ -357,4 +337,29 @@ public ProviderMetadata getProviderMetadata() { } } }; + + static final BlobStoreBuilder OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> { + ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata()); + Properties overrides = config.getOverrides(); + // For security reasons, OSS supports only virtual hosted style access. + overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true"); + contextBuilder.overrides(overrides); + + if (StringUtils.isNotEmpty(config.getServiceEndpoint())) { + contextBuilder.endpoint(config.getServiceEndpoint()); + } + + if (config.getProviderCredentials() != null) { + return contextBuilder + .credentialsSupplier(config.getCredentials()) + .buildView(BlobStoreContext.class) + .getBlobStore(); + } else { + log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket()); + return contextBuilder + .buildView(BlobStoreContext.class) + .getBlobStore(); + } + }; + } \ No newline at end of file From 2aa5a0083079e97aec2fa18b3eac4a30b1953e7c Mon Sep 17 00:00:00 2001 From: wangyufan Date: Sun, 20 Dec 2020 21:30:33 +0800 Subject: [PATCH 3/5] add new validator by aliyun-oss --- .../provider/JCloudBlobStoreProvider.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java index 6fc64f35169e6..5a7c1f23dc552 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java @@ -173,7 +173,21 @@ public void buildCredentials(TieredStorageConfiguration config) { ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) { @Override public void validate(TieredStorageConfiguration config) throws IllegalArgumentException { - VALIDATION.validate(config); + if (Strings.isNullOrEmpty(config.getServiceEndpoint())) { + throw new IllegalArgumentException( + "ServiceEndpoint must specified for " + config.getDriver() + " offload"); + } + + if (Strings.isNullOrEmpty(config.getBucket())) { + throw new IllegalArgumentException( + "Bucket cannot be empty for " + config.getDriver() + " offload"); + } + + if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) { + throw new IllegalArgumentException( + "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for " + + config.getDriver() + " offload"); + } } @Override @@ -344,10 +358,7 @@ public ProviderMetadata getProviderMetadata() { // For security reasons, OSS supports only virtual hosted style access. overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true"); contextBuilder.overrides(overrides); - - if (StringUtils.isNotEmpty(config.getServiceEndpoint())) { - contextBuilder.endpoint(config.getServiceEndpoint()); - } + contextBuilder.endpoint(config.getServiceEndpoint()); if (config.getProviderCredentials() != null) { return contextBuilder From cae7456e45c41fb8eabb4934e7e11499f9e43ead Mon Sep 17 00:00:00 2001 From: wangyufan Date: Mon, 21 Dec 2020 12:10:11 +0800 Subject: [PATCH 4/5] add environment variable by aliyun-oss --- .../jcloud/provider/JCloudBlobStoreProvider.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java index 5a7c1f23dc552..a8820061acfde 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java @@ -197,7 +197,7 @@ public BlobStore getBlobStore(TieredStorageConfiguration config) { @Override public void buildCredentials(TieredStorageConfiguration config) { - AWS_CREDENTIAL_BUILDER.buildCredentials(config); + ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config); } }, @@ -373,4 +373,18 @@ public ProviderMetadata getProviderMetadata() { } }; + static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> { + String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID"); + if (StringUtils.isEmpty(accountName)) { + throw new IllegalArgumentException("Couldn't get the aliyun oss access key id."); + } + String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET"); + if (StringUtils.isEmpty(accountKey)) { + throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret."); + } + Credentials credentials = new Credentials( + accountName, accountKey); + config.setProviderCredentials(() -> credentials); + }; + } \ No newline at end of file From f7528903fb5b39ca2d921c42d5f3fbd652a5d938 Mon Sep 17 00:00:00 2001 From: wangyufan Date: Mon, 21 Dec 2020 12:11:11 +0800 Subject: [PATCH 5/5] extract constant --- .../provider/JCloudBlobStoreProvider.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java index a8820061acfde..ba7065e5da390 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java @@ -173,26 +173,12 @@ public void buildCredentials(TieredStorageConfiguration config) { ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) { @Override public void validate(TieredStorageConfiguration config) throws IllegalArgumentException { - if (Strings.isNullOrEmpty(config.getServiceEndpoint())) { - throw new IllegalArgumentException( - "ServiceEndpoint must specified for " + config.getDriver() + " offload"); - } - - if (Strings.isNullOrEmpty(config.getBucket())) { - throw new IllegalArgumentException( - "Bucket cannot be empty for " + config.getDriver() + " offload"); - } - - if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) { - throw new IllegalArgumentException( - "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for " - + config.getDriver() + " offload"); - } + ALIYUN_OSS_VALIDATION.validate(config); } @Override public BlobStore getBlobStore(TieredStorageConfiguration config) { - return OSS_BLOB_STORE_BUILDER.getBlobStore(config); + return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config); } @Override @@ -352,7 +338,7 @@ public ProviderMetadata getProviderMetadata() { } }; - static final BlobStoreBuilder OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> { + static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> { ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata()); Properties overrides = config.getOverrides(); // For security reasons, OSS supports only virtual hosted style access. @@ -373,6 +359,24 @@ public ProviderMetadata getProviderMetadata() { } }; + static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> { + if (Strings.isNullOrEmpty(config.getServiceEndpoint())) { + throw new IllegalArgumentException( + "ServiceEndpoint must specified for " + config.getDriver() + " offload"); + } + + if (Strings.isNullOrEmpty(config.getBucket())) { + throw new IllegalArgumentException( + "Bucket cannot be empty for " + config.getDriver() + " offload"); + } + + if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) { + throw new IllegalArgumentException( + "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for " + + config.getDriver() + " offload"); + } + }; + static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> { String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID"); if (StringUtils.isEmpty(accountName)) {