From 7f151d953edb61b1d4a9fb7e35e3035f44a9de4a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 21 Dec 2020 17:54:42 -0800 Subject: [PATCH 1/2] Add latest offset. --- .../sql/kafka010/KafkaMicroBatchStream.scala | 8 +++++- .../spark/sql/kafka010/KafkaSource.scala | 10 ++++++++ .../streaming/SupportsAdmissionControl.java | 8 ++++++ .../streaming/MicroBatchExecution.scala | 25 +++++++++++++------ .../streaming/ProgressReporter.scala | 13 ++++++++-- .../execution/streaming/StreamExecution.scala | 9 +++++++ .../continuous/ContinuousExecution.scala | 2 +- .../apache/spark/sql/streaming/progress.scala | 3 +++ ...StreamingQueryStatusAndProgressSuite.scala | 4 +++ 9 files changed, 71 insertions(+), 11 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index c25b8b4e510a0..ff79d02bc4131 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -64,6 +64,8 @@ private[kafka010] class KafkaMicroBatchStream( private var endPartitionOffsets: KafkaSourceOffset = _ + private var latestPartitionOffsets: PartitionOffsetMap = _ + /** * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only * called in StreamExecutionThread. Otherwise, interrupting a thread while running @@ -77,6 +79,10 @@ private[kafka010] class KafkaMicroBatchStream( maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit) } + override def reportLatestOffset(): Offset = { + KafkaSourceOffset(latestPartitionOffsets) + } + override def latestOffset(): Offset = { throw new UnsupportedOperationException( "latestOffset(Offset, ReadLimit) should be called instead of this method") @@ -84,7 +90,7 @@ private[kafka010] class KafkaMicroBatchStream( override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets - val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) + latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) endPartitionOffsets = KafkaSourceOffset(readLimit match { case rows: ReadMaxRows => rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 71ccb5f952f0a..a91bc296acce8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -115,8 +115,13 @@ private[kafka010] class KafkaSource( maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit) } + // The offsets for each topic-partition currently read to process. Note this maybe not necessarily + // to be latest offsets because we possibly apply a read limit. private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None + // The latest offsets for each topic-partition. + private var latestPartitionOffsets: Option[Map[TopicPartition, Long]] = None + private val converter = new KafkaRecordToRowConverter() override def schema: StructType = KafkaRecordToRowConverter.kafkaSchema(includeHeaders) @@ -127,6 +132,10 @@ private[kafka010] class KafkaSource( "latestOffset(Offset, ReadLimit) should be called instead of this method") } + override def reportLatestOffset(): streaming.Offset = { + latestPartitionOffsets.map(KafkaSourceOffset(_)).getOrElse(null) + } + override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { // Make sure initialPartitionOffsets is initialized initialPartitionOffsets @@ -145,6 +154,7 @@ private[kafka010] class KafkaSource( } currentPartitionOffsets = Some(offsets) + latestPartitionOffsets = Some(latest) logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") KafkaSourceOffset(offsets) } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java index 027763ce6fcdf..c808b9a3066b0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java @@ -53,4 +53,12 @@ public interface SupportsAdmissionControl extends SparkDataStream { * for the very first micro-batch. The source can return `null` if there is no data to process. */ Offset latestOffset(Offset startOffset, ReadLimit limit); + + /** + * Returns the most recent offset available. + * + * The source can return `null`, if there is no data to process or the source does not support + * to this method. + */ + default Offset reportLatestOffset() { return null; } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index c485d0f7d8b2d..a9cb345c4a06e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -212,7 +212,10 @@ class MicroBatchExecution( } // Record the trigger offset range for progress reporting *before* processing the batch - recordTriggerOffsets(from = committedOffsets, to = availableOffsets) + recordTriggerOffsets( + from = committedOffsets, + to = availableOffsets, + latest = latestOffsets) // Remember whether the current batch has data or not. This will be required later // for bookkeeping after running the batch, when `isNewDataAvailable` will have changed @@ -379,7 +382,7 @@ class MicroBatchExecution( if (isCurrentBatchConstructed) return true // Generate a map from each unique source to the next available offset. - val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map { + val (nextOffsets, recentOffsets) = uniqueSources.toSeq.map { case (s: SupportsAdmissionControl, limit) => updateStatusMessage(s"Getting offsets from $s") reportTimeTaken("latestOffset") { @@ -391,23 +394,31 @@ class MicroBatchExecution( startOffsetOpt.map(offset => v2.deserializeOffset(offset.json)) .getOrElse(v2.initialOffset()) } - (s, Option(s.latestOffset(startOffset, limit))) + val next = s.latestOffset(startOffset, limit) + val latest = s.reportLatestOffset() + ((s, Option(next)), (s, Option(latest))) } case (s: Source, _) => updateStatusMessage(s"Getting offsets from $s") reportTimeTaken("getOffset") { - (s, s.getOffset) + val offset = s.getOffset + ((s, offset), (s, offset)) } case (s: MicroBatchStream, _) => updateStatusMessage(s"Getting offsets from $s") reportTimeTaken("latestOffset") { - (s, Option(s.latestOffset())) + val latest = s.latestOffset() + ((s, Option(latest)), (s, Option(latest))) } case (s, _) => // for some reason, the compiler is unhappy and thinks the match is not exhaustive throw new IllegalStateException(s"Unexpected source: $s") - } - availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) + }.unzip + + availableOffsets ++= nextOffsets.filter { case (_, o) => o.nonEmpty } + .map(p => p._1 -> p._2.get).toMap + latestOffsets ++= recentOffsets.filter { case (_, o) => o.nonEmpty } + .map(p => p._1 -> p._2.get).toMap // Update the query metadata offsetSeqMetadata = offsetSeqMetadata.copy( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 57cb551bba17d..2ab473d737a23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -71,6 +71,8 @@ trait ProgressReporter extends Logging { private var currentTriggerEndTimestamp = -1L private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _ private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _ + private var currentTriggerLatestOffsets: Map[SparkDataStream, String] = _ + // TODO: Restore this from the checkpoint when possible. private var lastTriggerStartTimestamp = -1L @@ -119,6 +121,7 @@ trait ProgressReporter extends Logging { currentTriggerStartTimestamp = triggerClock.getTimeMillis() currentTriggerStartOffsets = null currentTriggerEndOffsets = null + currentTriggerLatestOffsets = null currentDurationsMs.clear() } @@ -126,9 +129,13 @@ trait ProgressReporter extends Logging { * Record the offsets range this trigger will process. Call this before updating * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded. */ - protected def recordTriggerOffsets(from: StreamProgress, to: StreamProgress): Unit = { + protected def recordTriggerOffsets( + from: StreamProgress, + to: StreamProgress, + latest: StreamProgress): Unit = { currentTriggerStartOffsets = from.mapValues(_.json).toMap currentTriggerEndOffsets = to.mapValues(_.json).toMap + currentTriggerLatestOffsets = latest.mapValues(_.json).toMap } private def updateProgress(newProgress: StreamingQueryProgress): Unit = { @@ -151,7 +158,8 @@ trait ProgressReporter extends Logging { * though the sources don't have any new data. */ protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = { - assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null) + assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null && + currentTriggerLatestOffsets != null) currentTriggerEndTimestamp = triggerClock.getTimeMillis() val executionStats = extractExecutionStats(hasNewData, hasExecuted) @@ -171,6 +179,7 @@ trait ProgressReporter extends Logging { description = source.toString, startOffset = currentTriggerStartOffsets.get(source).orNull, endOffset = currentTriggerEndOffsets.get(source).orNull, + latestOffset = currentTriggerLatestOffsets.get(source).orNull, numInputRows = numRecords, inputRowsPerSecond = numRecords / inputTimeSec, processedRowsPerSecond = numRecords / processingTimeSec diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6b0d33b819a20..c9f40fa22bf9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -161,6 +161,15 @@ abstract class StreamExecution( @volatile var availableOffsets = new StreamProgress + /** + * Tracks the latest offsets for each input source. + * Only the scheduler thread should modify this field, and only in atomic steps. + * Other threads should make a shallow copy if they are going to access this field more than + * once, since the field's value may change at any time. + */ + @volatile + var latestOffsets = new StreamProgress + @volatile var sinkCommitProgress: Option[StreamWriterCommitProgress] = None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 6eb28d4c66ded..ad041ceeba723 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -332,7 +332,7 @@ class ContinuousExecution( synchronized { // Record offsets before updating `committedOffsets` - recordTriggerOffsets(from = committedOffsets, to = availableOffsets) + recordTriggerOffsets(from = committedOffsets, to = availableOffsets, latest = latestOffsets) if (queryExecutionThread.isAlive) { commitLog.add(epoch, CommitMetadata()) val offset = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 59dc5bc1f37df..1a8939e42a412 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -173,6 +173,7 @@ class StreamingQueryProgress private[sql]( * @param description Description of the source. * @param startOffset The starting offset for data being read. * @param endOffset The ending offset for data being read. + * @param latestOffset The latest offset from this source. * @param numInputRows The number of records read from this source. * @param inputRowsPerSecond The rate at which data is arriving from this source. * @param processedRowsPerSecond The rate at which data from this source is being processed by @@ -184,6 +185,7 @@ class SourceProgress protected[sql]( val description: String, val startOffset: String, val endOffset: String, + val latestOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, val processedRowsPerSecond: Double) extends Serializable { @@ -204,6 +206,7 @@ class SourceProgress protected[sql]( ("description" -> JString(description)) ~ ("startOffset" -> tryParse(startOffset)) ~ ("endOffset" -> tryParse(endOffset)) ~ + ("latestOffset" -> tryParse(latestOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index ec61102804ea3..c0aefb8120808 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -75,6 +75,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "description" : "source", | "startOffset" : 123, | "endOffset" : 456, + | "latestOffset" : 789, | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0 | } ], @@ -121,6 +122,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "description" : "source", | "startOffset" : 123, | "endOffset" : 456, + | "latestOffset" : 789, | "numInputRows" : 678 | } ], | "sink" : { @@ -333,6 +335,7 @@ object StreamingQueryStatusAndProgressSuite { description = "source", startOffset = "123", endOffset = "456", + latestOffset = "789", numInputRows = 678, inputRowsPerSecond = 10.0, processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json @@ -361,6 +364,7 @@ object StreamingQueryStatusAndProgressSuite { description = "source", startOffset = "123", endOffset = "456", + latestOffset = "789", numInputRows = 678, inputRowsPerSecond = Double.NaN, // should not be present in the json processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json From 6ba33233eb3b57bfa0502ad7ca4ffdd195854249 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 Jan 2021 14:19:42 -0800 Subject: [PATCH 2/2] Fix mima. --- project/MimaExcludes.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 33e65c9def41b..5a494a4f67973 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { // Exclude rules for 3.2.x lazy val v32excludes = v31excludes ++ Seq( + // [SPARK-33955] Add latest offsets to source progress + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceProgress.this") ) // Exclude rules for 3.1.x