Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .baseline/checkstyle/.checkstyle.xml.swp
Binary file not shown.
242 changes: 242 additions & 0 deletions arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package org.apache.iceberg.arrow.reader;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.execution.arrow.ArrowWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskCompletionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/***
* This is a helper class for Arrow reading. It provides two main converter methods.
* These converter methods are currently used to first convert a Parquet FileIterator
* into Iterator over ArrowRecordBatches. Second, the ArrowRecordBatch is made
* into Columnar Batch and exposed as an Iterator over InternalRow. The second step is to
* done to conform to Spark's current interface. When Spark adds Arrow support we will
* take the second iterator out and just return the first one.
*/
public class ArrowReader {

private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);

/***
* Accepts an iterator over ArrowRecordBatches and copies into ColumnarBatches.
* Since Spark uses Iterator over InternalRow we return this over ColumarBatch.
* @param arrowBatchIter
* @param sparkSchema
* @param timeZoneId
* @return
*/
public static InternalRowOverArrowBatchIterator fromBatchIterator(
Iterator<ArrowRecordBatch> arrowBatchIter,
StructType sparkSchema,
String timeZoneId) {

// timeZoneId required for TimestampType in StructType
Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timeZoneId);
BufferAllocator allocator =
ArrowUtils.rootAllocator().newChildAllocator("fromBatchIterator", 0, Long.MAX_VALUE);

VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);

return new InternalRowOverArrowBatchIterator(arrowBatchIter, allocator, root);
}

@NotThreadSafe
public static class InternalRowOverArrowBatchIterator implements Iterator<InternalRow>, Closeable {

private final Iterator<ArrowRecordBatch> arrowBatchIter;
private final BufferAllocator allocator;
private final VectorSchemaRoot root;

private Iterator<InternalRow> rowIter;

InternalRowOverArrowBatchIterator(Iterator<ArrowRecordBatch> arrowBatchIter,
BufferAllocator allocator,
VectorSchemaRoot root) {

this.arrowBatchIter = arrowBatchIter;
this.allocator = allocator;
this.root = root;

}



@Override
public boolean hasNext() {
if (rowIter != null && rowIter.hasNext()) {
return true;
}
if (arrowBatchIter.hasNext()) {
rowIter = nextBatch();
return true;
} else {
try {
close();
} catch (IOException ioe) {
throw new RuntimeException("Encountered an error while closing iterator. "+ioe.getMessage(), ioe);
}
return false;
}
}

@Override
public InternalRow next() {
return rowIter.next();
}

private Iterator<InternalRow> nextBatch() {
ArrowRecordBatch arrowRecordBatch = arrowBatchIter.next();
long start = System.currentTimeMillis();
root.setRowCount(0);
VectorLoader vectorLoader = new VectorLoader(root);
vectorLoader.load(arrowRecordBatch);
arrowRecordBatch.close();

List<FieldVector> fieldVectors = root.getFieldVectors();
ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
for(int i=0; i<fieldVectors.size(); i++) {
columns[i] = new ArrowColumnVector(fieldVectors.get(i));
}

ColumnarBatch batch = new ColumnarBatch(columns);
batch.setNumRows(root.getRowCount());

LOG.info("[InternalRowOverArrowIterator] => Created Columnar Batch with "+root.getRowCount()+ " rows" +
". Took " + (System.currentTimeMillis() - start) + " milliseconds.");
return batch.rowIterator();
}


@Override
public void close() throws IOException {
// arrowWriter.finish();
root.close();
allocator.close();
}

}

