Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e024ba0
Wireframe.
SreeramGarlapati May 19, 2021
61611fd
remove StreamingOffset related change - as it is handled in another P…
SreeramGarlapati May 20, 2021
31f2a03
Merge branch 'master' of https://github.com/apache/iceberg into spark…
SreeramGarlapati May 20, 2021
6a2adcb
test changes
SreeramGarlapati May 21, 2021
f1565cc
Merge branch 'master' of https://github.com/apache/iceberg into spark…
SreeramGarlapati May 21, 2021
def1bc0
rudimentary implementation for spark3 streaming reads from iceberg table
SreeramGarlapati May 21, 2021
a96b3e6
rudimentary implementation for spark3 streaming reads from iceberg table
SreeramGarlapati May 22, 2021
b18cfdc
unit test
SreeramGarlapati May 22, 2021
993bd9e
works!
SreeramGarlapati May 22, 2021
94dd103
Merge branch 'master' of https://github.com/apache/iceberg into spark…
SreeramGarlapati May 24, 2021
7063fd3
Unit test.
SreeramGarlapati May 26, 2021
96fbc87
Merge branch 'master' of https://github.com/apache/iceberg into spark…
SreeramGarlapati May 26, 2021
17f6eb8
Unit test.
SreeramGarlapati May 27, 2021
2044d94
checkpoint done!
SreeramGarlapati May 27, 2021
15e33e9
checkpoint done!
SreeramGarlapati May 28, 2021
b8e5b34
refactor
SreeramGarlapati May 28, 2021
fa4f2ae
Merge branch 'master' of https://github.com/apache/iceberg into spark…
SreeramGarlapati May 28, 2021
633afbb
test batchSize option
SreeramGarlapati May 29, 2021
16d3984
refactor
SreeramGarlapati May 29, 2021
919e386
checkstyle
SreeramGarlapati May 29, 2021
e3fb1fe
checkstyle
SreeramGarlapati May 29, 2021
bee1690
fix indent
SreeramGarlapati May 29, 2021
daee48a
unit test - full coverage
SreeramGarlapati May 29, 2021
0a65617
add logic for ignoring deletes and replace
SreeramGarlapati May 29, 2021
f9e9e66
minor refactor
SreeramGarlapati Jun 1, 2021
c7658d3
Merge branch 'master' of https://github.com/apache/iceberg into spark…
SreeramGarlapati Jun 2, 2021
67e2d27
minor refactor
SreeramGarlapati Jun 2, 2021
072c911
remove ignoreDelete and ignoreReplace.
SreeramGarlapati Jun 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand All @@ -62,6 +63,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class);

private final JavaSparkContext sparkContext;
private final SparkSession spark;
private final Table table;
private final boolean caseSensitive;
private final boolean localityPreferred;
Expand All @@ -76,6 +78,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
SparkBatchScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema,
List<Expression> filters, CaseInsensitiveStringMap options) {
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.spark = spark;
this.table = table;
this.caseSensitive = caseSensitive;
this.expectedSchema = expectedSchema;
Expand Down Expand Up @@ -108,6 +111,12 @@ public Batch toBatch() {
return this;
}

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new SparkMicroBatchStream(
spark, sparkContext, table, caseSensitive, expectedSchema, options, checkpointLocation);
}

