From e024ba01b7d513bef3152488df3b30733b67517a Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Wed, 19 May 2021 00:40:38 -0700 Subject: [PATCH 01/22] Wireframe. --- .../source/SparkMicroBatchStreamScan.java | 109 ++++++++++++++ .../spark/source/SparkScanBuilder.java | 10 +- .../iceberg/spark/source/SparkTable.java | 1 + .../iceberg/spark/source/StreamingOffset.java | 135 +++++++++++++++++ .../source/TestStructuredStreamingRead3.java | 137 ++++++++++++++++++ 5 files changed, 390 insertions(+), 2 deletions(-) create mode 100644 spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java create mode 100644 spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java create mode 100644 spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java new file mode 100644 index 000000000000..d133cc5156bf --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkMicroBatchStreamScan implements Scan, MicroBatchStream { + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final boolean localityPreferred; + private final Schema expectedSchema; + private final List filterExpressions; + private final CaseInsensitiveStringMap options; + + private StructType readSchema = null; + + SparkMicroBatchStreamScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema, + List filters, CaseInsensitiveStringMap options) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.filterExpressions = filters != null ? filters : Collections.emptyList(); + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.options = options; + } + + @Override + public StructType readSchema() { + if (readSchema == null) { + this.readSchema = SparkSchemaUtil.convert(expectedSchema); + } + return readSchema; + } + + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + return this; + } + + @Override + public Offset latestOffset() { + return null; + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + return new InputPartition[0]; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return null; + } + + @Override + public Offset initialOffset() { + return null; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + + } + + @Override + public void stop() { + + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 633a17143f52..a2a810237599 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -159,8 +159,14 @@ private Schema schemaWithMetadataColumns() { @Override public Scan build() { - return new SparkBatchQueryScan( - spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); + // TODO: understand how to differentiate that this is a spark streaming microbatch scan. + if (false) { + return new SparkBatchQueryScan( + spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); + } else { + return new SparkMicroBatchStreamScan( + spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); + } } public Scan buildMergeScan() { diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 52fbf7a1bf2d..c535a3534954 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -70,6 +70,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, private static final Set CAPABILITIES = ImmutableSet.of( TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, + TableCapability.MICRO_BATCH_READ, TableCapability.STREAMING_WRITE, TableCapability.OVERWRITE_BY_FILTER, TableCapability.OVERWRITE_DYNAMIC); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java b/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java new file mode 100644 index 000000000000..21fdb0202a30 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; +import org.apache.spark.sql.connector.read.streaming.Offset; + +class StreamingOffset extends Offset { + static final StreamingOffset START_OFFSET = new StreamingOffset(-1L, -1, false); + + private static final int CURR_VERSION = 1; + private static final String VERSION = "version"; + private static final String SNAPSHOT_ID = "snapshot_id"; + private static final String POSITION = "position"; + private static final String SCAN_ALL_FILES = "scan_all_files"; + + private final long snapshotId; + private final long position; + private final boolean scanAllFiles; + + /** + * An implementation of Spark Structured Streaming Offset, to track the current processed files of + * Iceberg table. + * + * @param snapshotId The current processed snapshot id. + * @param position The position of last scanned file in snapshot. + * @param scanAllFiles whether to scan all files in a snapshot; for example, to read + * all data when starting a stream. + */ + StreamingOffset(long snapshotId, long position, boolean scanAllFiles) { + this.snapshotId = snapshotId; + this.position = position; + this.scanAllFiles = scanAllFiles; + } + + static StreamingOffset fromJson(String json) { + Preconditions.checkNotNull(json, "Cannot parse StreamingOffset JSON: null"); + + try { + JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class); + // The version of StreamingOffset. The offset was created with a version number + // used to validate when deserializing from json string. + int version = JsonUtil.getInt(VERSION, node); + Preconditions.checkArgument(version == CURR_VERSION, + "Cannot parse offset JSON: offset version %s is not supported", version); + + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + int position = JsonUtil.getInt(POSITION, node); + boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node); + + return new StreamingOffset(snapshotId, position, shouldScanAllFiles); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e); + } + } + + @Override + public String json() { + StringWriter writer = new StringWriter(); + try { + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + generator.writeStartObject(); + generator.writeNumberField(VERSION, CURR_VERSION); + generator.writeNumberField(SNAPSHOT_ID, snapshotId); + generator.writeNumberField(POSITION, position); + generator.writeBooleanField(SCAN_ALL_FILES, scanAllFiles); + generator.writeEndObject(); + generator.flush(); + + } catch (IOException e) { + throw new UncheckedIOException("Failed to write StreamingOffset to json", e); + } + + return writer.toString(); + } + + long snapshotId() { + return snapshotId; + } + + long position() { + return position; + } + + boolean shouldScanAllFiles() { + return scanAllFiles; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof StreamingOffset) { + StreamingOffset offset = (StreamingOffset) obj; + return offset.snapshotId == snapshotId && + offset.position == position && + offset.scanAllFiles == scanAllFiles; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hashCode(snapshotId, position, scanAllFiles); + } + + @Override + public String toString() { + return String.format("Streaming Offset[%d: position (%d) scan_all_files (%b)]", + snapshotId, position, scanAllFiles); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java new file mode 100644 index 000000000000..1dfb28314099 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import org.apache.spark.sql.execution.streaming.MemoryStream; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import scala.Option; +import scala.collection.JavaConversions; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public final class TestStructuredStreamingRead3 { + private static final Configuration CONF = new Configuration(); + private static final Schema SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + private static SparkSession spark = null; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @BeforeClass + public static void startSpark() { + TestStructuredStreamingRead3.spark = SparkSession.builder() + .master("local[2]") + .config("spark.sql.shuffle.partitions", 4) + .getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestStructuredStreamingRead3.spark; + TestStructuredStreamingRead3.spark = null; + currentSpark.stop(); + } + + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFromStart() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + File checkpoint = new File(parent, "checkpoint"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List expected = Lists.newArrayList( + new SimpleRecord(2, "1"), + new SimpleRecord(3, "2"), + new SimpleRecord(1, "3") + ); + + // Write records one by one to generate multiple snapshots + for (SimpleRecord l : expected) { + Dataset df = spark.createDataFrame(Collections.singletonList(l), SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + table.refresh(); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .option("checkpointLocation", checkpoint.toString()) + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("test12") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + Object actual = spark.sql("select * from test12").collect(); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } +} From 61611fd0a6177e6cf1f51a5c338758b37bbd8d47 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Wed, 19 May 2021 19:24:06 -0700 Subject: [PATCH 02/22] remove StreamingOffset related change - as it is handled in another PR #2615 --- .../iceberg/spark/source/StreamingOffset.java | 135 ------------------ 1 file changed, 135 deletions(-) delete mode 100644 spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java b/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java deleted file mode 100644 index 21fdb0202a30..000000000000 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.source; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; -import java.io.StringWriter; -import java.io.UncheckedIOException; -import org.apache.iceberg.relocated.com.google.common.base.Objects; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.util.JsonUtil; -import org.apache.spark.sql.connector.read.streaming.Offset; - -class StreamingOffset extends Offset { - static final StreamingOffset START_OFFSET = new StreamingOffset(-1L, -1, false); - - private static final int CURR_VERSION = 1; - private static final String VERSION = "version"; - private static final String SNAPSHOT_ID = "snapshot_id"; - private static final String POSITION = "position"; - private static final String SCAN_ALL_FILES = "scan_all_files"; - - private final long snapshotId; - private final long position; - private final boolean scanAllFiles; - - /** - * An implementation of Spark Structured Streaming Offset, to track the current processed files of - * Iceberg table. - * - * @param snapshotId The current processed snapshot id. - * @param position The position of last scanned file in snapshot. - * @param scanAllFiles whether to scan all files in a snapshot; for example, to read - * all data when starting a stream. - */ - StreamingOffset(long snapshotId, long position, boolean scanAllFiles) { - this.snapshotId = snapshotId; - this.position = position; - this.scanAllFiles = scanAllFiles; - } - - static StreamingOffset fromJson(String json) { - Preconditions.checkNotNull(json, "Cannot parse StreamingOffset JSON: null"); - - try { - JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class); - // The version of StreamingOffset. The offset was created with a version number - // used to validate when deserializing from json string. - int version = JsonUtil.getInt(VERSION, node); - Preconditions.checkArgument(version == CURR_VERSION, - "Cannot parse offset JSON: offset version %s is not supported", version); - - long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); - int position = JsonUtil.getInt(POSITION, node); - boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node); - - return new StreamingOffset(snapshotId, position, shouldScanAllFiles); - } catch (IOException e) { - throw new IllegalArgumentException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e); - } - } - - @Override - public String json() { - StringWriter writer = new StringWriter(); - try { - JsonGenerator generator = JsonUtil.factory().createGenerator(writer); - generator.writeStartObject(); - generator.writeNumberField(VERSION, CURR_VERSION); - generator.writeNumberField(SNAPSHOT_ID, snapshotId); - generator.writeNumberField(POSITION, position); - generator.writeBooleanField(SCAN_ALL_FILES, scanAllFiles); - generator.writeEndObject(); - generator.flush(); - - } catch (IOException e) { - throw new UncheckedIOException("Failed to write StreamingOffset to json", e); - } - - return writer.toString(); - } - - long snapshotId() { - return snapshotId; - } - - long position() { - return position; - } - - boolean shouldScanAllFiles() { - return scanAllFiles; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof StreamingOffset) { - StreamingOffset offset = (StreamingOffset) obj; - return offset.snapshotId == snapshotId && - offset.position == position && - offset.scanAllFiles == scanAllFiles; - } else { - return false; - } - } - - @Override - public int hashCode() { - return Objects.hashCode(snapshotId, position, scanAllFiles); - } - - @Override - public String toString() { - return String.format("Streaming Offset[%d: position (%d) scan_all_files (%b)]", - snapshotId, position, scanAllFiles); - } -} From 6a2adcb4616fce616f617bed74dea1ce35423e31 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 20 May 2021 19:10:06 -0700 Subject: [PATCH 03/22] test changes --- .../source/SparkMicroBatchStreamScan.java | 144 ++++++++++-------- .../source/TestStructuredStreamingRead3.java | 127 +++++++-------- 2 files changed, 141 insertions(+), 130 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java index d133cc5156bf..86c1477f92e3 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java @@ -35,75 +35,85 @@ import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SparkMicroBatchStreamScan implements Scan, MicroBatchStream { - private final JavaSparkContext sparkContext; - private final Table table; - private final boolean caseSensitive; - private final boolean localityPreferred; - private final Schema expectedSchema; - private final List filterExpressions; - private final CaseInsensitiveStringMap options; - - private StructType readSchema = null; - - SparkMicroBatchStreamScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema, - List filters, CaseInsensitiveStringMap options) { - this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); - this.table = table; - this.caseSensitive = caseSensitive; - this.expectedSchema = expectedSchema; - this.filterExpressions = filters != null ? filters : Collections.emptyList(); - this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); - this.options = options; - } - - @Override - public StructType readSchema() { - if (readSchema == null) { - this.readSchema = SparkSchemaUtil.convert(expectedSchema); - } - return readSchema; - } - - @Override - public MicroBatchStream toMicroBatchStream(String checkpointLocation) { - return this; - } - - @Override - public Offset latestOffset() { - return null; - } - - @Override - public InputPartition[] planInputPartitions(Offset start, Offset end) { - return new InputPartition[0]; - } - - @Override - public PartitionReaderFactory createReaderFactory() { - return null; - } - - @Override - public Offset initialOffset() { - return null; - } - - @Override - public Offset deserializeOffset(String json) { - return StreamingOffset.fromJson(json); - } - - @Override - public void commit(Offset end) { - - } - - @Override - public void stop() { - + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStreamScan.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final boolean localityPreferred; + private final Schema expectedSchema; + private final List filterExpressions; + private final CaseInsensitiveStringMap options; + + private StructType readSchema = null; + + SparkMicroBatchStreamScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema, + List filters, CaseInsensitiveStringMap options) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.filterExpressions = filters != null ? filters : Collections.emptyList(); + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.options = options; + } + + @Override + public StructType readSchema() { + LOG.info("---------- readSchema"); + if (readSchema == null) { + this.readSchema = SparkSchemaUtil.convert(expectedSchema); } + return readSchema; + } + + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + LOG.info("---------- toMicroBatchStream: {}", checkpointLocation); + return this; + } + + @Override + public Offset latestOffset() { + LOG.info("---------- latestOffset"); + return new StreamingOffset(Long.MAX_VALUE, Long.MAX_VALUE, true); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + LOG.info("---------- planInputPartitions: {}, {}", start, end); + return new InputPartition[0]; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + LOG.info("---------- createReaderFactory"); + return null; + } + + @Override + public Offset initialOffset() { + return StreamingOffset.START_OFFSET; + } + + @Override + public Offset deserializeOffset(String json) { + LOG.info("---------- deserializeOffset {}", json); + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + LOG.info("---------- commit {}", end.toString()); + } + + @Override + public void stop() { + LOG.info("---------- stop"); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 1dfb28314099..13ddc766c1a4 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -62,76 +62,77 @@ import static org.apache.iceberg.types.Types.NestedField.optional; public final class TestStructuredStreamingRead3 { - private static final Configuration CONF = new Configuration(); - private static final Schema SCHEMA = new Schema( - optional(1, "id", Types.IntegerType.get()), - optional(2, "data", Types.StringType.get()) - ); - private static SparkSession spark = null; + private static final Configuration CONF = new Configuration(); + private static final Schema SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + private static SparkSession spark = null; - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - @Rule - public ExpectedException exceptionRule = ExpectedException.none(); + @Rule + public TemporaryFolder temp = new TemporaryFolder(); - @BeforeClass - public static void startSpark() { - TestStructuredStreamingRead3.spark = SparkSession.builder() - .master("local[2]") - .config("spark.sql.shuffle.partitions", 4) - .getOrCreate(); - } + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); - @AfterClass - public static void stopSpark() { - SparkSession currentSpark = TestStructuredStreamingRead3.spark; - TestStructuredStreamingRead3.spark = null; - currentSpark.stop(); - } + @BeforeClass + public static void startSpark() { + TestStructuredStreamingRead3.spark = SparkSession.builder() + .master("local[2]") + .config("spark.sql.shuffle.partitions", 4) + .getOrCreate(); + } - @SuppressWarnings("unchecked") - @Test - public void testGetChangesFromStart() throws IOException, TimeoutException { - File parent = temp.newFolder("parent"); - File location = new File(parent, "test-table"); - File checkpoint = new File(parent, "checkpoint"); + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestStructuredStreamingRead3.spark; + TestStructuredStreamingRead3.spark = null; + currentSpark.stop(); + } - HadoopTables tables = new HadoopTables(CONF); - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); - Table table = tables.create(SCHEMA, spec, location.toString()); + @SuppressWarnings("unchecked") + @Test + public void testGetChangesFromStart() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + File checkpoint = new File(parent, "checkpoint"); - List expected = Lists.newArrayList( - new SimpleRecord(2, "1"), - new SimpleRecord(3, "2"), - new SimpleRecord(1, "3") - ); + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); - // Write records one by one to generate multiple snapshots - for (SimpleRecord l : expected) { - Dataset df = spark.createDataFrame(Collections.singletonList(l), SimpleRecord.class); - df.select("id", "data").write() - .format("iceberg") - .mode("append") - .save(location.toString()); - } - table.refresh(); + List expected = Lists.newArrayList( + new SimpleRecord(2, "1"), + new SimpleRecord(3, "2"), + new SimpleRecord(1, "3") + ); + + // Write records one by one to generate multiple snapshots + for (SimpleRecord l : expected) { + Dataset df = spark.createDataFrame(Collections.singletonList(l), SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + table.refresh(); - try { - Dataset df = spark.readStream() - .format("iceberg") - .option("checkpointLocation", checkpoint.toString()) - .load(location.toString()); - StreamingQuery streamingQuery = df.writeStream() - .format("memory") - .queryName("test12") - .outputMode(OutputMode.Append()) - .start(); - streamingQuery.processAllAvailable(); - Object actual = spark.sql("select * from test12").collect(); - } finally { - for (StreamingQuery query : spark.streams().active()) { - query.stop(); - } - } + try { + Dataset df = spark.readStream() + .format("iceberg") + .option("checkpointLocation", checkpoint.toString()) + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("test12") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + Object actual = spark.sql("select * from test12").collect(); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } } + } } From def1bc06d9942a44e34deb82ec7b033135390e05 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 21 May 2021 15:31:07 -0700 Subject: [PATCH 04/22] rudimentary implementation for spark3 streaming reads from iceberg table --- .../iceberg/spark/source/SparkBatchScan.java | 8 + .../spark/source/SparkMicroBatchStream.java | 309 ++++++++++++++++++ .../source/SparkMicroBatchStreamScan.java | 119 ------- .../spark/source/SparkScanBuilder.java | 6 - 4 files changed, 317 insertions(+), 125 deletions(-) create mode 100644 spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java delete mode 100644 spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 1999b568b9e0..9ba763a2cf5a 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -52,6 +52,8 @@ import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -108,6 +110,12 @@ public Batch toBatch() { return this; } + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + return new SparkMicroBatchStream( + sparkContext, table, caseSensitive, expectedSchema, filterExpressions, options, checkpointLocation); + } + @Override public StructType readSchema() { if (readSchema == null) { diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java new file mode 100644 index 000000000000..fc36ceb490f7 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.execution.streaming.HDFSMetadataLog; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.reflect.ClassTag; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final List filterExpressions; + private final int batchSize; + private final CaseInsensitiveStringMap options; + private final String checkpointLocation; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + + // lazy variables + private StructType readSchema = null; + + // state + private StreamingOffset committedOffset = null; + + SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, + List filterExpressions, CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.filterExpressions = filterExpressions; + this.batchSize = Spark3Util.batchSize(table.properties(), options); + this.options = options; + this.checkpointLocation = checkpointLocation; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null); + } + + @Override + public Offset latestOffset() { + if (committedOffset == null) { + // Spark MicroBatchStream stateMachine invokes latestOffset as its first step. + // this makes sense for sources like SocketStream - when there is a running stream of data. + // in case of iceberg - particularly in case of full table reads from start + // the amount of data to read could be very large. + // so - do not participate in the statemachine unless spark specifies StartingOffset + // - via planInputPartitions - in its 2nd step + return StreamingOffset.START_OFFSET; + } + + MicroBatch microBatch = MicroBatches.from(table.snapshot(committedOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(committedOffset.position(), batchSize, committedOffset.shouldScanAllFiles()); + + return new PlannedEndOffset( + microBatch.snapshotId(), microBatch.endFileIndex(), + committedOffset.shouldScanAllFiles(), committedOffset, microBatch); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + // TODO: validate that this is exercised - when a stream is being resumed from a checkpoint + this.committedOffset = (StreamingOffset) start; + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + PlannedEndOffset endOffset = (PlannedEndOffset) end; + // Preconditions.checkState(endOffset.getStartOffset().equals(start), "The cached MicroBatch doesn't match requested planInputPartitions"); + + List fileScanTasks = endOffset.getMicroBatch().tasks(); + + CloseableIterable splitTasks = TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + // TODO: what about batchSize? + return new ReaderFactory(batchSize); + } + + @Override + public Offset initialOffset() { + // TODO: should this read initial offset from checkpoint location? + if (committedOffset != null) { + return committedOffset; + } else { + List snapshotIds = SnapshotUtil.currentAncestors(table); + if (snapshotIds.isEmpty()) { + return StreamingOffset.START_OFFSET; + } else { + return new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); + } + } + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + committedOffset = (StreamingOffset) end; + } + + @Override + public void stop() { + LOG.info("---------- stop"); + } + + // TODO: is this needed? + // https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala + private static final class IcebergMetadataLog extends HDFSMetadataLog { + + public IcebergMetadataLog(SparkSession sparkSession, String path, ClassTag evidence$1) { + super(sparkSession, path, evidence$1); + } + } + + private static class ReaderFactory implements PartitionReaderFactory { + private final int batchSize; + + private ReaderFactory(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public PartitionReader createReader(InputPartition partition) { + if (partition instanceof ReadTask) { + return new RowReader((ReadTask) partition); + } else { + throw new UnsupportedOperationException("Incorrect input partition type: " + partition); + } + } + + @Override + public PartitionReader createColumnarReader(InputPartition partition) { + if (partition instanceof ReadTask) { + return new BatchReader((ReadTask) partition, batchSize); + } else { + throw new UnsupportedOperationException("Incorrect input partition type: " + partition); + } + } + + @Override + public boolean supportColumnarReads(InputPartition partition) { + return batchSize > 1; + } + } + + private static class RowReader extends RowDataReader implements PartitionReader { + RowReader(ReadTask task) { + super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); + } + } + + private static class BatchReader extends BatchDataReader implements PartitionReader { + BatchReader(ReadTask task, int batchSize) { + super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); + } + } + + private static class ReadTask implements InputPartition, Serializable { + private final CombinedScanTask task; + private final Broadcast
tableBroadcast; + private final String expectedSchemaString; + private final boolean caseSensitive; + + private transient Schema expectedSchema = null; + private transient String[] preferredLocations = null; + + ReadTask(CombinedScanTask task, Broadcast
tableBroadcast, String expectedSchemaString, + boolean caseSensitive, boolean localityPreferred) { + this.task = task; + this.tableBroadcast = tableBroadcast; + this.expectedSchemaString = expectedSchemaString; + this.caseSensitive = caseSensitive; + if (localityPreferred) { + Table table = tableBroadcast.value(); + this.preferredLocations = Util.blockLocations(table.io(), task); + } else { + this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE; + } + } + + @Override + public String[] preferredLocations() { + return preferredLocations; + } + + public Collection files() { + return task.files(); + } + + public Table table() { + return tableBroadcast.value(); + } + + public boolean isCaseSensitive() { + return caseSensitive; + } + + private Schema expectedSchema() { + if (expectedSchema == null) { + this.expectedSchema = SchemaParser.fromJson(expectedSchemaString); + } + return expectedSchema; + } + } + + private static class PlannedEndOffset extends StreamingOffset { + + private final StreamingOffset startOffset; + private final MicroBatch microBatch; + + PlannedEndOffset(long snapshotId, long position, boolean scanAllFiles, StreamingOffset startOffset, MicroBatch microBatch) { + super(snapshotId, position, scanAllFiles); + + this.startOffset = startOffset; + this.microBatch = microBatch; + } + + public StreamingOffset getStartOffset() { + return startOffset; + } + + public MicroBatch getMicroBatch() { + return microBatch; + } + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java deleted file mode 100644 index 86c1477f92e3..000000000000 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStreamScan.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.source; - -import java.util.Collections; -import java.util.List; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.spark.Spark3Util; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.connector.read.InputPartition; -import org.apache.spark.sql.connector.read.PartitionReaderFactory; -import org.apache.spark.sql.connector.read.Scan; -import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; -import org.apache.spark.sql.connector.read.streaming.Offset; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SparkMicroBatchStreamScan implements Scan, MicroBatchStream { - - private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStreamScan.class); - - private final JavaSparkContext sparkContext; - private final Table table; - private final boolean caseSensitive; - private final boolean localityPreferred; - private final Schema expectedSchema; - private final List filterExpressions; - private final CaseInsensitiveStringMap options; - - private StructType readSchema = null; - - SparkMicroBatchStreamScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema, - List filters, CaseInsensitiveStringMap options) { - this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); - this.table = table; - this.caseSensitive = caseSensitive; - this.expectedSchema = expectedSchema; - this.filterExpressions = filters != null ? filters : Collections.emptyList(); - this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); - this.options = options; - } - - @Override - public StructType readSchema() { - LOG.info("---------- readSchema"); - if (readSchema == null) { - this.readSchema = SparkSchemaUtil.convert(expectedSchema); - } - return readSchema; - } - - @Override - public MicroBatchStream toMicroBatchStream(String checkpointLocation) { - LOG.info("---------- toMicroBatchStream: {}", checkpointLocation); - return this; - } - - @Override - public Offset latestOffset() { - LOG.info("---------- latestOffset"); - return new StreamingOffset(Long.MAX_VALUE, Long.MAX_VALUE, true); - } - - @Override - public InputPartition[] planInputPartitions(Offset start, Offset end) { - LOG.info("---------- planInputPartitions: {}, {}", start, end); - return new InputPartition[0]; - } - - @Override - public PartitionReaderFactory createReaderFactory() { - LOG.info("---------- createReaderFactory"); - return null; - } - - @Override - public Offset initialOffset() { - return StreamingOffset.START_OFFSET; - } - - @Override - public Offset deserializeOffset(String json) { - LOG.info("---------- deserializeOffset {}", json); - return StreamingOffset.fromJson(json); - } - - @Override - public void commit(Offset end) { - LOG.info("---------- commit {}", end.toString()); - } - - @Override - public void stop() { - LOG.info("---------- stop"); - } -} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index a2a810237599..f47ac55579e9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -159,14 +159,8 @@ private Schema schemaWithMetadataColumns() { @Override public Scan build() { - // TODO: understand how to differentiate that this is a spark streaming microbatch scan. - if (false) { return new SparkBatchQueryScan( spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); - } else { - return new SparkMicroBatchStreamScan( - spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); - } } public Scan buildMergeScan() { From a96b3e6cd4f5d01410162ab0ce07b821505e3207 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 21 May 2021 17:32:27 -0700 Subject: [PATCH 05/22] rudimentary implementation for spark3 streaming reads from iceberg table --- .../spark/source/SparkMicroBatchStream.java | 51 ++++++++++++------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index fc36ceb490f7..53ef9841a4d3 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.List; +import java.util.Optional; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MicroBatches; @@ -39,6 +40,7 @@ import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.api.java.JavaSparkContext; @@ -58,6 +60,13 @@ import org.slf4j.LoggerFactory; import scala.reflect.ClassTag; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + public class SparkMicroBatchStream implements MicroBatchStream { private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); @@ -79,6 +88,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { // state private StreamingOffset committedOffset = null; + private StreamingOffset startOffset = null; SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, List filterExpressions, CaseInsensitiveStringMap options, String checkpointLocation) { @@ -91,14 +101,18 @@ public class SparkMicroBatchStream implements MicroBatchStream { this.options = options; this.checkpointLocation = checkpointLocation; this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); - this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null); - this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null); - this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); } @Override public Offset latestOffset() { - if (committedOffset == null) { + if (startOffset == null || startOffset.equals(StreamingOffset.START_OFFSET)) { // Spark MicroBatchStream stateMachine invokes latestOffset as its first step. // this makes sense for sources like SocketStream - when there is a running stream of data. // in case of iceberg - particularly in case of full table reads from start @@ -108,21 +122,24 @@ public Offset latestOffset() { return StreamingOffset.START_OFFSET; } - MicroBatch microBatch = MicroBatches.from(table.snapshot(committedOffset.snapshotId()), table.io()) + final StreamingOffset startReadingFrom = committedOffset == null || committedOffset.equals(StreamingOffset.START_OFFSET) + ? startOffset : committedOffset; + + MicroBatch microBatch = MicroBatches.from(table.snapshot(startReadingFrom.snapshotId()), table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) - .generate(committedOffset.position(), batchSize, committedOffset.shouldScanAllFiles()); + .generate(startReadingFrom.position(), batchSize, startReadingFrom.shouldScanAllFiles()); return new PlannedEndOffset( microBatch.snapshotId(), microBatch.endFileIndex(), - committedOffset.shouldScanAllFiles(), committedOffset, microBatch); + startReadingFrom.shouldScanAllFiles(), startReadingFrom, microBatch); } @Override public InputPartition[] planInputPartitions(Offset start, Offset end) { if (end.equals(StreamingOffset.START_OFFSET)) { // TODO: validate that this is exercised - when a stream is being resumed from a checkpoint - this.committedOffset = (StreamingOffset) start; + startOffset = (StreamingOffset) start; return new InputPartition[0]; } @@ -158,17 +175,15 @@ public PartitionReaderFactory createReaderFactory() { @Override public Offset initialOffset() { - // TODO: should this read initial offset from checkpoint location? - if (committedOffset != null) { - return committedOffset; + // TODO: read snapshot from spark options + List snapshotIds = SnapshotUtil.currentAncestors(table); + if (snapshotIds.isEmpty()) { + startOffset = StreamingOffset.START_OFFSET; } else { - List snapshotIds = SnapshotUtil.currentAncestors(table); - if (snapshotIds.isEmpty()) { - return StreamingOffset.START_OFFSET; - } else { - return new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); - } + startOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); } + + return startOffset; } @Override @@ -178,7 +193,7 @@ public Offset deserializeOffset(String json) { @Override public void commit(Offset end) { - committedOffset = (StreamingOffset) end; + this.committedOffset = (StreamingOffset) end; } @Override From b18cfdc5c115399af5c5092f7f7bc6de6a9df05d Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 21 May 2021 18:31:16 -0700 Subject: [PATCH 06/22] unit test --- .../iceberg/spark/source/SparkMicroBatchStream.java | 12 ++++++++---- .../spark/source/TestStructuredStreamingRead3.java | 7 ++++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 53ef9841a4d3..fb07b557090f 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -86,9 +86,10 @@ public class SparkMicroBatchStream implements MicroBatchStream { // lazy variables private StructType readSchema = null; - // state + // state variables private StreamingOffset committedOffset = null; private StreamingOffset startOffset = null; + private StreamingOffset previousEndOffset = null; SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, List filterExpressions, CaseInsensitiveStringMap options, String checkpointLocation) { @@ -122,17 +123,20 @@ public Offset latestOffset() { return StreamingOffset.START_OFFSET; } - final StreamingOffset startReadingFrom = committedOffset == null || committedOffset.equals(StreamingOffset.START_OFFSET) - ? startOffset : committedOffset; + final StreamingOffset startReadingFrom = previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET) + ? startOffset : previousEndOffset; + // TODO: detect end of microBatch and graduate to next batch MicroBatch microBatch = MicroBatches.from(table.snapshot(startReadingFrom.snapshotId()), table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate(startReadingFrom.position(), batchSize, startReadingFrom.shouldScanAllFiles()); - return new PlannedEndOffset( + previousEndOffset = new PlannedEndOffset( microBatch.snapshotId(), microBatch.endFileIndex(), startReadingFrom.shouldScanAllFiles(), startReadingFrom, microBatch); + + return previousEndOffset; } @Override diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 13ddc766c1a4..992863db8675 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -116,6 +116,7 @@ public void testGetChangesFromStart() throws IOException, TimeoutException { .save(location.toString()); } table.refresh(); + List actual; try { Dataset df = spark.readStream() @@ -128,7 +129,11 @@ public void testGetChangesFromStart() throws IOException, TimeoutException { .outputMode(OutputMode.Append()) .start(); streamingQuery.processAllAvailable(); - Object actual = spark.sql("select * from test12").collect(); + actual = spark.sql("select * from test12") + .orderBy("id").as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals(expected, actual); } finally { for (StreamingQuery query : spark.streams().active()) { query.stop(); From 993bd9eadb3c1f5d79198fc5689b5ea29950a76b Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 21 May 2021 20:06:06 -0700 Subject: [PATCH 07/22] works! --- .../spark/source/SparkMicroBatchStream.java | 47 ++++++++++++++----- .../source/TestStructuredStreamingRead3.java | 2 +- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index fb07b557090f..6049461b52c1 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -27,6 +27,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MicroBatches; import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -89,7 +90,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { // state variables private StreamingOffset committedOffset = null; private StreamingOffset startOffset = null; - private StreamingOffset previousEndOffset = null; + private PlannedEndOffset previousEndOffset = null; SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, List filterExpressions, CaseInsensitiveStringMap options, String checkpointLocation) { @@ -115,26 +116,48 @@ public class SparkMicroBatchStream implements MicroBatchStream { public Offset latestOffset() { if (startOffset == null || startOffset.equals(StreamingOffset.START_OFFSET)) { // Spark MicroBatchStream stateMachine invokes latestOffset as its first step. - // this makes sense for sources like SocketStream - when there is a running stream of data. - // in case of iceberg - particularly in case of full table reads from start - // the amount of data to read could be very large. - // so - do not participate in the statemachine unless spark specifies StartingOffset - // - via planInputPartitions - in its 2nd step + // this makes sense for sources like SocketStream - when there is no persistent backing store + // but a continuous stream of data. + // in case of iceberg - particularly in case of "full table reads from that start" + // the amount of data to read in a given batch could be very large. + // so - latestOffset will not participate in the statemachine unless StartingOffset + // is found - via initialOffset/deserializeOffset is invoked - in its 2nd step return StreamingOffset.START_OFFSET; } - final StreamingOffset startReadingFrom = previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET) + final StreamingOffset startReadingFrom = + previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET) ? startOffset : previousEndOffset; - // TODO: detect end of microBatch and graduate to next batch - MicroBatch microBatch = MicroBatches.from(table.snapshot(startReadingFrom.snapshotId()), table.io()) + final Snapshot startingSnapshot; + final long startFileIndex; + final boolean shouldScanAllFiles; + + if (startReadingFrom instanceof PlannedEndOffset + && ((PlannedEndOffset)startReadingFrom).getMicroBatch().lastIndexOfSnapshot() + && table.currentSnapshot().snapshotId() != startReadingFrom.snapshotId()) { + Snapshot previousSnapshot = table.snapshot(startReadingFrom.snapshotId()); + Snapshot pointer = table.currentSnapshot(); + while (pointer != null && pointer.parentId() != previousSnapshot.snapshotId()) { + pointer = table.snapshot(pointer.parentId()); + } + + startingSnapshot = pointer; + startFileIndex = 0L; + shouldScanAllFiles = false; + } else { + startingSnapshot = table.snapshot(startReadingFrom.snapshotId()); + startFileIndex = startReadingFrom.position(); + shouldScanAllFiles = startReadingFrom.shouldScanAllFiles(); + } + + MicroBatch microBatch = MicroBatches.from(startingSnapshot, table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) - .generate(startReadingFrom.position(), batchSize, startReadingFrom.shouldScanAllFiles()); + .generate(startFileIndex, batchSize, shouldScanAllFiles); previousEndOffset = new PlannedEndOffset( - microBatch.snapshotId(), microBatch.endFileIndex(), - startReadingFrom.shouldScanAllFiles(), startReadingFrom, microBatch); + microBatch.snapshotId(), microBatch.endFileIndex(), shouldScanAllFiles, startReadingFrom, microBatch); return previousEndOffset; } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 992863db8675..074c6f18cff4 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -130,7 +130,7 @@ public void testGetChangesFromStart() throws IOException, TimeoutException { .start(); streamingQuery.processAllAvailable(); actual = spark.sql("select * from test12") - .orderBy("id").as(Encoders.bean(SimpleRecord.class)) + .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); Assert.assertEquals(expected, actual); From 7063fd3ff88385717f6f29351cc0f81c846ff956 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Tue, 25 May 2021 21:04:26 -0700 Subject: [PATCH 08/22] Unit test. --- .../source/TestStructuredStreamingRead3.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 074c6f18cff4..bbccf1e3f5cf 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -92,29 +92,37 @@ public static void stopSpark() { @SuppressWarnings("unchecked") @Test - public void testGetChangesFromStart() throws IOException, TimeoutException { + public void testGetAllAppendsFromStartAcrossMultipleSnapshots() throws IOException, TimeoutException { File parent = temp.newFolder("parent"); File location = new File(parent, "test-table"); File checkpoint = new File(parent, "checkpoint"); HadoopTables tables = new HadoopTables(CONF); - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); Table table = tables.create(SCHEMA, spec, location.toString()); - List expected = Lists.newArrayList( - new SimpleRecord(2, "1"), - new SimpleRecord(3, "2"), - new SimpleRecord(1, "3") + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")), + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")) ); - // Write records one by one to generate multiple snapshots - for (SimpleRecord l : expected) { - Dataset df = spark.createDataFrame(Collections.singletonList(l), SimpleRecord.class); + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); df.select("id", "data").write() .format("iceberg") .mode("append") .save(location.toString()); } + table.refresh(); List actual; @@ -133,7 +141,9 @@ public void testGetChangesFromStart() throws IOException, TimeoutException { .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); - Assert.assertEquals(expected, actual); + Assert.assertEquals( + expected.stream().flatMap(List::stream).collect(Collectors.toList()), + actual); } finally { for (StreamingQuery query : spark.streams().active()) { query.stop(); From 17f6eb8448a5d9f01c4e3fa7e7a31791b4950a38 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Wed, 26 May 2021 23:12:55 -0700 Subject: [PATCH 09/22] Unit test. --- .../iceberg/spark/source/SparkBatchScan.java | 4 +- .../spark/source/SparkMicroBatchStream.java | 59 ++++++------- .../source/TestStructuredStreamingRead3.java | 84 ++++++++++++++++++- 3 files changed, 114 insertions(+), 33 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 9ba763a2cf5a..62fd648f3d0a 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -64,6 +64,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class); private final JavaSparkContext sparkContext; + private final SparkSession spark; private final Table table; private final boolean caseSensitive; private final boolean localityPreferred; @@ -78,6 +79,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { SparkBatchScan(SparkSession spark, Table table, boolean caseSensitive, Schema expectedSchema, List filters, CaseInsensitiveStringMap options) { this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.spark = spark; this.table = table; this.caseSensitive = caseSensitive; this.expectedSchema = expectedSchema; @@ -113,7 +115,7 @@ public Batch toBatch() { @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { return new SparkMicroBatchStream( - sparkContext, table, caseSensitive, expectedSchema, filterExpressions, options, checkpointLocation); + spark, sparkContext, table, caseSensitive, expectedSchema, filterExpressions, options, checkpointLocation); } @Override diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 6049461b52c1..4a36b36128b9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -21,8 +21,10 @@ import java.io.Serializable; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MicroBatches; @@ -53,12 +55,17 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.execution.streaming.CommitLog; import org.apache.spark.sql.execution.streaming.HDFSMetadataLog; +import org.apache.spark.sql.execution.streaming.OffsetSeq; +import org.apache.spark.sql.execution.streaming.OffsetSeqLog; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; import scala.reflect.ClassTag; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; @@ -71,6 +78,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + private final SparkSession spark; private final JavaSparkContext sparkContext; private final Table table; private final boolean caseSensitive; @@ -83,18 +91,16 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final Integer splitLookback; private final Long splitOpenFileCost; private final boolean localityPreferred; + private final OffsetSeqLog offsetSeqLog; - // lazy variables - private StructType readSchema = null; - - // state variables - private StreamingOffset committedOffset = null; + private long batchId = 0L; private StreamingOffset startOffset = null; private PlannedEndOffset previousEndOffset = null; - SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, + SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, List filterExpressions, CaseInsensitiveStringMap options, String checkpointLocation) { this.sparkContext = sparkContext; + this.spark = spark; this.table = table; this.caseSensitive = caseSensitive; this.expectedSchema = expectedSchema; @@ -110,21 +116,14 @@ public class SparkMicroBatchStream implements MicroBatchStream { this.splitOpenFileCost = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT)); + this.offsetSeqLog = new OffsetSeqLog(spark, new Path(checkpointLocation, "offsets").toString()); } @Override public Offset latestOffset() { - if (startOffset == null || startOffset.equals(StreamingOffset.START_OFFSET)) { - // Spark MicroBatchStream stateMachine invokes latestOffset as its first step. - // this makes sense for sources like SocketStream - when there is no persistent backing store - // but a continuous stream of data. - // in case of iceberg - particularly in case of "full table reads from that start" - // the amount of data to read in a given batch could be very large. - // so - latestOffset will not participate in the statemachine unless StartingOffset - // is found - via initialOffset/deserializeOffset is invoked - in its 2nd step - return StreamingOffset.START_OFFSET; - } - + // lastoffset() is the first step in spark MicroBatchStream statemachine + // to control the batch size - calculate startOffset first + initialOffset(); final StreamingOffset startReadingFrom = previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET) ? startOffset : previousEndOffset; @@ -196,13 +195,22 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { @Override public PartitionReaderFactory createReaderFactory() { - // TODO: what about batchSize? return new ReaderFactory(batchSize); } @Override public Offset initialOffset() { // TODO: read snapshot from spark options + // HDFSMetadataLog checkpointLog = new HDFSMetadataLog<>(spark, checkpointLocation, null); + // StreamingOffset offset = checkpointLog.getLatest().get()._2; + + if (offsetSeqLog != null && offsetSeqLog.getLatest() != null && offsetSeqLog.getLatest().isDefined()) { + batchId = (long) offsetSeqLog.getLatest().get()._1; + OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2; + + return JavaConverters.asJavaCollection(offsetSeq.offsets()).stream().findFirst().get().get(); + } + List snapshotIds = SnapshotUtil.currentAncestors(table); if (snapshotIds.isEmpty()) { startOffset = StreamingOffset.START_OFFSET; @@ -220,21 +228,14 @@ public Offset deserializeOffset(String json) { @Override public void commit(Offset end) { - this.committedOffset = (StreamingOffset) end; + batchId++; + OffsetSeq offsetSeq = OffsetSeq.fill( + JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(end)).asScala().toSeq()); + this.offsetSeqLog.add(batchId, offsetSeq); } @Override public void stop() { - LOG.info("---------- stop"); - } - - // TODO: is this needed? - // https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala - private static final class IcebergMetadataLog extends HDFSMetadataLog { - - public IcebergMetadataLog(SparkSession sparkSession, String path, ClassTag evidence$1) { - super(sparkSession, path, evidence$1); - } } private static class ReaderFactory implements PartitionReaderFactory { diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index bbccf1e3f5cf..9deb41a8a0c6 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.MicroBatches.MicroBatch; @@ -47,8 +48,10 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.streaming.MemoryStream; +import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.Trigger; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -92,10 +95,9 @@ public static void stopSpark() { @SuppressWarnings("unchecked") @Test - public void testGetAllAppendsFromStartAcrossMultipleSnapshots() throws IOException, TimeoutException { + public void testGetAllInsertsAcrossIcebergSnapshots() throws IOException, TimeoutException { File parent = temp.newFolder("parent"); File location = new File(parent, "test-table"); - File checkpoint = new File(parent, "checkpoint"); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); @@ -129,7 +131,6 @@ public void testGetAllAppendsFromStartAcrossMultipleSnapshots() throws IOExcepti try { Dataset df = spark.readStream() .format("iceberg") - .option("checkpointLocation", checkpoint.toString()) .load(location.toString()); StreamingQuery streamingQuery = df.writeStream() .format("memory") @@ -150,4 +151,81 @@ public void testGetAllAppendsFromStartAcrossMultipleSnapshots() throws IOExcepti } } } + + @SuppressWarnings("unchecked") + @Test + public void testGetAllInsertsAcrossSparkCheckpoints() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + File writerCheckpoint = new File(parent, "writer-checkpoint"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + final String tempView = "microBatchView"; + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")), + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")), + Lists.newArrayList( + new SimpleRecord(8, "eight"), + new SimpleRecord(9, "nine")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset ds = spark.createDataFrame(l, SimpleRecord.class); + ds.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + List actual; + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + DataStreamWriter singleBatchWriter = df.writeStream() + .trigger(Trigger.Once()) + .option("checkpointLocation", writerCheckpoint.toString()) + .foreachBatch((batchDF, batchId) -> + { + batchDF.createOrReplaceGlobalTempView(tempView); + }); + + String globalTempView = "global_temp." + tempView; + Assert.assertEquals(expected.get(0), processMicroBatch(singleBatchWriter, globalTempView)); + Assert.assertEquals(expected.get(1), processMicroBatch(singleBatchWriter, globalTempView)); + Assert.assertEquals(expected.get(2), processMicroBatch(singleBatchWriter, globalTempView)); + Assert.assertEquals(expected.get(3), processMicroBatch(singleBatchWriter, globalTempView)); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @Test + public void testAllFormats() throws IOException, TimeoutException{} + + private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) + throws TimeoutException { + StreamingQuery streamingQuery = singleBatchWriter.start(); + streamingQuery.processAllAvailable(); + + return spark.sql(String.format("select * from %s", viewName)) + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + } } From 2044d945270f00d56f184384380d33b7566d23a3 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Wed, 26 May 2021 23:51:23 -0700 Subject: [PATCH 10/22] checkpoint done! --- .../spark/source/SparkMicroBatchStream.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 4a36b36128b9..8af029663de0 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -59,6 +59,7 @@ import org.apache.spark.sql.execution.streaming.HDFSMetadataLog; import org.apache.spark.sql.execution.streaming.OffsetSeq; import org.apache.spark.sql.execution.streaming.OffsetSeqLog; +import org.apache.spark.sql.execution.streaming.SerializedOffset; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -93,7 +94,6 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean localityPreferred; private final OffsetSeqLog offsetSeqLog; - private long batchId = 0L; private StreamingOffset startOffset = null; private PlannedEndOffset previousEndOffset = null; @@ -116,7 +116,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { this.splitOpenFileCost = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT)); - this.offsetSeqLog = new OffsetSeqLog(spark, new Path(checkpointLocation, "offsets").toString()); + this.offsetSeqLog = new OffsetSeqLog(spark, + new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString()); } @Override @@ -205,10 +206,14 @@ public Offset initialOffset() { // StreamingOffset offset = checkpointLog.getLatest().get()._2; if (offsetSeqLog != null && offsetSeqLog.getLatest() != null && offsetSeqLog.getLatest().isDefined()) { - batchId = (long) offsetSeqLog.getLatest().get()._1; + // batchId = (long) offsetSeqLog.getLatest().get()._1; OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2; - return JavaConverters.asJavaCollection(offsetSeq.offsets()).stream().findFirst().get().get(); + List> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets()); + Option optionalOffset = offsetSeqCol.get(0); + + startOffset = StreamingOffset.fromJson(optionalOffset.get().json()); + return startOffset; } List snapshotIds = SnapshotUtil.currentAncestors(table); @@ -228,10 +233,6 @@ public Offset deserializeOffset(String json) { @Override public void commit(Offset end) { - batchId++; - OffsetSeq offsetSeq = OffsetSeq.fill( - JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(end)).asScala().toSeq()); - this.offsetSeqLog.add(batchId, offsetSeq); } @Override From 15e33e93e075092344f998ff37a655c5e95c8983 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 15:47:05 -0700 Subject: [PATCH 11/22] checkpoint done! --- .../spark/source/SparkMicroBatchStream.java | 125 +++++++++++++----- 1 file changed, 91 insertions(+), 34 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 8af029663de0..93857ea59432 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Optional; import org.apache.hadoop.fs.Path; @@ -29,6 +28,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MicroBatches; import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.MicroBatches.MicroBatchBuilder; import org.apache.iceberg.Snapshot; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -55,19 +55,14 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; -import org.apache.spark.sql.execution.streaming.CommitLog; -import org.apache.spark.sql.execution.streaming.HDFSMetadataLog; import org.apache.spark.sql.execution.streaming.OffsetSeq; import org.apache.spark.sql.execution.streaming.OffsetSeqLog; -import org.apache.spark.sql.execution.streaming.SerializedOffset; -import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.JavaConverters; -import scala.reflect.ClassTag; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; @@ -79,14 +74,12 @@ public class SparkMicroBatchStream implements MicroBatchStream { private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); - private final SparkSession spark; private final JavaSparkContext sparkContext; private final Table table; private final boolean caseSensitive; private final Schema expectedSchema; private final List filterExpressions; private final int batchSize; - private final CaseInsensitiveStringMap options; private final String checkpointLocation; private final Long splitSize; private final Integer splitLookback; @@ -94,19 +87,17 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean localityPreferred; private final OffsetSeqLog offsetSeqLog; - private StreamingOffset startOffset = null; + private StreamingOffset initialOffset = null; private PlannedEndOffset previousEndOffset = null; SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, List filterExpressions, CaseInsensitiveStringMap options, String checkpointLocation) { this.sparkContext = sparkContext; - this.spark = spark; this.table = table; this.caseSensitive = caseSensitive; this.expectedSchema = expectedSchema; this.filterExpressions = filterExpressions; this.batchSize = Spark3Util.batchSize(table.properties(), options); - this.options = options; this.checkpointLocation = checkpointLocation; this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) @@ -122,21 +113,29 @@ public class SparkMicroBatchStream implements MicroBatchStream { @Override public Offset latestOffset() { - // lastoffset() is the first step in spark MicroBatchStream statemachine - // to control the batch size - calculate startOffset first initialOffset(); - final StreamingOffset startReadingFrom = - previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET) - ? startOffset : previousEndOffset; - final Snapshot startingSnapshot; + if (isTableEmpty()) { + return StreamingOffset.START_OFFSET; + } + + StreamingOffset microBatchStartOffset = isFirstBatch() ? initialOffset : previousEndOffset; + + if (isEndOfSnapshot(microBatchStartOffset)) { + microBatchStartOffset = getNextAvailableSnapshot(microBatchStartOffset); + } + + previousEndOffset = calculateEndOffset(microBatchStartOffset); + return previousEndOffset; + + /*final Snapshot startingSnapshot; final long startFileIndex; final boolean shouldScanAllFiles; - if (startReadingFrom instanceof PlannedEndOffset - && ((PlannedEndOffset)startReadingFrom).getMicroBatch().lastIndexOfSnapshot() - && table.currentSnapshot().snapshotId() != startReadingFrom.snapshotId()) { - Snapshot previousSnapshot = table.snapshot(startReadingFrom.snapshotId()); + if (microBatchStartOffset instanceof PlannedEndOffset + && ((PlannedEndOffset)microBatchStartOffset).getMicroBatch().lastIndexOfSnapshot() + && table.currentSnapshot().snapshotId() != microBatchStartOffset.snapshotId()) { + Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); Snapshot pointer = table.currentSnapshot(); while (pointer != null && pointer.parentId() != previousSnapshot.snapshotId()) { pointer = table.snapshot(pointer.parentId()); @@ -146,9 +145,9 @@ public Offset latestOffset() { startFileIndex = 0L; shouldScanAllFiles = false; } else { - startingSnapshot = table.snapshot(startReadingFrom.snapshotId()); - startFileIndex = startReadingFrom.position(); - shouldScanAllFiles = startReadingFrom.shouldScanAllFiles(); + startingSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); + startFileIndex = microBatchStartOffset.position(); + shouldScanAllFiles = microBatchStartOffset.shouldScanAllFiles(); } MicroBatch microBatch = MicroBatches.from(startingSnapshot, table.io()) @@ -157,16 +156,16 @@ public Offset latestOffset() { .generate(startFileIndex, batchSize, shouldScanAllFiles); previousEndOffset = new PlannedEndOffset( - microBatch.snapshotId(), microBatch.endFileIndex(), shouldScanAllFiles, startReadingFrom, microBatch); + microBatch.snapshotId(), microBatch.endFileIndex(), shouldScanAllFiles, microBatchStartOffset, microBatch); - return previousEndOffset; + return previousEndOffset;*/ } @Override public InputPartition[] planInputPartitions(Offset start, Offset end) { if (end.equals(StreamingOffset.START_OFFSET)) { // TODO: validate that this is exercised - when a stream is being resumed from a checkpoint - startOffset = (StreamingOffset) start; + initialOffset = (StreamingOffset) start; return new InputPartition[0]; } @@ -201,9 +200,11 @@ public PartitionReaderFactory createReaderFactory() { @Override public Offset initialOffset() { + if (initialOffset != null) { + return initialOffset; + } + // TODO: read snapshot from spark options - // HDFSMetadataLog checkpointLog = new HDFSMetadataLog<>(spark, checkpointLocation, null); - // StreamingOffset offset = checkpointLog.getLatest().get()._2; if (offsetSeqLog != null && offsetSeqLog.getLatest() != null && offsetSeqLog.getLatest().isDefined()) { // batchId = (long) offsetSeqLog.getLatest().get()._1; @@ -212,18 +213,18 @@ public Offset initialOffset() { List> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets()); Option optionalOffset = offsetSeqCol.get(0); - startOffset = StreamingOffset.fromJson(optionalOffset.get().json()); - return startOffset; + initialOffset = StreamingOffset.fromJson(optionalOffset.get().json()); + return initialOffset; } List snapshotIds = SnapshotUtil.currentAncestors(table); if (snapshotIds.isEmpty()) { - startOffset = StreamingOffset.START_OFFSET; + initialOffset = StreamingOffset.START_OFFSET; } else { - startOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); + initialOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); } - return startOffset; + return initialOffset; } @Override @@ -239,6 +240,62 @@ public void commit(Offset end) { public void stop() { } + private boolean isFirstBatch() { + return previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET); + } + + private boolean isTableEmpty() { + if (initialOffset == null) { + throw new IllegalStateException("isTableEmpty shouldn't be invoked without invoking initialOffset()"); + } + + return initialOffset.equals(StreamingOffset.START_OFFSET); + } + + private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStartOffset) { + if (table.currentSnapshot().snapshotId() == microBatchStartOffset.snapshotId()) { + return microBatchStartOffset; + } + + Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); + Snapshot pointer = table.currentSnapshot(); + while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { + pointer = table.snapshot(pointer.parentId()); + } + + return new StreamingOffset(pointer.snapshotId(), 0L, false); + } + + private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffset) { + MicroBatch microBatch = MicroBatches.from(table.snapshot(microBatchStartOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(microBatchStartOffset.position(), batchSize, microBatchStartOffset.shouldScanAllFiles()); + + return new PlannedEndOffset( + microBatch.snapshotId(), + microBatch.endFileIndex(), + microBatchStartOffset.shouldScanAllFiles(), + microBatchStartOffset, + microBatch); + } + + private boolean isEndOfSnapshot(StreamingOffset microBatchStartOffset) { + MicroBatchBuilder microBatchBuilder = MicroBatches.from( + table.snapshot(microBatchStartOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()); + + MicroBatch microBatchStart = microBatchBuilder.generate( + microBatchStartOffset.position(), + 1, + microBatchStartOffset.shouldScanAllFiles()); + + return microBatchStartOffset.position() == microBatchStart.startFileIndex() + && microBatchStartOffset.position() == microBatchStart.endFileIndex() + && microBatchStart.lastIndexOfSnapshot(); + } + private static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; From b8e5b34c9b2e451fd8d1f3a99ccddfcf84b502c8 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 16:52:00 -0700 Subject: [PATCH 12/22] refactor --- .../iceberg/spark/source/SparkBatchScan.java | 8 +- .../spark/source/SparkMicroBatchStream.java | 203 ++++-------------- .../source/TestStructuredStreamingRead3.java | 14 -- 3 files changed, 44 insertions(+), 181 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 62fd648f3d0a..0bc1f938518c 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -115,7 +115,7 @@ public Batch toBatch() { @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { return new SparkMicroBatchStream( - spark, sparkContext, table, caseSensitive, expectedSchema, filterExpressions, options, checkpointLocation); + spark, sparkContext, table, caseSensitive, expectedSchema, options, checkpointLocation); } @Override @@ -223,10 +223,10 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } - private static class ReaderFactory implements PartitionReaderFactory { + public static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; - private ReaderFactory(int batchSize) { + public ReaderFactory(int batchSize) { this.batchSize = batchSize; } @@ -266,7 +266,7 @@ private static class BatchReader extends BatchDataReader implements PartitionRea } } - private static class ReadTask implements InputPartition, Serializable { + public static class ReadTask implements InputPartition, Serializable { private final CombinedScanTask task; private final Broadcast
tableBroadcast; private final String expectedSchemaString; diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 93857ea59432..60e3ff408199 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -19,8 +19,6 @@ package org.apache.iceberg.spark.source; -import java.io.Serializable; -import java.util.Collection; import java.util.List; import java.util.Optional; import org.apache.hadoop.fs.Path; @@ -39,8 +37,6 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.util.PropertyUtil; @@ -49,20 +45,19 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.InputPartition; -import org.apache.spark.sql.connector.read.PartitionReader; import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.execution.streaming.OffsetSeq; import org.apache.spark.sql.execution.streaming.OffsetSeqLog; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.JavaConverters; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; @@ -78,9 +73,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final Table table; private final boolean caseSensitive; private final Schema expectedSchema; - private final List filterExpressions; private final int batchSize; - private final String checkpointLocation; private final Long splitSize; private final Integer splitLookback; private final Long splitOpenFileCost; @@ -91,14 +84,12 @@ public class SparkMicroBatchStream implements MicroBatchStream { private PlannedEndOffset previousEndOffset = null; SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, - List filterExpressions, CaseInsensitiveStringMap options, String checkpointLocation) { + CaseInsensitiveStringMap options, String checkpointLocation) { this.sparkContext = sparkContext; this.table = table; this.caseSensitive = caseSensitive; this.expectedSchema = expectedSchema; - this.filterExpressions = filterExpressions; this.batchSize = Spark3Util.batchSize(table.properties(), options); - this.checkpointLocation = checkpointLocation; this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); @@ -120,52 +111,17 @@ public Offset latestOffset() { } StreamingOffset microBatchStartOffset = isFirstBatch() ? initialOffset : previousEndOffset; - if (isEndOfSnapshot(microBatchStartOffset)) { microBatchStartOffset = getNextAvailableSnapshot(microBatchStartOffset); } previousEndOffset = calculateEndOffset(microBatchStartOffset); return previousEndOffset; - - /*final Snapshot startingSnapshot; - final long startFileIndex; - final boolean shouldScanAllFiles; - - if (microBatchStartOffset instanceof PlannedEndOffset - && ((PlannedEndOffset)microBatchStartOffset).getMicroBatch().lastIndexOfSnapshot() - && table.currentSnapshot().snapshotId() != microBatchStartOffset.snapshotId()) { - Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); - Snapshot pointer = table.currentSnapshot(); - while (pointer != null && pointer.parentId() != previousSnapshot.snapshotId()) { - pointer = table.snapshot(pointer.parentId()); - } - - startingSnapshot = pointer; - startFileIndex = 0L; - shouldScanAllFiles = false; - } else { - startingSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); - startFileIndex = microBatchStartOffset.position(); - shouldScanAllFiles = microBatchStartOffset.shouldScanAllFiles(); - } - - MicroBatch microBatch = MicroBatches.from(startingSnapshot, table.io()) - .caseSensitive(caseSensitive) - .specsById(table.specs()) - .generate(startFileIndex, batchSize, shouldScanAllFiles); - - previousEndOffset = new PlannedEndOffset( - microBatch.snapshotId(), microBatch.endFileIndex(), shouldScanAllFiles, microBatchStartOffset, microBatch); - - return previousEndOffset;*/ } @Override public InputPartition[] planInputPartitions(Offset start, Offset end) { if (end.equals(StreamingOffset.START_OFFSET)) { - // TODO: validate that this is exercised - when a stream is being resumed from a checkpoint - initialOffset = (StreamingOffset) start; return new InputPartition[0]; } @@ -173,12 +129,15 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); String expectedSchemaString = SchemaParser.toJson(expectedSchema); + Preconditions.checkState( + end instanceof PlannedEndOffset, + "The end offset passed to planInputPartitions() is not the one that is returned by lastOffset()"); PlannedEndOffset endOffset = (PlannedEndOffset) end; - // Preconditions.checkState(endOffset.getStartOffset().equals(start), "The cached MicroBatch doesn't match requested planInputPartitions"); List fileScanTasks = endOffset.getMicroBatch().tasks(); - CloseableIterable splitTasks = TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks), + CloseableIterable splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), splitSize); List combinedScanTasks = Lists.newArrayList( TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); @@ -200,26 +159,20 @@ public PartitionReaderFactory createReaderFactory() { @Override public Offset initialOffset() { - if (initialOffset != null) { + if (isInitialOffsetResolved()) { return initialOffset; } - // TODO: read snapshot from spark options - - if (offsetSeqLog != null && offsetSeqLog.getLatest() != null && offsetSeqLog.getLatest().isDefined()) { - // batchId = (long) offsetSeqLog.getLatest().get()._1; - OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2; - - List> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets()); - Option optionalOffset = offsetSeqCol.get(0); - - initialOffset = StreamingOffset.fromJson(optionalOffset.get().json()); + if (isStreamResumedFromCheckpoint()) { + initialOffset = calculateInitialOffsetFromCheckpoint(); return initialOffset; } List snapshotIds = SnapshotUtil.currentAncestors(table); if (snapshotIds.isEmpty()) { initialOffset = StreamingOffset.START_OFFSET; + Preconditions.checkState(isTableEmpty(), + "criteria behind isTableEmpty() changed."); } else { initialOffset = new StreamingOffset(Iterables.getLast(snapshotIds), 0, true); } @@ -240,14 +193,37 @@ public void commit(Offset end) { public void stop() { } + private boolean isInitialOffsetResolved() { + return initialOffset != null; + } + + private StreamingOffset calculateInitialOffsetFromCheckpoint() { + Preconditions.checkState(isStreamResumedFromCheckpoint(), + "Stream is not resumed from checkpoint."); + + OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2; + + List> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets()); + Option optionalOffset = offsetSeqCol.get(0); + + StreamingOffset checkpointedOffset = StreamingOffset.fromJson(optionalOffset.get().json()); + return checkpointedOffset; + } + + private boolean isStreamResumedFromCheckpoint() { + Preconditions.checkState(!isInitialOffsetResolved(), + "isStreamResumedFromCheckpoint() is invoked without resolving initialOffset"); + + return offsetSeqLog != null && offsetSeqLog.getLatest() != null && offsetSeqLog.getLatest().isDefined(); + } + private boolean isFirstBatch() { return previousEndOffset == null || previousEndOffset.equals(StreamingOffset.START_OFFSET); } private boolean isTableEmpty() { - if (initialOffset == null) { - throw new IllegalStateException("isTableEmpty shouldn't be invoked without invoking initialOffset()"); - } + Preconditions.checkState(isInitialOffsetResolved(), + "isTableEmpty() is invoked without resolving initialOffset"); return initialOffset.equals(StreamingOffset.START_OFFSET); } @@ -276,7 +252,6 @@ private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffse microBatch.snapshotId(), microBatch.endFileIndex(), microBatchStartOffset.shouldScanAllFiles(), - microBatchStartOffset, microBatch); } @@ -296,113 +271,15 @@ private boolean isEndOfSnapshot(StreamingOffset microBatchStartOffset) { && microBatchStart.lastIndexOfSnapshot(); } - private static class ReaderFactory implements PartitionReaderFactory { - private final int batchSize; - - private ReaderFactory(int batchSize) { - this.batchSize = batchSize; - } - - @Override - public PartitionReader createReader(InputPartition partition) { - if (partition instanceof ReadTask) { - return new RowReader((ReadTask) partition); - } else { - throw new UnsupportedOperationException("Incorrect input partition type: " + partition); - } - } - - @Override - public PartitionReader createColumnarReader(InputPartition partition) { - if (partition instanceof ReadTask) { - return new BatchReader((ReadTask) partition, batchSize); - } else { - throw new UnsupportedOperationException("Incorrect input partition type: " + partition); - } - } - - @Override - public boolean supportColumnarReads(InputPartition partition) { - return batchSize > 1; - } - } - - private static class RowReader extends RowDataReader implements PartitionReader { - RowReader(ReadTask task) { - super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); - } - } - - private static class BatchReader extends BatchDataReader implements PartitionReader { - BatchReader(ReadTask task, int batchSize) { - super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); - } - } - - private static class ReadTask implements InputPartition, Serializable { - private final CombinedScanTask task; - private final Broadcast
tableBroadcast; - private final String expectedSchemaString; - private final boolean caseSensitive; - - private transient Schema expectedSchema = null; - private transient String[] preferredLocations = null; - - ReadTask(CombinedScanTask task, Broadcast
tableBroadcast, String expectedSchemaString, - boolean caseSensitive, boolean localityPreferred) { - this.task = task; - this.tableBroadcast = tableBroadcast; - this.expectedSchemaString = expectedSchemaString; - this.caseSensitive = caseSensitive; - if (localityPreferred) { - Table table = tableBroadcast.value(); - this.preferredLocations = Util.blockLocations(table.io(), task); - } else { - this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE; - } - } - - @Override - public String[] preferredLocations() { - return preferredLocations; - } - - public Collection files() { - return task.files(); - } - - public Table table() { - return tableBroadcast.value(); - } - - public boolean isCaseSensitive() { - return caseSensitive; - } - - private Schema expectedSchema() { - if (expectedSchema == null) { - this.expectedSchema = SchemaParser.fromJson(expectedSchemaString); - } - return expectedSchema; - } - } - private static class PlannedEndOffset extends StreamingOffset { - private final StreamingOffset startOffset; private final MicroBatch microBatch; - PlannedEndOffset(long snapshotId, long position, boolean scanAllFiles, StreamingOffset startOffset, MicroBatch microBatch) { + PlannedEndOffset(long snapshotId, long position, boolean scanAllFiles, MicroBatch microBatch) { super(snapshotId, position, scanAllFiles); - - this.startOffset = startOffset; this.microBatch = microBatch; } - public StreamingOffset getStartOffset() { - return startOffset; - } - public MicroBatch getMicroBatch() { return microBatch; } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 9deb41a8a0c6..4da55db1ed44 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -21,19 +21,10 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.List; -import java.util.Optional; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AssertHelpers; -import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -41,13 +32,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; - -import org.apache.spark.sql.execution.streaming.MemoryStream; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; @@ -59,8 +47,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; -import scala.Option; -import scala.collection.JavaConversions; import static org.apache.iceberg.types.Types.NestedField.optional; From 633afbb5553a2dd5cb5a1ea87558b6999e276430 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 17:12:21 -0700 Subject: [PATCH 13/22] test batchSize option --- .../source/TestStructuredStreamingRead3.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 4da55db1ed44..0eab2fb68089 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -28,9 +29,11 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -202,6 +205,71 @@ public void testGetAllInsertsAcrossSparkCheckpoints() throws IOException, Timeou } } + @SuppressWarnings("unchecked") + @Test + public void testBatchSizeOption() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + File writerCheckpoint = new File(parent, "writer-checkpoint"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + final String tempView = "microBatchSingleRecordView"; + + // produce unique file per record - to test BatchSize=1 + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")), + Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")), + Lists.newArrayList( + new SimpleRecord(8, "eight"), + new SimpleRecord(9, "nine")) + ); + + // generate multiple snapshots - each snapshot with multiple files + for (List l : expected) { + Dataset ds = spark.createDataFrame(l, SimpleRecord.class); + ds.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .option(SparkReadOptions.VECTORIZATION_BATCH_SIZE, 1) + .load(location.toString()); + DataStreamWriter singleBatchWriter = df.writeStream() + .trigger(Trigger.Once()) + .option("checkpointLocation", writerCheckpoint.toString()) + .foreachBatch((batchDF, batchId) -> + { + batchDF.createOrReplaceGlobalTempView(tempView); + }); + + String globalTempView = "global_temp." + tempView; + for (SimpleRecord simpleRecord: + expected.stream().flatMap(List::stream).collect(Collectors.toList())) { + Assert.assertEquals(Collections.singletonList(simpleRecord), processMicroBatch(singleBatchWriter, globalTempView)); + } + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + @Test public void testAllFormats() throws IOException, TimeoutException{} From 16d398464918e9e402b0cc2ebe87682d4b0db03b Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 17:22:46 -0700 Subject: [PATCH 14/22] refactor --- .../iceberg/spark/source/SparkMicroBatchStream.java | 10 ++++++++-- .../spark/source/TestStructuredStreamingRead3.java | 1 - 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 60e3ff408199..6b5b11663171 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -98,8 +98,9 @@ public class SparkMicroBatchStream implements MicroBatchStream { this.splitOpenFileCost = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT)); - this.offsetSeqLog = new OffsetSeqLog(spark, - new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString()); + this.offsetSeqLog = checkpointLocation != null + ? new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) + : null; } @Override @@ -187,12 +188,17 @@ public Offset deserializeOffset(String json) { @Override public void commit(Offset end) { + int i=0; } @Override public void stop() { } + private String getOffsetLogLocation(String checkpointLocation) { + return new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString(); + } + private boolean isInitialOffsetResolved() { return initialOffset != null; } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 0eab2fb68089..d93cc6aa3c3b 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -29,7 +29,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; From 919e386125eab3e4dcd5f5c09cfd97d935af32bb Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 18:56:26 -0700 Subject: [PATCH 15/22] checkstyle --- .../iceberg/spark/source/SparkBatchScan.java | 3 +- .../spark/source/SparkMicroBatchStream.java | 36 +++++++++---------- .../spark/source/SparkScanBuilder.java | 2 +- 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 0bc1f938518c..f5b5b5d41777 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -53,7 +53,6 @@ import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsReportStatistics; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; -import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -226,7 +225,7 @@ public String description() { public static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; - public ReaderFactory(int batchSize) { + ReaderFactory(int batchSize) { this.batchSize = batchSize; } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 6b5b11663171..6cae241b49c9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -24,21 +24,22 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.MicroBatches; import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.MicroBatches.MicroBatchBuilder; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SerializableTable; -import org.apache.iceberg.Table; -import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.Table; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; @@ -56,8 +57,6 @@ import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.JavaConverters; -import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; -import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; @@ -83,7 +82,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { private StreamingOffset initialOffset = null; private PlannedEndOffset previousEndOffset = null; - SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, + SparkMicroBatchStream(SparkSession spark, JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) { this.sparkContext = sparkContext; this.table = table; @@ -95,12 +95,13 @@ public class SparkMicroBatchStream implements MicroBatchStream { .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); - this.splitOpenFileCost = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + this.splitOpenFileCost = Optional.ofNullable( + Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT)); - this.offsetSeqLog = checkpointLocation != null - ? new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) - : null; + this.offsetSeqLog = checkpointLocation != null ? + new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) : + null; } @Override @@ -188,7 +189,6 @@ public Offset deserializeOffset(String json) { @Override public void commit(Offset end) { - int i=0; } @Override @@ -250,9 +250,9 @@ private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStart private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffset) { MicroBatch microBatch = MicroBatches.from(table.snapshot(microBatchStartOffset.snapshotId()), table.io()) - .caseSensitive(caseSensitive) - .specsById(table.specs()) - .generate(microBatchStartOffset.position(), batchSize, microBatchStartOffset.shouldScanAllFiles()); + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(microBatchStartOffset.position(), batchSize, microBatchStartOffset.shouldScanAllFiles()); return new PlannedEndOffset( microBatch.snapshotId(), @@ -272,9 +272,9 @@ private boolean isEndOfSnapshot(StreamingOffset microBatchStartOffset) { 1, microBatchStartOffset.shouldScanAllFiles()); - return microBatchStartOffset.position() == microBatchStart.startFileIndex() - && microBatchStartOffset.position() == microBatchStart.endFileIndex() - && microBatchStart.lastIndexOfSnapshot(); + return microBatchStartOffset.position() == microBatchStart.startFileIndex() && + microBatchStartOffset.position() == microBatchStart.endFileIndex() && + microBatchStart.lastIndexOfSnapshot(); } private static class PlannedEndOffset extends StreamingOffset { diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index f47ac55579e9..bff65b4cbb66 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -159,7 +159,7 @@ private Schema schemaWithMetadataColumns() { @Override public Scan build() { - return new SparkBatchQueryScan( + return new SparkBatchQueryScan( spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); } From e3fb1fe32c712318ed898fae237da89fd20d9de6 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 19:05:36 -0700 Subject: [PATCH 16/22] checkstyle --- .../spark/source/SparkMicroBatchStream.java | 12 ++++++------ .../source/TestStructuredStreamingRead3.java | 18 +++++++++--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 6cae241b49c9..70a976628757 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -24,22 +24,22 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.MicroBatches; import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.MicroBatches.MicroBatchBuilder; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; -import org.apache.iceberg.Table; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index d93cc6aa3c3b..73963ce8c83b 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -30,7 +30,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.types.Types; @@ -135,7 +134,7 @@ public void testGetAllInsertsAcrossIcebergSnapshots() throws IOException, Timeou actual); } finally { for (StreamingQuery query : spark.streams().active()) { - query.stop(); + query.stop(); } } } @@ -187,8 +186,7 @@ public void testGetAllInsertsAcrossSparkCheckpoints() throws IOException, Timeou DataStreamWriter singleBatchWriter = df.writeStream() .trigger(Trigger.Once()) .option("checkpointLocation", writerCheckpoint.toString()) - .foreachBatch((batchDF, batchId) -> - { + .foreachBatch((batchDF, batchId) -> { batchDF.createOrReplaceGlobalTempView(tempView); }); @@ -252,15 +250,16 @@ public void testBatchSizeOption() throws IOException, TimeoutException { DataStreamWriter singleBatchWriter = df.writeStream() .trigger(Trigger.Once()) .option("checkpointLocation", writerCheckpoint.toString()) - .foreachBatch((batchDF, batchId) -> - { + .foreachBatch((batchDF, batchId) -> { batchDF.createOrReplaceGlobalTempView(tempView); }); String globalTempView = "global_temp." + tempView; - for (SimpleRecord simpleRecord: + for (SimpleRecord simpleRecord : expected.stream().flatMap(List::stream).collect(Collectors.toList())) { - Assert.assertEquals(Collections.singletonList(simpleRecord), processMicroBatch(singleBatchWriter, globalTempView)); + Assert.assertEquals( + Collections.singletonList(simpleRecord), + processMicroBatch(singleBatchWriter, globalTempView)); } } finally { for (StreamingQuery query : spark.streams().active()) { @@ -270,7 +269,8 @@ public void testBatchSizeOption() throws IOException, TimeoutException { } @Test - public void testAllFormats() throws IOException, TimeoutException{} + public void testAllFormats() throws IOException, TimeoutException{ + } private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) throws TimeoutException { From bee16901503f046a5b7c80417c9075324974652d Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 19:07:10 -0700 Subject: [PATCH 17/22] fix indent --- .../java/org/apache/iceberg/spark/source/SparkScanBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index bff65b4cbb66..633a17143f52 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -160,7 +160,7 @@ private Schema schemaWithMetadataColumns() { @Override public Scan build() { return new SparkBatchQueryScan( - spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); + spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); } public Scan buildMergeScan() { From daee48ad61fd0683654e78dd673452324806c1c3 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Fri, 28 May 2021 22:52:54 -0700 Subject: [PATCH 18/22] unit test - full coverage --- .../spark/source/SparkMicroBatchStream.java | 3 +- .../source/TestStructuredStreamingRead3.java | 108 +++++++++++++++++- 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 70a976628757..f2c13f5460bc 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -156,7 +156,8 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { @Override public PartitionReaderFactory createReaderFactory() { - return new ReaderFactory(batchSize); + int batchSizeValueToDisableColumnarReads = 0; + return new ReaderFactory(batchSizeValueToDisableColumnarReads); } @Override diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 73963ce8c83b..bd054d6f6052 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -177,7 +178,6 @@ public void testGetAllInsertsAcrossSparkCheckpoints() throws IOException, Timeou } table.refresh(); - List actual; try { Dataset df = spark.readStream() @@ -268,8 +268,112 @@ public void testBatchSizeOption() throws IOException, TimeoutException { } } + @SuppressWarnings("unchecked") @Test - public void testAllFormats() throws IOException, TimeoutException{ + public void testParquetOrcAvroDataInOneTable() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List parquetFileRecords = Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")); + + List orcFileRecords = Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")); + + List avroFileRecords = Lists.newArrayList( + new SimpleRecord(6, "six"), + new SimpleRecord(7, "seven")); + + // generate multiple snapshots + Dataset df = spark.createDataFrame(parquetFileRecords, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", "parquet") + .mode("append") + .save(location.toString()); + + df = spark.createDataFrame(orcFileRecords, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", "orc") + .mode("append") + .save(location.toString()); + + df = spark.createDataFrame(avroFileRecords, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", "avro") + .mode("append") + .save(location.toString()); + + table.refresh(); + List actual; + + try { + Dataset ds = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = ds.writeStream() + .format("memory") + .queryName("test12") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + actual = spark.sql("select * from test12") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals(Stream.concat(Stream.concat(parquetFileRecords.stream(), orcFileRecords.stream()), + avroFileRecords.stream()) + .collect(Collectors.toList()), + actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamFromEmtpyTable() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + table.refresh(); + List actual; + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testemptytable") + .outputMode(OutputMode.Append()) + .start(); + streamingQuery.processAllAvailable(); + actual = spark.sql("select * from testemptytable") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals(Collections.emptyList(), actual); + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } } private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) From 0a6561781c4a59700707db5974cc6d358a6f7fb4 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Sat, 29 May 2021 00:20:27 -0700 Subject: [PATCH 19/22] add logic for ignoring deletes and replace --- .../spark/source/SparkMicroBatchStream.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index f2c13f5460bc..75b106a51778 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -23,6 +23,7 @@ import java.util.Optional; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MicroBatches; import org.apache.iceberg.MicroBatches.MicroBatch; @@ -243,12 +244,26 @@ private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStart Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); Snapshot pointer = table.currentSnapshot(); while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { + Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), + "Encountered Snapshot DataOperation other than APPEND, REWRITE and DELETE."); + pointer = table.snapshot(pointer.parentId()); + while (pointer != null && isIgnorableStreamOperation(pointer)) { + pointer = table.snapshot(pointer.parentId()); + } } + Preconditions.checkState(pointer != null, + "snapshot on which the stream operated has been garbage collected."); + return new StreamingOffset(pointer.snapshotId(), 0L, false); } + private boolean isIgnorableStreamOperation(Snapshot snapshot) { + return snapshot.operation().equals(DataOperations.DELETE) || + snapshot.operation().equals(DataOperations.REPLACE); + } + private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffset) { MicroBatch microBatch = MicroBatches.from(table.snapshot(microBatchStartOffset.snapshotId()), table.io()) .caseSensitive(caseSensitive) From f9e9e6623a58a97554baeb15cc9b3eb1e2cd90ee Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Mon, 31 May 2021 23:15:56 -0700 Subject: [PATCH 20/22] minor refactor --- .../iceberg/spark/source/TestStructuredStreamingRead3.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index bd054d6f6052..871a138ff179 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -204,7 +204,7 @@ public void testGetAllInsertsAcrossSparkCheckpoints() throws IOException, Timeou @SuppressWarnings("unchecked") @Test - public void testBatchSizeOption() throws IOException, TimeoutException { + public void validateWhenBatchSizeEquals1ThenOneFileAtATimeIsStreamed() throws IOException, TimeoutException { File parent = temp.newFolder("parent"); File location = new File(parent, "test-table"); File writerCheckpoint = new File(parent, "writer-checkpoint"); @@ -343,7 +343,7 @@ public void testParquetOrcAvroDataInOneTable() throws IOException, TimeoutExcept @SuppressWarnings("unchecked") @Test - public void testReadStreamFromEmtpyTable() throws IOException, TimeoutException { + public void testReadStreamFromEmptyTable() throws IOException, TimeoutException { File parent = temp.newFolder("parent"); File location = new File(parent, "test-table"); From 67e2d270ce2f5e0e38292b720ea20279eb5119ae Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Tue, 1 Jun 2021 21:37:58 -0700 Subject: [PATCH 21/22] minor refactor --- .../spark/source/SparkMicroBatchStream.java | 61 ++++++++++++++----- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 75b106a51778..22fb55e69a93 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -78,7 +78,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final Integer splitLookback; private final Long splitOpenFileCost; private final boolean localityPreferred; - private final OffsetSeqLog offsetSeqLog; + private final OffsetLog offsetLog; private StreamingOffset initialOffset = null; private PlannedEndOffset previousEndOffset = null; @@ -100,9 +100,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT)); - this.offsetSeqLog = checkpointLocation != null ? - new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) : - null; + this.offsetLog = OffsetLog.getInstance(spark, checkpointLocation); } @Override @@ -197,10 +195,6 @@ public void commit(Offset end) { public void stop() { } - private String getOffsetLogLocation(String checkpointLocation) { - return new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString(); - } - private boolean isInitialOffsetResolved() { return initialOffset != null; } @@ -209,20 +203,14 @@ private StreamingOffset calculateInitialOffsetFromCheckpoint() { Preconditions.checkState(isStreamResumedFromCheckpoint(), "Stream is not resumed from checkpoint."); - OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2; - - List> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets()); - Option optionalOffset = offsetSeqCol.get(0); - - StreamingOffset checkpointedOffset = StreamingOffset.fromJson(optionalOffset.get().json()); - return checkpointedOffset; + return offsetLog.getLatest(); } private boolean isStreamResumedFromCheckpoint() { Preconditions.checkState(!isInitialOffsetResolved(), "isStreamResumedFromCheckpoint() is invoked without resolving initialOffset"); - return offsetSeqLog != null && offsetSeqLog.getLatest() != null && offsetSeqLog.getLatest().isDefined(); + return offsetLog.isOffsetLogInitialized(); } private boolean isFirstBatch() { @@ -306,4 +294,45 @@ public MicroBatch getMicroBatch() { return microBatch; } } + + interface OffsetLog { + static OffsetLog getInstance(SparkSession spark, String checkpointLocation) { + return new OffsetLogImpl(spark, checkpointLocation); + } + + boolean isOffsetLogInitialized(); + + StreamingOffset getLatest(); + } + + private static class OffsetLogImpl implements OffsetLog { + private final OffsetSeqLog offsetSeqLog; + + OffsetLogImpl(SparkSession spark, String checkpointLocation) { + this.offsetSeqLog = checkpointLocation != null ? + new OffsetSeqLog(spark, getOffsetLogLocation(checkpointLocation)) : + null; + } + + @Override + public boolean isOffsetLogInitialized() { + return offsetSeqLog != null && + offsetSeqLog.getLatest() != null && + offsetSeqLog.getLatest().isDefined(); + } + + @Override + public StreamingOffset getLatest() { + OffsetSeq offsetSeq = offsetSeqLog.getLatest().get()._2; + + List> offsetSeqCol = JavaConverters.seqAsJavaList(offsetSeq.offsets()); + Option optionalOffset = offsetSeqCol.get(0); + + return StreamingOffset.fromJson(optionalOffset.get().json()); + } + + private String getOffsetLogLocation(String checkpointLocation) { + return new Path(checkpointLocation.replace("/sources/0", ""), "offsets").toString(); + } + } } From 072c911a46869b27ba4cab44c862f68399b2b5e3 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Tue, 1 Jun 2021 23:22:12 -0700 Subject: [PATCH 22/22] remove ignoreDelete and ignoreReplace. --- .../spark/source/SparkMicroBatchStream.java | 10 +- .../source/TestStructuredStreamingRead3.java | 214 ++++++++++++++++++ 2 files changed, 215 insertions(+), 9 deletions(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 22fb55e69a93..82da87d242d9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -233,12 +233,9 @@ private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStart Snapshot pointer = table.currentSnapshot(); while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), - "Encountered Snapshot DataOperation other than APPEND, REWRITE and DELETE."); + "Encountered Snapshot DataOperation other than APPEND."); pointer = table.snapshot(pointer.parentId()); - while (pointer != null && isIgnorableStreamOperation(pointer)) { - pointer = table.snapshot(pointer.parentId()); - } } Preconditions.checkState(pointer != null, @@ -247,11 +244,6 @@ private StreamingOffset getNextAvailableSnapshot(StreamingOffset microBatchStart return new StreamingOffset(pointer.snapshotId(), 0L, false); } - private boolean isIgnorableStreamOperation(Snapshot snapshot) { - return snapshot.operation().equals(DataOperations.DELETE) || - snapshot.operation().equals(DataOperations.REPLACE); - } - private PlannedEndOffset calculateEndOffset(StreamingOffset microBatchStartOffset) { MicroBatch microBatch = MicroBatches.from(table.snapshot(microBatchStartOffset.snapshotId()), table.io()) .caseSensitive(caseSensitive) diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 871a138ff179..487c7afce2f1 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -27,9 +27,20 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadOptions; @@ -41,6 +52,7 @@ import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.Trigger; import org.junit.AfterClass; import org.junit.Assert; @@ -376,6 +388,208 @@ public void testReadStreamFromEmptyTable() throws IOException, TimeoutException } } + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id", 3).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithoverwrites") + .outputMode(OutputMode.Append()) + .start(); + + try { + streamingQuery.processAllAvailable(); + Assert.assertTrue(false); // should be unreachable + } catch (Exception exception) { + Assert.assertTrue(exception instanceof StreamingQueryException); + Assert.assertTrue(((StreamingQueryException) exception).cause() instanceof IllegalStateException); + } + + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four"), + new SimpleRecord(5, "five")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + // this should create a snapshot with type Replace. + table.rewriteManifests() + .clusterBy(f -> 1) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation()); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithreplace") + .outputMode(OutputMode.Append()) + .start(); + + try { + streamingQuery.processAllAvailable(); + Assert.assertTrue(false); // should be unreachable + } catch (Exception exception) { + Assert.assertTrue(exception instanceof StreamingQueryException); + Assert.assertTrue(((StreamingQueryException) exception).cause() instanceof IllegalStateException); + } + + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws IOException, TimeoutException { + File parent = temp.newFolder("parent"); + File location = new File(parent, "test-table"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List> expected = Lists.newArrayList( + Lists.newArrayList( + new SimpleRecord(1, "one"), + new SimpleRecord(2, "two"), + new SimpleRecord(3, "three")), + Lists.newArrayList( + new SimpleRecord(4, "four")) + ); + + // generate multiple snapshots + for (List l : expected) { + Dataset df = spark.createDataFrame(l, SimpleRecord.class); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + } + + table.refresh(); + + // this should create a snapshot with type delete. + table.newDelete() + .deleteFromRowFilter(Expressions.equal("id", 4)) + .commit(); + + // check pre-condition + Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); + + try { + Dataset df = spark.readStream() + .format("iceberg") + .load(location.toString()); + StreamingQuery streamingQuery = df.writeStream() + .format("memory") + .queryName("testtablewithdelete") + .outputMode(OutputMode.Append()) + .start(); + + try { + streamingQuery.processAllAvailable(); + Assert.assertTrue(false); // should be unreachable + } catch (Exception exception) { + Assert.assertTrue(exception instanceof StreamingQueryException); + Assert.assertTrue(((StreamingQueryException) exception).cause() instanceof IllegalStateException); + } + + } finally { + for (StreamingQuery query : spark.streams().active()) { + query.stop(); + } + } + } + private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) throws TimeoutException { StreamingQuery streamingQuery = singleBatchWriter.start();