Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected InputStream openObjectStream(AzureBlob object, long start) throws IOEx
@Override
protected InputStream wrapObjectStream(AzureBlob object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}

private static AzureByteSource makeByteSource(AzureStorage azureStorage, AzureBlob object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private CloudFilesByteSource createCloudFilesByteSource(CloudFilesBlob object)
@Override
protected InputStream wrapObjectStream(CloudFilesBlob object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private GoogleByteSource createGoogleByteSource(GoogleBlob object)
@Override
protected InputStream wrapObjectStream(GoogleBlob object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,14 @@ public CacheScheduler.VersionedCache generateCache(
catch (NumberFormatException ex) {
log.debug(ex, "Failed to get last modified timestamp. Assuming no timestamp");
}
final ByteSource source;
if (CompressionUtils.isGz(uriPath)) {
// Simple gzip stream
log.debug("Loading gz");
source = new ByteSource()
final ByteSource source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
@Override
public InputStream openStream() throws IOException
{
return CompressionUtils.gzipInputStream(puller.getInputStream(uri));
}
};
} else {
source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return puller.getInputStream(uri);
}
};
}
return CompressionUtils.decompress(puller.getInputStream(uri), uri.getPath());
}
};

final CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(entryId, version);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ protected Collection<S3ObjectSummary> initObjects() throws IOException
// Getting data is deferred until openObjectStream() is called for each object.
if (!uris.isEmpty()) {
return uris.stream()
.map(
uri -> {
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key);
}
)
.collect(Collectors.toList());
.map(
uri -> {
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key);
}
)
.collect(Collectors.toList());
} else {
final List<S3ObjectSummary> objects = new ArrayList<>();
for (URI uri : prefixes) {
Expand Down Expand Up @@ -212,7 +212,7 @@ protected InputStream openObjectStream(S3ObjectSummary object, long start) throw
@Override
protected InputStream wrapObjectStream(S3ObjectSummary object, InputStream stream) throws IOException
{
return object.getKey().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getKey());
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions java-util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@
<groupId>org.mozilla</groupId>
<artifactId>rhino</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@
import com.google.common.io.Files;
import io.druid.java.util.common.io.NativeIO;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Expand All @@ -48,7 +52,9 @@ public class CompressionUtils
{
private static final Logger log = new Logger(CompressionUtils.class);
private static final int DEFAULT_RETRY_COUNT = 3;
private static final String BZ2_SUFFIX = ".bz2";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These constants wouldn't happen to be in the apache libs anywhere would they?

private static final String GZ_SUFFIX = ".gz";
private static final String XZ_SUFFIX = ".xz";
private static final String ZIP_SUFFIX = ".zip";

/**
Expand Down Expand Up @@ -313,7 +319,7 @@ public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) thro
*
* @return A GZIPInputStream that can handle concatenated gzip streams in the input
*/
public static GZIPInputStream gzipInputStream(final InputStream in) throws IOException
private static GZIPInputStream gzipInputStream(final InputStream in) throws IOException
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can break extensions that depend on it. Can it be deprecated for a version or two instead?

{
return new GZIPInputStream(
new FilterInputStream(in)
Expand Down Expand Up @@ -516,4 +522,42 @@ public static String getGzBaseName(String fname)
}
throw new IAE("[%s] is not a valid gz file name", fname);
}

/**
* Decompress an input stream from a file, based on the filename.
*/
public static InputStream decompress(final InputStream in, final String fileName) throws IOException
{
if (fileName.endsWith(GZ_SUFFIX)) {
return gzipInputStream(in);
} else if (fileName.endsWith(BZ2_SUFFIX)) {
return new BZip2CompressorInputStream(in, true);
} else if (fileName.endsWith(XZ_SUFFIX)) {
return new XZCompressorInputStream(in, true);
} else if (fileName.endsWith(ZIP_SUFFIX)) {
// This reads the first file in the archive.
final ZipInputStream zipIn = new ZipInputStream(in, StandardCharsets.UTF_8);
try {
final ZipEntry nextEntry = zipIn.getNextEntry();
if (nextEntry == null) {
zipIn.close();

// No files in the archive - return an empty stream.
return new ByteArrayInputStream(new byte[0]);
}
return zipIn;
}
catch (IOException e) {
try {
zipIn.close();
}
catch (IOException e2) {
e.addSuppressed(e2);
}
throw e;
}
} else {
return in;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -53,6 +55,8 @@
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

public class CompressionUtilsTest
{
Expand Down Expand Up @@ -221,7 +225,6 @@ public void testGoodZipStream() throws IOException
}
}


@Test
public void testGoodGzipByteSource() throws IOException
{
Expand All @@ -230,7 +233,7 @@ public void testGoodGzipByteSource() throws IOException
Assert.assertFalse(gzFile.exists());
CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.<Throwable>alwaysTrue());
Assert.assertTrue(gzFile.exists());
try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) {
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), gzFile.getName())) {
assertGoodDataStream(inputStream);
}
if (!testFile.delete()) {
Expand All @@ -244,6 +247,50 @@ public void testGoodGzipByteSource() throws IOException
}
}

