From b4d7387ef1b6b6ca110f55d60e3bfeb62a5fc5d5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 5 Apr 2018 22:06:02 -0700 Subject: [PATCH] CompressionUtils: Add support for decompressing xz, bz2, zip. Also switch various firehoses to the new method. Fixes #5585. --- .../StaticAzureBlobStoreFirehoseFactory.java | 2 +- .../StaticCloudFilesFirehoseFactory.java | 2 +- .../StaticGoogleBlobStoreFirehoseFactory.java | 2 +- .../lookup/namespace/UriCacheGenerator.java | 28 +++------- .../firehose/s3/StaticS3FirehoseFactory.java | 18 +++--- java-util/pom.xml | 8 +++ .../java/util/common/CompressionUtils.java | 46 +++++++++++++++- .../util/common/CompressionUtilsTest.java | 55 +++++++++++++++++-- pom.xml | 10 ++++ .../firehose/HttpFirehoseFactory.java | 2 +- .../firehose/LocalFirehoseFactory.java | 4 +- 11 files changed, 136 insertions(+), 41 deletions(-) diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java index e3ad8b432aa7..345cffd512cf 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java @@ -91,7 +91,7 @@ protected InputStream openObjectStream(AzureBlob object, long start) throws IOEx @Override protected InputStream wrapObjectStream(AzureBlob object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } private static AzureByteSource makeByteSource(AzureStorage azureStorage, AzureBlob object) diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java index 5f39e7e5a446..343635c4680e 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java @@ -101,7 +101,7 @@ private CloudFilesByteSource createCloudFilesByteSource(CloudFilesBlob object) @Override protected InputStream wrapObjectStream(CloudFilesBlob object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } @Override diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java index 0d5d99959609..38fb83870883 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java @@ -93,7 +93,7 @@ private GoogleByteSource createGoogleByteSource(GoogleBlob object) @Override protected InputStream wrapObjectStream(GoogleBlob object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } @Override diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java index 27fd2eacbdfd..c2ca336c2eca 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java @@ -134,28 +134,14 @@ public CacheScheduler.VersionedCache generateCache( catch (NumberFormatException ex) { log.debug(ex, "Failed to get last modified timestamp. Assuming no timestamp"); } - final ByteSource source; - if (CompressionUtils.isGz(uriPath)) { - // Simple gzip stream - log.debug("Loading gz"); - source = new ByteSource() + final ByteSource source = new ByteSource() + { + @Override + public InputStream openStream() throws IOException { - @Override - public InputStream openStream() throws IOException - { - return CompressionUtils.gzipInputStream(puller.getInputStream(uri)); - } - }; - } else { - source = new ByteSource() - { - @Override - public InputStream openStream() throws IOException - { - return puller.getInputStream(uri); - } - }; - } + return CompressionUtils.decompress(puller.getInputStream(uri), uri.getPath()); + } + }; final CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(entryId, version); try { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java index 8827fc9ae31d..8c41d6f1f129 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -114,14 +114,14 @@ protected Collection initObjects() throws IOException // Getting data is deferred until openObjectStream() is called for each object. if (!uris.isEmpty()) { return uris.stream() - .map( - uri -> { - final String s3Bucket = uri.getAuthority(); - final String key = S3Utils.extractS3Key(uri); - return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key); - } - ) - .collect(Collectors.toList()); + .map( + uri -> { + final String s3Bucket = uri.getAuthority(); + final String key = S3Utils.extractS3Key(uri); + return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key); + } + ) + .collect(Collectors.toList()); } else { final List objects = new ArrayList<>(); for (URI uri : prefixes) { @@ -212,7 +212,7 @@ protected InputStream openObjectStream(S3ObjectSummary object, long start) throw @Override protected InputStream wrapObjectStream(S3ObjectSummary object, InputStream stream) throws IOException { - return object.getKey().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getKey()); } @Override diff --git a/java-util/pom.xml b/java-util/pom.xml index 150c332a3ca7..7f0b462d9c57 100644 --- a/java-util/pom.xml +++ b/java-util/pom.xml @@ -81,6 +81,14 @@ org.mozilla rhino + + org.apache.commons + commons-compress + + + org.tukaani + xz + com.jayway.jsonpath json-path diff --git a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java index 876f26f2f589..c076ea6e8def 100644 --- a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java @@ -28,14 +28,18 @@ import com.google.common.io.Files; import io.druid.java.util.common.io.NativeIO; import io.druid.java.util.common.logger.Logger; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -48,7 +52,9 @@ public class CompressionUtils { private static final Logger log = new Logger(CompressionUtils.class); private static final int DEFAULT_RETRY_COUNT = 3; + private static final String BZ2_SUFFIX = ".bz2"; private static final String GZ_SUFFIX = ".gz"; + private static final String XZ_SUFFIX = ".xz"; private static final String ZIP_SUFFIX = ".zip"; /** @@ -313,7 +319,7 @@ public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) thro * * @return A GZIPInputStream that can handle concatenated gzip streams in the input */ - public static GZIPInputStream gzipInputStream(final InputStream in) throws IOException + private static GZIPInputStream gzipInputStream(final InputStream in) throws IOException { return new GZIPInputStream( new FilterInputStream(in) @@ -516,4 +522,42 @@ public static String getGzBaseName(String fname) } throw new IAE("[%s] is not a valid gz file name", fname); } + + /** + * Decompress an input stream from a file, based on the filename. + */ + public static InputStream decompress(final InputStream in, final String fileName) throws IOException + { + if (fileName.endsWith(GZ_SUFFIX)) { + return gzipInputStream(in); + } else if (fileName.endsWith(BZ2_SUFFIX)) { + return new BZip2CompressorInputStream(in, true); + } else if (fileName.endsWith(XZ_SUFFIX)) { + return new XZCompressorInputStream(in, true); + } else if (fileName.endsWith(ZIP_SUFFIX)) { + // This reads the first file in the archive. + final ZipInputStream zipIn = new ZipInputStream(in, StandardCharsets.UTF_8); + try { + final ZipEntry nextEntry = zipIn.getNextEntry(); + if (nextEntry == null) { + zipIn.close(); + + // No files in the archive - return an empty stream. + return new ByteArrayInputStream(new byte[0]); + } + return zipIn; + } + catch (IOException e) { + try { + zipIn.close(); + } + catch (IOException e2) { + e.addSuppressed(e2); + } + throw e; + } + } else { + return in; + } + } } diff --git a/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java b/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java index c00e75b92249..d8f878b9ed70 100644 --- a/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java @@ -25,6 +25,8 @@ import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -53,6 +55,8 @@ import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; public class CompressionUtilsTest { @@ -221,7 +225,6 @@ public void testGoodZipStream() throws IOException } } - @Test public void testGoodGzipByteSource() throws IOException { @@ -230,7 +233,7 @@ public void testGoodGzipByteSource() throws IOException Assert.assertFalse(gzFile.exists()); CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.alwaysTrue()); Assert.assertTrue(gzFile.exists()); - try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) { + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), gzFile.getName())) { assertGoodDataStream(inputStream); } if (!testFile.delete()) { @@ -244,6 +247,50 @@ public void testGoodGzipByteSource() throws IOException } } + @Test + public void testDecompressBzip2() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressBzip2"); + final File bzFile = new File(tmpDir, testFile.getName() + ".bz2"); + Assert.assertFalse(bzFile.exists()); + try (final OutputStream out = new BZip2CompressorOutputStream(new FileOutputStream(bzFile))) { + ByteStreams.copy(new FileInputStream(testFile), out); + } + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(bzFile), bzFile.getName())) { + assertGoodDataStream(inputStream); + } + } + + @Test + public void testDecompressXz() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressXz"); + final File xzFile = new File(tmpDir, testFile.getName() + ".xz"); + Assert.assertFalse(xzFile.exists()); + try (final OutputStream out = new XZCompressorOutputStream(new FileOutputStream(xzFile))) { + ByteStreams.copy(new FileInputStream(testFile), out); + } + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(xzFile), xzFile.getName())) { + assertGoodDataStream(inputStream); + } + } + + @Test + public void testDecompressZip() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressZip"); + final File zipFile = new File(tmpDir, testFile.getName() + ".zip"); + Assert.assertFalse(zipFile.exists()); + try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile))) { + out.putNextEntry(new ZipEntry("cool.file")); + ByteStreams.copy(new FileInputStream(testFile), out); + out.closeEntry(); + } + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(zipFile), zipFile.getName())) { + assertGoodDataStream(inputStream); + } + } + @Test public void testGoodGZStream() throws IOException { @@ -490,7 +537,7 @@ public void flush() throws IOException }, Predicates.alwaysTrue() ); Assert.assertTrue(gzFile.exists()); - try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) { + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) { assertGoodDataStream(inputStream); } if (!testFile.delete()) { @@ -536,7 +583,7 @@ public void testStreamErrorGunzip() throws Exception Assert.assertFalse(gzFile.exists()); CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.alwaysTrue()); Assert.assertTrue(gzFile.exists()); - try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) { + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) { assertGoodDataStream(inputStream); } if (testFile.exists() && !testFile.delete()) { diff --git a/pom.xml b/pom.xml index 1830c8b7c7f8..4297a117c988 100644 --- a/pom.xml +++ b/pom.xml @@ -325,6 +325,16 @@ rhino 1.7R5 + + org.apache.commons + commons-compress + 1.16 + + + org.tukaani + xz + 1.8 + com.fasterxml.jackson.core jackson-annotations diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java index aaab6f9dae55..949cb1db7e47 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -105,7 +105,7 @@ protected InputStream openObjectStream(URI object, long start) throws IOExceptio @Override protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } @Override diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 6db1e8c30941..e9e7b40bb730 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -22,10 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.data.input.impl.AbstractTextFilesFirehoseFactory; import io.druid.data.input.impl.StringInputRowParser; import io.druid.java.util.common.CompressionUtils; +import io.druid.java.util.emitter.EmittingLogger; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; import org.apache.commons.io.filefilter.WildcardFileFilter; @@ -97,6 +97,6 @@ protected InputStream openObjectStream(File object) throws IOException @Override protected InputStream wrapObjectStream(File object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } }