Skip to content

Commit 7b86642

Browse files
committed
Fixes #88 by introducing thread-safe BufferAllocator
1 parent 5eebef8 commit 7b86642

File tree

8 files changed

+132
-190
lines changed

8 files changed

+132
-190
lines changed

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
sbt.version=0.13.5
1+
sbt.version=0.13.6
22

src/main/java/org/xerial/snappy/BufferRecycler.java

Lines changed: 0 additions & 176 deletions
This file was deleted.

src/main/java/org/xerial/snappy/SnappyOutputStream.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
//--------------------------------------
2525
package org.xerial.snappy;
2626

27+
import org.xerial.snappy.buffer.BufferAllocatorFactory;
28+
import org.xerial.snappy.buffer.BufferAllocator;
29+
import org.xerial.snappy.buffer.CachedBufferAllocator;
30+
2731
import java.io.IOException;
2832
import java.io.OutputStream;
2933

@@ -56,9 +60,11 @@ public class SnappyOutputStream extends OutputStream {
5660
static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size
5761

5862
protected final OutputStream out;
59-
60-
private final BufferRecycler recycler;
6163
private final int blockSize;
64+
65+
private final BufferAllocator inputBufferAllocator;
66+
private final BufferAllocator outputBufferAllocator;
67+
6268
protected final byte[] inputBuffer;
6369
protected final byte[] outputBuffer;
6470
private int inputCursor = 0;
@@ -74,14 +80,25 @@ public SnappyOutputStream(OutputStream out) {
7480
* @throws IOException
7581
*/
7682
public SnappyOutputStream(OutputStream out, int blockSize) {
83+
this(out, blockSize, CachedBufferAllocator.factory);
84+
}
85+
86+
public SnappyOutputStream(OutputStream out, int blockSize, BufferAllocatorFactory bufferAllocatorFactory) {
7787
this.out = out;
78-
this.recycler = BufferRecycler.instance();
7988
this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize);
80-
inputBuffer = recycler.allocInputBuffer(this.blockSize);
81-
outputBuffer = recycler.allocOutputBuffer(SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize));
89+
int inputSize = blockSize;
90+
int outputSize = SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(blockSize);
91+
92+
this.inputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(inputSize);
93+
this.outputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(outputSize);
94+
95+
inputBuffer = inputBufferAllocator.allocate(inputSize);
96+
outputBuffer = inputBufferAllocator.allocate(outputSize);
97+
8298
outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
8399
}
84100

101+
85102
/* (non-Javadoc)
86103
* @see java.io.OutputStream#write(byte[], int, int)
87104
*/
@@ -265,9 +282,9 @@ public void flush() throws IOException {
265282

266283
static void writeInt(byte[] dst, int offset, int v) {
267284
dst[offset] = (byte) ((v >> 24) & 0xFF);
268-
dst[offset+1] = (byte) ((v >> 16) & 0xFF);
269-
dst[offset+2] = (byte) ((v >> 8) & 0xFF);
270-
dst[offset+3] = (byte) ((v >> 0) & 0xFF);
285+
dst[offset + 1] = (byte) ((v >> 16) & 0xFF);
286+
dst[offset + 2] = (byte) ((v >> 8) & 0xFF);
287+
dst[offset + 3] = (byte) ((v >> 0) & 0xFF);
271288
}
272289

273290
static int readInt(byte[] buffer, int pos) {
@@ -312,10 +329,9 @@ public void close() throws IOException {
312329
try {
313330
flush();
314331
out.close();
315-
}
316-
finally {
317-
recycler.releaseInputBuffer(inputBuffer);
318-
recycler.releaseOutputBuffer(outputBuffer);
332+
} finally {
333+
inputBufferAllocator.release(inputBuffer);
334+
outputBufferAllocator.release(outputBuffer);
319335
}
320336
}
321337

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.xerial.snappy.buffer;
2+
3+
/**
4+
* BufferAllocator interface. The implementation of this interface must be thread-safe
5+
*/
6+
public interface BufferAllocator {
7+
8+
public byte[] allocate(int size);
9+
public void release(byte[] buffer);
10+
11+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.xerial.snappy.buffer;
2+
3+
/**
4+
*
5+
*/
6+
public interface BufferAllocatorFactory {
7+
8+
BufferAllocator getBufferAllocator(int minSize);
9+
}
10+
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.xerial.snappy.buffer;
2+
3+
import java.lang.ref.SoftReference;
4+
import java.util.*;
5+
6+
/**
7+
* Cached buffer
8+
*/
9+
public class CachedBufferAllocator implements BufferAllocator {
10+
11+
public static BufferAllocatorFactory factory = new BufferAllocatorFactory() {
12+
@Override
13+
public BufferAllocator getBufferAllocator(int bufferSize) {
14+
return CachedBufferAllocator.getAllocator(bufferSize);
15+
}
16+
};
17+
18+
/**
19+
* Use SoftReference so that having this queueTable does not prevent the GC of CachedBufferAllocator instances
20+
*/
21+
public static Map<Integer, SoftReference<CachedBufferAllocator>> queueTable = new HashMap<Integer, SoftReference<CachedBufferAllocator>>();
22+
23+
private final int bufferSize;
24+
private final Deque<byte[]> bufferQueue;
25+
26+
public CachedBufferAllocator(int bufferSize) {
27+
this.bufferSize = bufferSize;
28+
this.bufferQueue = new ArrayDeque<byte[]>();
29+
}
30+
31+
public static synchronized CachedBufferAllocator getAllocator(int bufferSize) {
32+
if(!queueTable.containsKey(bufferSize)) {
33+
queueTable.put(bufferSize, new SoftReference<CachedBufferAllocator>(new CachedBufferAllocator(bufferSize)));
34+
}
35+
return queueTable.get(bufferSize).get();
36+
}
37+
38+
@Override
39+
public byte[] allocate(int size) {
40+
synchronized(this) {
41+
if(bufferQueue.isEmpty()) {
42+
return new byte[size];
43+
}
44+
else {
45+
return bufferQueue.pollFirst();
46+
}
47+
}
48+
}
49+
@Override
50+
public void release(byte[] buffer) {
51+
synchronized(this) {
52+
bufferQueue.addLast(buffer);
53+
}
54+
}
55+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.xerial.snappy.buffer;
2+
3+
/**
4+
* Simple buffer allocator, which does not reuse the allocated buffer
5+
*/
6+
public class DefaultBufferAllocator implements BufferAllocator {
7+
8+
public static BufferAllocatorFactory factory = new BufferAllocatorFactory() {
9+
public BufferAllocator singleton = new DefaultBufferAllocator();
10+
@Override
11+
public BufferAllocator getBufferAllocator(int bufferSize) {
12+
return singleton;
13+
}
14+
};
15+
16+
@Override
17+
public byte[] allocate(int size) {
18+
return new byte[size];
19+
}
20+
21+
@Override
22+
public void release(byte[] buffer) {
23+
// do nothing
24+
}
25+
26+
}

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "1.1.1.3"
1+
version in ThisBuild := "1.1.1.4-SNAPSHOT"

0 commit comments

Comments
 (0)