@Test
public void testDecompressBzip2() throws IOException
{
final File tmpDir = temporaryFolder.newFolder("testDecompressBzip2");
final File bzFile = new File(tmpDir, testFile.getName() + ".bz2");
Assert.assertFalse(bzFile.exists());
try (final OutputStream out = new BZip2CompressorOutputStream(new FileOutputStream(bzFile))) {
ByteStreams.copy(new FileInputStream(testFile), out);
}
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(bzFile), bzFile.getName())) {
assertGoodDataStream(inputStream);
}
}

@Test
public void testDecompressXz() throws IOException
{
final File tmpDir = temporaryFolder.newFolder("testDecompressXz");
final File xzFile = new File(tmpDir, testFile.getName() + ".xz");
Assert.assertFalse(xzFile.exists());
try (final OutputStream out = new XZCompressorOutputStream(new FileOutputStream(xzFile))) {
ByteStreams.copy(new FileInputStream(testFile), out);
}
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(xzFile), xzFile.getName())) {
assertGoodDataStream(inputStream);
}
}

@Test
public void testDecompressZip() throws IOException
{
final File tmpDir = temporaryFolder.newFolder("testDecompressZip");
final File zipFile = new File(tmpDir, testFile.getName() + ".zip");
Assert.assertFalse(zipFile.exists());
try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile))) {
out.putNextEntry(new ZipEntry("cool.file"));
ByteStreams.copy(new FileInputStream(testFile), out);
out.closeEntry();
}
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(zipFile), zipFile.getName())) {
assertGoodDataStream(inputStream);
}
}

@Test
public void testGoodGZStream() throws IOException
{
Expand Down Expand Up @@ -490,7 +537,7 @@ public void flush() throws IOException
}, Predicates.<Throwable>alwaysTrue()
);
Assert.assertTrue(gzFile.exists());
try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) {
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) {
assertGoodDataStream(inputStream);
}
if (!testFile.delete()) {
Expand Down Expand Up @@ -536,7 +583,7 @@ public void testStreamErrorGunzip() throws Exception
Assert.assertFalse(gzFile.exists());
CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.<Throwable>alwaysTrue());
Assert.assertTrue(gzFile.exists());
try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) {
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) {
assertGoodDataStream(inputStream);
}
if (testFile.exists() && !testFile.delete()) {
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,16 @@
<artifactId>rhino</artifactId>
<version>1.7R5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.16</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected InputStream openObjectStream(URI object, long start) throws IOExceptio
@Override
protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
Expand Down Expand Up @@ -97,6 +97,6 @@ protected InputStream openObjectStream(File object) throws IOException
@Override
protected InputStream wrapObjectStream(File object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}
}