diff --git a/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java b/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java index db300741222a..d1480924c9e2 100644 --- a/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java +++ b/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java @@ -22,9 +22,6 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.google.inject.Injector; import com.google.inject.Key; -import com.metamx.common.IAE; - -import java.lang.reflect.Type; /** */ @@ -39,13 +36,6 @@ public Object findInjectableValue( Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance ) { - // From the docs: "Object that identifies value to inject; may be a simple name or more complex identifier object, - // whatever provider needs" - // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with - // great care - if(valueId instanceof Key){ - return injector.getInstance((Key) valueId); - } - throw new IAE("Unknown class type [%s] for valueId [%s]", valueId.getClass().getCanonicalName(), valueId.toString()); + return injector.getInstance((Key) valueId); } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java index 3b33cea6b108..b643ba4f5904 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java @@ -17,105 +17,79 @@ package io.druid.storage.cassandra; -import com.google.common.base.Predicates; +import com.google.common.io.Files; import com.google.inject.Inject; -import com.metamx.common.CompressionUtils; import com.metamx.common.ISE; -import com.metamx.common.RetryUtils; import com.metamx.common.logger.Logger; import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ObjectMetadata; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; import org.apache.commons.io.FileUtils; import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; import java.io.OutputStream; -import java.util.concurrent.Callable; /** * Cassandra Segment Puller + * + * @author boneill42 */ public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller { - private static final Logger log = new Logger(CassandraDataSegmentPuller.class); - private static final int CONCURRENCY = 10; - private static final int BATCH_SIZE = 10; + private static final Logger log = new Logger(CassandraDataSegmentPuller.class); + private static final int CONCURRENCY = 10; + private static final int BATCH_SIZE = 10; @Inject - public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) - { - super(config); - } + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + { + super(config); + } - @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException - { - String key = (String) segment.getLoadSpec().get("key"); - getSegmentFiles(key, outDir); - } - public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles(final String key, final File outDir) throws SegmentLoadingException{ - log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); - if (!outDir.exists()) { - outDir.mkdirs(); - } + @Override + public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + { + String key = (String) segment.getLoadSpec().get("key"); + log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); - if (!outDir.isDirectory()) { - throw new ISE("outDir[%s] must be a directory.", outDir); - } + if (!outDir.exists()) + { + outDir.mkdirs(); + } - long startTime = System.currentTimeMillis(); - final File tmpFile = new File(outDir, "index.zip"); - log.info("Pulling to temporary local cache [%s]", tmpFile.getAbsolutePath()); + if (!outDir.isDirectory()) + { + throw new ISE("outDir[%s] must be a directory.", outDir); + } - final com.metamx.common.FileUtils.FileCopyResult localResult; - try { - localResult = RetryUtils.retry( - new Callable() - { - @Override - public com.metamx.common.FileUtils.FileCopyResult call() throws Exception - { - try (OutputStream os = new FileOutputStream(tmpFile)) { - final ObjectMetadata meta = ChunkedStorage - .newReader(indexStorage, key, os) - .withBatchSize(BATCH_SIZE) - .withConcurrencyLevel(CONCURRENCY) - .call(); - } - return new com.metamx.common.FileUtils.FileCopyResult(tmpFile); - } - }, - Predicates.alwaysTrue(), - 10 - ); - }catch (Exception e){ - throw new SegmentLoadingException(e, "Unable to copy key [%s] to file [%s]", key, tmpFile.getAbsolutePath()); - } - try{ - final com.metamx.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(tmpFile, outDir); - log.info( - "Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, - result.size() - ); - return result; - } - catch (Exception e) { - try { - FileUtils.deleteDirectory(outDir); - } - catch (IOException e1) { - log.error(e1, "Error clearing segment directory [%s]", outDir.getAbsolutePath()); - e.addSuppressed(e1); - } - throw new SegmentLoadingException(e, e.getMessage()); - } finally { - if(!tmpFile.delete()){ - log.warn("Could not delete cache file at [%s]", tmpFile.getAbsolutePath()); - } - } - } + long startTime = System.currentTimeMillis(); + ObjectMetadata meta = null; + final File outFile = new File(outDir, "index.zip"); + try + { + try + { + log.info("Writing to [%s]", outFile.getAbsolutePath()); + OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); + meta = ChunkedStorage + .newReader(indexStorage, key, os) + .withBatchSize(BATCH_SIZE) + .withConcurrencyLevel(CONCURRENCY) + .call(); + os.close(); + CompressionUtils.unzip(outFile, outDir); + } catch (Exception e) + { + FileUtils.deleteDirectory(outDir); + } + } catch (Exception e) + { + throw new SegmentLoadingException(e, e.getMessage()); + } + log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, + meta.getObjectSize()); + } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 06a7dbedffe4..a60749850359 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -21,7 +21,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; -import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.recipes.storage.ChunkedStorage; @@ -29,6 +28,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; import java.io.File; import java.io.FileInputStream; diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java index 3d3852cf55d0..0630b645dc25 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java @@ -17,7 +17,7 @@ package io.druid.storage.cassandra; -import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; @@ -34,47 +34,24 @@ */ public class CassandraDruidModule implements DruidModule { - public static final String SCHEME = "c*"; + @Override + public List getJacksonModules() + { + return ImmutableList.of(); + } @Override public void configure(Binder binder) { Binders.dataSegmentPullerBinder(binder) - .addBinding(SCHEME) - .to(CassandraDataSegmentPuller.class) - .in(LazySingleton.class); + .addBinding("c*") + .to(CassandraDataSegmentPuller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) - .addBinding(SCHEME) + .addBinding("c*") .to(CassandraDataSegmentPusher.class) .in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", CassandraDataSegmentConfig.class); } - - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new com.fasterxml.jackson.databind.Module() - { - @Override - public String getModuleName() - { - return "DruidCassandraStorage-" + System.identityHashCode(this); - } - - @Override - public Version version() - { - return Version.unknownVersion(); - } - - @Override - public void setupModule(SetupContext context) - { - context.registerSubtypes(CassandraLoadSpec.class); - } - } - ); - } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java deleted file mode 100644 index 5159dae172ab..000000000000 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.storage.cassandra; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import io.druid.segment.loading.LoadSpec; -import io.druid.segment.loading.SegmentLoadingException; - -import java.io.File; - -/** - * - */ -@JsonTypeName(CassandraDruidModule.SCHEME) -public class CassandraLoadSpec implements LoadSpec -{ - @JsonProperty - private final String key; - private final CassandraDataSegmentPuller puller; - - @JsonCreator - public CassandraLoadSpec( - @JacksonInject CassandraDataSegmentPuller puller, - @JsonProperty("key") String key - ) - { - this.puller = puller; - this.key = key; - } - - @Override - public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException - { - return new LoadSpecResult(puller.getSegmentFiles(key, outDir).size()); - } -} diff --git a/extensions/hdfs-storage/pom.xml b/extensions/hdfs-storage/pom.xml index 0d487e6e1690..fde319f01962 100644 --- a/extensions/hdfs-storage/pom.xml +++ b/extensions/hdfs-storage/pom.xml @@ -63,39 +63,13 @@ commons-io commons-io - + - junit - junit - test - - - io.druid - druid-server - ${parent.version} - test - - - org.apache.hadoop - hadoop-hdfs - 2.3.0 - tests - test - - - org.apache.hadoop - hadoop-common - 2.3.0 - tests - test - - - org.apache.hadoop - hadoop-hdfs - 2.3.0 - test + junit + junit + test - + diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java index a03a3372f478..2927cbdafe53 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -17,144 +17,23 @@ package io.druid.storage.hdfs; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; -import com.google.common.io.ByteSource; import com.google.inject.Inject; -import com.metamx.common.CompressionUtils; -import com.metamx.common.FileUtils; -import com.metamx.common.IAE; -import com.metamx.common.RetryUtils; -import com.metamx.common.UOE; -import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; -import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import javax.tools.FileObject; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; -import java.io.Writer; -import java.net.URI; -import java.util.ArrayList; -import java.util.concurrent.Callable; /** */ -public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller +public class HdfsDataSegmentPuller implements DataSegmentPuller { - /** - * FileObject.getLastModified and FileObject.delete don't throw IOException. This allows us to wrap those calls - */ - public static class HdfsIOException extends RuntimeException - { - private final IOException cause; - - public HdfsIOException(IOException ex) - { - super(ex); - this.cause = ex; - } - - protected IOException getIOException() - { - return cause; - } - } - - - public static FileObject buildFileObject(final URI uri, final Configuration config) - { - return buildFileObject(uri, config, false); - } - - public static FileObject buildFileObject(final URI uri, final Configuration config, final Boolean overwrite) - { - return new FileObject() - { - final Path path = new Path(uri); - - @Override - public URI toUri() - { - return uri; - } - - @Override - public String getName() - { - return path.getName(); - } - - @Override - public InputStream openInputStream() throws IOException - { - final FileSystem fs = path.getFileSystem(config); - return fs.open(path); - } - - @Override - public OutputStream openOutputStream() throws IOException - { - final FileSystem fs = path.getFileSystem(config); - return fs.create(path, overwrite); - } - - @Override - public Reader openReader(boolean ignoreEncodingErrors) throws IOException - { - throw new UOE("HDFS Reader not supported"); - } - - @Override - public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException - { - throw new UOE("HDFS CharSequence not supported"); - } - - @Override - public Writer openWriter() throws IOException - { - throw new UOE("HDFS Writer not supported"); - } - - @Override - public long getLastModified() - { - try { - final FileSystem fs = path.getFileSystem(config); - return fs.getFileStatus(path).getModificationTime(); - } - catch (IOException ex) { - throw new HdfsIOException(ex); - } - } - - @Override - public boolean delete() - { - try { - final FileSystem fs = path.getFileSystem(config); - return fs.delete(path, false); - } - catch (IOException ex) { - throw new HdfsIOException(ex); - } - } - }; - } - - private static final Logger log = new Logger(HdfsDataSegmentPuller.class); private final Configuration config; @Inject @@ -163,190 +42,46 @@ public HdfsDataSegmentPuller(final Configuration config) this.config = config; } - @Override public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException { - getSegmentFiles(getPath(segment), dir); - } - - public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException - { - final LocalFileSystem localFileSystem = new LocalFileSystem(); - try { - final FileSystem fs = path.getFileSystem(config); - if (fs.isDirectory(path)) { + final Path path = getPath(segment); - // -------- directory --------- + final FileSystem fs = checkPathAndGetFilesystem(path); - try { - return RetryUtils.retry( - new Callable() - { - @Override - public FileUtils.FileCopyResult call() throws Exception - { - if (!fs.exists(path)) { - throw new SegmentLoadingException("No files found at [%s]", path.toString()); - } - - final RemoteIterator children = fs.listFiles(path, false); - final ArrayList localChildren = new ArrayList<>(); - final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); - while (children.hasNext()) { - final LocatedFileStatus child = children.next(); - final Path childPath = child.getPath(); - final String fname = childPath.getName(); - if (fs.isDirectory(childPath)) { - log.warn("[%s] is a child directory, skipping", childPath.toString()); - } else { - final File outFile = new File(outDir, fname); - - // Actual copy - fs.copyToLocalFile(childPath, new Path(outFile.toURI())); - result.addFile(outFile); - } - } - log.info( - "Copied %d bytes from [%s] to [%s]", - result.size(), - path.toString(), - outDir.getAbsolutePath() - ); - return result; - } - - }, - shouldRetryPredicate(), - 10 - ); + if (path.getName().endsWith(".zip")) { + try { + try (FSDataInputStream in = fs.open(path)) { + CompressionUtils.unzip(in, dir); } - catch (Exception e) { - throw Throwables.propagate(e); - } - } else if (CompressionUtils.isZip(path.getName())) { - - // -------- zip --------- - - final FileUtils.FileCopyResult result = CompressionUtils.unzip( - new ByteSource() - { - @Override - public InputStream openStream() throws IOException - { - return getInputStream(path); - } - }, outDir, shouldRetryPredicate(), false - ); - - log.info( - "Unzipped %d bytes from [%s] to [%s]", - result.size(), - path.toString(), - outDir.getAbsolutePath() - ); - - return result; - } else if (CompressionUtils.isGz(path.getName())) { - - // -------- gzip --------- - - final String fname = path.getName(); - final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname)); - final FileUtils.FileCopyResult result = CompressionUtils.gunzip( - new ByteSource() - { - @Override - public InputStream openStream() throws IOException - { - return getInputStream(path); - } - }, - outFile - ); - - log.info( - "Gunzipped %d bytes from [%s] to [%s]", - result.size(), - path.toString(), - outFile.getAbsolutePath() - ); - return result; - } else { - throw new SegmentLoadingException("Do not know how to handle file type at [%s]", path.toString()); } + catch (IOException e) { + throw new SegmentLoadingException(e, "Some IOException"); + } + } else { + throw new SegmentLoadingException("Unknown file type[%s]", path); } - catch (IOException e) { - throw new SegmentLoadingException(e, "Error loading [%s]", path.toString()); - } - } - - public FileUtils.FileCopyResult getSegmentFiles(URI uri, File outDir) throws SegmentLoadingException - { - if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) { - throw new SegmentLoadingException("Don't know how to load SCHEME for URI [%s]", uri.toString()); - } - return getSegmentFiles(new Path(uri), outDir); - } - - public InputStream getInputStream(Path path) throws IOException - { - return buildFileObject(path.toUri(), config).openInputStream(); } - @Override - public InputStream getInputStream(URI uri) throws IOException + private Path getPath(DataSegment segment) { - if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) { - throw new IAE("Don't know how to load SCHEME [%s] for URI [%s]", uri.getScheme(), uri.toString()); - } - return buildFileObject(uri, config).openInputStream(); + return new Path(String.valueOf(segment.getLoadSpec().get("path"))); } - /** - * Return the "version" (aka last modified timestamp) of the URI - * - * @param uri The URI of interest - * - * @return The last modified timestamp of the uri in String format - * - * @throws IOException - */ - @Override - public String getVersion(URI uri) throws IOException + private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException { + FileSystem fs; try { - return String.format("%d", buildFileObject(uri, config).getLastModified()); - } - catch (HdfsIOException ex) { - throw ex.getIOException(); - } - } + fs = path.getFileSystem(config); - @Override - public Predicate shouldRetryPredicate() - { - return new Predicate() - { - @Override - public boolean apply(Throwable input) - { - if (input == null) { - return false; - } - if (input instanceof HdfsIOException) { - return true; - } - if (input instanceof IOException) { - return true; - } - return apply(input.getCause()); + if (!fs.exists(path)) { + throw new SegmentLoadingException("Path[%s] doesn't exist.", path); } - }; - } - private Path getPath(DataSegment segment) - { - return new Path(String.valueOf(segment.getLoadSpec().get("path"))); + return fs; + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path); + } } } diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 6180b65f83f1..b1168ccfe705 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -22,12 +22,12 @@ import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.inject.Inject; -import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java deleted file mode 100644 index 9c86846314e4..000000000000 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.storage.hdfs; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import com.metamx.common.ISE; -import io.druid.segment.loading.DataSegmentPuller; -import io.druid.segment.loading.LoadSpec; -import io.druid.segment.loading.SegmentLoadingException; -import org.apache.hadoop.fs.Path; - -import java.io.File; -import java.util.Map; - -/** - * - */ -@JsonTypeName(HdfsStorageDruidModule.SCHEME) -public class HdfsLoadSpec implements LoadSpec -{ - private final Path path; - final HdfsDataSegmentPuller puller; - @JsonCreator - public HdfsLoadSpec( - @JacksonInject HdfsDataSegmentPuller puller, - @JsonProperty(value = "path", required = true) String path - ){ - Preconditions.checkNotNull(path); - this.path = new Path(path); - this.puller = puller; - } - @JsonProperty("path") - public final String getPathString(){ - return path.toString(); - } - - @Override - public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException - { - return new LoadSpecResult(puller.getSegmentFiles(path, outDir).size()); - } -} diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java index 6707cdf97f5e..fca37d71358c 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -17,7 +17,6 @@ package io.druid.storage.hdfs; -import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -37,7 +36,6 @@ */ public class HdfsStorageDruidModule implements DruidModule { - public static final String SCHEME = "hdfs"; private Properties props = null; @Inject @@ -49,36 +47,15 @@ public void setProperties(Properties props) @Override public List getJacksonModules() { - return ImmutableList.of( - new Module() - { - @Override - public String getModuleName() - { - return "DruidHDFSStorage-" + System.identityHashCode(this); - } - - @Override - public Version version() - { - return Version.unknownVersion(); - } - - @Override - public void setupModule(SetupContext context) - { - context.registerSubtypes(HdfsLoadSpec.class); - } - } - ); + return ImmutableList.of(); } @Override public void configure(Binder binder) { - Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class); - Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class); - Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentPullerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class); + Binders.dataSegmentPusherBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPusher.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentKiller.class).in(LazySingleton.class); final Configuration conf = new Configuration(); if (props != null) { diff --git a/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java b/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java deleted file mode 100644 index 400270efa2ab..000000000000 --- a/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.segment.loading; - -import com.google.common.io.ByteStreams; -import com.metamx.common.CompressionUtils; -import com.metamx.common.StringUtils; -import io.druid.storage.hdfs.HdfsDataSegmentPuller; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.nio.file.Files; -import java.util.zip.GZIPOutputStream; - -/** - * - */ -public class HdfsDataSegmentPullerTest -{ - private static MiniDFSCluster miniCluster; - private static File hdfsTmpDir; - private static URI uriBase; - private static Path filePath = new Path("/tmp/foo"); - private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum"; - private static byte[] pathByteContents = StringUtils.toUtf8(pathContents); - private static Configuration conf; - - @BeforeClass - public static void setupStatic() throws IOException, ClassNotFoundException - { - hdfsTmpDir = File.createTempFile("hdfsHandlerTest", "dir"); - hdfsTmpDir.deleteOnExit(); - if (!hdfsTmpDir.delete()) { - throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath())); - } - conf = new Configuration(true); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); - miniCluster = new MiniDFSCluster.Builder(conf).build(); - uriBase = miniCluster.getURI(0); - - final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data"); - tmpFile.delete(); - try { - tmpFile.deleteOnExit(); - Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath()); - try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) { - Files.copy(tmpFile.toPath(), stream); - } - } - finally { - tmpFile.delete(); - } - } - - @AfterClass - public static void tearDownStatic() throws IOException - { - if (miniCluster != null) { - miniCluster.shutdown(true); - } - } - - - private HdfsDataSegmentPuller puller; - - @Before - public void setUp() - { - puller = new HdfsDataSegmentPuller(conf); - } - - @Test - public void testZip() throws IOException, SegmentLoadingException - { - final File tmpDir = com.google.common.io.Files.createTempDir(); - tmpDir.deleteOnExit(); - final File tmpFile = File.createTempFile("zipContents", ".txt", tmpDir); - tmpFile.deleteOnExit(); - - final Path zipPath = new Path("/tmp/testZip.zip"); - - final File outTmpDir = com.google.common.io.Files.createTempDir(); - outTmpDir.deleteOnExit(); - - final URI uri = URI.create(uriBase.toString() + zipPath.toString()); - - tmpFile.deleteOnExit(); - try (final OutputStream stream = new FileOutputStream(tmpFile)) { - ByteStreams.copy(new ByteArrayInputStream(pathByteContents), stream); - } - Assert.assertTrue(tmpFile.exists()); - - final File outFile = new File(outTmpDir, tmpFile.getName()); - outFile.delete(); - - try (final OutputStream stream = miniCluster.getFileSystem().create(zipPath)) { - CompressionUtils.zip(tmpDir, stream); - } - try { - Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(uri, outTmpDir); - Assert.assertTrue(outFile.exists()); - - Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); - } - finally { - if (tmpFile.exists()) { - tmpFile.delete(); - } - if (outFile.exists()) { - outFile.delete(); - } - if (outTmpDir.exists()) { - outTmpDir.delete(); - } - if (tmpDir.exists()) { - tmpDir.delete(); - } - } - } - - @Test - public void testGZ() throws IOException, SegmentLoadingException - { - final Path zipPath = new Path("/tmp/testZip.gz"); - - final File outTmpDir = com.google.common.io.Files.createTempDir(); - outTmpDir.deleteOnExit(); - final File outFile = new File(outTmpDir, "testZip"); - outFile.delete(); - - final URI uri = URI.create(uriBase.toString() + zipPath.toString()); - - try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) { - try (final OutputStream gzStream = new GZIPOutputStream(outputStream)) { - try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { - ByteStreams.copy(inputStream, gzStream); - } - } - } - try { - Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(uri, outTmpDir); - Assert.assertTrue(outFile.exists()); - - Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); - } - finally { - if (outFile.exists()) { - outFile.delete(); - } - if (outTmpDir.exists()) { - outTmpDir.delete(); - } - } - } - - @Test - public void testDir() throws IOException, SegmentLoadingException - { - - final Path zipPath = new Path("/tmp/tmp2/test.txt"); - - final File outTmpDir = com.google.common.io.Files.createTempDir(); - outTmpDir.deleteOnExit(); - final File outFile = new File(outTmpDir, "test.txt"); - outFile.delete(); - - final URI uri = URI.create(uriBase.toString() + "/tmp/tmp2"); - - try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) { - try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { - ByteStreams.copy(inputStream, outputStream); - } - } - try { - Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(uri, outTmpDir); - Assert.assertTrue(outFile.exists()); - - Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); - } - finally { - if (outFile.exists()) { - outFile.delete(); - } - if (outTmpDir.exists()) { - outTmpDir.delete(); - } - } - } -} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index 01cf5742c8ac..e36bdb9e4ab3 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -17,125 +17,37 @@ package io.druid.storage.s3; -import com.amazonaws.services.s3.AmazonS3URI; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; -import com.metamx.common.CompressionUtils; -import com.metamx.common.FileUtils; -import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.MapUtils; -import com.metamx.common.UOE; import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; -import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; +import org.apache.commons.io.FileUtils; import org.jets3t.service.S3ServiceException; -import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; -import javax.tools.FileObject; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; -import java.io.Writer; -import java.net.URI; -import java.nio.file.Paths; import java.util.Map; import java.util.concurrent.Callable; +import java.util.zip.GZIPInputStream; /** - * A data segment puller that also hanldes URI data pulls. */ -public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller +public class S3DataSegmentPuller implements DataSegmentPuller { - public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws S3ServiceException - { - final URI checkedUri = checkURI(uri); - final AmazonS3URI s3URI = new AmazonS3URI(checkedUri); - final String key = s3URI.getKey(); - final String bucket = s3URI.getBucket(); - final S3Object s3Obj = s3Client.getObject(bucket, key); - final String path = uri.getPath(); - - return new FileObject() - { - @Override - public URI toUri() - { - return uri; - } - - @Override - public String getName() - { - final String ext = Files.getFileExtension(path); - return Files.getNameWithoutExtension(path) + (Strings.isNullOrEmpty(ext) ? "" : ("." + ext)); - } - - @Override - public InputStream openInputStream() throws IOException - { - try { - return s3Obj.getDataInputStream(); - } - catch (ServiceException e) { - throw new IOException(String.format("Could not load S3 URI [%s]", checkedUri.toString()), e); - } - } - - @Override - public OutputStream openOutputStream() throws IOException - { - throw new UOE("Cannot stream S3 output"); - } - - @Override - public Reader openReader(boolean ignoreEncodingErrors) throws IOException - { - throw new UOE("Cannot open reader"); - } - - @Override - public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException - { - throw new UOE("Cannot open character sequence"); - } - - @Override - public Writer openWriter() throws IOException - { - throw new UOE("Cannot open writer"); - } - - @Override - public long getLastModified() - { - return s3Obj.getLastModifiedDate().getTime(); - } - - @Override - public boolean delete() - { - throw new UOE("Cannot delete S3 items anonymously. jetS3t doesn't support authenticated deletes easily."); - } - }; - } - - public static final String scheme = S3StorageDruidModule.SCHEME; - private static final Logger log = new Logger(S3DataSegmentPuller.class); - protected static final String BUCKET = "bucket"; - protected static final String KEY = "key"; + private static final String BUCKET = "bucket"; + private static final String KEY = "key"; private final RestS3Service s3Client; @@ -150,12 +62,7 @@ public S3DataSegmentPuller( @Override public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException { - getSegmentFiles(new S3Coords(segment), outDir); - } - - public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) - throws SegmentLoadingException - { + final S3Coords s3Coords = new S3Coords(segment); log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); @@ -172,134 +79,63 @@ public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final F } try { - final URI uri = URI.create(String.format("s3://%s/%s", s3Coords.bucket, s3Coords.path)); - final ByteSource byteSource = new ByteSource() - { - @Override - public InputStream openStream() throws IOException - { - try { - return buildFileObject(uri, s3Client).openInputStream(); - } - catch (ServiceException e) { - if (e.getCause() != null) { - if (S3Utils.S3RETRY.apply(e)) { - throw new IOException("Recoverable exception", e); + S3Utils.retryS3Operation( + new Callable() + { + @Override + public Void call() throws Exception + { + long startTime = System.currentTimeMillis(); + S3Object s3Obj = null; + + try { + s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path); + + try (InputStream in = s3Obj.getDataInputStream()) { + final String key = s3Obj.getKey(); + if (key.endsWith(".zip")) { + CompressionUtils.unzip(in, outDir); + } else if (key.endsWith(".gz")) { + final File outFile = new File(outDir, toFilename(key, ".gz")); + ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); + } else { + ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); + } + log.info( + "Pull of file[%s/%s] completed in %,d millis", + s3Obj.getBucketName(), + s3Obj.getKey(), + System.currentTimeMillis() - startTime + ); + return null; + } + catch (IOException e) { + throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e); + } + } + finally { + S3Utils.closeStreamsQuietly(s3Obj); } } - throw Throwables.propagate(e); } - } - }; - if (CompressionUtils.isZip(s3Coords.path)) { - final FileUtils.FileCopyResult result = CompressionUtils.unzip( - byteSource, - outDir, - S3Utils.S3RETRY, - true - ); - log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath()); - return result; - } - if (CompressionUtils.isGz(s3Coords.path)) { - final String fname = Paths.get(uri).getFileName().toString(); - final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname)); - - final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile); - log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath()); - return result; - } - throw new IAE("Do not know how to load file type at [%s]", uri.toString()); + ); } catch (Exception e) { try { - org.apache.commons.io.FileUtils.deleteDirectory(outDir); + FileUtils.deleteDirectory(outDir); } catch (IOException ioe) { log.warn( ioe, - "Failed to remove output directory [%s] for segment pulled from [%s]", - outDir.getAbsolutePath(), - s3Coords.toString() + "Failed to remove output directory for segment[%s] after exception: %s", + segment.getIdentifier(), + outDir ); } throw new SegmentLoadingException(e, e.getMessage()); } } - public static URI checkURI(URI uri) - { - if (uri.getScheme().equalsIgnoreCase(scheme)) { - uri = URI.create("s3" + uri.toString().substring(scheme.length())); - } else if (!uri.getScheme().equalsIgnoreCase("s3")) { - throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString()); - } - return uri; - } - - @Override - public InputStream getInputStream(URI uri) throws IOException - { - try { - return buildFileObject(uri, s3Client).openInputStream(); - } - catch (ServiceException e) { - throw new IOException(String.format("Could not load URI [%s]", uri.toString()), e); - } - } - - @Override - public Predicate shouldRetryPredicate() - { - // Yay! smart retries! - return new Predicate() - { - @Override - public boolean apply(Throwable e) - { - if (e == null) { - return false; - } - if (e instanceof ServiceException) { - return S3Utils.isServiceExceptionRecoverable((ServiceException) e); - } - if (S3Utils.S3RETRY.apply(e)) { - return true; - } - // Look all the way down the cause chain, just in case something wraps it deep. - return apply(e.getCause()); - } - }; - } - - /** - * Returns the "version" (aka last modified timestamp) of the URI - * - * @param uri The URI to check the last timestamp - * - * @return The time in ms of the last modification of the URI in String format - * - * @throws IOException - */ - @Override - public String getVersion(URI uri) throws IOException - { - try { - return String.format("%d", buildFileObject(uri, s3Client).getLastModified()); - } - catch (S3ServiceException e) { - if (S3Utils.isServiceExceptionRecoverable(e)) { - // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable - throw new IOException( - String.format("Could not fetch last modified timestamp from URI [%s]", uri.toString()), - e - ); - } else { - throw Throwables.propagate(e); - } - } - } - private String toFilename(String key, final String suffix) { String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' @@ -329,7 +165,7 @@ public Boolean call() throws Exception } } - protected static class S3Coords + private static class S3Coords { String bucket; String path; @@ -344,12 +180,6 @@ public S3Coords(DataSegment segment) } } - public S3Coords(String bucket, String key) - { - this.bucket = bucket; - this.path = key; - } - public String toString() { return String.format("s3://%s/%s", bucket, path); diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 29de26d1b45a..78b02e42bbb0 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -23,11 +23,11 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; -import com.metamx.common.CompressionUtils; import com.metamx.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; import org.jets3t.service.ServiceException; import org.jets3t.service.acl.gs.GSAccessControlList; import org.jets3t.service.impl.rest.httpclient.RestS3Service; diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java deleted file mode 100644 index ae260dbda0cd..000000000000 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.storage.s3; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.elasticbeanstalk.model.S3LocationNotInServiceRegionException; -import com.amazonaws.services.s3.AmazonS3URI; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.metamx.common.CompressionUtils; -import com.metamx.common.FileUtils; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import com.metamx.common.RetryUtils; -import com.metamx.common.StreamUtils; -import com.metamx.common.logger.Logger; -import io.druid.segment.loading.DataSegmentPuller; -import io.druid.segment.loading.LoadSpec; -import io.druid.segment.loading.SegmentLoadingException; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.ServiceException; - -import javax.swing.text.Segment; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -/** - * - */ -@JsonTypeName(S3StorageDruidModule.SCHEME) -public class S3LoadSpec implements LoadSpec -{ - @JsonProperty(S3DataSegmentPuller.BUCKET) - private final String bucket; - @JsonProperty(S3DataSegmentPuller.KEY) - private final String key; - - private final S3DataSegmentPuller puller; - - @JsonCreator - public S3LoadSpec( - @JacksonInject S3DataSegmentPuller puller, - @JsonProperty(S3DataSegmentPuller.BUCKET) String bucket, - @JsonProperty(S3DataSegmentPuller.KEY) String key - ) - { - Preconditions.checkNotNull(bucket); - Preconditions.checkNotNull(key); - this.bucket = bucket; - this.key = key; - this.puller = puller; - } - - @Override - public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException - { - return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size()); - } -} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index bab2114ff6d3..c1e7dae641f2 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -18,7 +18,6 @@ package io.druid.storage.s3; import com.amazonaws.auth.AWSCredentialsProvider; -import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -38,32 +37,10 @@ */ public class S3StorageDruidModule implements DruidModule { - public static final String SCHEME = "s3_zip"; @Override public List getJacksonModules() { - return ImmutableList.of( - new Module() - { - @Override - public String getModuleName() - { - return "DruidS3-" + System.identityHashCode(this); - } - - @Override - public Version version() - { - return Version.unknownVersion(); - } - - @Override - public void setupModule(SetupContext context) - { - context.registerSubtypes(S3LoadSpec.class); - } - } - ); + return ImmutableList.of(); } @Override @@ -71,10 +48,10 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class); - Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(S3DataSegmentPuller.class).in(LazySingleton.class); - Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(S3DataSegmentKiller.class).in(LazySingleton.class); - Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class); - Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class); + Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder).addBinding("s3_zip").to(S3DataSegmentArchiver.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index fe66e838dab8..b9fc5441e4fd 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -19,7 +19,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; -import com.metamx.common.FileUtils; import com.metamx.common.RetryUtils; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; @@ -52,38 +51,30 @@ public static void closeStreamsQuietly(S3Object s3Obj) } } - public static boolean isServiceExceptionRecoverable(ServiceException ex) - { - final boolean isIOException = ex.getCause() instanceof IOException; - final boolean isTimeout = "RequestTimeout".equals(((ServiceException) ex).getErrorCode()); - return isIOException || isTimeout; - } - - public static final Predicate S3RETRY = new Predicate() - { - @Override - public boolean apply(Throwable e) - { - if (e == null) { - return false; - } else if (e instanceof IOException) { - return true; - } else if (e instanceof ServiceException) { - return isServiceExceptionRecoverable((ServiceException) e); - } else { - return apply(e.getCause()); - } - } - }; - /** * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not * found, etc) are not retried. */ public static T retryS3Operation(Callable f) throws Exception { + final Predicate shouldRetry = new Predicate() + { + @Override + public boolean apply(Throwable e) + { + if (e instanceof IOException) { + return true; + } else if (e instanceof ServiceException) { + final boolean isIOException = e.getCause() instanceof IOException; + final boolean isTimeout = "RequestTimeout".equals(((ServiceException) e).getErrorCode()); + return isIOException || isTimeout; + } else { + return false; + } + } + }; final int maxTries = 10; - return RetryUtils.retry(f, S3RETRY, maxTries); + return RetryUtils.retry(f, shouldRetry, maxTries); } public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java index 8b60effdfb68..d14b5548545d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java @@ -18,7 +18,7 @@ package io.druid.indexing.common; import com.google.inject.Inject; -import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; @@ -30,11 +30,11 @@ */ public class SegmentLoaderFactory { - private final SegmentLoaderLocalCacheManager loader; + private final OmniSegmentLoader loader; @Inject public SegmentLoaderFactory( - SegmentLoaderLocalCacheManager loader + OmniSegmentLoader loader ) { this.loader = loader; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 261d888334e5..f869e56749c0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -31,7 +31,7 @@ import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoaderConfig; @@ -68,7 +68,7 @@ public class TaskToolboxTest private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class); private ObjectMapper ObjectMapper = new ObjectMapper(); - private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); + private OmniSegmentLoader mockOmniSegmentLoader = EasyMock.createMock(OmniSegmentLoader.class); private Task task = EasyMock.createMock(Task.class); @Rule @@ -93,7 +93,7 @@ public void setUp() throws IOException mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, - new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), + new SegmentLoaderFactory(mockOmniSegmentLoader), ObjectMapper ); } @@ -144,11 +144,11 @@ public void testGetObjectMapper() public void testFetchSegments() throws SegmentLoadingException, IOException { File expectedFile = temporaryFolder.newFile(); - EasyMock.expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles((DataSegment)EasyMock.anyObject())) + EasyMock.expect(mockOmniSegmentLoader.getSegmentFiles((DataSegment)EasyMock.anyObject())) .andReturn(expectedFile).anyTimes(); - EasyMock.expect(mockSegmentLoaderLocalCacheManager.withConfig((SegmentLoaderConfig)EasyMock.anyObject())) - .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); - EasyMock.replay(mockSegmentLoaderLocalCacheManager); + EasyMock.expect(mockOmniSegmentLoader.withConfig((SegmentLoaderConfig)EasyMock.anyObject())) + .andReturn(mockOmniSegmentLoader).anyTimes(); + EasyMock.replay(mockOmniSegmentLoader); DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(new Interval("2012-01-01/P1D")).version("1").size(1).build(); List segments = ImmutableList.of ( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 2946345115fc..1650929b1564 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -17,11 +17,6 @@ package io.druid.indexing.firehose; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; -import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector; -import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -45,7 +40,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; -import io.druid.guice.GuiceInjectors; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; @@ -66,11 +60,11 @@ import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.LocalLoadSpec; +import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; -import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; import io.druid.timeline.DataSegment; @@ -175,37 +169,6 @@ public void deleteSegments(Set segments) ts, new TaskActionToolbox(tl, mdc, newMockEmitter()) ); - - final ObjectMapper objectMapper = new DefaultObjectMapper(); - objectMapper.registerModule( - new SimpleModule("testModule").registerSubtypes(LocalLoadSpec.class) - ); - - final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); - objectMapper.setAnnotationIntrospectors( - new AnnotationIntrospectorPair( - guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector() - ), - new AnnotationIntrospectorPair( - guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector() - ) - ); - objectMapper.setInjectableValues( - new GuiceInjectableValues( - GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bind(LocalDataSegmentPuller.class); - } - } - ) - ) - ) - ); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null), tac, @@ -261,7 +224,11 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException null, // query executor service null, // monitor scheduler new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager( + new OmniSegmentLoader( + ImmutableMap.of( + "local", + new LocalDataSegmentPuller() + ), null, new SegmentLoaderConfig() { @@ -270,10 +237,10 @@ public List getLocations() { return Lists.newArrayList(); } - }, objectMapper + } ) ), - objectMapper + new DefaultObjectMapper() ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index aaa2e3c878ae..4a9a64045b54 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -68,8 +68,10 @@ import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.LocalDataSegmentPuller; +import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; @@ -312,7 +314,11 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException null, // query executor service null, // monitor scheduler new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager( + new OmniSegmentLoader( + ImmutableMap.of( + "local", + new LocalDataSegmentPuller() + ), null, new SegmentLoaderConfig() { @@ -321,7 +327,7 @@ public List getLocations() { return Lists.newArrayList(); } - }, new DefaultObjectMapper() + } ) ), new DefaultObjectMapper() diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index ddb38fd1d73c..478ccbaf3283 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Files; import io.druid.curator.PotentiallyGzippedCompressionProvider; @@ -35,7 +36,9 @@ import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.DataSegmentPuller; +import io.druid.segment.loading.LocalDataSegmentPuller; +import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.initialization.IndexerZkConfig; @@ -121,7 +124,11 @@ public String getBase() new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0, null), null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( - new SegmentLoaderLocalCacheManager( + new OmniSegmentLoader( + ImmutableMap.of( + "local", + new LocalDataSegmentPuller() + ), null, new SegmentLoaderConfig() { @@ -131,7 +138,7 @@ public List getLocations() return Lists.newArrayList(); } } - , jsonMapper) + ) ), jsonMapper ), null diff --git a/pom.xml b/pom.xml index e09d7f80051f..33adea93305f 100644 --- a/pom.xml +++ b/pom.xml @@ -65,10 +65,10 @@ - 0.27.0 + 0.26.15 2.7.0 9.2.5.v20141112 - 0.3.6 + 0.3.5 2.4.4 2.2 1.7.10 diff --git a/processing/src/main/java/io/druid/segment/SegmentMissingException.java b/processing/src/main/java/io/druid/segment/SegmentMissingException.java index f66a8c506170..df1dc269a3b3 100644 --- a/processing/src/main/java/io/druid/segment/SegmentMissingException.java +++ b/processing/src/main/java/io/druid/segment/SegmentMissingException.java @@ -24,8 +24,4 @@ public class SegmentMissingException extends ISE public SegmentMissingException(String formatText, Object... arguments) { super(String.format(formatText, arguments)); } - - public SegmentMissingException(Throwable cause, String formatText, Object... arguments){ - super(cause, formatText, arguments); - } } diff --git a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java index 295ef0cdc215..0015ccdf325e 100644 --- a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java +++ b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java @@ -17,34 +17,26 @@ package io.druid.guice; -import com.fasterxml.jackson.core.Version; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; -import io.druid.initialization.DruidModule; +import com.google.inject.Module; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentKiller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusherConfig; -import io.druid.segment.loading.LocalLoadSpec; -import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoader; -import java.util.List; - /** */ -public class LocalDataStorageDruidModule implements DruidModule +public class LocalDataStorageDruidModule implements Module { - public static final String SCHEME = "local"; - @Override public void configure(Binder binder) { - binder.bind(SegmentLoader.class).to(SegmentLoaderLocalCacheManager.class).in(LazySingleton.class); + binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class); bindDeepStorageLocal(binder); @@ -56,14 +48,14 @@ public void configure(Binder binder) private static void bindDeepStorageLocal(Binder binder) { Binders.dataSegmentPullerBinder(binder) - .addBinding(SCHEME) - .to(LocalDataSegmentPuller.class) - .in(LazySingleton.class); + .addBinding("local") + .to(LocalDataSegmentPuller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class)) - .addBinding(SCHEME) - .to(LocalDataSegmentKiller.class) - .in(LazySingleton.class); + .addBinding("local") + .to(LocalDataSegmentKiller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) .addBinding("local") @@ -71,31 +63,4 @@ private static void bindDeepStorageLocal(Binder binder) .in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class); } - - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new com.fasterxml.jackson.databind.Module() - { - @Override - public String getModuleName() - { - return "DruidLocalStorage-" + System.identityHashCode(this); - } - - @Override - public Version version() - { - return Version.unknownVersion(); - } - - @Override - public void setupModule(SetupContext context) - { - context.registerSubtypes(LocalLoadSpec.class); - } - } - ); - } } diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java index b56cd9db8f24..58873844e003 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java @@ -17,207 +17,58 @@ package io.druid.segment.loading; -import com.google.common.base.Predicate; import com.google.common.io.Files; -import com.metamx.common.CompressionUtils; -import com.metamx.common.FileUtils; import com.metamx.common.MapUtils; -import com.metamx.common.UOE; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; -import javax.tools.FileObject; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; -import java.io.Writer; -import java.net.URI; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Map; /** */ -public class LocalDataSegmentPuller implements DataSegmentPuller, URIDataPuller +public class LocalDataSegmentPuller implements DataSegmentPuller { - public static FileObject buildFileObject(final URI uri) - { - final Path path = Paths.get(uri); - final File file = path.toFile(); - return new FileObject() - { - @Override - public URI toUri() - { - return uri; - } - - @Override - public String getName() - { - return path.getFileName().toString(); - } - - @Override - public InputStream openInputStream() throws IOException - { - return new FileInputStream(file); - } - - @Override - public OutputStream openOutputStream() throws IOException - { - return new FileOutputStream(file); - } - - @Override - public Reader openReader(boolean ignoreEncodingErrors) throws IOException - { - return new FileReader(file); - } - - @Override - public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException - { - throw new UOE("CharSequence not supported"); - } - - @Override - public Writer openWriter() throws IOException - { - return new FileWriter(file); - } - - @Override - public long getLastModified() - { - return file.lastModified(); - } - - @Override - public boolean delete() - { - return file.delete(); - } - }; - } - private static final Logger log = new Logger(LocalDataSegmentPuller.class); @Override public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException { - getSegmentFiles(getFile(segment), dir); - } + final File path = getFile(segment); - public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException - { - if (sourceFile.isDirectory()) { - if (sourceFile.equals(dir)) { + if (path.isDirectory()) { + if (path.equals(dir)) { log.info("Asked to load [%s] into itself, done!", dir); - return new FileUtils.FileCopyResult(sourceFile); + return; } - final File[] files = sourceFile.listFiles(); - if (files == null) { - throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath()); - } - final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(sourceFile); - for (final File oldFile : files) { - if (oldFile.isDirectory()) { - log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath()); - continue; + log.info("Copying files from [%s] to [%s]", path, dir); + File file = null; + try { + final File[] files = path.listFiles(); + for (int i = 0; i < files.length; ++i) { + file = files[i]; + Files.copy(file, new File(dir, file.getName())); } - - result.addFiles( - FileUtils.retryCopy( - Files.asByteSource(oldFile), - new File(dir, oldFile.getName()), - shouldRetryPredicate(), - 10 - ).getFiles() - ); } - log.info( - "Coppied %d bytes from [%s] to [%s]", - result.size(), - sourceFile.getAbsolutePath(), - dir.getAbsolutePath() - ); - return result; - } - if (CompressionUtils.isZip(sourceFile.getName())) { + catch (IOException e) { + throw new SegmentLoadingException(e, "Unable to copy file[%s].", file); + } + } else { + if (!path.getName().endsWith(".zip")) { + throw new SegmentLoadingException("File is not a zip file[%s]", path); + } + + log.info("Unzipping local file[%s] to [%s]", path, dir); try { - final FileUtils.FileCopyResult result = CompressionUtils.unzip( - Files.asByteSource(sourceFile), - dir, - shouldRetryPredicate(), - false - ); - log.info( - "Unzipped %d bytes from [%s] to [%s]", - result.size(), - sourceFile.getAbsolutePath(), - dir.getAbsolutePath() - ); - return result; + CompressionUtils.unzip(path, dir); } catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to unzip file [%s]", sourceFile.getAbsolutePath()); + throw new SegmentLoadingException(e, "Unable to unzip file[%s]", path); } } - if (CompressionUtils.isGz(sourceFile.getName())) { - final File outFile = new File(dir, CompressionUtils.getGzBaseName(sourceFile.getName())); - final FileUtils.FileCopyResult result = CompressionUtils.gunzip( - Files.asByteSource(sourceFile), - outFile, - shouldRetryPredicate() - ); - log.info( - "Gunzipped %d bytes from [%s] to [%s]", - result.size(), - sourceFile.getAbsolutePath(), - outFile.getAbsolutePath() - ); - return result; - } - throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath()); - } - - - @Override - public InputStream getInputStream(URI uri) throws IOException - { - return buildFileObject(uri).openInputStream(); - } - - /** - * Returns the "version" (aka last modified timestamp) of the URI of interest - * - * @param uri The URI to check the last modified timestamp - * - * @return The last modified timestamp in ms of the URI in String format - */ - @Override - public String getVersion(URI uri) - { - return String.format("%d", buildFileObject(uri).getLastModified()); - } - - @Override - public Predicate shouldRetryPredicate() - { - // It would be nice if there were better logic for smarter retries. For example: If the error is that the file is - // not found, there's only so much that retries would do (unless the file was temporarily absent for some reason). - // Since this is not a commonly used puller in production, and in general is more useful in testing/debugging, - // I do not have a good sense of what kind of Exceptions people would expect to encounter in the wild - return FileUtils.IS_EXCEPTION; } private File getFile(DataSegment segment) throws SegmentLoadingException diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index 17b5db0ebf3f..b55ac34fcc02 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -22,10 +22,10 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; -import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; +import io.druid.utils.CompressionUtils; import java.io.File; import java.io.IOException; diff --git a/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java b/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java deleted file mode 100644 index b0ab3a713293..000000000000 --- a/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.segment.loading; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.api.client.util.Preconditions; -import io.druid.guice.LocalDataStorageDruidModule; - -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - -/** - * - */ -@JsonTypeName(LocalDataStorageDruidModule.SCHEME) -public class LocalLoadSpec implements LoadSpec -{ - private final Path path; - private final LocalDataSegmentPuller puller; - - @JsonCreator - public LocalLoadSpec( - @JacksonInject LocalDataSegmentPuller puller, - @JsonProperty(value = "path", required = true) final String path - ) - { - Preconditions.checkNotNull(path); - this.path = Paths.get(path); - Preconditions.checkArgument(Files.exists(Paths.get(path)), "[%s] does not exist", path); - this.puller = puller; - } - - @JsonProperty - public String getPath() - { - return path.toString(); - } - - @Override - public LoadSpecResult loadSegment(final File outDir) throws SegmentLoadingException - { - return new LoadSpecResult(puller.getSegmentFiles(path.toFile(), outDir).size()); - } -} diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java similarity index 79% rename from server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java rename to server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java index 570a50f6cc58..aecd38202345 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java @@ -17,12 +17,11 @@ package io.druid.segment.loading; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.inject.Inject; import com.metamx.common.ISE; +import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; -import io.druid.guice.annotations.Json; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; @@ -33,31 +32,32 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; /** */ -public class SegmentLoaderLocalCacheManager implements SegmentLoader +public class OmniSegmentLoader implements SegmentLoader { - private static final Logger log = new Logger(SegmentLoaderLocalCacheManager.class); + private static final Logger log = new Logger(OmniSegmentLoader.class); + private final Map pullers; private final QueryableIndexFactory factory; private final SegmentLoaderConfig config; - private final ObjectMapper jsonMapper; private final List locations; private final Object lock = new Object(); @Inject - public SegmentLoaderLocalCacheManager( + public OmniSegmentLoader( + Map pullers, QueryableIndexFactory factory, - SegmentLoaderConfig config, - @Json ObjectMapper mapper + SegmentLoaderConfig config ) { + this.pullers = pullers; this.factory = factory; this.config = config; - this.jsonMapper = mapper; this.locations = Lists.newArrayList(); for (StorageLocationConfig locationConfig : config.getLocations()) { @@ -65,9 +65,9 @@ public SegmentLoaderLocalCacheManager( } } - public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config) + public OmniSegmentLoader withConfig(SegmentLoaderConfig config) { - return new SegmentLoaderLocalCacheManager(factory, config, jsonMapper); + return new OmniSegmentLoader(pullers, factory, config); } @Override @@ -127,26 +127,22 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException log.debug("Unable to make parent file[%s]", storageDir); } try { - if (!downloadStartMarker.createNewFile()) { - throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir); - } + downloadStartMarker.createNewFile(); } catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir); + throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir); } } - // LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies. - final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class); - final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir); - if(result.getSize() != segment.getSize()){ - log.warn("Segment [%s] is different than expected size. Expected [%d] found [%d]", segment.getIdentifier(), segment.getSize(), result.getSize()); - } + getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir); if (!downloadStartMarker.delete()) { throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir); } + + loc.addSegment(segment); + retVal = storageDir; } else { retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); @@ -183,6 +179,18 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException } } + private DataSegmentPuller getPuller(Map loadSpec) throws SegmentLoadingException + { + String type = MapUtils.getString(loadSpec, "type"); + DataSegmentPuller loader = pullers.get(type); + + if (loader == null) { + throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, pullers.keySet()); + } + + return loader; + } + public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException { if (cacheFile.equals(baseFile)) { diff --git a/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java b/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java deleted file mode 100644 index 8a8f8703070e..000000000000 --- a/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.segment.loading; - -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.core.Version; -import com.fasterxml.jackson.databind.BeanProperty; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; -import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector; -import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.Module; -import com.metamx.common.IAE; -import io.druid.guice.GuiceInjectors; -import io.druid.jackson.DefaultObjectMapper; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Collection; - -/** - * - */ -@RunWith(Parameterized.class) -public class LoadSpecTest -{ - @Parameterized.Parameters - public static Collection getParameters() - { - return ImmutableList.of( - new Object[]{"{\"path\":\"/\",\"type\":\"local\"}", "local"} - ); - } - - private final String value; - private final String expectedId; - - public LoadSpecTest(String value, String expectedId) - { - this.value = value; - this.expectedId = expectedId; - } - - private static ObjectMapper mapper; - - @BeforeClass - public static void setUp() - { - final Injector injector = GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bind(LocalDataSegmentPuller.class); - } - } - ) - ); - mapper = new DefaultObjectMapper(); - mapper.registerModule( new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class)); - mapper.setInjectableValues(new GuiceInjectableValues(injector)); - - final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); - mapper.setAnnotationIntrospectors( - new AnnotationIntrospectorPair( - guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector() - ), - new AnnotationIntrospectorPair( - guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector() - ) - ); - } - - @Test - public void testStringResolve() throws IOException - { - LoadSpec loadSpec = mapper.readValue(value, LoadSpec.class); - Assert.assertEquals(expectedId, loadSpec.getClass().getAnnotation(JsonTypeName.class).value()); - } -} diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java deleted file mode 100644 index 55a2742c91ea..000000000000 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.segment.loading; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.io.Files; -import com.metamx.common.CompressionUtils; -import io.druid.jackson.DefaultObjectMapper; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; -import java.util.zip.GZIPOutputStream; - -/** - * - */ -public class LocalDataSegmentPullerTest -{ - private File tmpDir; - private LocalDataSegmentPuller puller; - - @Before - public void setup() - { - tmpDir = Files.createTempDir(); - tmpDir.deleteOnExit(); - puller = new LocalDataSegmentPuller(); - } - - @After - public void tearDown() throws IOException - { - deleteFiles(tmpDir); - } - - public static void deleteFiles(File... files) throws IOException - { - IOException ex = null; - for (File file : files) { - if (file == null || !file.exists()) { - continue; - } - if (!file.delete()) { - IOException e = new IOException("Could not delete " + file.getAbsolutePath()); - if (ex == null) { - ex = e; - } else { - ex.addSuppressed(e); - } - } - } - if (ex != null) { - throw ex; - } - } - - @Test - public void simpleZipTest() throws IOException, SegmentLoadingException - { - File file = new File(tmpDir, "test1data"); - File zipFile = File.createTempFile("ziptest", ".zip"); - file.deleteOnExit(); - zipFile.deleteOnExit(); - zipFile.delete(); - try { - try (OutputStream outputStream = new FileOutputStream(file)) { - outputStream.write(new byte[0]); - outputStream.flush(); - } - CompressionUtils.zip(tmpDir, zipFile); - file.delete(); - - Assert.assertFalse(file.exists()); - Assert.assertTrue(zipFile.exists()); - puller.getSegmentFiles(zipFile, tmpDir); - Assert.assertTrue(file.exists()); - } - finally { - deleteFiles(file, zipFile); - } - } - - @Test - public void simpleGZTest() throws IOException, SegmentLoadingException - { - File zipFile = File.createTempFile("gztest", ".gz"); - File unZipFile = new File( - tmpDir, - Files.getNameWithoutExtension( - zipFile.getAbsolutePath() - ) - ); - unZipFile.delete(); - zipFile.deleteOnExit(); - zipFile.delete(); - try { - try (OutputStream fOutStream = new FileOutputStream(zipFile)) { - try (OutputStream outputStream = new GZIPOutputStream(fOutStream)) { - outputStream.write(new byte[0]); - outputStream.flush(); - } - } - - Assert.assertTrue(zipFile.exists()); - Assert.assertFalse(unZipFile.exists()); - puller.getSegmentFiles(zipFile, tmpDir); - Assert.assertTrue(unZipFile.exists()); - }finally{ - deleteFiles(zipFile, unZipFile); - } - } - - @Test - public void simpleDirectoryTest() throws IOException, SegmentLoadingException - { - File srcDir = Files.createTempDir(); - File tmpFile = File.createTempFile("test", "file", srcDir); - File expectedOutput = new File(tmpDir, Files.getNameWithoutExtension(tmpFile.getAbsolutePath())); - expectedOutput.delete(); - try{ - Assert.assertFalse(expectedOutput.exists()); - puller.getSegmentFiles(srcDir, tmpDir); - Assert.assertTrue(expectedOutput.exists()); - }finally{ - deleteFiles(expectedOutput, tmpFile, srcDir); - } - } -} -