Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions java/core/src/java/org/apache/orc/TypeDescription.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
Expand All @@ -45,6 +46,7 @@ public class TypeDescription
private static final int MAX_SCALE = 38;
private static final int DEFAULT_PRECISION = 38;
private static final int DEFAULT_SCALE = 10;
public static final int MAX_DECIMAL64_PRECISION = 18;
private static final int DEFAULT_LENGTH = 256;
static final Pattern UNQUOTED_NAMES = Pattern.compile("^[a-zA-Z0-9_]+$");

Expand Down Expand Up @@ -622,7 +624,7 @@ public int getMaximumId() {
return maxId;
}

private ColumnVector createColumn(int maxSize) {
private ColumnVector createColumn(RowBatchVersion version, int maxSize) {
switch (category) {
case BOOLEAN:
case BYTE:
Expand All @@ -637,7 +639,12 @@ private ColumnVector createColumn(int maxSize) {
case DOUBLE:
return new DoubleColumnVector(maxSize);
case DECIMAL:
return new DecimalColumnVector(maxSize, precision, scale);
if (version == RowBatchVersion.ORIGINAL ||
precision > MAX_DECIMAL64_PRECISION) {
return new DecimalColumnVector(maxSize, precision, scale);
} else {
return new Decimal64ColumnVector(maxSize, precision, scale);
}
case STRING:
case BINARY:
case CHAR:
Expand All @@ -646,48 +653,80 @@ private ColumnVector createColumn(int maxSize) {
case STRUCT: {
ColumnVector[] fieldVector = new ColumnVector[children.size()];
for(int i=0; i < fieldVector.length; ++i) {
fieldVector[i] = children.get(i).createColumn(maxSize);
fieldVector[i] = children.get(i).createColumn(version, maxSize);
}
return new StructColumnVector(maxSize,
fieldVector);
}
case UNION: {
ColumnVector[] fieldVector = new ColumnVector[children.size()];
for(int i=0; i < fieldVector.length; ++i) {
fieldVector[i] = children.get(i).createColumn(maxSize);
fieldVector[i] = children.get(i).createColumn(version, maxSize);
}
return new UnionColumnVector(maxSize,
fieldVector);
}
case LIST:
return new ListColumnVector(maxSize,
children.get(0).createColumn(maxSize));
children.get(0).createColumn(version, maxSize));
case MAP:
return new MapColumnVector(maxSize,
children.get(0).createColumn(maxSize),
children.get(1).createColumn(maxSize));
children.get(0).createColumn(version, maxSize),
children.get(1).createColumn(version, maxSize));
default:
throw new IllegalArgumentException("Unknown type " + category);
}
}

public VectorizedRowBatch createRowBatch(int maxSize) {
/**
* Specify the version of the VectorizedRowBatch that the user desires.
*/
enum RowBatchVersion {
ORIGINAL,
USE_DECIMAL64;
}

VectorizedRowBatch createRowBatch(RowBatchVersion version, int size) {
VectorizedRowBatch result;
if (category == Category.STRUCT) {
result = new VectorizedRowBatch(children.size(), maxSize);
result = new VectorizedRowBatch(children.size(), size);
for(int i=0; i < result.cols.length; ++i) {
result.cols[i] = children.get(i).createColumn(maxSize);
result.cols[i] = children.get(i).createColumn(version, size);
}
} else {
result = new VectorizedRowBatch(1, maxSize);
result.cols[0] = createColumn(maxSize);
result = new VectorizedRowBatch(1, size);
result.cols[0] = createColumn(version, size);
}
result.reset();
return result;
}

/**
* Create a VectorizedRowBatch that uses Decimal64ColumnVector for
* short (p <= 18) decimals.
* @return a new VectorizedRowBatch
*/
public VectorizedRowBatch createRowBatchV2() {
return createRowBatch(RowBatchVersion.USE_DECIMAL64,
VectorizedRowBatch.DEFAULT_SIZE);
}

/**
* Create a VectorizedRowBatch with the original ColumnVector types
* @param maxSize the maximum size of the batch
* @return a new VectorizedRowBatch
*/
public VectorizedRowBatch createRowBatch(int maxSize) {
return createRowBatch(RowBatchVersion.ORIGINAL, maxSize);
}

/**
* Create a VectorizedRowBatch with the original ColumnVector types
* @return a new VectorizedRowBatch
*/
public VectorizedRowBatch createRowBatch() {
return createRowBatch(VectorizedRowBatch.DEFAULT_SIZE);
return createRowBatch(RowBatchVersion.ORIGINAL,
VectorizedRowBatch.DEFAULT_SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ public static class AnyIntegerFromDecimalTreeReader extends ConvertTreeReader {
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
this.readerType = readerType;
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -848,7 +848,7 @@ public static class FloatFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -1054,7 +1054,7 @@ public static class DoubleFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -1387,7 +1387,7 @@ public static class DecimalFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
filePrecision = fileType.getPrecision();
fileScale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, filePrecision, fileScale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down Expand Up @@ -1565,7 +1565,7 @@ public static class StringGroupFromDecimalTreeReader extends ConvertTreeReader {
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
this.readerType = readerType;
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
}
Expand Down Expand Up @@ -1904,7 +1904,7 @@ public static class TimestampFromDecimalTreeReader extends ConvertTreeReader {
super(columnId);
this.precision = fileType.getPrecision();
this.scale = fileType.getScale();
decimalTreeReader = new DecimalTreeReader(columnId, context);
decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context);
setConvertTreeReader(decimalTreeReader);
}

Expand Down
12 changes: 7 additions & 5 deletions java/core/src/java/org/apache/orc/impl/SerializationUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ public SerializationUtils() {

public void writeVulong(OutputStream output,
long value) throws IOException {
int posn = 0;
while (true) {
if ((value & ~0x7f) == 0) {
output.write((byte) value);
return;
writeBuffer[posn++] = (byte) value;
break;
} else {
output.write((byte) (0x80 | (value & 0x7f)));
writeBuffer[posn++] = (byte)(0x80 | (value & 0x7f));
value >>>= 7;
}
}
output.write(writeBuffer, 0, posn);
}

public void writeVslong(OutputStream output,
Expand All @@ -55,7 +57,7 @@ public void writeVslong(OutputStream output,
}


public long readVulong(InputStream in) throws IOException {
public static long readVulong(InputStream in) throws IOException {
long result = 0;
long b;
int offset = 0;
Expand All @@ -70,7 +72,7 @@ public long readVulong(InputStream in) throws IOException {
return result;
}

public long readVslong(InputStream in) throws IOException {
public static long readVslong(InputStream in) throws IOException {
long result = readVulong(in);
return (result >>> 1) ^ -(result & 1);
}
Expand Down
88 changes: 72 additions & 16 deletions java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
Expand Down Expand Up @@ -1096,19 +1097,31 @@ void skipRows(long items) throws IOException {
}

public static class DecimalTreeReader extends TreeReader {
protected final int precision;
protected final int scale;
protected InStream valueStream;
protected IntegerReader scaleReader = null;
private int[] scratchScaleVector;
private byte[] scratchBytes;

DecimalTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, null, context);
}

protected DecimalTreeReader(int columnId, InStream present,
InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding, Context context)
throws IOException {
DecimalTreeReader(int columnId,
int precision,
int scale,
Context context) throws IOException {
this(columnId, null, null, null, null, precision, scale, context);
}

protected DecimalTreeReader(int columnId,
InStream present,
InStream valueStream,
InStream scaleStream,
OrcProto.ColumnEncoding encoding,
int precision,
int scale,
Context context) throws IOException {
super(columnId, present, context);
this.precision = precision;
this.scale = scale;
this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
this.valueStream = valueStream;
this.scratchBytes = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_SERIALIZATION_UTILS_READ];
Expand Down Expand Up @@ -1150,14 +1163,9 @@ public void seek(PositionProvider index) throws IOException {
scaleReader.seek(index);
}

@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final DecimalColumnVector result = (DecimalColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);

private void nextVector(DecimalColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
if (batchSize > scratchScaleVector.length) {
scratchScaleVector = new int[(int) batchSize];
}
Expand Down Expand Up @@ -1193,6 +1201,53 @@ public void nextVector(ColumnVector previousVector,
}
}

private void nextVector(Decimal64ColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
if (precision > TypeDescription.MAX_DECIMAL64_PRECISION) {
throw new IllegalArgumentException("Reading large precision type into" +
" Decimal64ColumnVector.");
}

if (batchSize > scratchScaleVector.length) {
scratchScaleVector = new int[(int) batchSize];
}
// read the scales
scaleReader.nextVector(result, scratchScaleVector, batchSize);
if (result.noNulls) {
for (int r=0; r < batchSize; ++r) {
result.vector[r] = SerializationUtils.readVslong(valueStream);
for(int s=scratchScaleVector[r]; s < scale; ++s) {
result.vector[r] *= 10;
}
}
} else if (!result.isRepeating || !result.isNull[0]) {
for (int r=0; r < batchSize; ++r) {
if (!result.isNull[r]) {
result.vector[r] = SerializationUtils.readVslong(valueStream);
for(int s=scratchScaleVector[r]; s < scale; ++s) {
result.vector[r] *= 10;
}
}
}
}
result.precision = (short) precision;
result.scale = (short) scale;
}

@Override
public void nextVector(ColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
if (result instanceof Decimal64ColumnVector) {
nextVector((Decimal64ColumnVector) result, isNull, batchSize);
} else {
nextVector((DecimalColumnVector) result, isNull, batchSize);
}
}

@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
Expand Down Expand Up @@ -2186,7 +2241,8 @@ public static TreeReader createTreeReader(TypeDescription readerType,
case DATE:
return new DateTreeReader(fileType.getId(), context);
case DECIMAL:
return new DecimalTreeReader(fileType.getId(), context);
return new DecimalTreeReader(fileType.getId(), fileType.getPrecision(),
fileType.getScale(), context);
case STRUCT:
return new StructTreeReader(fileType.getId(), readerType, context);
case LIST:
Expand Down
Loading