From 034e080bf823ca62319e7316b50b3a958bc6bbf0 Mon Sep 17 00:00:00 2001 From: kirpichov Date: Mon, 4 Apr 2016 13:31:23 -0700 Subject: [PATCH 1/2] [BEAM-167] Fix custom source gzip input to read concatenated gzip files This applies patch from kirpichov@google.com from https://gist.github.com/jkff/d8d984a33a41ec607328cee8e418c174 --- .../dataflow/sdk/io/CompressedSource.java | 2 +- .../dataflow/sdk/io/CompressedSourceTest.java | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java index e3dca91680..b0636ea35f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java @@ -121,7 +121,7 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe byte zero = 0x00; int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); if (header == GZIPInputStream.GZIP_MAGIC) { - return Channels.newChannel(new GzipCompressorInputStream(stream)); + return Channels.newChannel(new GzipCompressorInputStream(stream, true)); } } return Channels.newChannel(stream); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index 14c8fe9aca..86c39a47f1 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -46,16 +46,19 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; @@ -97,6 +100,43 @@ public void testEmptyReadGzip() throws Exception { runReadTest(input, CompressionMode.GZIP); } + private static byte[] compressGzip(byte[] input) throws IOException { + ByteArrayOutputStream res = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) { + gzipStream.write(input); + } + return res.toByteArray(); + } + + private static byte[] concat(byte[] first, byte[] second) { + byte[] res = new byte[first.length + second.length]; + System.arraycopy(first, 0, res, 0, first.length); + System.arraycopy(second, 0, res, first.length, second.length); + return res; + } + + @Test + public void testReadConcatenatedGzip() throws IOException { + byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); + byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); + byte[] expected = concat(header, body); + byte[] totalGz = concat(compressGzip(header), compressGzip(body)); + File tmpFile = tmpFolder.newFile(); + try (FileOutputStream os = new FileOutputStream(tmpFile)) { + os.write(totalGz); + } + + Pipeline p = TestPipeline.create(); + + CompressedSource source = + CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) + .withDecompression(CompressionMode.GZIP); + PCollection output = p.apply(Read.from(source)); + + DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected)); + p.run(); + } + /** * Test reading empty input with bzip2. */ From b2b4a2f4ab88174d38cb29338e113b0d4dd34085 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 4 Apr 2016 13:35:25 -0700 Subject: [PATCH 2/2] [BEAM-167] Add comment for the test explaining why concatenation of gzip files is valid --- .../google/cloud/dataflow/sdk/io/CompressedSourceTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index 86c39a47f1..fe855101c6 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -115,6 +115,12 @@ private static byte[] concat(byte[] first, byte[] second) { return res; } + /** + * Test a concatenation of gzip files is correctly decompressed. + * + *

A concatenation of gzip files as one file is a valid gzip file and should decompress + * to be the concatenation of those individual files. + */ @Test public void testReadConcatenatedGzip() throws IOException { byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);