From 26f5bfb90ae5ccdbb36bf3d20ad37c2a6fd84afd Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 5 Jul 2023 10:41:44 +0530 Subject: [PATCH 1/7] initial commit --- .../storage/s3/output/S3StorageConnector.java | 15 +- .../druid/storage/StorageConnectorModule.java | 6 +- .../remote/ChunkingStorageConnector.java | 185 ++++++++++++++++++ 3 files changed, 203 insertions(+), 3 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java index 81dfb5747621..a8cbc7ec095a 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.remote.ChunkingStorageConnector; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -60,7 +61,7 @@ /** * In this implementation, all remote calls to aws s3 are retried {@link S3OutputConfig#getMaxRetry()} times. */ -public class S3StorageConnector implements StorageConnector +public class S3StorageConnector extends ChunkingStorageConnector { private static final Logger log = new Logger(S3StorageConnector.class); @@ -127,6 +128,18 @@ public InputStream readRange(String path, long from, long size) ); } + @Override + public BuildInputStreamParams buildInputParams(String path) + { + return null; + } + + @Override + public BuildInputStreamParams buildInputParams(String path, long from, long size) + { + return null; + } + private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path) { // fetch the size of the whole object to make chunks diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java index 9682ac01d88e..a0bf3a91f0cc 100644 --- a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java +++ b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java @@ -33,8 +33,10 @@ public class StorageConnectorModule implements DruidModule @Override public List getJacksonModules() { - return ImmutableList.of(new SimpleModule(StorageConnector.class.getSimpleName()).registerSubtypes( - LocalFileStorageConnectorProvider.class)); + return ImmutableList.of( + new SimpleModule(StorageConnector.class.getSimpleName()) + .registerSubtypes(LocalFileStorageConnectorProvider.class) + ); } @Override diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java new file mode 100644 index 000000000000..f166139949ac --- /dev/null +++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java @@ -0,0 +1,185 @@ +package org.apache.druid.storage.remote; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import org.apache.commons.io.input.NullInputStream; +import org.apache.druid.data.input.impl.RetryingInputStream; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.storage.StorageConnector; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.Enumeration; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public abstract class ChunkingStorageConnector implements StorageConnector +{ + private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000; + + public ChunkingStorageConnector() + { + + } + + @Override + public InputStream read(String path) throws IOException + { + return buildInputStream(buildInputParams(path)); + } + + @Override + public InputStream readRange(String path, long from, long size) throws IOException + { + return buildInputStream(buildInputParams(path, from, size)); + } + + public abstract BuildInputStreamParams buildInputParams(String path); + + public abstract BuildInputStreamParams buildInputParams(String path, long from, long size); + + private InputStream buildInputStream(BuildInputStreamParams params) + { + AtomicLong currentReadStartPosition = new AtomicLong(params.start); + long readEnd = params.end; + + AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false); + + return new SequenceInputStream( + + new Enumeration() + { + boolean initStream = false; + + @Override + public boolean hasMoreElements() + { + // Checking if the stream was already closed. If it was, then don't iterate over the remaining chunks + // SequenceInputStream's close method closes all the chunk streams in its close. Since we're opening them + // lazily, we don't need to close them. + if (isSequenceStreamClosed.get()) { + return false; + } + // Don't stop until the whole object is downloaded + return currentReadStartPosition.get() < readEnd; + } + + @Override + public InputStream nextElement() + { + if (!initStream) { + initStream = true; + return new NullInputStream(); + } + + File outFile = new File( + params.tempDirSupplier.get().getAbsolutePath(), + UUID.randomUUID().toString() + ); + + // exclusive + long currentReadEndPosition = Math.min( + currentReadStartPosition.get() + DOWNLOAD_MAX_CHUNK_SIZE, + readEnd + ); + + try { + if (!outFile.createNewFile()) { + throw new IOE( + StringUtils.format( + "Could not create temporary file [%s] for copying [%s]", + outFile.getAbsolutePath(), + params.cloudStoragePath + ) + ); + } + + FileUtils.copyLarge( + () -> new RetryingInputStream( + params.objectSupplier.getObject(currentReadStartPosition.get(), currentReadEndPosition), + params.objectOpenFunction, + params.retryCondition, + params.maxRetry + ), + outFile, + new byte[8 * 1024], + Predicates.alwaysFalse(), + 1, + StringUtils.format( + "Retrying copying of [%s] to [%s]", + params.cloudStoragePath, + outFile.getAbsolutePath() + ) + ); + } + catch (IOException e) { + throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", params.cloudStoragePath, outFile)); + } + + try { + AtomicBoolean fileInputStreamClosed = new AtomicBoolean(false); + return new FileInputStream(outFile) + { + @Override + public void close() throws IOException + { + // close should be idempotent + if (fileInputStreamClosed.get()) { + return; + } + fileInputStreamClosed.set(true); + super.close(); + // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1 + currentReadStartPosition.set(currentReadEndPosition); + if (!outFile.delete()) { + throw new RE("Cannot delete temp file [%s]", outFile); + } + } + + }; + } + catch (FileNotFoundException e) { + throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile)); + } + } + } + ) + { + @Override + public void close() throws IOException + { + isSequenceStreamClosed.set(true); + super.close(); + } + }; + } + + // Start inclusive, end exclusive + public static class BuildInputStreamParams + { + public long start; + public long end; + public String cloudStoragePath; + public GetObjectFromRangeFunction objectSupplier; + public ObjectOpenFunction objectOpenFunction; + public Predicate retryCondition; + public int maxRetry; + public Supplier tempDirSupplier; + } + + public interface GetObjectFromRangeFunction + { + T getObject(long start, long end); + } +} From 2b9787a340466531d0d653c7a4e3e14426d6c1e0 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 7 Jul 2023 00:32:49 +0530 Subject: [PATCH 2/7] classes n code --- .../druid/storage/google/output/GoogleOutputConfig.java | 5 +++++ .../druid/storage/google/output/GoogleStorageConnector.java | 5 +++++ .../storage/google/output/GoogleStorageConnectorModule.java | 5 +++++ .../google/output/GoogleStorageConnectorProvider.java | 5 +++++ 4 files changed, 20 insertions(+) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java new file mode 100644 index 000000000000..ab093ff054e0 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -0,0 +1,5 @@ +package org.apache.druid.storage.google.output; + +public class GoogleOutputConfig +{ +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java new file mode 100644 index 000000000000..6ae9b06a1438 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java @@ -0,0 +1,5 @@ +package org.apache.druid.storage.google.output; + +public class GoogleStorageConnector +{ +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java new file mode 100644 index 000000000000..de9f7e2b785e --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java @@ -0,0 +1,5 @@ +package org.apache.druid.storage.google.output; + +public class GoogleStorageConnectorModule +{ +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java new file mode 100644 index 000000000000..bfe81a9339a6 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -0,0 +1,5 @@ +package org.apache.druid.storage.google.output; + +public class GoogleStorageConnectorProvider +{ +} From d41e967617ec35fbc611ff4a89ffc71950e5cb57 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 7 Jul 2023 22:54:54 +0530 Subject: [PATCH 3/7] add stubs for other classes --- .../google/output/GoogleOutputConfig.java | 38 ++++ .../google/output/GoogleStorageConnector.java | 76 +++++++- .../output/GoogleStorageConnectorModule.java | 19 ++ .../GoogleStorageConnectorProvider.java | 19 ++ .../storage/s3/output/S3StorageConnector.java | 8 +- .../remote/ChunkingStorageConnector.java | 47 ++--- .../ChunkingStorageConnectorParameters.java | 179 ++++++++++++++++++ 7 files changed, 349 insertions(+), 37 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java index ab093ff054e0..d18e31273119 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -1,5 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.storage.google.output; +import org.apache.druid.java.util.common.HumanReadableBytes; + +import java.io.File; + public class GoogleOutputConfig { + + String bucket; + String prefix; + File tempDir; + HumanReadableBytes chunkSize; + Integer maxRetry; + + public GoogleOutputConfig( + String bucket, + String prefix, + File tempDir, + HumanReadableBytes chunkSize, + Integer maxRetry + ) { + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java index 6ae9b06a1438..b819cf090daf 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java @@ -1,5 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.storage.google.output; -public class GoogleStorageConnector +import com.google.api.services.storage.model.StorageObject; +import org.apache.druid.storage.remote.ChunkingStorageConnector; +import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Iterator; + +public class GoogleStorageConnector extends ChunkingStorageConnector { + @Override + public boolean pathExists(String path) + { + return false; + } + + @Override + public OutputStream write(String path) throws IOException + { + return null; + } + + @Override + public void deleteFile(String path) throws IOException + { + + } + + @Override + public void deleteFiles(Iterable paths) throws IOException + { + + } + + @Override + public void deleteRecursively(String path) throws IOException + { + + } + + @Override + public Iterator listDir(String dirName) throws IOException + { + return null; + } + + @Override + public ChunkingStorageConnectorParameters buildInputParams(String path) + { + return null; + } + + @Override + public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size) + { + return null; + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java index de9f7e2b785e..b3594730c2bd 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.storage.google.output; public class GoogleStorageConnectorModule diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java index bfe81a9339a6..1bb91b77e541 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.storage.google.output; public class GoogleStorageConnectorProvider diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java index a8cbc7ec095a..e238c12b1ddc 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java @@ -37,8 +37,8 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.remote.ChunkingStorageConnector; +import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -61,7 +61,7 @@ /** * In this implementation, all remote calls to aws s3 are retried {@link S3OutputConfig#getMaxRetry()} times. */ -public class S3StorageConnector extends ChunkingStorageConnector +public class S3StorageConnector extends ChunkingStorageConnector { private static final Logger log = new Logger(S3StorageConnector.class); @@ -129,13 +129,13 @@ public InputStream readRange(String path, long from, long size) } @Override - public BuildInputStreamParams buildInputParams(String path) + public ChunkingStorageConnectorParameters buildInputParams(String path) { return null; } @Override - public BuildInputStreamParams buildInputParams(String path, long from, long size) + public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size) { return null; } diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java index f166139949ac..d01a8eee3eb6 100644 --- a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java +++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java @@ -1,12 +1,9 @@ package org.apache.druid.storage.remote; -import com.google.common.base.Predicate; import com.google.common.base.Predicates; import org.apache.commons.io.input.NullInputStream; import org.apache.druid.data.input.impl.RetryingInputStream; -import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; @@ -22,7 +19,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; public abstract class ChunkingStorageConnector implements StorageConnector { @@ -40,19 +36,19 @@ public InputStream read(String path) throws IOException } @Override - public InputStream readRange(String path, long from, long size) throws IOException + public InputStream readRange(String path, long from, long size) { return buildInputStream(buildInputParams(path, from, size)); } - public abstract BuildInputStreamParams buildInputParams(String path); + public abstract ChunkingStorageConnectorParameters buildInputParams(String path); - public abstract BuildInputStreamParams buildInputParams(String path, long from, long size); + public abstract ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size); - private InputStream buildInputStream(BuildInputStreamParams params) + private InputStream buildInputStream(ChunkingStorageConnectorParameters params) { - AtomicLong currentReadStartPosition = new AtomicLong(params.start); - long readEnd = params.end; + AtomicLong currentReadStartPosition = new AtomicLong(params.getStart()); + long readEnd = params.getEnd(); AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false); @@ -84,7 +80,7 @@ public InputStream nextElement() } File outFile = new File( - params.tempDirSupplier.get().getAbsolutePath(), + params.getTempDirSupplier().get().getAbsolutePath(), UUID.randomUUID().toString() ); @@ -100,17 +96,17 @@ public InputStream nextElement() StringUtils.format( "Could not create temporary file [%s] for copying [%s]", outFile.getAbsolutePath(), - params.cloudStoragePath + params.getCloudStoragePath() ) ); } FileUtils.copyLarge( - () -> new RetryingInputStream( - params.objectSupplier.getObject(currentReadStartPosition.get(), currentReadEndPosition), - params.objectOpenFunction, - params.retryCondition, - params.maxRetry + () -> new RetryingInputStream<>( + params.getObjectSupplier().getObject(currentReadStartPosition.get(), currentReadEndPosition), + params.getObjectOpenFunction(), + params.getRetryCondition(), + params.getMaxRetry() ), outFile, new byte[8 * 1024], @@ -118,13 +114,13 @@ public InputStream nextElement() 1, StringUtils.format( "Retrying copying of [%s] to [%s]", - params.cloudStoragePath, + params.getCloudStoragePath(), outFile.getAbsolutePath() ) ); } catch (IOException e) { - throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", params.cloudStoragePath, outFile)); + throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", params.getCloudStoragePath(), outFile)); } try { @@ -165,19 +161,6 @@ public void close() throws IOException }; } - // Start inclusive, end exclusive - public static class BuildInputStreamParams - { - public long start; - public long end; - public String cloudStoragePath; - public GetObjectFromRangeFunction objectSupplier; - public ObjectOpenFunction objectOpenFunction; - public Predicate retryCondition; - public int maxRetry; - public Supplier tempDirSupplier; - } - public interface GetObjectFromRangeFunction { T getObject(long start, long end); diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java new file mode 100644 index 000000000000..ec46a3f21d84 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.remote; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; + +import java.io.File; +import java.util.function.Supplier; + +// Start inclusive, end exclusive +public class ChunkingStorageConnectorParameters +{ + private final long start; + private final long end; + private final String cloudStoragePath; + private final ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier; + private final ObjectOpenFunction objectOpenFunction; + private final Predicate retryCondition; + private final int maxRetry; + private final Supplier tempDirSupplier; + + public ChunkingStorageConnectorParameters( + long start, + long end, + String cloudStoragePath, + ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier, + ObjectOpenFunction objectOpenFunction, + Predicate retryCondition, + int maxRetry, + Supplier tempDirSupplier + ) + { + this.start = start; + this.end = end; + this.cloudStoragePath = cloudStoragePath; + this.objectSupplier = objectSupplier; + this.objectOpenFunction = objectOpenFunction; + this.retryCondition = retryCondition; + this.maxRetry = maxRetry; + this.tempDirSupplier = tempDirSupplier; + } + + public long getStart() + { + return start; + } + + public long getEnd() + { + return end; + } + + public String getCloudStoragePath() + { + return cloudStoragePath; + } + + public ChunkingStorageConnector.GetObjectFromRangeFunction getObjectSupplier() + { + return objectSupplier; + } + + public ObjectOpenFunction getObjectOpenFunction() + { + return objectOpenFunction; + } + + public Predicate getRetryCondition() + { + return retryCondition; + } + + public int getMaxRetry() + { + return maxRetry; + } + + public Supplier getTempDirSupplier() + { + return tempDirSupplier; + } + + private static class Builder + { + private long start; + private long end; + private String cloudStoragePath; + private ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier; + private ObjectOpenFunction objectOpenFunction; + private Predicate retryCondition; + private int maxRetry; + private Supplier tempDirSupplier; + + + public Builder start(long start) + { + this.start = start; + return this; + } + + public Builder end(long end) + { + this.end = end; + return this; + } + + public Builder cloudStoragePath(String cloudStoragePath) + { + this.cloudStoragePath = cloudStoragePath; + return this; + } + + public Builder objectSupplier(ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier) + { + this.objectSupplier = objectSupplier; + return this; + } + + public Builder objectOpenFunction(ObjectOpenFunction objectOpenFunction) + { + this.objectOpenFunction = objectOpenFunction; + return this; + } + + public Builder retryCondition(Predicate retryCondition) + { + this.retryCondition = retryCondition; + return this; + } + + public Builder maxRetry(int maxRetry) + { + this.maxRetry = maxRetry; + return this; + } + + public Builder tempDirSupplier(Supplier tempDirSupplier) + { + this.tempDirSupplier = tempDirSupplier; + return this; + } + + ChunkingStorageConnectorParameters build() + { + Preconditions.checkArgument(start >= 0, "'start' not provided or an incorrect value [%s] passed", start); + Preconditions.checkArgument(end >= 0, "'end' not provided or an incorrect value [%s] passed", end); + Preconditions.checkArgument(maxRetry >= 0, "'maxRetry' not provided or an incorrect value [%s] passed", maxRetry); + return new ChunkingStorageConnectorParameters( + start, + end, + Preconditions.checkNotNull(cloudStoragePath, "'cloudStoragePath' not supplied"), + Preconditions.checkNotNull(objectSupplier, "'objectSupplier' not supplied"), + Preconditions.checkNotNull(objectOpenFunction, "'objectOpenFunction' not supplied"), + Preconditions.checkNotNull(retryCondition, "'retryCondition' not supplied"), + maxRetry, + Preconditions.checkNotNull(tempDirSupplier, "'tempDirSupplier' not supplied") + ); + } + } +} From 71c50fabb43cbf524cc04bbf3acfba38ec685971 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 12 Jul 2023 15:26:37 +0530 Subject: [PATCH 4/7] goc changes stash --- .../google/output/GoogleOutputConfig.java | 87 ++++++++++++++++--- 1 file changed, 76 insertions(+), 11 deletions(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java index d18e31273119..9df02d8876fb 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -19,25 +19,90 @@ package org.apache.druid.storage.google.output; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.HumanReadableBytes; import java.io.File; +import java.util.Objects; public class GoogleOutputConfig { - String bucket; - String prefix; - File tempDir; - HumanReadableBytes chunkSize; - Integer maxRetry; + @JsonProperty + private final String bucket; + + @JsonProperty + private final String prefix; + + @JsonProperty + private final File tempDir; + + @JsonProperty + private final HumanReadableBytes chunkSize; + + @JsonProperty + private final Integer maxRetry; public GoogleOutputConfig( - String bucket, - String prefix, - File tempDir, - HumanReadableBytes chunkSize, - Integer maxRetry - ) { + final String bucket, + final String prefix, + final File tempDir, + final HumanReadableBytes chunkSize, + final Integer maxRetry + ) + { + this.bucket = bucket; + this.prefix = prefix; + this.tempDir = tempDir; + this.chunkSize = chunkSize; + this.maxRetry = maxRetry; + } + + public String getBucket() + { + return bucket; + } + + public String getPrefix() + { + return prefix; + } + + public File getTempDir() + { + return tempDir; + } + + public HumanReadableBytes getChunkSize() + { + return chunkSize; + } + + public Integer getMaxRetry() + { + return maxRetry; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleOutputConfig that = (GoogleOutputConfig) o; + return Objects.equals(bucket, that.bucket) + && Objects.equals(prefix, that.prefix) + && Objects.equals(tempDir, that.tempDir) + && Objects.equals(chunkSize, that.chunkSize) + && Objects.equals(maxRetry, that.maxRetry); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, prefix, tempDir, chunkSize, maxRetry); } } From d07b740fae425c006210b792ccb974645a0fe0f6 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 14 Jul 2023 11:54:52 +0530 Subject: [PATCH 5/7] version before batch delete --- .../druid/storage/google/GoogleUtils.java | 2 +- .../google/output/GoogleStorageConnector.java | 53 ++++++++++++++++++- .../output/GoogleStorageConnectorModule.java | 24 ++++++++- .../GoogleStorageConnectorProvider.java | 42 ++++++++++++++- 4 files changed, 117 insertions(+), 4 deletions(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 25b4f3286ea7..5b2727abb61b 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -45,7 +45,7 @@ public static boolean isRetryable(Throwable t) return t instanceof IOException; } - static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception + public static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception { return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java index b819cf090daf..6bb28682b85e 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java @@ -20,6 +20,11 @@ package org.apache.druid.storage.google.output; import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.Joiner; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.storage.remote.ChunkingStorageConnector; import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters; @@ -29,10 +34,31 @@ public class GoogleStorageConnector extends ChunkingStorageConnector { + + private static final String DELIM = "/"; + private static final Joiner JOINER = Joiner.on(DELIM).skipNulls(); + private static final Logger log = new Logger(GoogleStorageConnector.class); + + private final GoogleStorage storage; + private final GoogleOutputConfig config; + private final GoogleInputDataConfig inputDataConfig; + + public GoogleStorageConnector( + GoogleStorage storage, + GoogleOutputConfig config, + GoogleInputDataConfig inputDataConfig + ) + { + this.storage = storage; + this.config = config; + this.inputDataConfig = inputDataConfig; + } + + @Override public boolean pathExists(String path) { - return false; + return storage.exists(config.getBucket(), objectPath(path)); } @Override @@ -44,13 +70,33 @@ public OutputStream write(String path) throws IOException @Override public void deleteFile(String path) throws IOException { + try { + final String fullPath = objectPath(path); + log.debug("Deleting file at bucket: [%s], path: [%s]", config.getBucket(), fullPath); + GoogleUtils.retryGoogleCloudStorageOperation( + () -> { + storage.delete(config.getBucket(), fullPath); + return null; + } + ); + } + catch (Exception e) { + log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage()); + throw new IOException(e); + } } @Override public void deleteFiles(Iterable paths) throws IOException { + int currentItemSize = 0; + + for (String path : paths) { + + + } } @Override @@ -76,4 +122,9 @@ public ChunkingStorageConnectorParameters buildInputParams(String { return null; } + + private String objectPath(String path) + { + return JOINER.join(config.getPrefix(), path); + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java index b3594730c2bd..875e9fde46e3 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java @@ -19,6 +19,28 @@ package org.apache.druid.storage.google.output; -public class GoogleStorageConnectorModule +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class GoogleStorageConnectorModule implements DruidModule { + + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule(GoogleStorageConnectorModule.class.getSimpleName()) + .registerSubtypes(GoogleStorageConnectorProvider.class) + ); + } + + @Override + public void configure(Binder binder) + { + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java index 1bb91b77e541..0746914fe861 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -19,6 +19,46 @@ package org.apache.druid.storage.google.output; -public class GoogleStorageConnectorProvider +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; + +import java.io.File; + +@JsonTypeName(GoogleStorageDruidModule.SCHEME) +public class GoogleStorageConnectorProvider extends GoogleOutputConfig implements StorageConnectorProvider { + + private final GoogleStorage storage; + private final GoogleInputDataConfig inputDataConfig; + + @JsonCreator + public GoogleStorageConnectorProvider( + @JacksonInject GoogleStorage storage, + @JacksonInject GoogleInputDataConfig inputDataConfig, + @JsonProperty(value = "bucket", required = true) final String bucket, + @JsonProperty(value = "prefix", required = true) final String prefix, + @JsonProperty(value = "tempDir", required = true) final File tempDir, + @JsonProperty(value = "chunkSize") final HumanReadableBytes chunkSize, + @JsonProperty(value = "maxRetry") final Integer maxRetry + ) + { + super(bucket, prefix, tempDir, chunkSize, maxRetry); + this.storage = Preconditions.checkNotNull(storage, "google client must be provided"); + this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "google input data config must be provided"); + } + + @Override + public StorageConnector get() + { + return new GoogleStorageConnector(storage, this, inputDataConfig); + } } From 7c482c55b111baedae2eeb0ff818468ed29d1ab4 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 19 Jul 2023 08:59:56 +0530 Subject: [PATCH 6/7] storage connector final --- .../druid/storage/google/GoogleStorage.java | 27 ++- .../google/output/GoogleInputRange.java | 81 +++++++++ .../google/output/GoogleOutputConfig.java | 35 +++- .../google/output/GoogleStorageConnector.java | 172 ++++++++++++++++-- .../GoogleStorageConnectorProvider.java | 3 +- .../remote/ChunkingStorageConnector.java | 23 ++- .../ChunkingStorageConnectorParameters.java | 4 +- 7 files changed, 318 insertions(+), 27 deletions(-) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java index f181d08f443c..168bdb7069da 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java @@ -20,10 +20,13 @@ package org.apache.druid.storage.google; import com.google.api.client.http.AbstractInputStreamContent; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponse; import com.google.api.services.storage.Storage; import com.google.api.services.storage.Storage.Objects.Get; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Supplier; +import org.apache.druid.java.util.common.StringUtils; import java.io.IOException; import java.io.InputStream; @@ -36,7 +39,7 @@ public class GoogleStorage * if we have a Storage instead of a supplier of it, it can cause unnecessary config validation * against Google storage even when it's not used at all. To perform the config validation * only when it is actually used, we use a supplier. - * + *

* See OmniDataSegmentKiller for how DataSegmentKillers are initialized. */ private final Supplier storage; @@ -59,6 +62,26 @@ public InputStream get(final String bucket, final String path) throws IOExceptio return get(bucket, path, 0); } + public InputStream getUsingRangeHeaders(final String bucket, final String path, long start, long end) + throws IOException + { + final Get get = storage.get().objects().get(bucket, path); + HttpHeaders httpHeaders = new HttpHeaders(); + + String rangeString = StringUtils.format("bytes %d-%d/*", start, end); + + httpHeaders.setRange(rangeString); + get.setRequestHeaders(httpHeaders); + + HttpResponse httpResponse = get.executeUsingHead(); + String responseRangeString = httpResponse.getHeaders().getRange(); + if (!responseRangeString.equals(rangeString)) { + throw new IOException("Unable to fetch the correct range"); + } + + return get.executeMediaAsInputStream(); + } + public InputStream get(final String bucket, final String path, long start) throws IOException { final Get get = storage.get().objects().get(bucket, path); @@ -86,7 +109,7 @@ public boolean exists(final String bucket, final String path) return false; } } - + public long size(final String bucket, final String path) throws IOException { return storage.get().objects().get(bucket, path).execute().getSize().longValue(); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java new file mode 100644 index 000000000000..ee0d9a58f880 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import java.util.Objects; + +public class GoogleInputRange +{ + private final long start; + private final long size; + private final String bucket; + private final String path; + + public GoogleInputRange(long start, long size, String bucket, String path) + { + this.start = start; + this.size = size; + this.bucket = bucket; + this.path = path; + } + + public long getStart() + { + return start; + } + + public long getSize() + { + return size; + } + + public String getBucket() + { + return bucket; + } + + public String getPath() + { + return path; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleInputRange that = (GoogleInputRange) o; + return start == that.start + && size == that.size + && Objects.equals(bucket, that.bucket) + && Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(start, size, bucket, path); + } +} + diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java index 9df02d8876fb..4fa91c1c911a 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.RetryUtils; +import javax.annotation.Nullable; import java.io.File; import java.util.Objects; @@ -38,24 +40,35 @@ public class GoogleOutputConfig private final File tempDir; @JsonProperty - private final HumanReadableBytes chunkSize; + private boolean chunkedDownloads = false; @JsonProperty - private final Integer maxRetry; + private HumanReadableBytes chunkSize = new HumanReadableBytes("100MiB"); + + @JsonProperty + private int maxRetry = RetryUtils.DEFAULT_MAX_TRIES; public GoogleOutputConfig( final String bucket, final String prefix, final File tempDir, - final HumanReadableBytes chunkSize, - final Integer maxRetry + @Nullable final Boolean chunkedDownloads, + @Nullable final HumanReadableBytes chunkSize, + @Nullable final Integer maxRetry ) { this.bucket = bucket; this.prefix = prefix; this.tempDir = tempDir; - this.chunkSize = chunkSize; - this.maxRetry = maxRetry; + if (chunkedDownloads != null) { + this.chunkedDownloads = chunkedDownloads; + } + if (chunkSize != null) { + this.chunkSize = chunkSize; + } + if (maxRetry != null) { + this.maxRetry = maxRetry; + } } public String getBucket() @@ -73,6 +86,11 @@ public File getTempDir() return tempDir; } + public Boolean isChunkedDownloads() + { + return chunkedDownloads; + } + public HumanReadableBytes getChunkSize() { return chunkSize; @@ -96,6 +114,7 @@ public boolean equals(Object o) return Objects.equals(bucket, that.bucket) && Objects.equals(prefix, that.prefix) && Objects.equals(tempDir, that.tempDir) + && Objects.equals(chunkedDownloads, that.chunkedDownloads) && Objects.equals(chunkSize, that.chunkSize) && Objects.equals(maxRetry, that.maxRetry); } @@ -103,6 +122,8 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(bucket, prefix, tempDir, chunkSize, maxRetry); + return Objects.hash(bucket, prefix, tempDir, chunkedDownloads, chunkSize, maxRetry); } + + } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java index 6bb28682b85e..3f0bba093b6a 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java @@ -19,20 +19,33 @@ package org.apache.druid.storage.google.output; +import com.google.api.client.http.InputStreamContent; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import com.google.common.io.ByteStreams; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; +import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.storage.remote.ChunkingStorageConnector; import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Files; import java.util.Iterator; +import java.util.UUID; -public class GoogleStorageConnector extends ChunkingStorageConnector +public class GoogleStorageConnector extends ChunkingStorageConnector { private static final String DELIM = "/"; @@ -64,7 +77,58 @@ public boolean pathExists(String path) @Override public OutputStream write(String path) throws IOException { - return null; + File tempFile = new File(JOINER.join(config.getTempDir(), UUID.randomUUID().toString())); + + return new OutputStream() + { + + final OutputStream delegate = new FileOutputStream(tempFile); + @Override + public void write(int b) throws IOException + { + delegate.write(b); + } + + @Override + public void write(byte[] b) throws IOException + { + delegate.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + delegate.write(b, off, len); + } + + @Override + public void flush() throws IOException + { + delegate.flush(); + } + + @Override + public void close() throws IOException + { + delegate.close(); + + try (final InputStream fileInputStream = Files.newInputStream(tempFile.toPath())) { + InputStreamContent mediaContent = new InputStreamContent("application/octet-stream", fileInputStream); + GoogleUtils.retryGoogleCloudStorageOperation( + () -> { + storage.insert(config.getBucket(), objectPath(path), mediaContent); + return null; + } + ); + } + catch (Exception e) { + throw new IOException(e); + } + if (!tempFile.delete()) { + throw new RE("Unable to delete the temp file [%s]", tempFile); + } + } + }; } @Override @@ -90,37 +154,119 @@ public void deleteFile(String path) throws IOException @Override public void deleteFiles(Iterable paths) throws IOException { - int currentItemSize = 0; - - for (String path : paths) { - - + deleteFile(objectPath(path)); } } @Override public void deleteRecursively(String path) throws IOException { + try { + GoogleUtils.deleteObjectsInPath( + storage, + inputDataConfig, + config.getBucket(), + objectPath(config.getPrefix()), + p -> true + ); + } + catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public Iterator listDir(String dirName) + { + final String fullPath = objectPath(dirName); + Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator( + storage, + ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath) + .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), + inputDataConfig.getMaxListingLength() + ); + + return Iterators.transform( + storageObjects, + storageObject -> { + String[] split = storageObject.getName().split(fullPath, 2); + if (split.length > 1) { + return split[1]; + } else { + return ""; + } + } + ); + } + + @Override + public InputStream read(String path) throws IOException + { + if (config.isChunkedDownloads()) { + return super.read(path); + } + return storage.get(config.getBucket(), objectPath(path)); } @Override - public Iterator listDir(String dirName) throws IOException + public InputStream readRange(String path, long from, long size) throws IOException { - return null; + if (config.isChunkedDownloads()) { + return super.readRange(path, from, size); + } + + return ByteStreams.limit(storage.get(config.getBucket(), objectPath(path), from), size); } @Override - public ChunkingStorageConnectorParameters buildInputParams(String path) + public ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException { - return null; + long size = storage.size(config.getBucket(), objectPath(path)); + return buildInputParams(path, 0, size); } @Override - public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size) + public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size) { - return null; + ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>(); + builder.start(from); + builder.end(from + size); + builder.cloudStoragePath(objectPath(path)); + builder.tempDirSupplier(config::getTempDir); + builder.maxRetry(config.getMaxRetry()); + builder.retryCondition(GoogleUtils.GOOGLE_RETRY); + builder.objectSupplier(((start, end) -> new GoogleInputRange(start, end - start, config.getBucket(), objectPath(path)))); + builder.objectOpenFunction(new ObjectOpenFunction() + { + @Override + public InputStream open(GoogleInputRange googleInputRange) throws IOException + { + long rangeEnd = googleInputRange.getStart() + googleInputRange.getSize() - 1; + return storage.getUsingRangeHeaders( + googleInputRange.getBucket(), + googleInputRange.getPath(), + googleInputRange.getStart(), + rangeEnd + ); + } + + @Override + public InputStream open(GoogleInputRange googleInputRange, long offset) throws IOException + { + long rangeStart = googleInputRange.getStart() + offset; + long rangeEnd = googleInputRange.getStart() + googleInputRange.getSize() - 1; + return storage.getUsingRangeHeaders( + googleInputRange.getBucket(), + googleInputRange.getPath(), + rangeStart, + rangeEnd + ); + } + }); + + return builder.build(); } private String objectPath(String path) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java index 0746914fe861..37dd1c1234f9 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -47,11 +47,12 @@ public GoogleStorageConnectorProvider( @JsonProperty(value = "bucket", required = true) final String bucket, @JsonProperty(value = "prefix", required = true) final String prefix, @JsonProperty(value = "tempDir", required = true) final File tempDir, + @JsonProperty(value = "chunkedDownloads") final Boolean chunkedDownloads, @JsonProperty(value = "chunkSize") final HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") final Integer maxRetry ) { - super(bucket, prefix, tempDir, chunkSize, maxRetry); + super(bucket, prefix, tempDir, chunkedDownloads, chunkSize, maxRetry); this.storage = Preconditions.checkNotNull(storage, "google client must be provided"); this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "google input data config must be provided"); } diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java index d01a8eee3eb6..d628ee87f4b8 100644 --- a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java +++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.storage.remote; import com.google.common.base.Predicates; @@ -36,12 +55,12 @@ public InputStream read(String path) throws IOException } @Override - public InputStream readRange(String path, long from, long size) + public InputStream readRange(String path, long from, long size) throws IOException { return buildInputStream(buildInputParams(path, from, size)); } - public abstract ChunkingStorageConnectorParameters buildInputParams(String path); + public abstract ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException; public abstract ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size); diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java index ec46a3f21d84..1e2440d11132 100644 --- a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java +++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java @@ -99,7 +99,7 @@ public Supplier getTempDirSupplier() return tempDirSupplier; } - private static class Builder + public static class Builder { private long start; private long end; @@ -159,7 +159,7 @@ public Builder tempDirSupplier(Supplier tempDirSupplier) return this; } - ChunkingStorageConnectorParameters build() + public ChunkingStorageConnectorParameters build() { Preconditions.checkArgument(start >= 0, "'start' not provided or an incorrect value [%s] passed", start); Preconditions.checkArgument(end >= 0, "'end' not provided or an incorrect value [%s] passed", end); From 5730c354fea66e62576432de481404cb9d36cc3a Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 19 Jul 2023 10:45:32 +0530 Subject: [PATCH 7/7] cleanup S3 storage connector --- .../storage/s3/output/S3StorageConnector.java | 205 +++--------------- 1 file changed, 36 insertions(+), 169 deletions(-) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java index e238c12b1ddc..a68ed9c1c00c 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java @@ -27,13 +27,9 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; -import org.apache.commons.io.input.NullInputStream; import org.apache.druid.data.input.impl.CloudObjectLocation; -import org.apache.druid.data.input.impl.RetryingInputStream; import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -43,20 +39,12 @@ import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import javax.annotation.Nonnull; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.SequenceInputStream; import java.util.ArrayList; -import java.util.Enumeration; import java.util.Iterator; import java.util.List; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * In this implementation, all remote calls to aws s3 are retried {@link S3OutputConfig#getMaxRetry()} times. @@ -70,7 +58,6 @@ public class S3StorageConnector extends ChunkingStorageConnector buildInputParams(String path) { - if (from < 0 || size < 0) { - throw new IAE( - "Invalid arguments for reading %s. from = %d, readSize = %d", - objectPath(path), - from, - size + long size; + try { + size = S3Utils.retryS3Operation( + () -> this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(), + config.getMaxRetry() ); } - return buildInputStream( - new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1), - path - ); - } - - @Override - public ChunkingStorageConnectorParameters buildInputParams(String path) - { - return null; + catch (Exception e) { + throw new RuntimeException(e); + } + return buildInputParams(path, 0, size); } @Override public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size) { - return null; - } - - private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path) - { - // fetch the size of the whole object to make chunks - long readEnd; - AtomicLong currReadStart = new AtomicLong(0); - if (getObjectRequest.getRange() != null) { - currReadStart.set(getObjectRequest.getRange()[0]); - readEnd = getObjectRequest.getRange()[1] + 1; - } else { - try { - readEnd = S3Utils.retryS3Operation( - () -> this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(), - config.getMaxRetry() - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false); - - // build a sequence input stream from chunks - return new SequenceInputStream(new Enumeration() + ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>(); + builder.start(from); + builder.end(from + size); + builder.cloudStoragePath(objectPath(path)); + builder.tempDirSupplier(config::getTempDir); + builder.maxRetry(config.getMaxRetry()); + builder.retryCondition(S3Utils.S3RETRY); + builder.objectSupplier((start, end) -> new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(start, end - 1)); + builder.objectOpenFunction(new ObjectOpenFunction() { - boolean initStream = false; @Override - public boolean hasMoreElements() + public InputStream open(GetObjectRequest object) { - // checking if the stream was already closed. If it was, then don't iterate over the remaining chunks - // SequenceInputStream's close method closes all the chunk streams in its close. Since we're opening them - // lazily, we don't need to close them. - if (isSequenceStreamClosed.get()) { - return false; - } - // don't stop until the whole object is downloaded - return currReadStart.get() < readEnd; - } - - @Override - public InputStream nextElement() - { - // since Sequence input stream calls nextElement in the constructor, we start chunking as soon as we call read. - // to avoid that we pass a nullInputStream for the first iteration. - if (!initStream) { - initStream = true; - return new NullInputStream(); - } - File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString()); - // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE - long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1; try { - if (!outFile.createNewFile()) { - throw new IOE( - StringUtils.format( - "Could not create temporary file [%s] for copying [%s]", - outFile.getAbsolutePath(), - objectPath(path) - ) - ); - } - FileUtils.copyLarge( - () -> new RetryingInputStream<>( - new GetObjectRequest( - config.getBucket(), - objectPath(path) - ).withRange(currReadStart.get(), endPoint), - new ObjectOpenFunction() - { - @Override - public InputStream open(GetObjectRequest object) - { - try { - return S3Utils.retryS3Operation( - () -> s3Client.getObject(object).getObjectContent(), - config.getMaxRetry() - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public InputStream open(GetObjectRequest object, long offset) - { - if (object.getRange() != null) { - long[] oldRange = object.getRange(); - object.setRange(oldRange[0] + offset, oldRange[1]); - } else { - object.setRange(offset); - } - return open(object); - } - }, - S3Utils.S3RETRY, - config.getMaxRetry() - ), - outFile, - new byte[8 * 1024], - Predicates.alwaysFalse(), - 1, - StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath()) + return S3Utils.retryS3Operation( + () -> s3Client.getObject(object).getObjectContent(), + config.getMaxRetry() ); } - catch (IOException e) { - throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", objectPath(path), outFile)); - } - try { - AtomicBoolean isClosed = new AtomicBoolean(false); - return new FileInputStream(outFile) - { - @Override - public void close() throws IOException - { - // close should be idempotent - if (isClosed.get()) { - return; - } - isClosed.set(true); - super.close(); - // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1 - currReadStart.set(endPoint + 1); - if (!outFile.delete()) { - throw new RE("Cannot delete temp file [%s]", outFile); - } - } - }; - } - catch (FileNotFoundException e) { - throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile)); + catch (Exception e) { + throw new RuntimeException(e); } } - }) - { + @Override - public void close() throws IOException + public InputStream open(GetObjectRequest object, long offset) { - isSequenceStreamClosed.set(true); - super.close(); + if (object.getRange() != null) { + long[] oldRange = object.getRange(); + object.setRange(oldRange[0] + offset, oldRange[1]); + } else { + object.setRange(offset); + } + return open(object); } - }; + }); + return builder.build(); } @Override