From 6d407e86772a39ad47a3b5abae9b0fabc1cc638c Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 17 Feb 2015 16:37:28 -0800 Subject: [PATCH] Add URI handling to SegmentPullers * Requires https://github.com/druid-io/druid-api/pull/37 * Requires https://github.com/metamx/java-util/pull/22 * Moves the puller logic to use a more standard workflow going through java-util helpers instead of re-writing the handlers for each impl * General workflow goes like this: 1) LoadSpec makes sure the correct Puller is called with the correct parameters. 2) The Puller sets up general information like how to make an InputStream, how to find a file name (for .gz files for example), and when to retry. 3) CompressionUtils does most of the heavy lifting when it can --- .../introspect/GuiceInjectableValues.java | 12 +- .../cassandra/CassandraDataSegmentPuller.java | 128 ++++--- .../cassandra/CassandraDataSegmentPusher.java | 2 +- .../cassandra/CassandraDruidModule.java | 43 ++- .../storage/cassandra/CassandraLoadSpec.java | 54 +++ extensions/hdfs-storage/pom.xml | 36 +- .../storage/hdfs/HdfsDataSegmentPuller.java | 317 ++++++++++++++++-- .../storage/hdfs/HdfsDataSegmentPusher.java | 2 +- .../io/druid/storage/hdfs/HdfsLoadSpec.java | 61 ++++ .../storage/hdfs/HdfsStorageDruidModule.java | 31 +- .../loading/HdfsDataSegmentPullerTest.java | 220 ++++++++++++ .../druid/storage/s3/S3DataSegmentPuller.java | 270 ++++++++++++--- .../druid/storage/s3/S3DataSegmentPusher.java | 2 +- .../java/io/druid/storage/s3/S3LoadSpec.java | 90 +++++ .../storage/s3/S3StorageDruidModule.java | 33 +- .../java/io/druid/storage/s3/S3Utils.java | 43 ++- .../indexing/common/SegmentLoaderFactory.java | 6 +- .../indexing/common/TaskToolboxTest.java | 14 +- .../IngestSegmentFirehoseFactoryTest.java | 51 ++- .../indexing/overlord/TaskLifecycleTest.java | 12 +- .../worker/WorkerTaskMonitorTest.java | 13 +- pom.xml | 4 +- .../segment/SegmentMissingException.java | 4 + .../guice/LocalDataStorageDruidModule.java | 55 ++- .../loading/LocalDataSegmentPuller.java | 197 +++++++++-- .../loading/LocalDataSegmentPusher.java | 2 +- .../druid/segment/loading/LocalLoadSpec.java | 64 ++++ ...va => SegmentLoaderLocalCacheManager.java} | 50 ++- .../druid/segment/loading/LoadSpecTest.java | 108 ++++++ .../loading/LocalDataSegmentPullerTest.java | 152 +++++++++ 30 files changed, 1800 insertions(+), 276 deletions(-) create mode 100644 extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java create mode 100644 extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java create mode 100644 extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java create mode 100644 extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java create mode 100644 server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java rename server/src/main/java/io/druid/segment/loading/{OmniSegmentLoader.java => SegmentLoaderLocalCacheManager.java} (79%) create mode 100644 server/src/test/java/io/druid/segment/loading/LoadSpecTest.java create mode 100644 server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java diff --git a/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java b/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java index d1480924c9e2..db300741222a 100644 --- a/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java +++ b/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.google.inject.Injector; import com.google.inject.Key; +import com.metamx.common.IAE; + +import java.lang.reflect.Type; /** */ @@ -36,6 +39,13 @@ public Object findInjectableValue( Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance ) { - return injector.getInstance((Key) valueId); + // From the docs: "Object that identifies value to inject; may be a simple name or more complex identifier object, + // whatever provider needs" + // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with + // great care + if(valueId instanceof Key){ + return injector.getInstance((Key) valueId); + } + throw new IAE("Unknown class type [%s] for valueId [%s]", valueId.getClass().getCanonicalName(), valueId.toString()); } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java index b643ba4f5904..3b33cea6b108 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java @@ -17,79 +17,105 @@ package io.druid.storage.cassandra; -import com.google.common.io.Files; +import com.google.common.base.Predicates; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.ISE; +import com.metamx.common.RetryUtils; import com.metamx.common.logger.Logger; import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ObjectMetadata; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.apache.commons.io.FileUtils; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.Callable; /** * Cassandra Segment Puller - * - * @author boneill42 */ public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller { - private static final Logger log = new Logger(CassandraDataSegmentPuller.class); - private static final int CONCURRENCY = 10; - private static final int BATCH_SIZE = 10; + private static final Logger log = new Logger(CassandraDataSegmentPuller.class); + private static final int CONCURRENCY = 10; + private static final int BATCH_SIZE = 10; @Inject - public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) - { - super(config); - } + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + { + super(config); + } - @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException - { - String key = (String) segment.getLoadSpec().get("key"); - log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); + @Override + public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + { + String key = (String) segment.getLoadSpec().get("key"); + getSegmentFiles(key, outDir); + } + public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles(final String key, final File outDir) throws SegmentLoadingException{ + log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); + if (!outDir.exists()) { + outDir.mkdirs(); + } - if (!outDir.exists()) - { - outDir.mkdirs(); - } + if (!outDir.isDirectory()) { + throw new ISE("outDir[%s] must be a directory.", outDir); + } - if (!outDir.isDirectory()) - { - throw new ISE("outDir[%s] must be a directory.", outDir); - } + long startTime = System.currentTimeMillis(); + final File tmpFile = new File(outDir, "index.zip"); + log.info("Pulling to temporary local cache [%s]", tmpFile.getAbsolutePath()); - long startTime = System.currentTimeMillis(); - ObjectMetadata meta = null; - final File outFile = new File(outDir, "index.zip"); - try - { - try - { - log.info("Writing to [%s]", outFile.getAbsolutePath()); - OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); - meta = ChunkedStorage - .newReader(indexStorage, key, os) - .withBatchSize(BATCH_SIZE) - .withConcurrencyLevel(CONCURRENCY) - .call(); - os.close(); - CompressionUtils.unzip(outFile, outDir); - } catch (Exception e) - { - FileUtils.deleteDirectory(outDir); - } - } catch (Exception e) - { - throw new SegmentLoadingException(e, e.getMessage()); - } - log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, - meta.getObjectSize()); - } + final com.metamx.common.FileUtils.FileCopyResult localResult; + try { + localResult = RetryUtils.retry( + new Callable() + { + @Override + public com.metamx.common.FileUtils.FileCopyResult call() throws Exception + { + try (OutputStream os = new FileOutputStream(tmpFile)) { + final ObjectMetadata meta = ChunkedStorage + .newReader(indexStorage, key, os) + .withBatchSize(BATCH_SIZE) + .withConcurrencyLevel(CONCURRENCY) + .call(); + } + return new com.metamx.common.FileUtils.FileCopyResult(tmpFile); + } + }, + Predicates.alwaysTrue(), + 10 + ); + }catch (Exception e){ + throw new SegmentLoadingException(e, "Unable to copy key [%s] to file [%s]", key, tmpFile.getAbsolutePath()); + } + try{ + final com.metamx.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(tmpFile, outDir); + log.info( + "Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, + result.size() + ); + return result; + } + catch (Exception e) { + try { + FileUtils.deleteDirectory(outDir); + } + catch (IOException e1) { + log.error(e1, "Error clearing segment directory [%s]", outDir.getAbsolutePath()); + e.addSuppressed(e1); + } + throw new SegmentLoadingException(e, e.getMessage()); + } finally { + if(!tmpFile.delete()){ + log.warn("Could not delete cache file at [%s]", tmpFile.getAbsolutePath()); + } + } + } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index a60749850359..06a7dbedffe4 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -21,6 +21,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.recipes.storage.ChunkedStorage; @@ -28,7 +29,6 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import java.io.File; import java.io.FileInputStream; diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java index 0630b645dc25..3d3852cf55d0 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java @@ -17,7 +17,7 @@ package io.druid.storage.cassandra; -import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.core.Version; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; @@ -34,24 +34,47 @@ */ public class CassandraDruidModule implements DruidModule { - @Override - public List getJacksonModules() - { - return ImmutableList.of(); - } + public static final String SCHEME = "c*"; @Override public void configure(Binder binder) { Binders.dataSegmentPullerBinder(binder) - .addBinding("c*") - .to(CassandraDataSegmentPuller.class) - .in(LazySingleton.class); + .addBinding(SCHEME) + .to(CassandraDataSegmentPuller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) - .addBinding("c*") + .addBinding(SCHEME) .to(CassandraDataSegmentPusher.class) .in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", CassandraDataSegmentConfig.class); } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new com.fasterxml.jackson.databind.Module() + { + @Override + public String getModuleName() + { + return "DruidCassandraStorage-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(CassandraLoadSpec.class); + } + } + ); + } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java new file mode 100644 index 000000000000..5159dae172ab --- /dev/null +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java @@ -0,0 +1,54 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cassandra; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.druid.segment.loading.LoadSpec; +import io.druid.segment.loading.SegmentLoadingException; + +import java.io.File; + +/** + * + */ +@JsonTypeName(CassandraDruidModule.SCHEME) +public class CassandraLoadSpec implements LoadSpec +{ + @JsonProperty + private final String key; + private final CassandraDataSegmentPuller puller; + + @JsonCreator + public CassandraLoadSpec( + @JacksonInject CassandraDataSegmentPuller puller, + @JsonProperty("key") String key + ) + { + this.puller = puller; + this.key = key; + } + + @Override + public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(key, outDir).size()); + } +} diff --git a/extensions/hdfs-storage/pom.xml b/extensions/hdfs-storage/pom.xml index fde319f01962..0d487e6e1690 100644 --- a/extensions/hdfs-storage/pom.xml +++ b/extensions/hdfs-storage/pom.xml @@ -63,13 +63,39 @@ commons-io commons-io - + - junit - junit - test + junit + junit + test + + + io.druid + druid-server + ${parent.version} + test + + + org.apache.hadoop + hadoop-hdfs + 2.3.0 + tests + test + + + org.apache.hadoop + hadoop-common + 2.3.0 + tests + test + + + org.apache.hadoop + hadoop-hdfs + 2.3.0 + test - + diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java index 2927cbdafe53..a03a3372f478 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -17,23 +17,144 @@ package io.druid.storage.hdfs; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; +import com.metamx.common.IAE; +import com.metamx.common.RetryUtils; +import com.metamx.common.UOE; +import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import javax.tools.FileObject; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.util.ArrayList; +import java.util.concurrent.Callable; /** */ -public class HdfsDataSegmentPuller implements DataSegmentPuller +public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller { + /** + * FileObject.getLastModified and FileObject.delete don't throw IOException. This allows us to wrap those calls + */ + public static class HdfsIOException extends RuntimeException + { + private final IOException cause; + + public HdfsIOException(IOException ex) + { + super(ex); + this.cause = ex; + } + + protected IOException getIOException() + { + return cause; + } + } + + + public static FileObject buildFileObject(final URI uri, final Configuration config) + { + return buildFileObject(uri, config, false); + } + + public static FileObject buildFileObject(final URI uri, final Configuration config, final Boolean overwrite) + { + return new FileObject() + { + final Path path = new Path(uri); + + @Override + public URI toUri() + { + return uri; + } + + @Override + public String getName() + { + return path.getName(); + } + + @Override + public InputStream openInputStream() throws IOException + { + final FileSystem fs = path.getFileSystem(config); + return fs.open(path); + } + + @Override + public OutputStream openOutputStream() throws IOException + { + final FileSystem fs = path.getFileSystem(config); + return fs.create(path, overwrite); + } + + @Override + public Reader openReader(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("HDFS Reader not supported"); + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("HDFS CharSequence not supported"); + } + + @Override + public Writer openWriter() throws IOException + { + throw new UOE("HDFS Writer not supported"); + } + + @Override + public long getLastModified() + { + try { + final FileSystem fs = path.getFileSystem(config); + return fs.getFileStatus(path).getModificationTime(); + } + catch (IOException ex) { + throw new HdfsIOException(ex); + } + } + + @Override + public boolean delete() + { + try { + final FileSystem fs = path.getFileSystem(config); + return fs.delete(path, false); + } + catch (IOException ex) { + throw new HdfsIOException(ex); + } + } + }; + } + + private static final Logger log = new Logger(HdfsDataSegmentPuller.class); private final Configuration config; @Inject @@ -42,46 +163,190 @@ public HdfsDataSegmentPuller(final Configuration config) this.config = config; } + @Override public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException { - final Path path = getPath(segment); + getSegmentFiles(getPath(segment), dir); + } + + public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException + { + final LocalFileSystem localFileSystem = new LocalFileSystem(); + try { + final FileSystem fs = path.getFileSystem(config); + if (fs.isDirectory(path)) { - final FileSystem fs = checkPathAndGetFilesystem(path); + // -------- directory --------- - if (path.getName().endsWith(".zip")) { - try { - try (FSDataInputStream in = fs.open(path)) { - CompressionUtils.unzip(in, dir); + try { + return RetryUtils.retry( + new Callable() + { + @Override + public FileUtils.FileCopyResult call() throws Exception + { + if (!fs.exists(path)) { + throw new SegmentLoadingException("No files found at [%s]", path.toString()); + } + + final RemoteIterator children = fs.listFiles(path, false); + final ArrayList localChildren = new ArrayList<>(); + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); + while (children.hasNext()) { + final LocatedFileStatus child = children.next(); + final Path childPath = child.getPath(); + final String fname = childPath.getName(); + if (fs.isDirectory(childPath)) { + log.warn("[%s] is a child directory, skipping", childPath.toString()); + } else { + final File outFile = new File(outDir, fname); + + // Actual copy + fs.copyToLocalFile(childPath, new Path(outFile.toURI())); + result.addFile(outFile); + } + } + log.info( + "Copied %d bytes from [%s] to [%s]", + result.size(), + path.toString(), + outDir.getAbsolutePath() + ); + return result; + } + + }, + shouldRetryPredicate(), + 10 + ); } + catch (Exception e) { + throw Throwables.propagate(e); + } + } else if (CompressionUtils.isZip(path.getName())) { + + // -------- zip --------- + + final FileUtils.FileCopyResult result = CompressionUtils.unzip( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return getInputStream(path); + } + }, outDir, shouldRetryPredicate(), false + ); + + log.info( + "Unzipped %d bytes from [%s] to [%s]", + result.size(), + path.toString(), + outDir.getAbsolutePath() + ); + + return result; + } else if (CompressionUtils.isGz(path.getName())) { + + // -------- gzip --------- + + final String fname = path.getName(); + final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname)); + final FileUtils.FileCopyResult result = CompressionUtils.gunzip( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return getInputStream(path); + } + }, + outFile + ); + + log.info( + "Gunzipped %d bytes from [%s] to [%s]", + result.size(), + path.toString(), + outFile.getAbsolutePath() + ); + return result; + } else { + throw new SegmentLoadingException("Do not know how to handle file type at [%s]", path.toString()); } - catch (IOException e) { - throw new SegmentLoadingException(e, "Some IOException"); - } - } else { - throw new SegmentLoadingException("Unknown file type[%s]", path); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Error loading [%s]", path.toString()); } } - private Path getPath(DataSegment segment) + public FileUtils.FileCopyResult getSegmentFiles(URI uri, File outDir) throws SegmentLoadingException { - return new Path(String.valueOf(segment.getLoadSpec().get("path"))); + if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) { + throw new SegmentLoadingException("Don't know how to load SCHEME for URI [%s]", uri.toString()); + } + return getSegmentFiles(new Path(uri), outDir); } - private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException + public InputStream getInputStream(Path path) throws IOException { - FileSystem fs; - try { - fs = path.getFileSystem(config); + return buildFileObject(path.toUri(), config).openInputStream(); + } - if (!fs.exists(path)) { - throw new SegmentLoadingException("Path[%s] doesn't exist.", path); - } + @Override + public InputStream getInputStream(URI uri) throws IOException + { + if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) { + throw new IAE("Don't know how to load SCHEME [%s] for URI [%s]", uri.getScheme(), uri.toString()); + } + return buildFileObject(uri, config).openInputStream(); + } - return fs; + /** + * Return the "version" (aka last modified timestamp) of the URI + * + * @param uri The URI of interest + * + * @return The last modified timestamp of the uri in String format + * + * @throws IOException + */ + @Override + public String getVersion(URI uri) throws IOException + { + try { + return String.format("%d", buildFileObject(uri, config).getLastModified()); } - catch (IOException e) { - throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path); + catch (HdfsIOException ex) { + throw ex.getIOException(); } } + + @Override + public Predicate shouldRetryPredicate() + { + return new Predicate() + { + @Override + public boolean apply(Throwable input) + { + if (input == null) { + return false; + } + if (input instanceof HdfsIOException) { + return true; + } + if (input instanceof IOException) { + return true; + } + return apply(input.getCause()); + } + }; + } + + private Path getPath(DataSegment segment) + { + return new Path(String.valueOf(segment.getLoadSpec().get("path"))); + } } diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index b1168ccfe705..6180b65f83f1 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -22,12 +22,12 @@ import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java new file mode 100644 index 000000000000..9c86846314e4 --- /dev/null +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java @@ -0,0 +1,61 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.hdfs; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.metamx.common.ISE; +import io.druid.segment.loading.DataSegmentPuller; +import io.druid.segment.loading.LoadSpec; +import io.druid.segment.loading.SegmentLoadingException; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.util.Map; + +/** + * + */ +@JsonTypeName(HdfsStorageDruidModule.SCHEME) +public class HdfsLoadSpec implements LoadSpec +{ + private final Path path; + final HdfsDataSegmentPuller puller; + @JsonCreator + public HdfsLoadSpec( + @JacksonInject HdfsDataSegmentPuller puller, + @JsonProperty(value = "path", required = true) String path + ){ + Preconditions.checkNotNull(path); + this.path = new Path(path); + this.puller = puller; + } + @JsonProperty("path") + public final String getPathString(){ + return path.toString(); + } + + @Override + public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(path, outDir).size()); + } +} diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java index fca37d71358c..6707cdf97f5e 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -17,6 +17,7 @@ package io.druid.storage.hdfs; +import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -36,6 +37,7 @@ */ public class HdfsStorageDruidModule implements DruidModule { + public static final String SCHEME = "hdfs"; private Properties props = null; @Inject @@ -47,15 +49,36 @@ public void setProperties(Properties props) @Override public List getJacksonModules() { - return ImmutableList.of(); + return ImmutableList.of( + new Module() + { + @Override + public String getModuleName() + { + return "DruidHDFSStorage-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(HdfsLoadSpec.class); + } + } + ); } @Override public void configure(Binder binder) { - Binders.dataSegmentPullerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class); - Binders.dataSegmentPusherBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPusher.class).in(LazySingleton.class); - Binders.dataSegmentKillerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class); + Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class); final Configuration conf = new Configuration(); if (props != null) { diff --git a/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java b/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java new file mode 100644 index 000000000000..400270efa2ab --- /dev/null +++ b/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java @@ -0,0 +1,220 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.google.common.io.ByteStreams; +import com.metamx.common.CompressionUtils; +import com.metamx.common.StringUtils; +import io.druid.storage.hdfs.HdfsDataSegmentPuller; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.Files; +import java.util.zip.GZIPOutputStream; + +/** + * + */ +public class HdfsDataSegmentPullerTest +{ + private static MiniDFSCluster miniCluster; + private static File hdfsTmpDir; + private static URI uriBase; + private static Path filePath = new Path("/tmp/foo"); + private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum"; + private static byte[] pathByteContents = StringUtils.toUtf8(pathContents); + private static Configuration conf; + + @BeforeClass + public static void setupStatic() throws IOException, ClassNotFoundException + { + hdfsTmpDir = File.createTempFile("hdfsHandlerTest", "dir"); + hdfsTmpDir.deleteOnExit(); + if (!hdfsTmpDir.delete()) { + throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath())); + } + conf = new Configuration(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); + miniCluster = new MiniDFSCluster.Builder(conf).build(); + uriBase = miniCluster.getURI(0); + + final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data"); + tmpFile.delete(); + try { + tmpFile.deleteOnExit(); + Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath()); + try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) { + Files.copy(tmpFile.toPath(), stream); + } + } + finally { + tmpFile.delete(); + } + } + + @AfterClass + public static void tearDownStatic() throws IOException + { + if (miniCluster != null) { + miniCluster.shutdown(true); + } + } + + + private HdfsDataSegmentPuller puller; + + @Before + public void setUp() + { + puller = new HdfsDataSegmentPuller(conf); + } + + @Test + public void testZip() throws IOException, SegmentLoadingException + { + final File tmpDir = com.google.common.io.Files.createTempDir(); + tmpDir.deleteOnExit(); + final File tmpFile = File.createTempFile("zipContents", ".txt", tmpDir); + tmpFile.deleteOnExit(); + + final Path zipPath = new Path("/tmp/testZip.zip"); + + final File outTmpDir = com.google.common.io.Files.createTempDir(); + outTmpDir.deleteOnExit(); + + final URI uri = URI.create(uriBase.toString() + zipPath.toString()); + + tmpFile.deleteOnExit(); + try (final OutputStream stream = new FileOutputStream(tmpFile)) { + ByteStreams.copy(new ByteArrayInputStream(pathByteContents), stream); + } + Assert.assertTrue(tmpFile.exists()); + + final File outFile = new File(outTmpDir, tmpFile.getName()); + outFile.delete(); + + try (final OutputStream stream = miniCluster.getFileSystem().create(zipPath)) { + CompressionUtils.zip(tmpDir, stream); + } + try { + Assert.assertFalse(outFile.exists()); + puller.getSegmentFiles(uri, outTmpDir); + Assert.assertTrue(outFile.exists()); + + Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); + } + finally { + if (tmpFile.exists()) { + tmpFile.delete(); + } + if (outFile.exists()) { + outFile.delete(); + } + if (outTmpDir.exists()) { + outTmpDir.delete(); + } + if (tmpDir.exists()) { + tmpDir.delete(); + } + } + } + + @Test + public void testGZ() throws IOException, SegmentLoadingException + { + final Path zipPath = new Path("/tmp/testZip.gz"); + + final File outTmpDir = com.google.common.io.Files.createTempDir(); + outTmpDir.deleteOnExit(); + final File outFile = new File(outTmpDir, "testZip"); + outFile.delete(); + + final URI uri = URI.create(uriBase.toString() + zipPath.toString()); + + try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) { + try (final OutputStream gzStream = new GZIPOutputStream(outputStream)) { + try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { + ByteStreams.copy(inputStream, gzStream); + } + } + } + try { + Assert.assertFalse(outFile.exists()); + puller.getSegmentFiles(uri, outTmpDir); + Assert.assertTrue(outFile.exists()); + + Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); + } + finally { + if (outFile.exists()) { + outFile.delete(); + } + if (outTmpDir.exists()) { + outTmpDir.delete(); + } + } + } + + @Test + public void testDir() throws IOException, SegmentLoadingException + { + + final Path zipPath = new Path("/tmp/tmp2/test.txt"); + + final File outTmpDir = com.google.common.io.Files.createTempDir(); + outTmpDir.deleteOnExit(); + final File outFile = new File(outTmpDir, "test.txt"); + outFile.delete(); + + final URI uri = URI.create(uriBase.toString() + "/tmp/tmp2"); + + try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) { + try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { + ByteStreams.copy(inputStream, outputStream); + } + } + try { + Assert.assertFalse(outFile.exists()); + puller.getSegmentFiles(uri, outTmpDir); + Assert.assertTrue(outFile.exists()); + + Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); + } + finally { + if (outFile.exists()) { + outFile.delete(); + } + if (outTmpDir.exists()) { + outTmpDir.delete(); + } + } + } +} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index e36bdb9e4ab3..01cf5742c8ac 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -17,37 +17,125 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3URI; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.io.ByteStreams; +import com.google.common.io.ByteSource; import com.google.common.io.Files; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; +import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.MapUtils; +import com.metamx.common.UOE; import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; -import org.apache.commons.io.FileUtils; import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; +import javax.tools.FileObject; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.nio.file.Paths; import java.util.Map; import java.util.concurrent.Callable; -import java.util.zip.GZIPInputStream; /** + * A data segment puller that also hanldes URI data pulls. */ -public class S3DataSegmentPuller implements DataSegmentPuller +public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller { + public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws S3ServiceException + { + final URI checkedUri = checkURI(uri); + final AmazonS3URI s3URI = new AmazonS3URI(checkedUri); + final String key = s3URI.getKey(); + final String bucket = s3URI.getBucket(); + final S3Object s3Obj = s3Client.getObject(bucket, key); + final String path = uri.getPath(); + + return new FileObject() + { + @Override + public URI toUri() + { + return uri; + } + + @Override + public String getName() + { + final String ext = Files.getFileExtension(path); + return Files.getNameWithoutExtension(path) + (Strings.isNullOrEmpty(ext) ? "" : ("." + ext)); + } + + @Override + public InputStream openInputStream() throws IOException + { + try { + return s3Obj.getDataInputStream(); + } + catch (ServiceException e) { + throw new IOException(String.format("Could not load S3 URI [%s]", checkedUri.toString()), e); + } + } + + @Override + public OutputStream openOutputStream() throws IOException + { + throw new UOE("Cannot stream S3 output"); + } + + @Override + public Reader openReader(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("Cannot open reader"); + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("Cannot open character sequence"); + } + + @Override + public Writer openWriter() throws IOException + { + throw new UOE("Cannot open writer"); + } + + @Override + public long getLastModified() + { + return s3Obj.getLastModifiedDate().getTime(); + } + + @Override + public boolean delete() + { + throw new UOE("Cannot delete S3 items anonymously. jetS3t doesn't support authenticated deletes easily."); + } + }; + } + + public static final String scheme = S3StorageDruidModule.SCHEME; + private static final Logger log = new Logger(S3DataSegmentPuller.class); - private static final String BUCKET = "bucket"; - private static final String KEY = "key"; + protected static final String BUCKET = "bucket"; + protected static final String KEY = "key"; private final RestS3Service s3Client; @@ -62,7 +150,12 @@ public S3DataSegmentPuller( @Override public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException { - final S3Coords s3Coords = new S3Coords(segment); + getSegmentFiles(new S3Coords(segment), outDir); + } + + public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) + throws SegmentLoadingException + { log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); @@ -79,63 +172,134 @@ public void getSegmentFiles(final DataSegment segment, final File outDir) throws } try { - S3Utils.retryS3Operation( - new Callable() - { - @Override - public Void call() throws Exception - { - long startTime = System.currentTimeMillis(); - S3Object s3Obj = null; - - try { - s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path); - - try (InputStream in = s3Obj.getDataInputStream()) { - final String key = s3Obj.getKey(); - if (key.endsWith(".zip")) { - CompressionUtils.unzip(in, outDir); - } else if (key.endsWith(".gz")) { - final File outFile = new File(outDir, toFilename(key, ".gz")); - ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); - } else { - ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); - } - log.info( - "Pull of file[%s/%s] completed in %,d millis", - s3Obj.getBucketName(), - s3Obj.getKey(), - System.currentTimeMillis() - startTime - ); - return null; - } - catch (IOException e) { - throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e); - } - } - finally { - S3Utils.closeStreamsQuietly(s3Obj); + final URI uri = URI.create(String.format("s3://%s/%s", s3Coords.bucket, s3Coords.path)); + final ByteSource byteSource = new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + try { + return buildFileObject(uri, s3Client).openInputStream(); + } + catch (ServiceException e) { + if (e.getCause() != null) { + if (S3Utils.S3RETRY.apply(e)) { + throw new IOException("Recoverable exception", e); } } + throw Throwables.propagate(e); } - ); + } + }; + if (CompressionUtils.isZip(s3Coords.path)) { + final FileUtils.FileCopyResult result = CompressionUtils.unzip( + byteSource, + outDir, + S3Utils.S3RETRY, + true + ); + log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath()); + return result; + } + if (CompressionUtils.isGz(s3Coords.path)) { + final String fname = Paths.get(uri).getFileName().toString(); + final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname)); + + final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile); + log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath()); + return result; + } + throw new IAE("Do not know how to load file type at [%s]", uri.toString()); } catch (Exception e) { try { - FileUtils.deleteDirectory(outDir); + org.apache.commons.io.FileUtils.deleteDirectory(outDir); } catch (IOException ioe) { log.warn( ioe, - "Failed to remove output directory for segment[%s] after exception: %s", - segment.getIdentifier(), - outDir + "Failed to remove output directory [%s] for segment pulled from [%s]", + outDir.getAbsolutePath(), + s3Coords.toString() ); } throw new SegmentLoadingException(e, e.getMessage()); } } + public static URI checkURI(URI uri) + { + if (uri.getScheme().equalsIgnoreCase(scheme)) { + uri = URI.create("s3" + uri.toString().substring(scheme.length())); + } else if (!uri.getScheme().equalsIgnoreCase("s3")) { + throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString()); + } + return uri; + } + + @Override + public InputStream getInputStream(URI uri) throws IOException + { + try { + return buildFileObject(uri, s3Client).openInputStream(); + } + catch (ServiceException e) { + throw new IOException(String.format("Could not load URI [%s]", uri.toString()), e); + } + } + + @Override + public Predicate shouldRetryPredicate() + { + // Yay! smart retries! + return new Predicate() + { + @Override + public boolean apply(Throwable e) + { + if (e == null) { + return false; + } + if (e instanceof ServiceException) { + return S3Utils.isServiceExceptionRecoverable((ServiceException) e); + } + if (S3Utils.S3RETRY.apply(e)) { + return true; + } + // Look all the way down the cause chain, just in case something wraps it deep. + return apply(e.getCause()); + } + }; + } + + /** + * Returns the "version" (aka last modified timestamp) of the URI + * + * @param uri The URI to check the last timestamp + * + * @return The time in ms of the last modification of the URI in String format + * + * @throws IOException + */ + @Override + public String getVersion(URI uri) throws IOException + { + try { + return String.format("%d", buildFileObject(uri, s3Client).getLastModified()); + } + catch (S3ServiceException e) { + if (S3Utils.isServiceExceptionRecoverable(e)) { + // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable + throw new IOException( + String.format("Could not fetch last modified timestamp from URI [%s]", uri.toString()), + e + ); + } else { + throw Throwables.propagate(e); + } + } + } + private String toFilename(String key, final String suffix) { String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' @@ -165,7 +329,7 @@ public Boolean call() throws Exception } } - private static class S3Coords + protected static class S3Coords { String bucket; String path; @@ -180,6 +344,12 @@ public S3Coords(DataSegment segment) } } + public S3Coords(String bucket, String key) + { + this.bucket = bucket; + this.path = key; + } + public String toString() { return String.format("s3://%s/%s", bucket, path); diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 78b02e42bbb0..29de26d1b45a 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -23,11 +23,11 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.jets3t.service.ServiceException; import org.jets3t.service.acl.gs.GSAccessControlList; import org.jets3t.service.impl.rest.httpclient.RestS3Service; diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java new file mode 100644 index 000000000000..ae260dbda0cd --- /dev/null +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java @@ -0,0 +1,90 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.elasticbeanstalk.model.S3LocationNotInServiceRegionException; +import com.amazonaws.services.s3.AmazonS3URI; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.RetryUtils; +import com.metamx.common.StreamUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentPuller; +import io.druid.segment.loading.LoadSpec; +import io.druid.segment.loading.SegmentLoadingException; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; + +import javax.swing.text.Segment; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * + */ +@JsonTypeName(S3StorageDruidModule.SCHEME) +public class S3LoadSpec implements LoadSpec +{ + @JsonProperty(S3DataSegmentPuller.BUCKET) + private final String bucket; + @JsonProperty(S3DataSegmentPuller.KEY) + private final String key; + + private final S3DataSegmentPuller puller; + + @JsonCreator + public S3LoadSpec( + @JacksonInject S3DataSegmentPuller puller, + @JsonProperty(S3DataSegmentPuller.BUCKET) String bucket, + @JsonProperty(S3DataSegmentPuller.KEY) String key + ) + { + Preconditions.checkNotNull(bucket); + Preconditions.checkNotNull(key); + this.bucket = bucket; + this.key = key; + this.puller = puller; + } + + @Override + public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size()); + } +} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index c1e7dae641f2..bab2114ff6d3 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -18,6 +18,7 @@ package io.druid.storage.s3; import com.amazonaws.auth.AWSCredentialsProvider; +import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -37,10 +38,32 @@ */ public class S3StorageDruidModule implements DruidModule { + public static final String SCHEME = "s3_zip"; @Override public List getJacksonModules() { - return ImmutableList.of(); + return ImmutableList.of( + new Module() + { + @Override + public String getModuleName() + { + return "DruidS3-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(S3LoadSpec.class); + } + } + ); } @Override @@ -48,10 +71,10 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class); - Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); - Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class); - Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class); - Binders.dataSegmentArchiverBinder(binder).addBinding("s3_zip").to(S3DataSegmentArchiver.class).in(LazySingleton.class); + Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(S3DataSegmentPuller.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(S3DataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index b9fc5441e4fd..fe66e838dab8 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -19,6 +19,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.metamx.common.FileUtils; import com.metamx.common.RetryUtils; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; @@ -51,30 +52,38 @@ public static void closeStreamsQuietly(S3Object s3Obj) } } + public static boolean isServiceExceptionRecoverable(ServiceException ex) + { + final boolean isIOException = ex.getCause() instanceof IOException; + final boolean isTimeout = "RequestTimeout".equals(((ServiceException) ex).getErrorCode()); + return isIOException || isTimeout; + } + + public static final Predicate S3RETRY = new Predicate() + { + @Override + public boolean apply(Throwable e) + { + if (e == null) { + return false; + } else if (e instanceof IOException) { + return true; + } else if (e instanceof ServiceException) { + return isServiceExceptionRecoverable((ServiceException) e); + } else { + return apply(e.getCause()); + } + } + }; + /** * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not * found, etc) are not retried. */ public static T retryS3Operation(Callable f) throws Exception { - final Predicate shouldRetry = new Predicate() - { - @Override - public boolean apply(Throwable e) - { - if (e instanceof IOException) { - return true; - } else if (e instanceof ServiceException) { - final boolean isIOException = e.getCause() instanceof IOException; - final boolean isTimeout = "RequestTimeout".equals(((ServiceException) e).getErrorCode()); - return isIOException || isTimeout; - } else { - return false; - } - } - }; final int maxTries = 10; - return RetryUtils.retry(f, shouldRetry, maxTries); + return RetryUtils.retry(f, S3RETRY, maxTries); } public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java index d14b5548545d..8b60effdfb68 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java @@ -18,7 +18,7 @@ package io.druid.indexing.common; import com.google.inject.Inject; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; @@ -30,11 +30,11 @@ */ public class SegmentLoaderFactory { - private final OmniSegmentLoader loader; + private final SegmentLoaderLocalCacheManager loader; @Inject public SegmentLoaderFactory( - OmniSegmentLoader loader + SegmentLoaderLocalCacheManager loader ) { this.loader = loader; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index f869e56749c0..261d888334e5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -31,7 +31,7 @@ import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoaderConfig; @@ -68,7 +68,7 @@ public class TaskToolboxTest private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class); private ObjectMapper ObjectMapper = new ObjectMapper(); - private OmniSegmentLoader mockOmniSegmentLoader = EasyMock.createMock(OmniSegmentLoader.class); + private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); private Task task = EasyMock.createMock(Task.class); @Rule @@ -93,7 +93,7 @@ public void setUp() throws IOException mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, - new SegmentLoaderFactory(mockOmniSegmentLoader), + new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), ObjectMapper ); } @@ -144,11 +144,11 @@ public void testGetObjectMapper() public void testFetchSegments() throws SegmentLoadingException, IOException { File expectedFile = temporaryFolder.newFile(); - EasyMock.expect(mockOmniSegmentLoader.getSegmentFiles((DataSegment)EasyMock.anyObject())) + EasyMock.expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles((DataSegment)EasyMock.anyObject())) .andReturn(expectedFile).anyTimes(); - EasyMock.expect(mockOmniSegmentLoader.withConfig((SegmentLoaderConfig)EasyMock.anyObject())) - .andReturn(mockOmniSegmentLoader).anyTimes(); - EasyMock.replay(mockOmniSegmentLoader); + EasyMock.expect(mockSegmentLoaderLocalCacheManager.withConfig((SegmentLoaderConfig)EasyMock.anyObject())) + .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); + EasyMock.replay(mockSegmentLoaderLocalCacheManager); DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(new Interval("2012-01-01/P1D")).version("1").size(1).build(); List segments = ImmutableList.of ( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 1650929b1564..2946345115fc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -17,6 +17,11 @@ package io.druid.indexing.firehose; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector; +import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -40,6 +45,7 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; +import io.druid.guice.GuiceInjectors; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; @@ -60,11 +66,11 @@ import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; -import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.LocalLoadSpec; import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; import io.druid.timeline.DataSegment; @@ -169,6 +175,37 @@ public void deleteSegments(Set segments) ts, new TaskActionToolbox(tl, mdc, newMockEmitter()) ); + + final ObjectMapper objectMapper = new DefaultObjectMapper(); + objectMapper.registerModule( + new SimpleModule("testModule").registerSubtypes(LocalLoadSpec.class) + ); + + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + objectMapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair( + guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector() + ), + new AnnotationIntrospectorPair( + guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector() + ) + ); + objectMapper.setInjectableValues( + new GuiceInjectableValues( + GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(LocalDataSegmentPuller.class); + } + } + ) + ) + ) + ); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null), tac, @@ -224,11 +261,7 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException null, // query executor service null, // monitor scheduler new SegmentLoaderFactory( - new OmniSegmentLoader( - ImmutableMap.of( - "local", - new LocalDataSegmentPuller() - ), + new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() { @@ -237,10 +270,10 @@ public List getLocations() { return Lists.newArrayList(); } - } + }, objectMapper ) ), - new DefaultObjectMapper() + objectMapper ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 4a9a64045b54..aaa2e3c878ae 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -68,10 +68,8 @@ import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; -import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; @@ -314,11 +312,7 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException null, // query executor service null, // monitor scheduler new SegmentLoaderFactory( - new OmniSegmentLoader( - ImmutableMap.of( - "local", - new LocalDataSegmentPuller() - ), + new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() { @@ -327,7 +321,7 @@ public List getLocations() { return Lists.newArrayList(); } - } + }, new DefaultObjectMapper() ) ), new DefaultObjectMapper() diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 478ccbaf3283..ddb38fd1d73c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Files; import io.druid.curator.PotentiallyGzippedCompressionProvider; @@ -36,9 +35,7 @@ import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.DataSegmentPuller; -import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.initialization.IndexerZkConfig; @@ -124,11 +121,7 @@ public String getBase() new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0, null), null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( - new OmniSegmentLoader( - ImmutableMap.of( - "local", - new LocalDataSegmentPuller() - ), + new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() { @@ -138,7 +131,7 @@ public List getLocations() return Lists.newArrayList(); } } - ) + , jsonMapper) ), jsonMapper ), null diff --git a/pom.xml b/pom.xml index 33adea93305f..e09d7f80051f 100644 --- a/pom.xml +++ b/pom.xml @@ -65,10 +65,10 @@ - 0.26.15 + 0.27.0 2.7.0 9.2.5.v20141112 - 0.3.5 + 0.3.6 2.4.4 2.2 1.7.10 diff --git a/processing/src/main/java/io/druid/segment/SegmentMissingException.java b/processing/src/main/java/io/druid/segment/SegmentMissingException.java index df1dc269a3b3..f66a8c506170 100644 --- a/processing/src/main/java/io/druid/segment/SegmentMissingException.java +++ b/processing/src/main/java/io/druid/segment/SegmentMissingException.java @@ -24,4 +24,8 @@ public class SegmentMissingException extends ISE public SegmentMissingException(String formatText, Object... arguments) { super(String.format(formatText, arguments)); } + + public SegmentMissingException(Throwable cause, String formatText, Object... arguments){ + super(cause, formatText, arguments); + } } diff --git a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java index 0015ccdf325e..295ef0cdc215 100644 --- a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java +++ b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java @@ -17,26 +17,34 @@ package io.druid.guice; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; -import com.google.inject.Module; +import io.druid.initialization.DruidModule; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentKiller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusherConfig; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.LocalLoadSpec; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoader; +import java.util.List; + /** */ -public class LocalDataStorageDruidModule implements Module +public class LocalDataStorageDruidModule implements DruidModule { + public static final String SCHEME = "local"; + @Override public void configure(Binder binder) { - binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class); + binder.bind(SegmentLoader.class).to(SegmentLoaderLocalCacheManager.class).in(LazySingleton.class); bindDeepStorageLocal(binder); @@ -48,14 +56,14 @@ public void configure(Binder binder) private static void bindDeepStorageLocal(Binder binder) { Binders.dataSegmentPullerBinder(binder) - .addBinding("local") - .to(LocalDataSegmentPuller.class) - .in(LazySingleton.class); + .addBinding(SCHEME) + .to(LocalDataSegmentPuller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class)) - .addBinding("local") - .to(LocalDataSegmentKiller.class) - .in(LazySingleton.class); + .addBinding(SCHEME) + .to(LocalDataSegmentKiller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) .addBinding("local") @@ -63,4 +71,31 @@ private static void bindDeepStorageLocal(Binder binder) .in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class); } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new com.fasterxml.jackson.databind.Module() + { + @Override + public String getModuleName() + { + return "DruidLocalStorage-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(LocalLoadSpec.class); + } + } + ); + } } diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java index 58873844e003..b56cd9db8f24 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java @@ -17,58 +17,207 @@ package io.druid.segment.loading; +import com.google.common.base.Predicate; import com.google.common.io.Files; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; import com.metamx.common.MapUtils; +import com.metamx.common.UOE; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; +import javax.tools.FileObject; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Map; /** */ -public class LocalDataSegmentPuller implements DataSegmentPuller +public class LocalDataSegmentPuller implements DataSegmentPuller, URIDataPuller { + public static FileObject buildFileObject(final URI uri) + { + final Path path = Paths.get(uri); + final File file = path.toFile(); + return new FileObject() + { + @Override + public URI toUri() + { + return uri; + } + + @Override + public String getName() + { + return path.getFileName().toString(); + } + + @Override + public InputStream openInputStream() throws IOException + { + return new FileInputStream(file); + } + + @Override + public OutputStream openOutputStream() throws IOException + { + return new FileOutputStream(file); + } + + @Override + public Reader openReader(boolean ignoreEncodingErrors) throws IOException + { + return new FileReader(file); + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("CharSequence not supported"); + } + + @Override + public Writer openWriter() throws IOException + { + return new FileWriter(file); + } + + @Override + public long getLastModified() + { + return file.lastModified(); + } + + @Override + public boolean delete() + { + return file.delete(); + } + }; + } + private static final Logger log = new Logger(LocalDataSegmentPuller.class); @Override public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException { - final File path = getFile(segment); + getSegmentFiles(getFile(segment), dir); + } - if (path.isDirectory()) { - if (path.equals(dir)) { + public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException + { + if (sourceFile.isDirectory()) { + if (sourceFile.equals(dir)) { log.info("Asked to load [%s] into itself, done!", dir); - return; + return new FileUtils.FileCopyResult(sourceFile); } - log.info("Copying files from [%s] to [%s]", path, dir); - File file = null; - try { - final File[] files = path.listFiles(); - for (int i = 0; i < files.length; ++i) { - file = files[i]; - Files.copy(file, new File(dir, file.getName())); - } - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to copy file[%s].", file); - } - } else { - if (!path.getName().endsWith(".zip")) { - throw new SegmentLoadingException("File is not a zip file[%s]", path); + final File[] files = sourceFile.listFiles(); + if (files == null) { + throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath()); } + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(sourceFile); + for (final File oldFile : files) { + if (oldFile.isDirectory()) { + log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath()); + continue; + } - log.info("Unzipping local file[%s] to [%s]", path, dir); + result.addFiles( + FileUtils.retryCopy( + Files.asByteSource(oldFile), + new File(dir, oldFile.getName()), + shouldRetryPredicate(), + 10 + ).getFiles() + ); + } + log.info( + "Coppied %d bytes from [%s] to [%s]", + result.size(), + sourceFile.getAbsolutePath(), + dir.getAbsolutePath() + ); + return result; + } + if (CompressionUtils.isZip(sourceFile.getName())) { try { - CompressionUtils.unzip(path, dir); + final FileUtils.FileCopyResult result = CompressionUtils.unzip( + Files.asByteSource(sourceFile), + dir, + shouldRetryPredicate(), + false + ); + log.info( + "Unzipped %d bytes from [%s] to [%s]", + result.size(), + sourceFile.getAbsolutePath(), + dir.getAbsolutePath() + ); + return result; } catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to unzip file[%s]", path); + throw new SegmentLoadingException(e, "Unable to unzip file [%s]", sourceFile.getAbsolutePath()); } } + if (CompressionUtils.isGz(sourceFile.getName())) { + final File outFile = new File(dir, CompressionUtils.getGzBaseName(sourceFile.getName())); + final FileUtils.FileCopyResult result = CompressionUtils.gunzip( + Files.asByteSource(sourceFile), + outFile, + shouldRetryPredicate() + ); + log.info( + "Gunzipped %d bytes from [%s] to [%s]", + result.size(), + sourceFile.getAbsolutePath(), + outFile.getAbsolutePath() + ); + return result; + } + throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath()); + } + + + @Override + public InputStream getInputStream(URI uri) throws IOException + { + return buildFileObject(uri).openInputStream(); + } + + /** + * Returns the "version" (aka last modified timestamp) of the URI of interest + * + * @param uri The URI to check the last modified timestamp + * + * @return The last modified timestamp in ms of the URI in String format + */ + @Override + public String getVersion(URI uri) + { + return String.format("%d", buildFileObject(uri).getLastModified()); + } + + @Override + public Predicate shouldRetryPredicate() + { + // It would be nice if there were better logic for smarter retries. For example: If the error is that the file is + // not found, there's only so much that retries would do (unless the file was temporarily absent for some reason). + // Since this is not a commonly used puller in production, and in general is more useful in testing/debugging, + // I do not have a good sense of what kind of Exceptions people would expect to encounter in the wild + return FileUtils.IS_EXCEPTION; } private File getFile(DataSegment segment) throws SegmentLoadingException diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index b55ac34fcc02..17b5db0ebf3f 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -22,10 +22,10 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import java.io.File; import java.io.IOException; diff --git a/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java b/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java new file mode 100644 index 000000000000..b0ab3a713293 --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java @@ -0,0 +1,64 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.api.client.util.Preconditions; +import io.druid.guice.LocalDataStorageDruidModule; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * + */ +@JsonTypeName(LocalDataStorageDruidModule.SCHEME) +public class LocalLoadSpec implements LoadSpec +{ + private final Path path; + private final LocalDataSegmentPuller puller; + + @JsonCreator + public LocalLoadSpec( + @JacksonInject LocalDataSegmentPuller puller, + @JsonProperty(value = "path", required = true) final String path + ) + { + Preconditions.checkNotNull(path); + this.path = Paths.get(path); + Preconditions.checkArgument(Files.exists(Paths.get(path)), "[%s] does not exist", path); + this.puller = puller; + } + + @JsonProperty + public String getPath() + { + return path.toString(); + } + + @Override + public LoadSpecResult loadSegment(final File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(path.toFile(), outDir).size()); + } +} diff --git a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java similarity index 79% rename from server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java rename to server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index aecd38202345..570a50f6cc58 100644 --- a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -17,11 +17,12 @@ package io.druid.segment.loading; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.inject.Inject; import com.metamx.common.ISE; -import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; +import io.druid.guice.annotations.Json; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; @@ -32,32 +33,31 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; -import java.util.Map; /** */ -public class OmniSegmentLoader implements SegmentLoader +public class SegmentLoaderLocalCacheManager implements SegmentLoader { - private static final Logger log = new Logger(OmniSegmentLoader.class); + private static final Logger log = new Logger(SegmentLoaderLocalCacheManager.class); - private final Map pullers; private final QueryableIndexFactory factory; private final SegmentLoaderConfig config; + private final ObjectMapper jsonMapper; private final List locations; private final Object lock = new Object(); @Inject - public OmniSegmentLoader( - Map pullers, + public SegmentLoaderLocalCacheManager( QueryableIndexFactory factory, - SegmentLoaderConfig config + SegmentLoaderConfig config, + @Json ObjectMapper mapper ) { - this.pullers = pullers; this.factory = factory; this.config = config; + this.jsonMapper = mapper; this.locations = Lists.newArrayList(); for (StorageLocationConfig locationConfig : config.getLocations()) { @@ -65,9 +65,9 @@ public OmniSegmentLoader( } } - public OmniSegmentLoader withConfig(SegmentLoaderConfig config) + public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config) { - return new OmniSegmentLoader(pullers, factory, config); + return new SegmentLoaderLocalCacheManager(factory, config, jsonMapper); } @Override @@ -127,22 +127,26 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException log.debug("Unable to make parent file[%s]", storageDir); } try { - downloadStartMarker.createNewFile(); + if (!downloadStartMarker.createNewFile()) { + throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir); + } } catch (IOException e) { - throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir); + throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir); } } - getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir); + // LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies. + final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class); + final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir); + if(result.getSize() != segment.getSize()){ + log.warn("Segment [%s] is different than expected size. Expected [%d] found [%d]", segment.getIdentifier(), segment.getSize(), result.getSize()); + } if (!downloadStartMarker.delete()) { throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir); } - - loc.addSegment(segment); - retVal = storageDir; } else { retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); @@ -179,18 +183,6 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException } } - private DataSegmentPuller getPuller(Map loadSpec) throws SegmentLoadingException - { - String type = MapUtils.getString(loadSpec, "type"); - DataSegmentPuller loader = pullers.get(type); - - if (loader == null) { - throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, pullers.keySet()); - } - - return loader; - } - public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException { if (cacheFile.equals(baseFile)) { diff --git a/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java b/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java new file mode 100644 index 000000000000..8a8f8703070e --- /dev/null +++ b/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java @@ -0,0 +1,108 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector; +import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.metamx.common.IAE; +import io.druid.guice.GuiceInjectors; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; + +/** + * + */ +@RunWith(Parameterized.class) +public class LoadSpecTest +{ + @Parameterized.Parameters + public static Collection getParameters() + { + return ImmutableList.of( + new Object[]{"{\"path\":\"/\",\"type\":\"local\"}", "local"} + ); + } + + private final String value; + private final String expectedId; + + public LoadSpecTest(String value, String expectedId) + { + this.value = value; + this.expectedId = expectedId; + } + + private static ObjectMapper mapper; + + @BeforeClass + public static void setUp() + { + final Injector injector = GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(LocalDataSegmentPuller.class); + } + } + ) + ); + mapper = new DefaultObjectMapper(); + mapper.registerModule( new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class)); + mapper.setInjectableValues(new GuiceInjectableValues(injector)); + + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + mapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair( + guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector() + ), + new AnnotationIntrospectorPair( + guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector() + ) + ); + } + + @Test + public void testStringResolve() throws IOException + { + LoadSpec loadSpec = mapper.readValue(value, LoadSpec.class); + Assert.assertEquals(expectedId, loadSpec.getClass().getAnnotation(JsonTypeName.class).value()); + } +} diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java new file mode 100644 index 000000000000..55a2742c91ea --- /dev/null +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java @@ -0,0 +1,152 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.metamx.common.CompressionUtils; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.zip.GZIPOutputStream; + +/** + * + */ +public class LocalDataSegmentPullerTest +{ + private File tmpDir; + private LocalDataSegmentPuller puller; + + @Before + public void setup() + { + tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + puller = new LocalDataSegmentPuller(); + } + + @After + public void tearDown() throws IOException + { + deleteFiles(tmpDir); + } + + public static void deleteFiles(File... files) throws IOException + { + IOException ex = null; + for (File file : files) { + if (file == null || !file.exists()) { + continue; + } + if (!file.delete()) { + IOException e = new IOException("Could not delete " + file.getAbsolutePath()); + if (ex == null) { + ex = e; + } else { + ex.addSuppressed(e); + } + } + } + if (ex != null) { + throw ex; + } + } + + @Test + public void simpleZipTest() throws IOException, SegmentLoadingException + { + File file = new File(tmpDir, "test1data"); + File zipFile = File.createTempFile("ziptest", ".zip"); + file.deleteOnExit(); + zipFile.deleteOnExit(); + zipFile.delete(); + try { + try (OutputStream outputStream = new FileOutputStream(file)) { + outputStream.write(new byte[0]); + outputStream.flush(); + } + CompressionUtils.zip(tmpDir, zipFile); + file.delete(); + + Assert.assertFalse(file.exists()); + Assert.assertTrue(zipFile.exists()); + puller.getSegmentFiles(zipFile, tmpDir); + Assert.assertTrue(file.exists()); + } + finally { + deleteFiles(file, zipFile); + } + } + + @Test + public void simpleGZTest() throws IOException, SegmentLoadingException + { + File zipFile = File.createTempFile("gztest", ".gz"); + File unZipFile = new File( + tmpDir, + Files.getNameWithoutExtension( + zipFile.getAbsolutePath() + ) + ); + unZipFile.delete(); + zipFile.deleteOnExit(); + zipFile.delete(); + try { + try (OutputStream fOutStream = new FileOutputStream(zipFile)) { + try (OutputStream outputStream = new GZIPOutputStream(fOutStream)) { + outputStream.write(new byte[0]); + outputStream.flush(); + } + } + + Assert.assertTrue(zipFile.exists()); + Assert.assertFalse(unZipFile.exists()); + puller.getSegmentFiles(zipFile, tmpDir); + Assert.assertTrue(unZipFile.exists()); + }finally{ + deleteFiles(zipFile, unZipFile); + } + } + + @Test + public void simpleDirectoryTest() throws IOException, SegmentLoadingException + { + File srcDir = Files.createTempDir(); + File tmpFile = File.createTempFile("test", "file", srcDir); + File expectedOutput = new File(tmpDir, Files.getNameWithoutExtension(tmpFile.getAbsolutePath())); + expectedOutput.delete(); + try{ + Assert.assertFalse(expectedOutput.exists()); + puller.getSegmentFiles(srcDir, tmpDir); + Assert.assertTrue(expectedOutput.exists()); + }finally{ + deleteFiles(expectedOutput, tmpFile, srcDir); + } + } +} +