Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/spark/memory/MemoryMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
@Private
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
OFF_HEAP,
ON_HEAP_UNSAFE
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
import org.apache.spark.sql.catalyst.expressions.UnsafeMapData;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -58,6 +62,8 @@
* ColumnVectors are intended to be reused.
*/
public abstract class ColumnVector implements AutoCloseable {
ColumnVector() { }

/**
* Allocates a column to store elements of `type` on or off heap.
* Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
Expand All @@ -66,6 +72,8 @@ public abstract class ColumnVector implements AutoCloseable {
public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
if (mode == MemoryMode.OFF_HEAP) {
return new OffHeapColumnVector(capacity, type);
} else if (mode == MemoryMode.ON_HEAP_UNSAFE) {
return new OnHeapUnsafeColumnVector(capacity, type);
} else {
return new OnHeapColumnVector(capacity, type);
}
Expand Down Expand Up @@ -548,18 +556,69 @@ public ColumnarBatch.Row getStruct(int rowId) {
* Returns a utility object to get structs.
* provided to keep API compatibility with InternalRow for code generation
*/
public ColumnarBatch.Row getStruct(int rowId, int size) {
resultStruct.rowId = rowId;
return resultStruct;
public InternalRow getStruct(int rowId, int size) {
if (!unsafeDirectCopy) {
resultStruct.rowId = rowId;
return resultStruct;
}
resultArray.data.loadBytes(resultArray);
int offset = getArrayOffset(rowId);
int length = getArrayLength(rowId);
UnsafeRow map = new UnsafeRow(size);
map.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length);
return map;
}

public int putStruct(int rowId, InternalRow row) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
assert (row instanceof UnsafeRow);
UnsafeRow unsafeRow = (UnsafeRow) row;
Object base = unsafeRow.getBaseObject();
long offset = unsafeRow.getBaseOffset();
int length = unsafeRow.getSizeInBytes();
if (offset > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Cannot put this map to ColumnVector as " +
"it's too big.");
}
putUnsafeArray(rowId, base, offset, length);
return length;
}

/**
* Returns the array at rowid.
*/
public final Array getArray(int rowId) {
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
return resultArray;
public final ArrayData getArray(int rowId) {
if (!unsafeDirectCopy) {
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
return resultArray;
} else {
resultArray.data.loadBytes(resultArray); // update resultArray.byteData
int offset = getArrayOffset(rowId);
int length = getArrayLength(rowId);
UnsafeArrayData array = new UnsafeArrayData();
array.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length);
return array;
}
}

public final int putArray(int rowId, ArrayData array) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
assert(array instanceof UnsafeArrayData);
UnsafeArrayData unsafeArray = (UnsafeArrayData)array;
Object base = unsafeArray.getBaseObject();
long offset = unsafeArray.getBaseOffset();
int length = unsafeArray.getSizeInBytes();
if (offset > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Cannot put this array to ColumnVector as " +
"it's too big.");
}
putUnsafeArray(rowId, base, offset, length);
return length;
}

/**
Expand All @@ -579,16 +638,43 @@ public final int putByteArray(int rowId, byte[] value) {
* Returns the value for rowId.
*/
private Array getByteArray(int rowId) {
Array array = getArray(rowId);
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
Array array = resultArray;
array.data.loadBytes(array);
return array;
}

/**
* Returns the value for rowId.
*/
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
public MapData getMap(int rowId) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
resultArray.data.loadBytes(resultArray);
int offset = getArrayOffset(rowId);
int length = getArrayLength(rowId);
UnsafeMapData map = new UnsafeMapData();
map.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length);
return map;
}

public int putMap(int rowId, MapData map) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
assert(map instanceof UnsafeMapData);
UnsafeMapData unsafeMap = (UnsafeMapData)map;
byte[] value = (byte[])unsafeMap.getBaseObject();
long offset = unsafeMap.getBaseOffset() - Platform.BYTE_ARRAY_OFFSET;
int length = unsafeMap.getSizeInBytes();
if (offset > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Cannot put this map to ColumnVector as " +
"it's too big.");
}
putByteArray(rowId, value, (int)offset, length);
return length;
}

/**
Expand All @@ -609,14 +695,18 @@ public final Decimal getDecimal(int rowId, int precision, int scale) {
}


public final void putDecimal(int rowId, Decimal value, int precision) {
public final int putDecimal(int rowId, Decimal value, int precision) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
putInt(rowId, (int) value.toUnscaledLong());
return 4;
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
putLong(rowId, value.toUnscaledLong());
return 8;
} else {
BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
putByteArray(rowId, bigInteger.toByteArray());
byte[] array = bigInteger.toByteArray();
putByteArray(rowId, array);
return array.length;
}
}

Expand All @@ -633,6 +723,13 @@ public final UTF8String getUTF8String(int rowId) {
}
}

public final int putUTF8String(int rowId, UTF8String string) {
assert(dictionary == null);
byte[] array = string.getBytes();
putByteArray(rowId, array);
return array.length;
}

/**
* Returns the byte array for rowId.
*/
Expand All @@ -648,6 +745,22 @@ public final byte[] getBinary(int rowId) {
}
}

