Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions src/main/java/com/bc/zarr/CompressorFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public void uncompress(InputStream is, OutputStream os) throws IOException {
}

private static class BloscCompressor extends Compressor {

private final int clevel;
private final int blocksize;
private final int shuffle;
Expand Down Expand Up @@ -312,21 +313,37 @@ public void compress(InputStream is, OutputStream os) throws IOException {

@Override
public void uncompress(InputStream is, OutputStream os) throws IOException {
while (is.available() >= 16) {
byte[] header = new byte[16];
is.read(header);

BufferSizes bs = cbufferSizes(ByteBuffer.wrap(header));
int chunkSize = (int) bs.getCbytes();
byte[] header = new byte[JBlosc.OVERHEAD];
int lastHeaderRead = 0;
int headerSize = 0;
while(lastHeaderRead >= 0 && headerSize < header.length) {
lastHeaderRead = is.read(header, headerSize, header.length - headerSize);
headerSize += lastHeaderRead;
}

byte[] inBytes = Arrays.copyOf(header, header.length + chunkSize);
is.read(inBytes, header.length, chunkSize);
if(headerSize == header.length) {

ByteBuffer outBuffer = ByteBuffer.allocate((int) bs.getNbytes());
JBlosc.decompressCtx(ByteBuffer.wrap(inBytes), outBuffer, outBuffer.limit(), 1);
BufferSizes bs = cbufferSizes(ByteBuffer.wrap(header));
int compressedSize = (int) bs.getCbytes();
int uncompressedSize = (int) bs.getNbytes();

int lastRead = 0;
int totalRead = header.length;
byte[] inBytes = Arrays.copyOf(header, compressedSize);
while(lastRead >= 0 && totalRead < compressedSize) {
lastRead = is.read(inBytes, totalRead, compressedSize - totalRead);
totalRead += lastRead;
}

if(totalRead == compressedSize) {
ByteBuffer outBuffer = ByteBuffer.allocate(uncompressedSize);
JBlosc.decompressCtx(ByteBuffer.wrap(inBytes), outBuffer, outBuffer.limit(), 1);
os.write(outBuffer.array());
}

os.write(outBuffer.array());
}

}

private BufferSizes cbufferSizes(ByteBuffer cbuffer) {
Expand Down
136 changes: 136 additions & 0 deletions src/test/java/com/bc/zarr/CompressorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import static org.hamcrest.MatcherAssert.*;

import com.bc.zarr.chunk.ZarrInputStreamAdapter;
import java.util.function.Function;
import org.junit.*;

import javax.imageio.stream.MemoryCacheImageInputStream;
import javax.imageio.stream.MemoryCacheImageOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteOrder;
import java.util.Arrays;
Expand Down Expand Up @@ -151,4 +153,138 @@ public void writeRead_BloscCompressor() throws IOException {
resultIis.readFully(uncompressed, 0, uncompressed.length);
assertThat(input, is(equalTo(uncompressed)));
}

@Test
public void read_BloscCompressor_DefaultAvailable() throws IOException {
final Compressor compressor = CompressorFactory.create("blosc");
final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
final int[] input = {
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100
};
final MemoryCacheImageOutputStream iis = new MemoryCacheImageOutputStream(new ByteArrayOutputStream());
iis.setByteOrder(byteOrder);
iis.writeInts(input, 0, input.length);
iis.seek(0);

ByteArrayOutputStream os;
InputStream is;

//write
os = new ByteArrayOutputStream();
compressor.compress(new ZarrInputStreamAdapter(iis), os);
final byte[] compressed = os.toByteArray();

//read
is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed), len -> len > 16 ? 7 : len);
os = new ByteArrayOutputStream();
compressor.uncompress(is, os);
final ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray());
final MemoryCacheImageInputStream resultIis = new MemoryCacheImageInputStream(bais);
resultIis.setByteOrder(byteOrder);
final int[] uncompressed = new int[input.length];
resultIis.readFully(uncompressed, 0, uncompressed.length);
assertThat(input, is(equalTo(uncompressed)));
}

@Test
public void read_BloscCompressor_ReducedBytesRead() throws IOException {
final Compressor compressor = CompressorFactory.create("blosc");
final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
final int[] input = {
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100,
100, 22, 100, 22, 22, 22, 100, 100, 100, 22, 100
};
final MemoryCacheImageOutputStream iis = new MemoryCacheImageOutputStream(new ByteArrayOutputStream());
iis.setByteOrder(byteOrder);
iis.writeInts(input, 0, input.length);
iis.seek(0);

ByteArrayOutputStream os;
InputStream is;

//write
os = new ByteArrayOutputStream();
compressor.compress(new ZarrInputStreamAdapter(iis), os);
final byte[] compressed = os.toByteArray();

//read
is = new MockAWSChecksumValidatingInputStream(new ByteArrayInputStream(compressed), len -> len > 1 ? len - 1 : len);
os = new ByteArrayOutputStream();
compressor.uncompress(is, os);
final ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray());
final MemoryCacheImageInputStream resultIis = new MemoryCacheImageInputStream(bais);
resultIis.setByteOrder(byteOrder);
final int[] uncompressed = new int[input.length];
resultIis.readFully(uncompressed, 0, uncompressed.length);
assertThat(input, is(equalTo(uncompressed)));
}

// Simulates a software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream which
// does not provide it's own implementation of available() and always returns 0.
// Additionally, this class may return less bytes than requested from read(), this is allowed per
// the documentation
private static class MockAWSChecksumValidatingInputStream extends InputStream {

private final InputStream in;
private final Function<Integer, Integer> getBytesRead;

public MockAWSChecksumValidatingInputStream(InputStream in, Function<Integer, Integer> getBytesRead) {
this.in = in;
this.getBytesRead = getBytesRead;
}

@Override
public int read() throws IOException {
return in.read();
}

/*
* From InputStream:
*
* Reads up to <code>len</code> bytes of data from the input stream into
* an array of bytes. An attempt is made to read as many as
* <code>len</code> bytes, but a smaller number may be read.
*/
@Override
public int read(byte[] buf, int off, int len) throws IOException {
return in.read(buf, off, getBytesRead.apply(len));
}

@Override
public synchronized void reset() throws IOException {
in.reset();
}

@Override
public void close() throws IOException {
in.close();
}
}
}