From 31e1b06a29ac49965af18f8b7d026f23e455cb32 Mon Sep 17 00:00:00 2001 From: electicsam <6004777+electricsam@users.noreply.github.com> Date: Thu, 21 Jan 2021 11:00:36 -0700 Subject: [PATCH 1/2] The BloscCompressor used InputStream.available() to determine if bytes were available. InputStream.available() is not guaranteed to be accurate. Some implementations rely on the implementation in InputStream, which always returns 0. An EOFException was thrown in this case. The BlockCompressor was updated to just attempt to read from the stream and see if bytes were returned. --- .../java/com/bc/zarr/CompressorFactory.java | 9 +-- src/test/java/com/bc/zarr/CompressorTest.java | 71 +++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/bc/zarr/CompressorFactory.java b/src/main/java/com/bc/zarr/CompressorFactory.java index d7a692b..aa40d67 100644 --- a/src/main/java/com/bc/zarr/CompressorFactory.java +++ b/src/main/java/com/bc/zarr/CompressorFactory.java @@ -207,6 +207,9 @@ public void uncompress(InputStream is, OutputStream os) throws IOException { } private static class BloscCompressor extends Compressor { + + private static final int HEADER_SIZE = 16; + private final int clevel; private final int blocksize; private final int shuffle; @@ -312,10 +315,8 @@ public void compress(InputStream is, OutputStream os) throws IOException { @Override public void uncompress(InputStream is, OutputStream os) throws IOException { - while (is.available() >= 16) { - byte[] header = new byte[16]; - is.read(header); - + byte[] header = new byte[HEADER_SIZE]; + while (is.read(header) == HEADER_SIZE) { BufferSizes bs = cbufferSizes(ByteBuffer.wrap(header)); int chunkSize = (int) bs.getCbytes(); diff --git a/src/test/java/com/bc/zarr/CompressorTest.java b/src/test/java/com/bc/zarr/CompressorTest.java index 8c9817c..a282d85 100644 --- a/src/test/java/com/bc/zarr/CompressorTest.java +++ b/src/test/java/com/bc/zarr/CompressorTest.java @@ -36,6 +36,7 @@ import javax.imageio.stream.MemoryCacheImageOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.IOException; import java.nio.ByteOrder; import java.util.Arrays; @@ -151,4 +152,74 @@ public void writeRead_BloscCompressor() throws IOException { resultIis.readFully(uncompressed, 0, uncompressed.length); assertThat(input, is(equalTo(uncompressed))); } + + @Test + public void read_BloscCompressor_DefaultAvailable() throws IOException { + final Compressor compressor = CompressorFactory.create("blosc"); + final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + final int[] input = { + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100 + }; + final MemoryCacheImageOutputStream iis = new MemoryCacheImageOutputStream(new ByteArrayOutputStream()); + iis.setByteOrder(byteOrder); + iis.writeInts(input, 0, input.length); + iis.seek(0); + + ByteArrayOutputStream os; + InputStream is; + + final byte[] intermediate = {2, 1, 33, 1, -36, 0, 0, 0, -36, 0, 0, 0, 73, 0, 0, 0, 20, 0, 0, 0, 49, 0, 0, 0, -5, 17, 0, 0, 0, 100, 0, 0, 0, 22, 0, 0, 0, 100, 0, 0, 0, 22, 0, 0, 0, 22, 0, 0, 0, 22, 0, 0, 0, 100, 0, 0, 0, 100, 32, 0, 0, 20, 0, 15, 44, 0, -111, 80, 22, 0, 0, 0, 100}; + + //write + os = new ByteArrayOutputStream(); + compressor.compress(new ZarrInputStreamAdapter(iis), os); + final byte[] compressed = os.toByteArray(); + assertThat(compressed, is(equalTo(intermediate))); + + //read + is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed)); + os = new ByteArrayOutputStream(); + compressor.uncompress(is, os); + final ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray()); + final MemoryCacheImageInputStream resultIis = new MemoryCacheImageInputStream(bais); + resultIis.setByteOrder(byteOrder); + final int[] uncompressed = new int[input.length]; + resultIis.readFully(uncompressed, 0, uncompressed.length); + assertThat(input, is(equalTo(uncompressed))); + } + + // Simulates a software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream which + // does not provide it's own implementation of available() and always returns 0 + private static class MockAWSChecksumValidatingInputStream extends InputStream { + + private final InputStream in; + + public MockAWSChecksumValidatingInputStream(InputStream in) { + this.in = in; + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + return in.read(buf, off, len); + } + + @Override + public synchronized void reset() throws IOException { + in.reset(); + } + + @Override + public void close() throws IOException { + in.close(); + } + } } \ No newline at end of file From c954da0408c3b662842e133a95924db9df81ac14 Mon Sep 17 00:00:00 2001 From: electicsam <6004777+electricsam@users.noreply.github.com> Date: Thu, 21 Jan 2021 22:51:39 -0700 Subject: [PATCH 2/2] Blosc compressor now checks how many bytes were read when decompressing. Some InputStream implementations may mot always return the requested number of bytes from read(). This is the case when reading from an S3 bucket, where the stream is chunked. --- .../java/com/bc/zarr/CompressorFactory.java | 38 ++++++--- src/test/java/com/bc/zarr/CompressorTest.java | 77 +++++++++++++++++-- 2 files changed, 98 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/bc/zarr/CompressorFactory.java b/src/main/java/com/bc/zarr/CompressorFactory.java index aa40d67..9959f8b 100644 --- a/src/main/java/com/bc/zarr/CompressorFactory.java +++ b/src/main/java/com/bc/zarr/CompressorFactory.java @@ -208,8 +208,6 @@ public void uncompress(InputStream is, OutputStream os) throws IOException { private static class BloscCompressor extends Compressor { - private static final int HEADER_SIZE = 16; - private final int clevel; private final int blocksize; private final int shuffle; @@ -315,19 +313,37 @@ public void compress(InputStream is, OutputStream os) throws IOException { @Override public void uncompress(InputStream is, OutputStream os) throws IOException { - byte[] header = new byte[HEADER_SIZE]; - while (is.read(header) == HEADER_SIZE) { - BufferSizes bs = cbufferSizes(ByteBuffer.wrap(header)); - int chunkSize = (int) bs.getCbytes(); - byte[] inBytes = Arrays.copyOf(header, header.length + chunkSize); - is.read(inBytes, header.length, chunkSize); + byte[] header = new byte[JBlosc.OVERHEAD]; + int lastHeaderRead = 0; + int headerSize = 0; + while(lastHeaderRead >= 0 && headerSize < header.length) { + lastHeaderRead = is.read(header, headerSize, header.length - headerSize); + headerSize += lastHeaderRead; + } + + if(headerSize == header.length) { - ByteBuffer outBuffer = ByteBuffer.allocate((int) bs.getNbytes()); - JBlosc.decompressCtx(ByteBuffer.wrap(inBytes), outBuffer, outBuffer.limit(), 1); + BufferSizes bs = cbufferSizes(ByteBuffer.wrap(header)); + int compressedSize = (int) bs.getCbytes(); + int uncompressedSize = (int) bs.getNbytes(); + + int lastRead = 0; + int totalRead = header.length; + byte[] inBytes = Arrays.copyOf(header, compressedSize); + while(lastRead >= 0 && totalRead < compressedSize) { + lastRead = is.read(inBytes, totalRead, compressedSize - totalRead); + totalRead += lastRead; + } + + if(totalRead == compressedSize) { + ByteBuffer outBuffer = ByteBuffer.allocate(uncompressedSize); + JBlosc.decompressCtx(ByteBuffer.wrap(inBytes), outBuffer, outBuffer.limit(), 1); + os.write(outBuffer.array()); + } - os.write(outBuffer.array()); } + } private BufferSizes cbufferSizes(ByteBuffer cbuffer) { diff --git a/src/test/java/com/bc/zarr/CompressorTest.java b/src/test/java/com/bc/zarr/CompressorTest.java index a282d85..628c0c6 100644 --- a/src/test/java/com/bc/zarr/CompressorTest.java +++ b/src/test/java/com/bc/zarr/CompressorTest.java @@ -30,6 +30,7 @@ import static org.hamcrest.MatcherAssert.*; import com.bc.zarr.chunk.ZarrInputStreamAdapter; +import java.util.function.Function; import org.junit.*; import javax.imageio.stream.MemoryCacheImageInputStream; @@ -158,6 +159,16 @@ public void read_BloscCompressor_DefaultAvailable() throws IOException { final Compressor compressor = CompressorFactory.create("blosc"); final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; final int[] input = { + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, @@ -172,16 +183,59 @@ public void read_BloscCompressor_DefaultAvailable() throws IOException { ByteArrayOutputStream os; InputStream is; - final byte[] intermediate = {2, 1, 33, 1, -36, 0, 0, 0, -36, 0, 0, 0, 73, 0, 0, 0, 20, 0, 0, 0, 49, 0, 0, 0, -5, 17, 0, 0, 0, 100, 0, 0, 0, 22, 0, 0, 0, 100, 0, 0, 0, 22, 0, 0, 0, 22, 0, 0, 0, 22, 0, 0, 0, 100, 0, 0, 0, 100, 32, 0, 0, 20, 0, 15, 44, 0, -111, 80, 22, 0, 0, 0, 100}; + //write + os = new ByteArrayOutputStream(); + compressor.compress(new ZarrInputStreamAdapter(iis), os); + final byte[] compressed = os.toByteArray(); + + //read + is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed), len -> len > 16 ? 7 : len); + os = new ByteArrayOutputStream(); + compressor.uncompress(is, os); + final ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray()); + final MemoryCacheImageInputStream resultIis = new MemoryCacheImageInputStream(bais); + resultIis.setByteOrder(byteOrder); + final int[] uncompressed = new int[input.length]; + resultIis.readFully(uncompressed, 0, uncompressed.length); + assertThat(input, is(equalTo(uncompressed))); + } + + @Test + public void read_BloscCompressor_ReducedBytesRead() throws IOException { + final Compressor compressor = CompressorFactory.create("blosc"); + final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + final int[] input = { + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100 + }; + final MemoryCacheImageOutputStream iis = new MemoryCacheImageOutputStream(new ByteArrayOutputStream()); + iis.setByteOrder(byteOrder); + iis.writeInts(input, 0, input.length); + iis.seek(0); + + ByteArrayOutputStream os; + InputStream is; //write os = new ByteArrayOutputStream(); compressor.compress(new ZarrInputStreamAdapter(iis), os); final byte[] compressed = os.toByteArray(); - assertThat(compressed, is(equalTo(intermediate))); //read - is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed)); + is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed), len -> len > 1 ? len - 1 : len); os = new ByteArrayOutputStream(); compressor.uncompress(is, os); final ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray()); @@ -193,13 +247,17 @@ public void read_BloscCompressor_DefaultAvailable() throws IOException { } // Simulates a software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream which - // does not provide it's own implementation of available() and always returns 0 + // does not provide it's own implementation of available() and always returns 0. + // Additionally, this class may return less bytes than requested from read(), this is allowed per + // the documentation private static class MockAWSChecksumValidatingInputStream extends InputStream { private final InputStream in; + private final Function getBytesRead; - public MockAWSChecksumValidatingInputStream(InputStream in) { + public MockAWSChecksumValidatingInputStream(InputStream in, Function getBytesRead) { this.in = in; + this.getBytesRead = getBytesRead; } @Override @@ -207,9 +265,16 @@ public int read() throws IOException { return in.read(); } + /* + * From InputStream: + * + * Reads up to len bytes of data from the input stream into + * an array of bytes. An attempt is made to read as many as + * len bytes, but a smaller number may be read. + */ @Override public int read(byte[] buf, int off, int len) throws IOException { - return in.read(buf, off, len); + return in.read(buf, off, getBytesRead.apply(len)); } @Override