Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,6 @@ package org.apache.spark.sql.execution.streaming
* vector clock that must progress linearly forward.
*/
case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
/**
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
* or greater than the specified object.
*/
override def compareTo(other: Offset): Int = other match {
case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size =>
val comparisons = offsets.zip(otherComposite.offsets).map {
case (Some(a), Some(b)) => a compareTo b
case (None, None) => 0
case (None, _) => -1
case (_, None) => 1
}
val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet
nonZeroSigns.size match {
case 0 => 0 // if both empty or only 0s
case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s)
case _ => // there are both 1s and -1s
throw new IllegalArgumentException(
s"Invalid comparison between non-linear histories: $this <=> $other")
}
case _ =>
throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
}

private def sign(num: Int): Int = num match {
case i if i < 0 => -1
case i if i == 0 => 0
case i if i > 0 => 1
}

/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
* sources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ package org.apache.spark.sql.execution.streaming
*/
case class LongOffset(offset: Long) extends Offset {

override def compareTo(other: Offset): Int = other match {
case l: LongOffset => offset.compareTo(l.offset)
case _ =>
throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
}

def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,8 @@ package org.apache.spark.sql.execution.streaming

/**
* An offset is a monotonically increasing metric used to track progress in the computation of a
* stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent
* with `equals` and `hashcode`.
* stream. Since offsets are retrieved from a [[Source]] by a single thread, we know the global
* ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no
* new data has arrived.
*/
trait Offset extends Serializable {

/**
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
* or greater than the specified object.
*/
def compareTo(other: Offset): Int

def >(other: Offset): Boolean = compareTo(other) > 0
def <(other: Offset): Boolean = compareTo(other) < 0
def <=(other: Offset): Boolean = compareTo(other) <= 0
def >=(other: Offset): Boolean = compareTo(other) >= 0
}
trait Offset extends Serializable {}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class StreamExecution(
case (source, available) =>
committedOffsets
.get(source)
.map(committed => committed < available)
.map(committed => committed != available)
.getOrElse(true)
}
}
Expand Down Expand Up @@ -318,7 +318,8 @@ class StreamExecution(

// Request unprocessed data from all sources.
val newData = availableOffsets.flatMap {
case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
case (source, available)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
val batch = source.getBatch(current, available)
logDebug(s"Retrieving data from $source: $current -> $available")
Expand Down Expand Up @@ -404,10 +405,10 @@ class StreamExecution(
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is indented for use primarily when writing tests.
*/
def awaitOffset(source: Source, newOffset: Offset): Unit = {
private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = {
val localCommittedOffsets = committedOffsets
!localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset
!localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset
}

while (notDone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,12 @@ trait OffsetSuite extends SparkFunSuite {
/** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */
def compare(one: Offset, two: Offset): Unit = {
test(s"comparison $one <=> $two") {
assert(one < two)
assert(one <= two)
assert(one <= one)
assert(two > one)
assert(two >= one)
assert(one >= one)
assert(one == one)
assert(two == two)
assert(one != two)
assert(two != one)
}
}

/** Creates test to check that non-equality comparisons throw exception. */
def compareInvalid(one: Offset, two: Offset): Unit = {
test(s"invalid comparison $one <=> $two") {
intercept[IllegalArgumentException] {
assert(one < two)
}

intercept[IllegalArgumentException] {
assert(one <= two)
}

intercept[IllegalArgumentException] {
assert(one > two)
}

intercept[IllegalArgumentException] {
assert(one >= two)
}

assert(!(one == two))
assert(!(two == one))
assert(one != two)
assert(two != one)
}
}
}

class LongOffsetSuite extends OffsetSuite {
Expand All @@ -79,10 +47,6 @@ class CompositeOffsetSuite extends OffsetSuite {
one = CompositeOffset(None :: Nil),
two = CompositeOffset(Some(LongOffset(2)) :: Nil))

compareInvalid( // sizes must be same
one = CompositeOffset(Nil),
two = CompositeOffset(Some(LongOffset(2)) :: Nil))

compare(
one = CompositeOffset.fill(LongOffset(0), LongOffset(1)),
two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
Expand All @@ -91,8 +55,5 @@ class CompositeOffsetSuite extends OffsetSuite {
one = CompositeOffset.fill(LongOffset(1), LongOffset(1)),
two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))

compareInvalid(
one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), // vector time inconsistent
two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
}