public final int putBinary(int rowId, byte[] bytes) {
putByteArray(rowId, bytes);
return bytes.length;
}

/**
* APIs dealing with Unsafe Data
*/
public long putUnsafeArray(int rowId, Object base, long offset, int length) {
throw new UnsupportedOperationException();
}

public void putUnsafeData(int rowId, int count, Object src, long offset) {
throw new UnsupportedOperationException();
}

/**
* Append APIs. These APIs all behave similarly and will append data to the current vector. It
* is not valid to mix the put and append APIs. The append APIs are slower and should only be
Expand Down Expand Up @@ -858,6 +971,14 @@ public final int appendStruct(boolean isNull) {
return elementsAppended;
}

public final int appendUnsafeData(int length, Object base, long offset) {
reserve(elementsAppended + length);
int result = elementsAppended;
putUnsafeData(elementsAppended, length, base, offset);
elementsAppended += length;
return result;
}

/**
* Returns the data for the underlying array.
*/
Expand Down Expand Up @@ -894,10 +1015,12 @@ public final int appendStruct(boolean isNull) {
@VisibleForTesting
protected int MAX_CAPACITY = Integer.MAX_VALUE;

protected boolean unsafeDirectCopy;

/**
* Data type for this column.
*/
protected final DataType type;
protected DataType type;

/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
Expand Down Expand Up @@ -929,17 +1052,17 @@ public final int appendStruct(boolean isNull) {
/**
* If this is a nested type (array or struct), the column for the child data.
*/
protected final ColumnVector[] childColumns;
protected ColumnVector[] childColumns;

/**
* Reusable Array holder for getArray().
*/
protected final Array resultArray;
protected Array resultArray;

/**
* Reusable Struct holder for getStruct().
*/
protected final ColumnarBatch.Row resultStruct;
protected ColumnarBatch.Row resultStruct;

/**
* The Dictionary for this column.
Expand Down Expand Up @@ -991,14 +1114,20 @@ public ColumnVector getDictionaryIds() {
* type.
*/
protected ColumnVector(int capacity, DataType type, MemoryMode memMode) {
this(capacity, type, memMode, false);
}

protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean unsafeDirectCopy) {
this.capacity = capacity;
this.type = type;
this.unsafeDirectCopy = unsafeDirectCopy;

if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType
|| DecimalType.isByteArrayDecimalType(type)) {
|| DecimalType.isByteArrayDecimalType(type)
|| unsafeDirectCopy && (type instanceof MapType || type instanceof StructType)) {
DataType childType;
int childCapacity = capacity;
if (type instanceof ArrayType) {
if (!unsafeDirectCopy && type instanceof ArrayType) {
childType = ((ArrayType)type).elementType();
} else {
childType = DataTypes.ByteType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.CalendarInterval;
Expand Down Expand Up @@ -98,20 +99,25 @@ public static void populate(ColumnVector col, InternalRow row, int fieldIdx) {
* For example, an array of IntegerType will return an int[].
* Throws exceptions for unhandled schemas.
*/
public static Object toPrimitiveJavaArray(ColumnVector.Array array) {
DataType dt = array.data.dataType();
if (dt instanceof IntegerType) {
int[] result = new int[array.length];
ColumnVector data = array.data;
for (int i = 0; i < result.length; i++) {
if (data.isNullAt(array.offset + i)) {
throw new RuntimeException("Cannot handle NULL values.");
public static Object toPrimitiveJavaArray(ArrayData a) {
if (!(a instanceof ColumnVector.Array)) {
return a.toIntArray();
} else {
ColumnVector.Array array = (ColumnVector.Array)a;
DataType dt = array.data.dataType();
if (dt instanceof IntegerType) {
int[] result = new int[array.length];
ColumnVector data = array.data;
for (int i = 0; i < result.length; i++) {
if (data.isNullAt(array.offset + i)) {
throw new RuntimeException("Cannot handle NULL values.");
}
result[i] = data.getInt(array.offset + i);
}
result[i] = data.getInt(array.offset + i);
return result;
} else {
throw new UnsupportedOperationException();
}
return result;
} else {
throw new UnsupportedOperationException();
}
}

Expand Down
Loading