/**
* Acceepts Iterator over InternalRow coming in from ParqeutReader's FileIterator
* and creates ArrowRecordBatches over that by collecting rows from the input iter.
* Each next() call over this iterator will collect up to maxRecordsPerBatch rows
* at a time and create an Arrow batch with it and returns an iterator over that.
* @param rowIter
* @param sparkSchema
* @param maxRecordsPerBatch
* @param timezonId
* @return
*/
public static ArrowRecordBatchIterator toBatchIterator(
Iterator<InternalRow> rowIter,
StructType sparkSchema, int maxRecordsPerBatch,
String timezonId) {

// StructType sparkSchema = SparkSchemaUtil.convert(icebergSchema);
TaskContext context = TaskContext.get();

Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timezonId);
BufferAllocator allocator = ArrowUtils.rootAllocator().newChildAllocator(
"toBatchIterator",
0,
Long.MAX_VALUE);
VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);

if (context!=null) {
context.addTaskCompletionListener(new TaskCompletionListener() {
@Override
public void onTaskCompletion(TaskContext context) {
root.close();
allocator.close();
}
});
}

return new ArrowRecordBatchIterator(rowIter, root, allocator, maxRecordsPerBatch);
}


public static class ArrowRecordBatchIterator implements Iterator<ArrowRecordBatch>, Closeable {

final Iterator<InternalRow> rowIterator;
final VectorSchemaRoot root;
final BufferAllocator allocator;
final int maxRecordsPerBatch;
final ArrowWriter arrowWriter;
final VectorUnloader unloader;

ArrowRecordBatchIterator(Iterator<InternalRow> rowIterator,
VectorSchemaRoot root,
BufferAllocator allocator,
int maxRecordsPerBatch) {

this.unloader = new VectorUnloader(root);
this.arrowWriter = ArrowWriter.create(root);
this.rowIterator = rowIterator;
this.root = root;
this.allocator = allocator;
this.maxRecordsPerBatch = maxRecordsPerBatch;
}

@Override
public boolean hasNext() {

if (!rowIterator.hasNext()) {

try {
close();
} catch (IOException ioe) {
throw new RuntimeException("Encountered an error while closing iterator. "+ioe.getMessage(), ioe);
}
return false;
}

return true;
}

@Override
public ArrowRecordBatch next() {

int rowCount = 0;

long start = System.currentTimeMillis();
while (rowIterator.hasNext() && (maxRecordsPerBatch <= 0 || rowCount < maxRecordsPerBatch)) {
InternalRow row = rowIterator.next();
arrowWriter.write(row);
rowCount += 1;
}
arrowWriter.finish();
LOG.info("[ArrowRecordBatchIterator] => Created batch with "+rowCount+ " rows. " +
"Took "+(System.currentTimeMillis() - start) + " milliseconds.");
ArrowRecordBatch batch = unloader.getRecordBatch();
return batch;
}

@Override
public void close() throws IOException {
// arrowWriter.finish();
root.close();
allocator.close();
}
}
}
31 changes: 26 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ project(':iceberg-core') {
compile("org.apache.avro:avro") {
exclude group: 'org.tukaani' // xz compression is not supported
}
compile("org.apache.arrow:arrow-vector") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
}
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
}


compile "com.fasterxml.jackson.core:jackson-databind"
compile "com.fasterxml.jackson.core:jackson-core"
Expand All @@ -237,7 +245,11 @@ project(':iceberg-data') {
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')
compileOnly project(':iceberg-spark')
compileOnly project(':iceberg-parquet')
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
}

testCompile("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
Expand Down Expand Up @@ -323,14 +335,12 @@ project(':iceberg-parquet') {
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')
compile project(':iceberg-arrow')

compile("org.apache.parquet:parquet-avro") {
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
// already shaded by Parquet
exclude group: 'it.unimi.dsi'
exclude group: 'org.codehaus.jackson'
}

compile "org.apache.parquet:parquet-avro"
compileOnly "org.apache.avro:avro"
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
Expand Down Expand Up @@ -363,6 +373,17 @@ project(':iceberg-spark') {
}
}

project(':iceberg-arrow') {
dependencies {
// compile project(':iceberg-spark')
compile project(':iceberg-api')

compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
}
}
}

project(':iceberg-pig') {
dependencies {
compile project(':iceberg-api')
Expand Down
Loading