Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private[kafka010] class KafkaMicroBatchStream(

private var endPartitionOffsets: KafkaSourceOffset = _

private var latestPartitionOffsets: PartitionOffsetMap = _

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand All @@ -77,14 +79,18 @@ private[kafka010] class KafkaMicroBatchStream(
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

override def reportLatestOffset(): Offset = {
KafkaSourceOffset(latestPartitionOffsets)
}

override def latestOffset(): Offset = {
throw new UnsupportedOperationException(
"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(readLimit match {
case rows: ReadMaxRows =>
rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ private[kafka010] class KafkaSource(
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
}

// The offsets for each topic-partition currently read to process. Note this maybe not necessarily
// to be latest offsets because we possibly apply a read limit.
private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None

// The latest offsets for each topic-partition.
private var latestPartitionOffsets: Option[Map[TopicPartition, Long]] = None

private val converter = new KafkaRecordToRowConverter()

override def schema: StructType = KafkaRecordToRowConverter.kafkaSchema(includeHeaders)
Expand All @@ -127,6 +132,10 @@ private[kafka010] class KafkaSource(
"latestOffset(Offset, ReadLimit) should be called instead of this method")
}

override def reportLatestOffset(): streaming.Offset = {
latestPartitionOffsets.map(KafkaSourceOffset(_)).getOrElse(null)
}

override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
Expand All @@ -145,6 +154,7 @@ private[kafka010] class KafkaSource(
}

currentPartitionOffsets = Some(offsets)
latestPartitionOffsets = Some(latest)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
KafkaSourceOffset(offsets)
}
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ object MimaExcludes {
// Exclude rules for 3.2.x
lazy val v32excludes = v31excludes ++ Seq(
// [SPARK-33808][SQL] DataSource V2: Build logical writes in the optimizer
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder"),

// [SPARK-33955] Add latest offsets to source progress
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceProgress.this")
)

// Exclude rules for 3.1.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ public interface SupportsAdmissionControl extends SparkDataStream {
* for the very first micro-batch. The source can return `null` if there is no data to process.
*/
Offset latestOffset(Offset startOffset, ReadLimit limit);

/**
* Returns the most recent offset available.
*
* The source can return `null`, if there is no data to process or the source does not support
* to this method.
*/
default Offset reportLatestOffset() { return null; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ class MicroBatchExecution(
}

// Record the trigger offset range for progress reporting *before* processing the batch
recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
recordTriggerOffsets(
from = committedOffsets,
to = availableOffsets,
latest = latestOffsets)

// Remember whether the current batch has data or not. This will be required later
// for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
Expand Down Expand Up @@ -379,7 +382,7 @@ class MicroBatchExecution(
if (isCurrentBatchConstructed) return true

// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map {
val (nextOffsets, recentOffsets) = uniqueSources.toSeq.map {
case (s: SupportsAdmissionControl, limit) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
Expand All @@ -391,23 +394,31 @@ class MicroBatchExecution(
startOffsetOpt.map(offset => v2.deserializeOffset(offset.json))
.getOrElse(v2.initialOffset())
}
(s, Option(s.latestOffset(startOffset, limit)))
val next = s.latestOffset(startOffset, limit)
val latest = s.reportLatestOffset()
((s, Option(next)), (s, Option(latest)))
}
case (s: Source, _) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
val offset = s.getOffset
((s, offset), (s, offset))
}
case (s: MicroBatchStream, _) =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("latestOffset") {
(s, Option(s.latestOffset()))
val latest = s.latestOffset()
((s, Option(latest)), (s, Option(latest)))
}
case (s, _) =>
// for some reason, the compiler is unhappy and thinks the match is not exhaustive
throw new IllegalStateException(s"Unexpected source: $s")
}
availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
}.unzip

availableOffsets ++= nextOffsets.filter { case (_, o) => o.nonEmpty }
.map(p => p._1 -> p._2.get).toMap
latestOffsets ++= recentOffsets.filter { case (_, o) => o.nonEmpty }
.map(p => p._1 -> p._2.get).toMap

// Update the query metadata
offsetSeqMetadata = offsetSeqMetadata.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ trait ProgressReporter extends Logging {
private var currentTriggerEndTimestamp = -1L
private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _
private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _
private var currentTriggerLatestOffsets: Map[SparkDataStream, String] = _

// TODO: Restore this from the checkpoint when possible.
private var lastTriggerStartTimestamp = -1L

Expand Down Expand Up @@ -119,16 +121,21 @@ trait ProgressReporter extends Logging {
currentTriggerStartTimestamp = triggerClock.getTimeMillis()
currentTriggerStartOffsets = null
currentTriggerEndOffsets = null
currentTriggerLatestOffsets = null
currentDurationsMs.clear()
}

/**
* Record the offsets range this trigger will process. Call this before updating
* `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded.
*/
protected def recordTriggerOffsets(from: StreamProgress, to: StreamProgress): Unit = {
protected def recordTriggerOffsets(
from: StreamProgress,
to: StreamProgress,
latest: StreamProgress): Unit = {
currentTriggerStartOffsets = from.mapValues(_.json).toMap
currentTriggerEndOffsets = to.mapValues(_.json).toMap
currentTriggerLatestOffsets = latest.mapValues(_.json).toMap
}

private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
Expand All @@ -151,7 +158,8 @@ trait ProgressReporter extends Logging {
* though the sources don't have any new data.
*/
protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
currentTriggerLatestOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats = extractExecutionStats(hasNewData, hasExecuted)
Expand All @@ -171,6 +179,7 @@ trait ProgressReporter extends Logging {
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
endOffset = currentTriggerEndOffsets.get(source).orNull,
latestOffset = currentTriggerLatestOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ abstract class StreamExecution(
@volatile
var availableOffsets = new StreamProgress

/**
* Tracks the latest offsets for each input source.
* Only the scheduler thread should modify this field, and only in atomic steps.
* Other threads should make a shallow copy if they are going to access this field more than
* once, since the field's value may change at any time.
*/
@volatile
var latestOffsets = new StreamProgress

@volatile
var sinkCommitProgress: Option[StreamWriterCommitProgress] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class ContinuousExecution(

synchronized {
// Record offsets before updating `committedOffsets`
recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
recordTriggerOffsets(from = committedOffsets, to = availableOffsets, latest = latestOffsets)
if (queryExecutionThread.isAlive) {
commitLog.add(epoch, CommitMetadata())
val offset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class StreamingQueryProgress private[sql](
* @param description Description of the source.
* @param startOffset The starting offset for data being read.
* @param endOffset The ending offset for data being read.
* @param latestOffset The latest offset from this source.
* @param numInputRows The number of records read from this source.
* @param inputRowsPerSecond The rate at which data is arriving from this source.
* @param processedRowsPerSecond The rate at which data from this source is being processed by
Expand All @@ -184,6 +185,7 @@ class SourceProgress protected[sql](
val description: String,
val startOffset: String,
val endOffset: String,
val latestOffset: String,
val numInputRows: Long,
val inputRowsPerSecond: Double,
val processedRowsPerSecond: Double) extends Serializable {
Expand All @@ -204,6 +206,7 @@ class SourceProgress protected[sql](
("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("latestOffset" -> tryParse(latestOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "description" : "source",
| "startOffset" : 123,
| "endOffset" : 456,
| "latestOffset" : 789,
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0
| } ],
Expand Down Expand Up @@ -121,6 +122,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "description" : "source",
| "startOffset" : 123,
| "endOffset" : 456,
| "latestOffset" : 789,
| "numInputRows" : 678
| } ],
| "sink" : {
Expand Down Expand Up @@ -333,6 +335,7 @@ object StreamingQueryStatusAndProgressSuite {
description = "source",
startOffset = "123",
endOffset = "456",
latestOffset = "789",
numInputRows = 678,
inputRowsPerSecond = 10.0,
processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json
Expand Down Expand Up @@ -361,6 +364,7 @@ object StreamingQueryStatusAndProgressSuite {
description = "source",
startOffset = "123",
endOffset = "456",
latestOffset = "789",
numInputRows = 678,
inputRowsPerSecond = Double.NaN, // should not be present in the json
processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
Expand Down