@Override
public StructType readSchema() {
if (readSchema == null) {
Expand Down Expand Up @@ -213,10 +222,10 @@ public String description() {
return String.format("%s [filters=%s]", table, filters);
}

private static class ReaderFactory implements PartitionReaderFactory {
public static class ReaderFactory implements PartitionReaderFactory {
private final int batchSize;

private ReaderFactory(int batchSize) {
ReaderFactory(int batchSize) {
this.batchSize = batchSize;
}

Expand Down Expand Up @@ -256,7 +265,7 @@ private static class BatchReader extends BatchDataReader implements PartitionRea
}
}

private static class ReadTask implements InputPartition, Serializable {
public static class ReadTask implements InputPartition, Serializable {
private final CombinedScanTask task;
private final Broadcast<Table> tableBroadcast;
private final String expectedSchemaString;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.MicroBatches.MicroBatchBuilder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.OffsetSeq;
import org.apache.spark.sql.execution.streaming.OffsetSeqLog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;

public class SparkMicroBatchStream implements MicroBatchStream {
private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);

private final JavaSparkContext sparkContext;
private final Table table;
private final boolean caseSensitive;
private final Schema expectedSchema;
private final int batchSize;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
private final boolean localityPreferred;
private final OffsetLog offsetLog;

private StreamingOffset initialOffset = null;
private PlannedEndOffset previousEndOffset = null;

SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext,
Table table, boolean caseSensitive, Schema expectedSchema,
CaseInsensitiveStringMap options, String checkpointLocation) {
this.sparkContext = sparkContext;
this.table = table;
this.caseSensitive = caseSensitive;
this.expectedSchema = expectedSchema;
this.batchSize = Spark3Util.batchSize(table.properties(), options);
this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options);
this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null))
.orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null))
.orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
this.splitOpenFileCost = Optional.ofNullable(
Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null))
.orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST,
SPLIT_OPEN_FILE_COST_DEFAULT));
this.offsetLog = OffsetLog.getInstance(spark, checkpointLocation);
}

@Override
public Offset latestOffset() {
initialOffset();

if (isTableEmpty()) {
return StreamingOffset.START_OFFSET;
}

StreamingOffset microBatchStartOffset = isFirstBatch() ? initialOffset : previousEndOffset;
if (isEndOfSnapshot(microBatchStartOffset)) {
microBatchStartOffset = getNextAvailableSnapshot(microBatchStartOffset);
}

previousEndOffset = calculateEndOffset(microBatchStartOffset);
return previousEndOffset;
}

@Override
public InputPartition[] planInputPartitions(Offset start, Offset end) {
if (end.equals(StreamingOffset.START_OFFSET)) {
return new InputPartition[0];
}

// broadcast the table metadata as input partitions will be sent to executors
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
String expectedSchemaString = SchemaParser.toJson(expectedSchema);

Preconditions.checkState(
end instanceof PlannedEndOffset,
"The end offset passed to planInputPartitions() is not the one that is returned by lastOffset()");
PlannedEndOffset endOffset = (PlannedEndOffset) end;

List<FileScanTask> fileScanTasks = endOffset.getMicroBatch().tasks();

CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
CloseableIterable.withNoopClose(fileScanTasks),
splitSize);
List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost));
InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];

for (int i = 0; i < combinedScanTasks.size(); i++) {
readTasks[i] = new ReadTask(
combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
caseSensitive, localityPreferred);
}

return readTasks;
}

@Override
public PartitionReaderFactory createReaderFactory() {
int batchSizeValueToDisableColumnarReads = 0;
return new ReaderFactory(batchSizeValueToDisableColumnarReads);
}

@Override
public Offset initialOffset() {
if (isInitialOffsetResolved()) {
return initialOffset;
}

if (isStreamResumedFromCheckpoint()) {
initialOffset = calculateInitialOffsetFromCheckpoint();
return initialOffset;
}

List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
if (snapshotIds.isEmpty()) {
initialOffset = StreamingOffset.START_OFFSET;
Preconditions.checkState(isTableEmpty(),
"criteria behind isTableEmpty() changed.");
} else {
initialOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true);
}

return initialOffset;
}

@Override
public Offset deserializeOffset(String json) {
return StreamingOffset.fromJson(json);
}

@Override
public void commit(Offset end) {
}

@Override
public void stop() {
}

private boolean isInitialOffsetResolved() {
return initialOffset != null;
}

