diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index d425bcc1256f6..0940b7eccfbb4 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -536,6 +536,11 @@ Here are the details of all the sources in Spark.
href="api/R/read.stream.html">R).
E.g. for "parquet" format options see DataStreamReader.parquet().
+ ignoreFileStreamSinkMetadata: whether to ignore metadata information being left from file stream sink, which leads to always use in-memory file index. (default: false)
+
+ This option is useful when metadata grows too big and reading metadata is even slower than listing files from filesystem.
+ NOTE: This option must be set to "true" if file source is reading from output files which file stream sink is written, with setting "retainOnlyLastBatchInMetadata" option to "true".
+
In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section.
Yes |
@@ -1812,6 +1817,12 @@ Here are the details of all the sinks in Spark.
(Scala/Java/Python/R).
E.g. for "parquet" format options see DataFrameWriter.parquet()
+
+ retainOnlyLastBatchInMetadata: whether to retain metadata information only for last succeed batch.
+
+ This option greatly reduces overhead on compacting metadata files which would be non-trivial when query processes lots of files in each batch.
+ NOTE: As it only retains the last batch in metadata, the metadata is not readable from file source: you must set "ignoreFileStreamSinkMetadata" option
+ to "true" when reading sink's output files from another query, regardless of batch and streaming source.
Yes (exactly-once) |
Supports writes to partitioned tables. Partitioning by time may be useful. |
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 10dae8a55b47a..239a265bef369 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -344,8 +344,9 @@ case class DataSource(
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
- if FileStreamSink.hasMetadata(
- caseInsensitiveOptions.get("path").toSeq ++ paths,
+ if !caseInsensitiveOptions.getOrElse(
+ "ignoreFileStreamSinkMetadata", "false").toBoolean &&
+ FileStreamSink.hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 1d57cb084df9e..95ae7caae6b05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -74,6 +74,12 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
+ /**
+ * Whether to ignore FileStreamSink metadata in source, which leads to use in-memory file index.
+ */
+ val ignoreFileStreamSinkMetadata: Boolean = withBooleanParameter("ignoreFileStreamSinkMetadata",
+ default = false)
+
private def withBooleanParameter(name: String, default: Boolean) = {
parameters.get(name).map { str =>
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index b3d12f67b5d63..daba013fcda86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -97,6 +97,8 @@ class FileStreamSink(
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ private val retainOnlyLastBatchInMetadata: Boolean =
+ options.getOrElse("retainOnlyLastBatchInMetadata", "false").toBoolean
private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
@@ -114,7 +116,7 @@ class FileStreamSink(
committer match {
case manifestCommitter: ManifestFileCommitProtocol =>
- manifestCommitter.setupManifestOptions(fileLog, batchId)
+ manifestCommitter.setupManifestOptions(fileLog, batchId, retainOnlyLastBatchInMetadata)
case _ => // Do nothing
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 103fa7ce9066d..c43c09b6602e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -184,8 +184,14 @@ class FileStreamSource(
* Some(true) means we know for sure the source DOES have metadata
* Some(false) means we know for sure the source DOSE NOT have metadata
*/
- @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
- if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
+ @volatile private[sql] var sourceHasMetadata: Option[Boolean] = {
+ if (sourceOptions.ignoreFileStreamSinkMetadata ||
+ SparkHadoopUtil.get.isGlobPath(new Path(path))) {
+ Some(false)
+ } else {
+ None
+ }
+ }
private def allFilesUsingInMemoryFileIndex() = {
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualifiedBasePath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 92191c8b64b72..2132dd3687746 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -42,14 +42,19 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
@transient private var fileLog: FileStreamSinkLog = _
private var batchId: Long = _
+ private var retainOnlyLastBatch: Boolean = _
/**
* Sets up the manifest log output and the batch id for this job.
* Must be called before any other function.
*/
- def setupManifestOptions(fileLog: FileStreamSinkLog, batchId: Long): Unit = {
+ def setupManifestOptions(
+ fileLog: FileStreamSinkLog,
+ batchId: Long,
+ retainOnlyLastBatch: Boolean): Unit = {
this.fileLog = fileLog
this.batchId = batchId
+ this.retainOnlyLastBatch = retainOnlyLastBatch
}
override def setupJob(jobContext: JobContext): Unit = {
@@ -63,6 +68,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
+ if (retainOnlyLastBatch) {
+ // purge older than batchId, which always keep only one batch in file log
+ fileLog.purge(batchId)
+ }
} else {
throw new IllegalStateException(s"Race while writing batch $batchId")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index ed53def556cb8..18f63fd3acda6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -190,6 +190,75 @@ class FileStreamSinkSuite extends StreamTest {
}
}
+ test("SPARK-24295 retain only last batch for file log metadata") {
+ val inputData = MemoryStream[Long]
+ val inputDF = inputData.toDF.toDF("time")
+ val outputDf = inputDF
+ .selectExpr("CAST(time AS timestamp) AS timestamp")
+
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+ var query: StreamingQuery = null
+
+ try {
+ query =
+ outputDf.writeStream
+ .option("checkpointLocation", checkpointDir)
+ .option("retainOnlyLastBatchInMetadata", true)
+ .format("parquet")
+ .start(outputDir)
+
+ def addTimestamp(timestampInSecs: Int*): Unit = {
+ inputData.addData(timestampInSecs.map(_ * 1L): _*)
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+ }
+
+ def check(expectedResult: Long*): Unit = {
+ val outputDf = spark.read
+ // This option must be provided when we enable 'retainOnlyLastBatchInFileLog'
+ // to purge metadata from FileStreamSink, otherwise query will fail while loading
+ // due to incomplete of metadata.
+ .option("ignoreFileStreamSinkMetadata", "true")
+ .parquet(outputDir)
+ .selectExpr("timestamp")
+ .sort("timestamp")
+ checkDataset(outputDf.as[Long], expectedResult: _*)
+ }
+
+ val logPath = new Path(outputDir, FileStreamSink.metadataDir)
+ val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toUri.toString)
+
+ addTimestamp(100)
+ check(100)
+
+ // only new batch is retained, hence length should be 1
+ assert(fileLog.get(None, None).length == 1)
+ assert(fileLog.get(None, None).head._1 === 0)
+
+ addTimestamp(104, 123)
+ check(100, 104, 123)
+
+ // only new batch is retained, hence length should be 1
+ assert(fileLog.get(None, None).length === 1)
+ assert(fileLog.get(None, None).head._1 === 1)
+
+ addTimestamp(140)
+ check(100, 104, 123, 140)
+
+ // only new batch is retained, hence length should be 1
+ assert(fileLog.get(None, None).length === 1)
+ assert(fileLog.get(None, None).head._1 === 2)
+
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
+ }
+ }
+
test("partitioned writing and batch reading with 'basePath'") {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 9235c6d7c896f..a6de2a7aade18 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -908,6 +908,56 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("ignore metadata when reading data from outputs of another streaming query") {
+ withTempDirs { case (outputDir, checkpointDir) =>
+ // q1 is a streaming query that reads from memory and writes to text files
+ val q1Source = MemoryStream[String]
+ val q1 =
+ q1Source
+ .toDF()
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("text")
+ .start(outputDir.getCanonicalPath)
+
+ // q2 is a streaming query that reads q1's text outputs
+ // even q1 is supposed to store metadata in output location, we intend to ignore it
+ val q2 =
+ createFileStream("text", outputDir.getCanonicalPath,
+ options = Map("ignoreFileStreamSinkMetadata" -> "true"))
+ .filter($"value" contains "keep")
+
+ def q1AddData(data: String*): StreamAction =
+ Execute { _ =>
+ q1Source.addData(data)
+ q1.processAllAvailable()
+ }
+ def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
+
+ testStream(q2)(
+ // batch 0
+ q1AddData("drop1", "keep2"),
+ q2ProcessAllAvailable(),
+ CheckAnswer("keep2"),
+
+ // batch 1
+ Assert {
+ // create a text file that won't be on q1's sink log
+ // given we are ignoring sink metadata, the content should appear in q2's answer
+ val shouldNotKeep = new File(outputDir, "keep.txt")
+ stringToFile(shouldNotKeep, "keep")
+ shouldNotKeep.exists()
+ },
+ q1AddData("keep3"),
+ q2ProcessAllAvailable(),
+ // here we should see "keep", whereas with metadata index, it should not appear
+ CheckAnswer("keep", "keep2", "keep3"),
+
+ Execute { _ => q1.stop() }
+ )
+ }
+ }
+
test("start before another streaming query, and read its output") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files