From f6cec35fc4772849a0b84dcaf57d1ee5af8f807e Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Fri, 19 Mar 2021 14:02:43 +0530 Subject: [PATCH 1/5] GCS lookup support --- .../extensions-core/lookups-cached-global.md | 4 +- .../google/GoogleDataSegmentPuller.java | 2 +- .../google/GoogleStorageDruidModule.java | 7 ++ .../GoogleTimestampVersionedDataFinder.java | 92 +++++++++++++++++++ ...oogleTimestampVersionedDataFinderTest.java | 87 ++++++++++++++++++ .../google/ObjectStorageIteratorTest.java | 4 +- 6 files changed, 191 insertions(+), 5 deletions(-) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java diff --git a/docs/development/extensions-core/lookups-cached-global.md b/docs/development/extensions-core/lookups-cached-global.md index 4872a0977c4a..014a5d26a205 100644 --- a/docs/development/extensions-core/lookups-cached-global.md +++ b/docs/development/extensions-core/lookups-cached-global.md @@ -214,12 +214,12 @@ The remapping values for each globally cached lookup can be specified by a JSON |Property|Description|Required|Default| |--------|-----------|--------|-------| |`pollPeriod`|Period between polling for updates|No|0 (only once)| -|`uri`|URI for the file of interest, specified as a file, hdfs, or s3 path|No|Use `uriPrefix`| +|`uri`|URI for the file of interest, specified as a file, hdfs, s3 or gs path|No|Use `uriPrefix`| |`uriPrefix`|A URI that specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`| |`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`| |`namespaceParseSpec`|How to interpret the data at the URI|Yes|| -One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), or S3 (s3://) location. HTTP location is not currently supported. +One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GS (gs://) location. HTTP location is not currently supported. The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data. diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java index dbc22bc5d872..7ed270a4152d 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java @@ -37,7 +37,7 @@ public class GoogleDataSegmentPuller implements URIDataPuller { private static final Logger LOG = new Logger(GoogleDataSegmentPuller.class); - private final GoogleStorage storage; + protected final GoogleStorage storage; @Inject public GoogleDataSegmentPuller(final GoogleStorage storage) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java index c30ce073579c..966f5349d08e 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java @@ -30,6 +30,8 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; +import com.google.inject.multibindings.MapBinder; +import org.apache.druid.data.SearchableVersionedDataFinder; import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory; import org.apache.druid.guice.Binders; @@ -43,6 +45,7 @@ public class GoogleStorageDruidModule implements DruidModule { static final String SCHEME = "google"; + static final String SCHEME_GS = "gs"; private static final Logger LOG = new Logger(GoogleStorageDruidModule.class); private static final String APPLICATION_NAME = "druid-google-extensions"; @@ -94,6 +97,10 @@ public void configure(Binder binder) Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", GoogleTaskLogsConfig.class); binder.bind(GoogleTaskLogs.class).in(LazySingleton.class); + MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class) + .addBinding(SCHEME_GS) + .to(GoogleTimestampVersionedDataFinder.class) + .in(LazySingleton.class); } @Provides diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java new file mode 100644 index 000000000000..d3e539882bb4 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.inject.Inject; +import org.apache.druid.data.SearchableVersionedDataFinder; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; +import java.util.regex.Pattern; + +public class GoogleTimestampVersionedDataFinder extends GoogleDataSegmentPuller + implements SearchableVersionedDataFinder +{ + private static final long MAX_LISTING_KEYS = 1000; + + @Inject + public GoogleTimestampVersionedDataFinder(final GoogleStorage storage) + { + super(storage); + } + + @Override + public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) + { + try { + long mostRecent = Long.MIN_VALUE; + URI latest = null; + final String bucket = descriptorBase.getAuthority(); + final String prefix = StringUtils.maybeRemoveLeadingSlash(descriptorBase.getPath()); + final Objects objects = storage.list(bucket).setPrefix(prefix).setMaxResults(MAX_LISTING_KEYS).execute(); + for (StorageObject storageObject : objects.getItems()) { + if (isDirectoryPlaceholder(storageObject)) { + continue; + } + // remove path prefix from file name + final CloudObjectLocation objectLocation = new CloudObjectLocation(storageObject.getBucket(), + storageObject.getName() + ); + final String keyString = StringUtils + .maybeRemoveLeadingSlash(storageObject.getName().substring(prefix.length())); + if (pattern != null && !pattern.matcher(keyString).matches()) { + continue; + } + final long latestModified = storageObject.getUpdated().getValue(); + if (latestModified >= mostRecent) { + mostRecent = latestModified; + latest = objectLocation.toUri(GoogleStorageDruidModule.SCHEME_GS); + } + } + return latest; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder} + * Copied to avoid creating dependency on s3 extensions + */ + private static boolean isDirectoryPlaceholder(final StorageObject storageObject) + { + // Recognize "standard" directory place-holder indications + if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) { + return true; + } + // Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension. + return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0; + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java new file mode 100644 index 000000000000..8b6a34c4b3c6 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.client.util.DateTime; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.util.regex.Pattern; + +public class GoogleTimestampVersionedDataFinderTest +{ + @Test + public void getLatestVersion() throws InterruptedException + { + String bucket = "bucket"; + String keyPrefix = "prefix/dir/0"; + + // object for directory prefix/dir/0/ + final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0); + storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1); + storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1); + // sleep before setting update time on next object + Thread.sleep(100); + storageObject3.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4); + Thread.sleep(100); + storageObject4.setUpdated(new DateTime(System.currentTimeMillis())); + final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); + + final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); + Pattern pattern = Pattern.compile("v.*"); + URI latest = finder.getLatestVersion(URI.create(StringUtils.format("gs://%s/%s", bucket, keyPrefix)), pattern); + URI expected = URI.create(StringUtils.format("gs://%s/%s", bucket, storageObject3.getName())); + Assert.assertEquals(expected, latest); + } + + @Test + public void getLatestVersionTrailingSlashKeyPrefix() throws InterruptedException + { + String bucket = "bucket"; + String keyPrefix = "prefix/dir/0/"; + + // object for directory prefix/dir/0/ + final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0); + storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1); + storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1); + // sleep before setting update time on next object + Thread.sleep(100); + storageObject3.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4); + Thread.sleep(100); + storageObject4.setUpdated(new DateTime(System.currentTimeMillis())); + final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); + + final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); + Pattern pattern = Pattern.compile("v.*"); + URI latest = finder.getLatestVersion(URI.create(StringUtils.format("gs://%s/%s", bucket, keyPrefix)), pattern); + URI expected = URI.create(StringUtils.format("gs://%s/%s", bucket, storageObject3.getName())); + Assert.assertEquals(expected, latest); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java index 4d1504f842c3..a0b2bd8f2973 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -194,7 +194,7 @@ private static void test( * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the * {@link ObjectStorageIterator} class. */ - private static GoogleStorage makeMockClient(final List storageObjects) + static GoogleStorage makeMockClient(final List storageObjects) { return new GoogleStorage(null) { @@ -286,7 +286,7 @@ private static MockList mockList(String bucket, List storageObjec return new MockStorage().mockList(bucket, storageObjects); } - private static StorageObject makeStorageObject(final String bucket, final String key, final long size) + static StorageObject makeStorageObject(final String bucket, final String key, final long size) { final StorageObject summary = new StorageObject(); summary.setBucket(bucket); From 67f4b6ccd41d7a91abf6a4cf6a9086a141cdcd6b Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Wed, 24 Mar 2021 21:09:23 +0530 Subject: [PATCH 2/5] checkstyle fix --- .../storage/google/GoogleTimestampVersionedDataFinder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java index d3e539882bb4..fdb75fea5496 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java @@ -71,7 +71,8 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) } } return latest; - } catch (IOException e) { + } + catch (IOException e) { throw new RuntimeException(e); } } From ca45a63354395b85237a011eb81cdef7ece7e7dd Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Fri, 26 Mar 2021 20:29:30 +0530 Subject: [PATCH 3/5] review comments --- .../extensions-core/lookups-cached-global.md | 2 +- .../input/google/GoogleCloudStorageEntity.java | 3 ++- .../google/GoogleCloudStorageInputSource.java | 5 ++--- .../google/GoogleStorageDruidModule.java | 4 ++-- .../GoogleTimestampVersionedDataFinder.java | 18 ++---------------- .../druid/storage/google/GoogleUtils.java | 18 ++++++++++++++++-- ...GoogleTimestampVersionedDataFinderTest.java | 18 ++++++------------ 7 files changed, 31 insertions(+), 37 deletions(-) diff --git a/docs/development/extensions-core/lookups-cached-global.md b/docs/development/extensions-core/lookups-cached-global.md index 014a5d26a205..a97419806ef4 100644 --- a/docs/development/extensions-core/lookups-cached-global.md +++ b/docs/development/extensions-core/lookups-cached-global.md @@ -219,7 +219,7 @@ The remapping values for each globally cached lookup can be specified by a JSON |`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`| |`namespaceParseSpec`|How to interpret the data at the URI|Yes|| -One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GS (gs://) location. HTTP location is not currently supported. +One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GCS (gs://) location. HTTP location is not currently supported. The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data. diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 4c1b53abc2b9..13583a12f9c7 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nullable; @@ -47,7 +48,7 @@ public class GoogleCloudStorageEntity extends RetryingInputEntity @Override public URI getUri() { - return location.toUri(GoogleCloudStorageInputSource.SCHEME); + return location.toUri(GoogleStorageDruidModule.SCHEME_GS); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 9d37b3f7f503..47ba4b7c6a01 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.utils.Streams; @@ -47,8 +48,6 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource { - public static final String SCHEME = "gs"; - private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); private final GoogleStorage storage; @@ -63,7 +62,7 @@ public GoogleCloudStorageInputSource( @JsonProperty("objects") @Nullable List objects ) { - super(SCHEME, uris, prefixes, objects); + super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects); this.storage = storage; this.inputDataConfig = inputDataConfig; } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java index 966f5349d08e..d6ed002b5bc1 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java @@ -44,8 +44,8 @@ public class GoogleStorageDruidModule implements DruidModule { - static final String SCHEME = "google"; - static final String SCHEME_GS = "gs"; + public static final String SCHEME = "google"; + public static final String SCHEME_GS = "gs"; private static final Logger LOG = new Logger(GoogleStorageDruidModule.class); private static final String APPLICATION_NAME = "druid-google-extensions"; diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java index fdb75fea5496..52c1e130c8bc 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java @@ -48,11 +48,11 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) try { long mostRecent = Long.MIN_VALUE; URI latest = null; - final String bucket = descriptorBase.getAuthority(); + final String bucket = descriptorBase.getHost(); final String prefix = StringUtils.maybeRemoveLeadingSlash(descriptorBase.getPath()); final Objects objects = storage.list(bucket).setPrefix(prefix).setMaxResults(MAX_LISTING_KEYS).execute(); for (StorageObject storageObject : objects.getItems()) { - if (isDirectoryPlaceholder(storageObject)) { + if (GoogleUtils.isDirectoryPlaceholder(storageObject)) { continue; } // remove path prefix from file name @@ -76,18 +76,4 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) throw new RuntimeException(e); } } - - /** - * Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder} - * Copied to avoid creating dependency on s3 extensions - */ - private static boolean isDirectoryPlaceholder(final StorageObject storageObject) - { - // Recognize "standard" directory place-holder indications - if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) { - return true; - } - // Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension. - return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0; - } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 2ed80fedb274..435d227af1ee 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -53,7 +53,7 @@ static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Excep public static URI objectToUri(StorageObject object) { - return objectToCloudObjectLocation(object).toUri(GoogleCloudStorageInputSource.SCHEME); + return objectToCloudObjectLocation(object).toUri(GoogleStorageDruidModule.SCHEME_GS); } public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object) @@ -91,7 +91,7 @@ public static void deleteObjectsInPath( { final Iterator iterator = lazyFetchingStorageObjectsIterator( storage, - ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("gs")).iterator(), + ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), config.getMaxListingLength() ); @@ -105,4 +105,18 @@ public static void deleteObjectsInPath( } } } + + /** + * Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder} + * Copied to avoid creating dependency on s3 extensions + */ + public static boolean isDirectoryPlaceholder(final StorageObject storageObject) + { + // Recognize "standard" directory place-holder indications + if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) { + return true; + } + // Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension. + return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0; + } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java index 8b6a34c4b3c6..408033db053e 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java @@ -32,7 +32,7 @@ public class GoogleTimestampVersionedDataFinderTest { @Test - public void getLatestVersion() throws InterruptedException + public void getLatestVersion() { String bucket = "bucket"; String keyPrefix = "prefix/dir/0"; @@ -43,12 +43,9 @@ public void getLatestVersion() throws InterruptedException final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1); storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1); - // sleep before setting update time on next object - Thread.sleep(100); - storageObject3.setUpdated(new DateTime(System.currentTimeMillis())); + storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4); - Thread.sleep(100); - storageObject4.setUpdated(new DateTime(System.currentTimeMillis())); + storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); @@ -59,7 +56,7 @@ public void getLatestVersion() throws InterruptedException } @Test - public void getLatestVersionTrailingSlashKeyPrefix() throws InterruptedException + public void getLatestVersionTrailingSlashKeyPrefix() { String bucket = "bucket"; String keyPrefix = "prefix/dir/0/"; @@ -70,12 +67,9 @@ public void getLatestVersionTrailingSlashKeyPrefix() throws InterruptedException final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1); storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1); - // sleep before setting update time on next object - Thread.sleep(100); - storageObject3.setUpdated(new DateTime(System.currentTimeMillis())); + storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4); - Thread.sleep(100); - storageObject4.setUpdated(new DateTime(System.currentTimeMillis())); + storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); From 1842c78de66583e6e61b2411e8b97306bfeccaca Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Fri, 26 Mar 2021 20:34:41 +0530 Subject: [PATCH 4/5] review comments --- .../storage/google/GoogleTimestampVersionedDataFinder.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java index 52c1e130c8bc..d1ed8a7ef6a2 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java @@ -48,9 +48,8 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) try { long mostRecent = Long.MIN_VALUE; URI latest = null; - final String bucket = descriptorBase.getHost(); - final String prefix = StringUtils.maybeRemoveLeadingSlash(descriptorBase.getPath()); - final Objects objects = storage.list(bucket).setPrefix(prefix).setMaxResults(MAX_LISTING_KEYS).execute(); + final CloudObjectLocation baseLocation = new CloudObjectLocation(descriptorBase); + final Objects objects = storage.list(baseLocation.getBucket()).setPrefix(baseLocation.getPath()).setMaxResults(MAX_LISTING_KEYS).execute(); for (StorageObject storageObject : objects.getItems()) { if (GoogleUtils.isDirectoryPlaceholder(storageObject)) { continue; @@ -60,7 +59,7 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) storageObject.getName() ); final String keyString = StringUtils - .maybeRemoveLeadingSlash(storageObject.getName().substring(prefix.length())); + .maybeRemoveLeadingSlash(storageObject.getName().substring(baseLocation.getPath().length())); if (pattern != null && !pattern.matcher(keyString).matches()) { continue; } From 0eaf92e51107ac0f02ea735bacfb32d1b1c2102c Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Sat, 27 Mar 2021 12:16:21 +0530 Subject: [PATCH 5/5] remove unused import --- .../main/java/org/apache/druid/storage/google/GoogleUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 435d227af1ee..c9e84582ec12 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -23,7 +23,6 @@ import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.logger.Logger;