private StreamingOffset calculateInitialOffsetFromCheckpoint() {
Preconditions.checkState(isStreamResumedFromCheckpoint(),
"Stream is not resumed from checkpoint.");

return offsetLog.getLatest();
}

private boolean isStreamResumedFromCheckpoint() {
Preconditions.checkState(!isInitialOffsetResolved(),
"isStreamResumedFromCheckpoint() is invoked without resolving initialOffset");

return offsetLog.isOffsetLogInitialized();
}

private boolean isFirstBatch() {
return previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET);
}

private boolean isTableEmpty() {
Preconditions.checkState(isInitialOffsetResolved(),
"isTableEmpty() is invoked without resolving initialOffset");

return initialOffset.equals(StreamingOffset.START_OFFSET);
}

private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStartOffset) {
if (table.currentSnapshot().snapshotId() == microBatchStartOffset.snapshotId()) {
return microBatchStartOffset;
}

Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId());
Snapshot pointer = table.currentSnapshot();
while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) {
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Add unittest coverage for overwrite operation.

"Encountered Snapshot DataOperation other than APPEND.");

pointer = table.snapshot(pointer.parentId());
}

Preconditions.checkState(pointer != null,
"snapshot on which the stream operated has been garbage collected.");

return new StreamingOffset(pointer.snapshotId(), 0L, false);
}

private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffset) {
MicroBatch microBatch = MicroBatches.from(table.snapshot(microBatchStartOffset.snapshotId()), table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(microBatchStartOffset.position(), batchSize, microBatchStartOffset.shouldScanAllFiles());

return new PlannedEndOffset(
microBatch.snapshotId(),
microBatch.endFileIndex(),
microBatchStartOffset.shouldScanAllFiles(),
microBatch);
}

private boolean isEndOfSnapshot(StreamingOffset microBatchStartOffset) {
MicroBatchBuilder microBatchBuilder = MicroBatches.from(
table.snapshot(microBatchStartOffset.snapshotId()), table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs());

MicroBatch microBatchStart = microBatchBuilder.generate(
microBatchStartOffset.position(),
1,
microBatchStartOffset.shouldScanAllFiles());

return microBatchStartOffset.position() == microBatchStart.startFileIndex() &&
microBatchStartOffset.position() == microBatchStart.endFileIndex() &&
microBatchStart.lastIndexOfSnapshot();
}

private static class PlannedEndOffset extends StreamingOffset {

private final MicroBatch microBatch;

PlannedEndOffset(long snapshotId, long position, boolean scanAllFiles, MicroBatch microBatch) {
super(snapshotId, position, scanAllFiles);
this.microBatch = microBatch;
}

public MicroBatch getMicroBatch() {
return microBatch;
}
}

interface OffsetLog {
static OffsetLog getInstance(SparkSession spark, String checkpointLocation) {
return new OffsetLogImpl(spark, checkpointLocation);
}

boolean isOffsetLogInitialized();

StreamingOffset getLatest();
}

private static class OffsetLogImpl implements OffsetLog {
private final OffsetSeqLog offsetSeqLog;

OffsetLogImpl(SparkSession spark, String checkpointLocation) {
this.offsetSeqLog = checkpointLocation != null ?
new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) :
null;
}

@Override
public boolean isOffsetLogInitialized() {
return offsetSeqLog != null &&
offsetSeqLog.getLatest() != null &&
offsetSeqLog.getLatest().isDefined();
}

@Override
public StreamingOffset getLatest() {
OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2;

List<Option<Offset>> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets());
Option<Offset> optionalOffset = offsetSeqCol.get(0);

return StreamingOffset.fromJson(optionalOffset.get().json());
}

private String getOffsetLogLocation(String checkpointLocation) {
return new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
TableCapability.MICRO_BATCH_READ,
TableCapability.STREAMING_WRITE,
TableCapability.OVERWRITE_BY_FILTER,
TableCapability.OVERWRITE_DYNAMIC);
Expand Down
Loading