diff --git a/src/main/java/com/bc/zarr/CompressorFactory.java b/src/main/java/com/bc/zarr/CompressorFactory.java index d7a692b..9959f8b 100644 --- a/src/main/java/com/bc/zarr/CompressorFactory.java +++ b/src/main/java/com/bc/zarr/CompressorFactory.java @@ -207,6 +207,7 @@ public void uncompress(InputStream is, OutputStream os) throws IOException { } private static class BloscCompressor extends Compressor { + private final int clevel; private final int blocksize; private final int shuffle; @@ -312,21 +313,37 @@ public void compress(InputStream is, OutputStream os) throws IOException { @Override public void uncompress(InputStream is, OutputStream os) throws IOException { - while (is.available() >= 16) { - byte[] header = new byte[16]; - is.read(header); - BufferSizes bs = cbufferSizes(ByteBuffer.wrap(header)); - int chunkSize = (int) bs.getCbytes(); + byte[] header = new byte[JBlosc.OVERHEAD]; + int lastHeaderRead = 0; + int headerSize = 0; + while(lastHeaderRead >= 0 && headerSize < header.length) { + lastHeaderRead = is.read(header, headerSize, header.length - headerSize); + headerSize += lastHeaderRead; + } - byte[] inBytes = Arrays.copyOf(header, header.length + chunkSize); - is.read(inBytes, header.length, chunkSize); + if(headerSize == header.length) { - ByteBuffer outBuffer = ByteBuffer.allocate((int) bs.getNbytes()); - JBlosc.decompressCtx(ByteBuffer.wrap(inBytes), outBuffer, outBuffer.limit(), 1); + BufferSizes bs = cbufferSizes(ByteBuffer.wrap(header)); + int compressedSize = (int) bs.getCbytes(); + int uncompressedSize = (int) bs.getNbytes(); + + int lastRead = 0; + int totalRead = header.length; + byte[] inBytes = Arrays.copyOf(header, compressedSize); + while(lastRead >= 0 && totalRead < compressedSize) { + lastRead = is.read(inBytes, totalRead, compressedSize - totalRead); + totalRead += lastRead; + } + + if(totalRead == compressedSize) { + ByteBuffer outBuffer = ByteBuffer.allocate(uncompressedSize); + JBlosc.decompressCtx(ByteBuffer.wrap(inBytes), outBuffer, outBuffer.limit(), 1); + os.write(outBuffer.array()); + } - os.write(outBuffer.array()); } + } private BufferSizes cbufferSizes(ByteBuffer cbuffer) { diff --git a/src/test/java/com/bc/zarr/CompressorTest.java b/src/test/java/com/bc/zarr/CompressorTest.java index 8c9817c..628c0c6 100644 --- a/src/test/java/com/bc/zarr/CompressorTest.java +++ b/src/test/java/com/bc/zarr/CompressorTest.java @@ -30,12 +30,14 @@ import static org.hamcrest.MatcherAssert.*; import com.bc.zarr.chunk.ZarrInputStreamAdapter; +import java.util.function.Function; import org.junit.*; import javax.imageio.stream.MemoryCacheImageInputStream; import javax.imageio.stream.MemoryCacheImageOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.IOException; import java.nio.ByteOrder; import java.util.Arrays; @@ -151,4 +153,138 @@ public void writeRead_BloscCompressor() throws IOException { resultIis.readFully(uncompressed, 0, uncompressed.length); assertThat(input, is(equalTo(uncompressed))); } + + @Test + public void read_BloscCompressor_DefaultAvailable() throws IOException { + final Compressor compressor = CompressorFactory.create("blosc"); + final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + final int[] input = { + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100 + }; + final MemoryCacheImageOutputStream iis = new MemoryCacheImageOutputStream(new ByteArrayOutputStream()); + iis.setByteOrder(byteOrder); + iis.writeInts(input, 0, input.length); + iis.seek(0); + + ByteArrayOutputStream os; + InputStream is; + + //write + os = new ByteArrayOutputStream(); + compressor.compress(new ZarrInputStreamAdapter(iis), os); + final byte[] compressed = os.toByteArray(); + + //read + is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed), len -> len > 16 ? 7 : len); + os = new ByteArrayOutputStream(); + compressor.uncompress(is, os); + final ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray()); + final MemoryCacheImageInputStream resultIis = new MemoryCacheImageInputStream(bais); + resultIis.setByteOrder(byteOrder); + final int[] uncompressed = new int[input.length]; + resultIis.readFully(uncompressed, 0, uncompressed.length); + assertThat(input, is(equalTo(uncompressed))); + } + + @Test + public void read_BloscCompressor_ReducedBytesRead() throws IOException { + final Compressor compressor = CompressorFactory.create("blosc"); + final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + final int[] input = { + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100, + 100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100 + }; + final MemoryCacheImageOutputStream iis = new MemoryCacheImageOutputStream(new ByteArrayOutputStream()); + iis.setByteOrder(byteOrder); + iis.writeInts(input, 0, input.length); + iis.seek(0); + + ByteArrayOutputStream os; + InputStream is; + + //write + os = new ByteArrayOutputStream(); + compressor.compress(new ZarrInputStreamAdapter(iis), os); + final byte[] compressed = os.toByteArray(); + + //read + is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed), len -> len > 1 ? len - 1 : len); + os = new ByteArrayOutputStream(); + compressor.uncompress(is, os); + final ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray()); + final MemoryCacheImageInputStream resultIis = new MemoryCacheImageInputStream(bais); + resultIis.setByteOrder(byteOrder); + final int[] uncompressed = new int[input.length]; + resultIis.readFully(uncompressed, 0, uncompressed.length); + assertThat(input, is(equalTo(uncompressed))); + } + + // Simulates a software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream which + // does not provide it's own implementation of available() and always returns 0. + // Additionally, this class may return less bytes than requested from read(), this is allowed per + // the documentation + private static class MockAWSChecksumValidatingInputStream extends InputStream { + + private final InputStream in; + private final Function getBytesRead; + + public MockAWSChecksumValidatingInputStream(InputStream in, Function getBytesRead) { + this.in = in; + this.getBytesRead = getBytesRead; + } + + @Override + public int read() throws IOException { + return in.read(); + } + + /* + * From InputStream: + * + * Reads up to len bytes of data from the input stream into + * an array of bytes. An attempt is made to read as many as + * len bytes, but a smaller number may be read. + */ + @Override + public int read(byte[] buf, int off, int len) throws IOException { + return in.read(buf, off, getBytesRead.apply(len)); + } + + @Override + public synchronized void reset() throws IOException { + in.reset(); + } + + @Override + public void close() throws IOException { + in.close(); + } + } } \ No newline at end of file