diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java index 4e3e9cab0646..15e6e29decea 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java @@ -122,7 +122,7 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe byte zero = 0x00; int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); if (header == GZIPInputStream.GZIP_MAGIC) { - return Channels.newChannel(new GzipCompressorInputStream(stream)); + return Channels.newChannel(new GzipCompressorInputStream(stream, true)); } } return Channels.newChannel(stream); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index 2dcddb4e62d1..3de0513d18f8 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -47,16 +47,19 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; @@ -98,6 +101,49 @@ public void testEmptyReadGzip() throws Exception { runReadTest(input, CompressionMode.GZIP); } + private static byte[] compressGzip(byte[] input) throws IOException { + ByteArrayOutputStream res = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) { + gzipStream.write(input); + } + return res.toByteArray(); + } + + private static byte[] concat(byte[] first, byte[] second) { + byte[] res = new byte[first.length + second.length]; + System.arraycopy(first, 0, res, 0, first.length); + System.arraycopy(second, 0, res, first.length, second.length); + return res; + } + + /** + * Test a concatenation of gzip files is correctly decompressed. + * + *

A concatenation of gzip files as one file is a valid gzip file and should decompress + * to be the concatenation of those individual files. + */ + @Test + public void testReadConcatenatedGzip() throws IOException { + byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); + byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); + byte[] expected = concat(header, body); + byte[] totalGz = concat(compressGzip(header), compressGzip(body)); + File tmpFile = tmpFolder.newFile(); + try (FileOutputStream os = new FileOutputStream(tmpFile)) { + os.write(totalGz); + } + + Pipeline p = TestPipeline.create(); + + CompressedSource source = + CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) + .withDecompression(CompressionMode.GZIP); + PCollection output = p.apply(Read.from(source)); + + DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected)); + p.run(); + } + /** * Test reading empty input with bzip2. */