From 88602c9ca09538f22557292e19e1dd38f8731744 Mon Sep 17 00:00:00 2001 From: kirpichov Date: Mon, 4 Apr 2016 13:31:23 -0700 Subject: [PATCH 1/2] [BEAM-167] Fix custom source gzip input to read concatenated gzip files This applies patch from kirpichov@google.com from https://gist.github.com/jkff/d8d984a33a41ec607328cee8e418c174 --- .../dataflow/sdk/io/CompressedSource.java | 2 +- .../dataflow/sdk/io/CompressedSourceTest.java | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java index 4e3e9cab0646..15e6e29decea 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java @@ -122,7 +122,7 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe byte zero = 0x00; int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); if (header == GZIPInputStream.GZIP_MAGIC) { - return Channels.newChannel(new GzipCompressorInputStream(stream)); + return Channels.newChannel(new GzipCompressorInputStream(stream, true)); } } return Channels.newChannel(stream); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index 2dcddb4e62d1..f63a128f4a1d 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -47,16 +47,19 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; @@ -98,6 +101,43 @@ public void testEmptyReadGzip() throws Exception { runReadTest(input, CompressionMode.GZIP); } + private static byte[] compressGzip(byte[] input) throws IOException { + ByteArrayOutputStream res = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) { + gzipStream.write(input); + } + return res.toByteArray(); + } + + private static byte[] concat(byte[] first, byte[] second) { + byte[] res = new byte[first.length + second.length]; + System.arraycopy(first, 0, res, 0, first.length); + System.arraycopy(second, 0, res, first.length, second.length); + return res; + } + + @Test + public void testReadConcatenatedGzip() throws IOException { + byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); + byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); + byte[] expected = concat(header, body); + byte[] totalGz = concat(compressGzip(header), compressGzip(body)); + File tmpFile = tmpFolder.newFile(); + try (FileOutputStream os = new FileOutputStream(tmpFile)) { + os.write(totalGz); + } + + Pipeline p = TestPipeline.create(); + + CompressedSource source = + CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) + .withDecompression(CompressionMode.GZIP); + PCollection output = p.apply(Read.from(source)); + + DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected)); + p.run(); + } + /** * Test reading empty input with bzip2. */ From d397b543017f9f9108d5bcf202542e2c24cfeabb Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 4 Apr 2016 13:35:25 -0700 Subject: [PATCH 2/2] [BEAM-167] Add comment for the test explaining why concatenation of gzip files is valid --- .../google/cloud/dataflow/sdk/io/CompressedSourceTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index f63a128f4a1d..3de0513d18f8 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -116,6 +116,12 @@ private static byte[] concat(byte[] first, byte[] second) { return res; } + /** + * Test a concatenation of gzip files is correctly decompressed. + * + *

A concatenation of gzip files as one file is a valid gzip file and should decompress + * to be the concatenation of those individual files. + */ @Test public void testReadConcatenatedGzip() throws IOException { byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);