Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 6 additions & 27 deletions src/com/amazon/ion/impl/IonReaderBinaryIncremental.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
import com.amazon.ion.UnknownSymbolException;
import com.amazon.ion.ValueFactory;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.impl.bin.utf8.Utf8StringDecoder;
import com.amazon.ion.impl.bin.utf8.Utf8StringDecoderPool;
import com.amazon.ion.system.SimpleCatalog;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.util.Arrays;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -161,9 +160,6 @@ private static class SystemSymbolIDs {
private static final int MAX_ID_ID = 8;
}

// The size of the reusable UTF-8 decoding buffer.
private static final int UTF8_BUFFER_SIZE_IN_BYTES = 4 * 1024;

// The final byte of the binary IVM.
private static final int IVM_FINAL_BYTE = 0xEA;

Expand Down Expand Up @@ -233,8 +229,7 @@ private static class SystemSymbolIDs {
// Stack to hold container info. Stepping into a container results in a push; stepping out results in a pop.
private final _Private_RecyclingStack<ContainerInfo> containerStack;

// UTF-8 string decoder.
private final CharsetDecoder utf8CharsetDecoder = Charset.forName("UTF-8").newDecoder();
private final Utf8StringDecoder utf8Decoder = Utf8StringDecoderPool.getInstance().getOrCreate();

// The symbol IDs for the annotations on the current value.
private final List<Integer> annotationSids;
Expand Down Expand Up @@ -283,9 +278,6 @@ private static class SystemSymbolIDs {
// The number of bytes of a lob value that the user has consumed, allowing for piecewise reads.
private int lobBytesRead = 0;

// A reusable scratch space to hold decoded UTF-8 bytes.
private CharBuffer utf8DecodingBuffer = CharBuffer.allocate(UTF8_BUFFER_SIZE_IN_BYTES);

// The type of value at which the reader is currently positioned.
private IonType valueType = null;

Expand Down Expand Up @@ -1599,22 +1591,8 @@ public double doubleValue() {
*/
private String readString(int valueStart, int valueEnd) {
ByteBuffer utf8InputBuffer = buffer.getByteBuffer(valueStart, valueEnd);

int numberOfBytes = valueEnd - valueStart;
if (numberOfBytes > utf8DecodingBuffer.capacity()) {
utf8DecodingBuffer = CharBuffer.allocate(numberOfBytes);
}

utf8DecodingBuffer.position(0);
utf8DecodingBuffer.limit(utf8DecodingBuffer.capacity());

utf8CharsetDecoder.reset();
CoderResult coderResult = utf8CharsetDecoder.decode(utf8InputBuffer, utf8DecodingBuffer, true);
if (coderResult.isError()) {
throw new IonException("Illegal value encountered while validating UTF-8 data in input stream. " + coderResult.toString());
}
utf8DecodingBuffer.flip();
return utf8DecodingBuffer.toString();
return utf8Decoder.decode(utf8InputBuffer, numberOfBytes);
}

@Override
Expand Down Expand Up @@ -1974,6 +1952,7 @@ public void requireCompleteValue() {
public void close() throws IOException {
requireCompleteValue();
inputStream.close();
utf8Decoder.close();
}

}
55 changes: 16 additions & 39 deletions src/com/amazon/ion/impl/IonReaderBinaryRawX.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
import com.amazon.ion.impl.UnifiedSavePointManagerX.SavePoint;
import com.amazon.ion.impl._Private_ScalarConversions.AS_TYPE;
import com.amazon.ion.impl._Private_ScalarConversions.ValueVariant;
import com.amazon.ion.impl.bin.utf8.ByteBufferPool;
import com.amazon.ion.impl.bin.utf8.PoolableByteBuffer;
import com.amazon.ion.impl.bin.utf8.Utf8StringDecoder;
import com.amazon.ion.impl.bin.utf8.Utf8StringDecoderPool;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;


/**
Expand All @@ -58,7 +58,6 @@ abstract class IonReaderBinaryRawX
static final int DEFAULT_CONTAINER_STACK_SIZE = 12; // a multiple of 3
static final int DEFAULT_ANNOTATION_SIZE = 10;
static final int NO_LIMIT = Integer.MIN_VALUE;
static final int UTF8_BUFFER_SIZE_IN_BYTES = 4 * 1024;

protected enum State {
S_INVALID,
Expand Down Expand Up @@ -104,18 +103,13 @@ protected enum State {
int _container_top;
long[] _container_stack; // triples of: position, type, local_end


// `StandardCharsets.UTF_8` wasn't introduced until Java 7, so we have to use Charset#forName(String) instead.
private static final Charset UTF8 = Charset.forName("UTF-8");
private CharsetDecoder utf8CharsetDecoder = UTF8.newDecoder();
// Pooled decoder for UTF-8 strings.
private final Utf8StringDecoder utf8Decoder = Utf8StringDecoderPool.getInstance().getOrCreate();

// Calling read() to pull in the next byte of a string requires an EOF check to be performed for each byte.
// This reusable buffer allows us to call read(utf8InputBuffer) instead, letting us can pay the cost of an EOF check
// once per buffer rather than once per byte.
private ByteBuffer utf8InputBuffer = ByteBuffer.allocate(UTF8_BUFFER_SIZE_IN_BYTES);

// A reusable scratch space to hold the decoded bytes as they're read from the utf8InputBuffer.
private CharBuffer utf8DecodingBuffer = CharBuffer.allocate(UTF8_BUFFER_SIZE_IN_BYTES);
private final PoolableByteBuffer pooledUtf8InputBuffer = ByteBufferPool.getInstance().getOrCreate();

protected IonReaderBinaryRawX() {
}
Expand Down Expand Up @@ -169,6 +163,8 @@ public void close()
throws IOException
{
_input.close();
utf8Decoder.close();
pooledUtf8InputBuffer.close();
}

static private final int POS_OFFSET = 0;
Expand Down Expand Up @@ -1200,16 +1196,14 @@ protected final Timestamp readTimestamp(int len) throws IOException

protected final String readString(int numberOfBytes) throws IOException
{
ByteBuffer utf8InputBuffer = pooledUtf8InputBuffer.getBuffer();
// If the string we're reading is small enough to fit in our reusable buffer, we can avoid the overhead
// of looping and bounds checking.
if (numberOfBytes <= utf8InputBuffer.capacity()) {
return readStringWithReusableBuffer(numberOfBytes);
return readStringWithReusableBuffer(numberOfBytes, utf8InputBuffer);
}

// Otherwise, allocate a one-off decoding buffer that's large enough to hold the string
// and prepare to decode the string in chunks.
CharBuffer decodingBuffer = CharBuffer.allocate(numberOfBytes);
utf8CharsetDecoder.reset();
utf8Decoder.prepareDecode(numberOfBytes);

int save_limit = NO_LIMIT;
if (_local_remaining != NO_LIMIT) {
Expand Down Expand Up @@ -1246,14 +1240,7 @@ protected final String readString(int numberOfBytes) throws IOException
utf8InputBuffer.position(0);
utf8InputBuffer.limit(carryoverBytes + bytesRead);

CoderResult coderResult = utf8CharsetDecoder.decode(
utf8InputBuffer,
decodingBuffer,
totalBytesRead >= numberOfBytes
);
if (coderResult.isError()) {
throw new IonException("Illegal value encountered while validating UTF-8 data in input stream. " + coderResult.toString());
}
utf8Decoder.partialDecode(utf8InputBuffer, totalBytesRead >= numberOfBytes);

// Shift leftover partial character bytes (if any) to the beginning of the buffer
carryoverBytes = utf8InputBuffer.remaining();
Expand All @@ -1270,11 +1257,10 @@ protected final String readString(int numberOfBytes) throws IOException

_local_remaining = save_limit;

decodingBuffer.flip();
return decodingBuffer.toString();
return utf8Decoder.finishDecode();
}

private String readStringWithReusableBuffer(int numberOfBytes) throws IOException {
private String readStringWithReusableBuffer(int numberOfBytes, ByteBuffer utf8InputBuffer) throws IOException {
int save_limit = NO_LIMIT;
if (_local_remaining != NO_LIMIT) {
save_limit = _local_remaining - numberOfBytes;
Expand All @@ -1286,16 +1272,7 @@ private String readStringWithReusableBuffer(int numberOfBytes) throws IOExceptio
utf8InputBuffer.position(0);
utf8InputBuffer.limit(numberOfBytes);

utf8DecodingBuffer.position(0);
utf8DecodingBuffer.limit(utf8DecodingBuffer.capacity());

utf8CharsetDecoder.reset();
CoderResult coderResult = utf8CharsetDecoder.decode(utf8InputBuffer, utf8DecodingBuffer, true);
if (coderResult.isError()) {
throw new IonException("Illegal value encountered while validating UTF-8 data in input stream. " + coderResult.toString());
}
utf8DecodingBuffer.flip();
return utf8DecodingBuffer.toString();
return utf8Decoder.decode(utf8InputBuffer, numberOfBytes);
}

private final void throwUnexpectedEOFException() throws IOException {
Expand Down
2 changes: 1 addition & 1 deletion src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private static byte[] bytes(int... vals) {

final Utf8StringEncoder utf8StringEncoder = Utf8StringEncoderPool
.getInstance()
.getOrCreateUtf8Encoder();
.getOrCreate();

private static final byte[] makeTypedPreallocatedBytes(final int typeDesc, final int length)
{
Expand Down
26 changes: 26 additions & 0 deletions src/com/amazon/ion/impl/bin/utf8/ByteBufferPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.amazon.ion.impl.bin.utf8;

/**
* A thread-safe shared pool of {@link PoolableByteBuffer}s.
*/
public class ByteBufferPool extends Pool<PoolableByteBuffer> {

private static final ByteBufferPool INSTANCE = new ByteBufferPool();

// Do not allow instantiation; all classes should share the singleton instance.
private ByteBufferPool() {
super(new Allocator<PoolableByteBuffer>() {
@Override
public PoolableByteBuffer newInstance(Pool<PoolableByteBuffer> pool) {
return new PoolableByteBuffer(pool);
}
});
}

/**
* @return a threadsafe shared instance of {@link ByteBufferPool}.
*/
public static ByteBufferPool getInstance() {
return INSTANCE;
}
}
64 changes: 64 additions & 0 deletions src/com/amazon/ion/impl/bin/utf8/Pool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.amazon.ion.impl.bin.utf8;

import java.util.concurrent.ArrayBlockingQueue;

abstract class Pool<T extends Poolable<?>> {

/**
* Allocates objects to be pooled.
* @param <T> the type of object.
*/
interface Allocator<T extends Poolable<?>> {

/**
* Allocate a new object and link it to the given pool.
* @param pool the pool to which the new object will be linked.
* @return a new instance.
*/
T newInstance(Pool<T> pool);
}

// The maximum number of objects that can be waiting in the queue before new ones will be discarded.
private static final int MAX_QUEUE_SIZE = 128;

// A queue of previously initialized objects that can be loaned out.
private final ArrayBlockingQueue<T> bufferQueue;

// Allocator of objects to be pooled.
private final Allocator<T> allocator;

Pool(Allocator<T> allocator) {
this.allocator = allocator;
bufferQueue = new ArrayBlockingQueue<T>(MAX_QUEUE_SIZE);
}

/**
* If the pool is not empty, removes an object from the pool and returns it;
* otherwise, constructs a new object.
*
* @return An object.
*/
public T getOrCreate() {
// The `poll` method does not block. If the queue is empty it returns `null` immediately.
T object = bufferQueue.poll();
if (object == null) {
// No buffers were available in the pool. Create a new one.
object = allocator.newInstance(this);
}
return object;
}

/**
* Adds the provided instance to the pool. If the pool is full, the instance will
* be discarded.
*
* Callers MUST NOT use an object after returning it to the pool.
*
* @param object An object to add to the pool.
*/
public void returnToPool(T object) {
// The `offer` method does not block. If the queue is full, it returns `false` immediately.
// If the provided instance cannot be added to the pool, we discard it silently.
bufferQueue.offer(object);
}
}
30 changes: 30 additions & 0 deletions src/com/amazon/ion/impl/bin/utf8/Poolable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.amazon.ion.impl.bin.utf8;

import java.io.Closeable;

/**
* Base class for types that may be pooled.
* @param <T> the concrete type.
*/
abstract class Poolable<T extends Poolable<T>> implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type bound is to ensure that only subclasses of Poolable can implement Poolable, right? Which provides the guarantee that you'll get instantiated with reference to an appropriate Pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it allows us to guarantee that we're getting a Pool<T> (i.e. a Pool for this type of Poolable) in the constructor.


// The pool to which this object is linked.
private final Pool<T> pool;

/**
* @param pool the pool to which the object will be returned upon {@link #close()}.
*/
Poolable(Pool<T> pool) {
this.pool = pool;
}

/**
* Attempts to return this instance to the pool with which it is associated, if any.
*
* Do not continue to use this instance after calling this method.
*/
@Override
public void close() {
pool.returnToPool((T) this);
}
}
33 changes: 33 additions & 0 deletions src/com/amazon/ion/impl/bin/utf8/PoolableByteBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazon.ion.impl.bin.utf8;

import java.nio.ByteBuffer;

/**
* Holds a reusable {@link ByteBuffer}. Instances of this class are reusable but are NOT threadsafe.
*
* Instances are vended by {@link ByteBufferPool#getOrCreate()}.
*
* Users are expected to call {@link #close()} when the decoder is no longer needed.
*/
public class PoolableByteBuffer extends Poolable<PoolableByteBuffer> {

static final int BUFFER_SIZE_IN_BYTES = 4 * 1024;

// The reusable buffer.
private final ByteBuffer buffer;

/**
* @param pool the pool to which the object will be returned upon {@link #close()}.
*/
PoolableByteBuffer(Pool<PoolableByteBuffer> pool) {
super(pool);
buffer = ByteBuffer.allocate(BUFFER_SIZE_IN_BYTES);
}

/**
* @return the buffer.
*/
public ByteBuffer getBuffer() {
return buffer;
}
}
Loading