diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 1999b568b9e0..f5b5b5d41777 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -52,6 +52,7 @@ import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -62,6 +63,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class); private final JavaSparkContext sparkContext; + private final SparkSession spark; private final Table table; private final boolean caseSensitive; private final boolean localityPreferred; @@ -76,6 +78,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { SparkBatchScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema, List filters, CaseInsensitiveStringMap options) { this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.spark = spark; this.table = table; this.caseSensitive = caseSensitive; this.expectedSchema = expectedSchema; @@ -108,6 +111,12 @@ public Batch toBatch() { return this; } + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + return new SparkMicroBatchStream( + spark, sparkContext, table, caseSensitive, expectedSchema, options, checkpointLocation); + } + @Override public StructType readSchema() { if (readSchema == null) { @@ -213,10 +222,10 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } - private static class ReaderFactory implements PartitionReaderFactory { + public static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; - private ReaderFactory(int batchSize) { + ReaderFactory(int batchSize) { this.batchSize = batchSize; } @@ -256,7 +265,7 @@ private static class BatchReader extends BatchDataReader implements PartitionRea } } - private static class ReadTask implements InputPartition, Serializable { + public static class ReadTask implements InputPartition, Serializable { private final CombinedScanTask task; private final Broadcast tableBroadcast; private final String expectedSchemaString; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java new file mode 100644 index 000000000000..82da87d242d9 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.MicroBatches.MicroBatchBuilder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.execution.streaming.OffsetSeq; +import org.apache.spark.sql.execution.streaming.OffsetSeqLog; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final int batchSize; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final OffsetLog offsetLog; + + private StreamingOffset initialOffset = null; + private PlannedEndOffset previousEndOffset = null; + + SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.batchSize = Spark3Util.batchSize(table.properties(), options); + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable( + Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); + this.offsetLog = OffsetLog.getInstance(spark, checkpointLocation); + } + + @Override + public Offset latestOffset() { + initialOffset(); + + if (isTableEmpty()) { + return StreamingOffset.START_OFFSET; + } + + StreamingOffset microBatchStartOffset = isFirstBatch() ? initialOffset : previousEndOffset; + if (isEndOfSnapshot(microBatchStartOffset)) { + microBatchStartOffset = getNextAvailableSnapshot(microBatchStartOffset); + } + + previousEndOffset = calculateEndOffset(microBatchStartOffset); + return previousEndOffset; + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof PlannedEndOffset, + "The end offset passed to planInputPartitions() is not the one that is returned by lastOffset()"); + PlannedEndOffset endOffset = (PlannedEndOffset) end; + + List fileScanTasks = endOffset.getMicroBatch().tasks(); + + CloseableIterable splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + int batchSizeValueToDisableColumnarReads = 0; + return new ReaderFactory(batchSizeValueToDisableColumnarReads); + } + + @Override + public Offset initialOffset() { + if (isInitialOffsetResolved()) { + return initialOffset; + } + + if (isStreamResumedFromCheckpoint()) { + initialOffset = calculateInitialOffsetFromCheckpoint(); + return initialOffset; + } + + List snapshotIds = SnapshotUtil.currentAncestors(table); + if (snapshotIds.isEmpty()) { + initialOffset = StreamingOffset.START_OFFSET; + Preconditions.checkState(isTableEmpty(), + "criteria behind isTableEmpty() changed."); + } else { + initialOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); + } + + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + } + + @Override + public void stop() { + } + + private boolean isInitialOffsetResolved() { + return initialOffset != null; + } + + private StreamingOffset calculateInitialOffsetFromCheckpoint() { + Preconditions.checkState(isStreamResumedFromCheckpoint(), + "Stream is not resumed from checkpoint."); + + return offsetLog.getLatest(); + } + + private boolean isStreamResumedFromCheckpoint() { + Preconditions.checkState(!isInitialOffsetResolved(), + "isStreamResumedFromCheckpoint() is invoked without resolving initialOffset"); + + return offsetLog.isOffsetLogInitialized(); + } + + private boolean isFirstBatch() { + return previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET); + } + + private boolean isTableEmpty() { + Preconditions.checkState(isInitialOffsetResolved(), + "isTableEmpty() is invoked without resolving initialOffset"); + + return initialOffset.equals(StreamingOffset.START_OFFSET); + } + + private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStartOffset) { + if (table.currentSnapshot().snapshotId() == microBatchStartOffset.snapshotId()) { + return microBatchStartOffset; + } + + Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); + Snapshot pointer = table.currentSnapshot(); + while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { + Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), + "Encountered Snapshot DataOperation other than APPEND."); + + pointer = table.snapshot(pointer.parentId()); + } + + Preconditions.checkState(pointer != null, + "snapshot on which the stream operated has been garbage collected."); + + return new StreamingOffset(pointer.snapshotId(), 0L, false); + } + + private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffset) { + MicroBatch microBatch = MicroBatches.from(table.snapshot(microBatchStartOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(microBatchStartOffset.position(), batchSize, microBatchStartOffset.shouldScanAllFiles()); + + return new PlannedEndOffset( + microBatch.snapshotId(), + microBatch.endFileIndex(), + microBatchStartOffset.shouldScanAllFiles(), + microBatch); + } + + private boolean isEndOfSnapshot(StreamingOffset microBatchStartOffset) { + MicroBatchBuilder microBatchBuilder = MicroBatches.from( + table.snapshot(microBatchStartOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()); + + MicroBatch microBatchStart = microBatchBuilder.generate( + microBatchStartOffset.position(), + 1, + microBatchStartOffset.shouldScanAllFiles()); + + return microBatchStartOffset.position() == microBatchStart.startFileIndex() && + microBatchStartOffset.position() == microBatchStart.endFileIndex() && + microBatchStart.lastIndexOfSnapshot(); + } + + private static class PlannedEndOffset extends StreamingOffset { + + private final MicroBatch microBatch; + + PlannedEndOffset(long snapshotId, long position, boolean scanAllFiles, MicroBatch microBatch) { + super(snapshotId, position, scanAllFiles); + this.microBatch = microBatch; + } + + public MicroBatch getMicroBatch() { + return microBatch; + } + } + + interface OffsetLog { + static OffsetLog getInstance(SparkSession spark, String checkpointLocation) { + return new OffsetLogImpl(spark, checkpointLocation); + } + + boolean isOffsetLogInitialized(); + + StreamingOffset getLatest(); + } + + private static class OffsetLogImpl implements OffsetLog { + private final OffsetSeqLog offsetSeqLog; + + OffsetLogImpl(SparkSession spark, String checkpointLocation) { + this.offsetSeqLog = checkpointLocation != null ? + new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) : + null; + } + + @Override + public boolean isOffsetLogInitialized() { + return offsetSeqLog != null && + offsetSeqLog.getLatest() != null && + offsetSeqLog.getLatest().isDefined(); + } + + @Override + public StreamingOffset getLatest() { + OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2; + + List> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets()); + Option optionalOffset = offsetSeqCol.get(0); + + return StreamingOffset.fromJson(optionalOffset.get().json()); + } + + private String getOffsetLogLocation(String checkpointLocation) { + return new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString(); + } + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 52fbf7a1bf2d..c535a3534954 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -70,6 +70,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, private static final Set CAPABILITIES = ImmutableSet.of( TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, + TableCapability.MICRO_BATCH_READ, TableCapability.STREAMING_WRITE, TableCapability.OVERWRITE_BY_FILTER, TableCapability.OVERWRITE_DYNAMIC); diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java new file mode 100644 index 000000000000..487c7afce2f1 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.DataStreamWriter; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; +import org.apache.spark.sql.streaming.Trigger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public final class TestStructuredStreamingRead3 { + private static final Configuration CONF = new Configuration(); + private static final Schema SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + private static SparkSession spark = null; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @BeforeClass + public static void startSpark() { + TestStructuredStreamingRead3.spark = SparkSession.builder() + .master("local[2]") + .config("spark.sql.shuffle.partitions", 4) + .getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestStructuredStreamingRead3.spark; + TestStructuredStreamingRead3.spark = null; + currentSpark.stop(); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetAllInsertsAcrossIcebergSnapshots() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")), + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + List actual; + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("test12") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + actual = spark.sql("select * from test12") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals( + expected.stream().flatMap(List::stream).collect(Collectors.toList()), + actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testGetAllInsertsAcrossSparkCheckpoints() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + File writerCheckpoint = new File(parent, "writer-checkpoint"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + final String tempView = "microBatchView"; + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")), + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")), + Lists.newArrayList( + new SimpleRecord(8, "eight"), + new SimpleRecord(9, "nine")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset ds = spark.createDataFrame(l, SimpleRecord.class); + ds.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + DataStreamWriter singleBatchWriter = df.writeStream() + .trigger(Trigger.Once()) + .option("checkpointLocation", writerCheckpoint.toString()) + .foreachBatch((batchDF, batchId) -> { + batchDF.createOrReplaceGlobalTempView(tempView); + }); + + String globalTempView = "global_temp." + tempView; + Assert.assertEquals(expected.get(0), processMicroBatch(singleBatchWriter, globalTempView)); + Assert.assertEquals(expected.get(1), processMicroBatch(singleBatchWriter, globalTempView)); + Assert.assertEquals(expected.get(2), processMicroBatch(singleBatchWriter, globalTempView)); + Assert.assertEquals(expected.get(3), processMicroBatch(singleBatchWriter, globalTempView)); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void validateWhenBatchSizeEquals1ThenOneFileAtATimeIsStreamed() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + File writerCheckpoint = new File(parent, "writer-checkpoint"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + final String tempView = "microBatchSingleRecordView"; + + // produce unique file per record - to test BatchSize=1 + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")), + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")), + Lists.newArrayList( + new SimpleRecord(8, "eight"), + new SimpleRecord(9, "nine")) + ); + + // generate multiple snapshots - each snapshot with multiple files + for (List l : expected) { + Dataset ds = spark.createDataFrame(l, SimpleRecord.class); + ds.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .option(SparkReadOptions.VECTORIZATION_BATCH_SIZE, 1) + .load(location.toString()); + DataStreamWriter singleBatchWriter = df.writeStream() + .trigger(Trigger.Once()) + .option("checkpointLocation", writerCheckpoint.toString()) + .foreachBatch((batchDF, batchId) -> { + batchDF.createOrReplaceGlobalTempView(tempView); + }); + + String globalTempView = "global_temp." + tempView; + for (SimpleRecord simpleRecord : + expected.stream().flatMap(List::stream).collect(Collectors.toList())) { + Assert.assertEquals( + Collections.singletonList(simpleRecord), + processMicroBatch(singleBatchWriter, globalTempView)); + } + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testParquetOrcAvroDataInOneTable() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List parquetFileRecords = Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")); + + List orcFileRecords = Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")); + + List avroFileRecords = Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")); + + // generate multiple snapshots + Dataset df = spark.createDataFrame(parquetFileRecords, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", "parquet") + .mode("append") + .save(location.toString()); + + df = spark.createDataFrame(orcFileRecords, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", "orc") + .mode("append") + .save(location.toString()); + + df = spark.createDataFrame(avroFileRecords, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", "avro") + .mode("append") + .save(location.toString()); + + table.refresh(); + List actual; + + try { + Dataset ds = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = ds.writeStream() + .format("memory") + .queryName("test12") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + actual = spark.sql("select * from test12") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals(Stream.concat(Stream.concat(parquetFileRecords.stream(), orcFileRecords.stream()), + avroFileRecords.stream()) + .collect(Collectors.toList()), + actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamFromEmptyTable() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + table.refresh(); + List actual; + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testemptytable") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + actual = spark.sql("select * from testemptytable") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals(Collections.emptyList(), actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithoverwrites") + .outputMode(OutputMode.Append()) + .start(); + + try { + streamingQuery.processAllAvailable(); + Assert.assertTrue(false); // should be unreachable + } catch (Exception exception) { + Assert.assertTrue(exception instanceof StreamingQueryException); + Assert.assertTrue(((StreamingQueryException) exception).cause() instanceof IllegalStateException); + } + + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + // this should create a snapshot with type Replace. + table.rewriteManifests() + .clusterBy(f -> 1) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation()); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithreplace") + .outputMode(OutputMode.Append()) + .start(); + + try { + streamingQuery.processAllAvailable(); + Assert.assertTrue(false); // should be unreachable + } catch (Exception exception) { + Assert.assertTrue(exception instanceof StreamingQueryException); + Assert.assertTrue(((StreamingQueryException) exception).cause() instanceof IllegalStateException); + } + + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + // this should create a snapshot with type delete. + table.newDelete() + .deleteFromRowFilter(Expressions.equal("id", 4)) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithdelete") + .outputMode(OutputMode.Append()) + .start(); + + try { + streamingQuery.processAllAvailable(); + Assert.assertTrue(false); // should be unreachable + } catch (Exception exception) { + Assert.assertTrue(exception instanceof StreamingQueryException); + Assert.assertTrue(((StreamingQueryException) exception).cause() instanceof IllegalStateException); + } + + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) + throws TimeoutException { + StreamingQuery streamingQuery = singleBatchWriter.start(); + streamingQuery.processAllAvailable(); + + return spark.sql(String.format("select * from %s", viewName)) + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + } +}