Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fefe794
First cut impl of reading Parquet FileIterator into ArrowRecordBatch …
prodeezy Jun 5, 2019
5b90780
made num records per arrow batch configurable
prodeezy Jun 5, 2019
f58e545
addressed comments
prodeezy Jun 5, 2019
47e436a
Added docs for public methods and ArrowReader class
prodeezy Jun 6, 2019
19c7cb9
Fixed javadoc
prodeezy Jun 6, 2019
15c7cd8
WIP first stab at reading into Arrow and returning as InternalRow ite…
prodeezy Jun 14, 2019
4a9efd6
Add publish to snapshot repository by replacing version to `1.0-adobe…
Mar 4, 2019
3bc8f38
Merge branch 'master' into issue-9-support-arrow-based-reading-WIP
prodeezy Jul 9, 2019
5a45320
Adding arrow schema conversion utility
prodeezy Jul 17, 2019
bc19e0b
adding arrow-vector dep to tests
prodeezy Jul 17, 2019
6ade18b
Merge branch 'master' into issue-9-support-arrow-based-reading-WIP
prodeezy Jul 17, 2019
af5aa5e
[WIP] Working vectorization for primitive types. Added test for Vecto…
prodeezy Jul 22, 2019
18084e2
[WIP] Added Decimal types to vectorization
prodeezy Jul 23, 2019
36da977
[WIP] added remaining primitive type vectorization and tests
prodeezy Jul 23, 2019
7121ecd
[WIP] unused imports fixed
prodeezy Jul 23, 2019
e2bdf33
Add argument validation to HadoopTables#create (#298)
chenjunjiedada Jul 25, 2019
1596d61
Install source JAR when running install target (#310)
electrum Jul 25, 2019
1845097
Merge branch 'branch-master-adobe' of git.corp.adobe.com:hstack/incub…
prodeezy Jul 26, 2019
0896610
Merge remote-tracking branch 'origin/issue-9-support-arrow-based-read…
prodeezy Jul 26, 2019
a404923
Merge remote-tracking branch 'prodeezy/vectorized-read' into vectoriz…
prodeezy Jul 26, 2019
ceae2fd
Bump version to 1.0-adobe-3.0-vectorized-SNAPSHOT
prodeezy Jul 26, 2019
83b94e6
Temporarily ignore applying style check
prodeezy Jul 26, 2019
3b8c43a
Fixing javadoc error
prodeezy Jul 26, 2019
c01cb71
Updating versions.lock
prodeezy Jul 26, 2019
5f63918
fixed checkstyle errors
prodeezy Jul 26, 2019
bcf2e2d
Revert "Bump version to 1.0-adobe-3.0-vectorized-SNAPSHOT"
prodeezy Jul 26, 2019
515ef85
cleanup
prodeezy Jul 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .baseline/checkstyle/.checkstyle.xml.swp
Binary file not shown.
242 changes: 242 additions & 0 deletions arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package org.apache.iceberg.arrow.reader;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.execution.arrow.ArrowWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskCompletionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/***
* This is a helper class for Arrow reading. It provides two main converter methods.
* These converter methods are currently used to first convert a Parquet FileIterator
* into Iterator over ArrowRecordBatches. Second, the ArrowRecordBatch is made
* into Columnar Batch and exposed as an Iterator over InternalRow. The second step is to
* done to conform to Spark's current interface. When Spark adds Arrow support we will
* take the second iterator out and just return the first one.
*/
public class ArrowReader {

private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);

/***
* Accepts an iterator over ArrowRecordBatches and copies into ColumnarBatches.
* Since Spark uses Iterator over InternalRow we return this over ColumarBatch.
* @param arrowBatchIter
* @param sparkSchema
* @param timeZoneId
* @return
*/
public static InternalRowOverArrowBatchIterator fromBatchIterator(
Iterator<ArrowRecordBatch> arrowBatchIter,
StructType sparkSchema,
String timeZoneId) {

// timeZoneId required for TimestampType in StructType
Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timeZoneId);
BufferAllocator allocator =
ArrowUtils.rootAllocator().newChildAllocator("fromBatchIterator", 0, Long.MAX_VALUE);

VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);

return new InternalRowOverArrowBatchIterator(arrowBatchIter, allocator, root);
}

@NotThreadSafe
public static class InternalRowOverArrowBatchIterator implements Iterator<InternalRow>, Closeable {

private final Iterator<ArrowRecordBatch> arrowBatchIter;
private final BufferAllocator allocator;
private final VectorSchemaRoot root;

private Iterator<InternalRow> rowIter;

InternalRowOverArrowBatchIterator(Iterator<ArrowRecordBatch> arrowBatchIter,
BufferAllocator allocator,
VectorSchemaRoot root) {

this.arrowBatchIter = arrowBatchIter;
this.allocator = allocator;
this.root = root;

}



@Override
public boolean hasNext() {
if (rowIter != null && rowIter.hasNext()) {
return true;
}
if (arrowBatchIter.hasNext()) {
rowIter = nextBatch();
return true;
} else {
try {
close();
} catch (IOException ioe) {
throw new RuntimeException("Encountered an error while closing iterator. "+ioe.getMessage(), ioe);
}
return false;
}
}

@Override
public InternalRow next() {
return rowIter.next();
}

private Iterator<InternalRow> nextBatch() {
ArrowRecordBatch arrowRecordBatch = arrowBatchIter.next();
long start = System.currentTimeMillis();
root.setRowCount(0);
VectorLoader vectorLoader = new VectorLoader(root);
vectorLoader.load(arrowRecordBatch);
arrowRecordBatch.close();

List<FieldVector> fieldVectors = root.getFieldVectors();
ColumnVector[] columns = new ColumnVector[fieldVectors.size()];
for(int i=0; i<fieldVectors.size(); i++) {
columns[i] = new ArrowColumnVector(fieldVectors.get(i));
}

ColumnarBatch batch = new ColumnarBatch(columns);
batch.setNumRows(root.getRowCount());

LOG.info("[InternalRowOverArrowIterator] => Created Columnar Batch with "+root.getRowCount()+ " rows" +
". Took " + (System.currentTimeMillis() - start) + " milliseconds.");
return batch.rowIterator();
}


@Override
public void close() throws IOException {
// arrowWriter.finish();
root.close();
allocator.close();
}

}

/**
* Acceepts Iterator over InternalRow coming in from ParqeutReader's FileIterator
* and creates ArrowRecordBatches over that by collecting rows from the input iter.
* Each next() call over this iterator will collect up to maxRecordsPerBatch rows
* at a time and create an Arrow batch with it and returns an iterator over that.
* @param rowIter
* @param sparkSchema
* @param maxRecordsPerBatch
* @param timezonId
* @return
*/
public static ArrowRecordBatchIterator toBatchIterator(
Iterator<InternalRow> rowIter,
StructType sparkSchema, int maxRecordsPerBatch,
String timezonId) {

// StructType sparkSchema = SparkSchemaUtil.convert(icebergSchema);
TaskContext context = TaskContext.get();

Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timezonId);
BufferAllocator allocator = ArrowUtils.rootAllocator().newChildAllocator(
"toBatchIterator",
0,
Long.MAX_VALUE);
VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator);

if (context!=null) {
context.addTaskCompletionListener(new TaskCompletionListener() {
@Override
public void onTaskCompletion(TaskContext context) {
root.close();
allocator.close();
}
});
}

return new ArrowRecordBatchIterator(rowIter, root, allocator, maxRecordsPerBatch);
}


public static class ArrowRecordBatchIterator implements Iterator<ArrowRecordBatch>, Closeable {

final Iterator<InternalRow> rowIterator;
final VectorSchemaRoot root;
final BufferAllocator allocator;
final int maxRecordsPerBatch;
final ArrowWriter arrowWriter;
final VectorUnloader unloader;

ArrowRecordBatchIterator(Iterator<InternalRow> rowIterator,
VectorSchemaRoot root,
BufferAllocator allocator,
int maxRecordsPerBatch) {

this.unloader = new VectorUnloader(root);
this.arrowWriter = ArrowWriter.create(root);
this.rowIterator = rowIterator;
this.root = root;
this.allocator = allocator;
this.maxRecordsPerBatch = maxRecordsPerBatch;
}

@Override
public boolean hasNext() {

if (!rowIterator.hasNext()) {

try {
close();
} catch (IOException ioe) {
throw new RuntimeException("Encountered an error while closing iterator. "+ioe.getMessage(), ioe);
}
return false;
}

return true;
}

@Override
public ArrowRecordBatch next() {

int rowCount = 0;

long start = System.currentTimeMillis();
while (rowIterator.hasNext() && (maxRecordsPerBatch <= 0 || rowCount < maxRecordsPerBatch)) {
InternalRow row = rowIterator.next();
arrowWriter.write(row);
rowCount += 1;
}
arrowWriter.finish();
LOG.info("[ArrowRecordBatchIterator] => Created batch with "+rowCount+ " rows. " +
"Took "+(System.currentTimeMillis() - start) + " milliseconds.");
ArrowRecordBatch batch = unloader.getRecordBatch();
return batch;
}

@Override
public void close() throws IOException {
// arrowWriter.finish();
root.close();
allocator.close();
}
}
}
35 changes: 30 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ subprojects {
apply plugin: 'maven' // make pom files for deployment
apply plugin: 'nebula.maven-base-publish'

artifacts {
archives sourceJar
}

compileJava {
options.encoding = "UTF-8"
}
Expand Down Expand Up @@ -222,6 +226,14 @@ project(':iceberg-core') {
compile("org.apache.avro:avro") {
exclude group: 'org.tukaani' // xz compression is not supported
}
compile("org.apache.arrow:arrow-vector") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
}
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
}


compile "com.fasterxml.jackson.core:jackson-databind"
compile "com.fasterxml.jackson.core:jackson-core"
Expand All @@ -237,7 +249,11 @@ project(':iceberg-data') {
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')
compileOnly project(':iceberg-spark')
compileOnly project(':iceberg-parquet')
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
}

testCompile("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
Expand Down Expand Up @@ -323,14 +339,12 @@ project(':iceberg-parquet') {
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')
compile project(':iceberg-arrow')

compile("org.apache.parquet:parquet-avro") {
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
// already shaded by Parquet
exclude group: 'it.unimi.dsi'
exclude group: 'org.codehaus.jackson'
}

compile "org.apache.parquet:parquet-avro"
compileOnly "org.apache.avro:avro"
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
Expand Down Expand Up @@ -363,6 +377,17 @@ project(':iceberg-spark') {
}
}

project(':iceberg-arrow') {
dependencies {
// compile project(':iceberg-spark')
compile project(':iceberg-api')

compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
}
}
}

project(':iceberg-pig') {
dependencies {
compile project(':iceberg-api')
Expand Down
Loading