From 26f3cf93e0321f48967462dce6436f3dbb072622 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Dec 2016 19:05:21 -0800 Subject: [PATCH 01/13] Made StreamingQuery.id persist across restart, added StreamingQuery.runId --- python/pyspark/sql/streaming.py | 14 +- .../sql/execution/streaming/OffsetSeq.scala | 31 ++++- .../execution/streaming/OffsetSeqLog.scala | 2 +- .../streaming/ProgressReporter.scala | 6 +- .../execution/streaming/StreamExecution.scala | 126 ++++++++++++------ .../execution/streaming/StreamProgress.scala | 2 +- .../spark/sql/streaming/StreamingQuery.scala | 13 +- .../streaming/StreamingQueryListener.scala | 10 +- .../sql/streaming/StreamingQueryManager.scala | 8 ++ .../apache/spark/sql/streaming/progress.scala | 7 +- .../query-metadata-logs-version-2.1.0.txt | 3 + .../streaming/OffsetSeqLogSuite.scala | 13 +- .../StreamExecutionMetadataSuite.scala | 55 ++++++++ .../StreamExecutionMetadataSuite.scala | 35 ----- .../StreamingQueryListenerSuite.scala | 7 +- ...StreamingQueryStatusAndProgressSuite.scala | 4 +- .../sql/streaming/StreamingQuerySuite.scala | 93 ++++++++----- 17 files changed, 307 insertions(+), 122 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamExecutionMetadataSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 84f01d3d9ac0b..1e8b75d011291 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -50,10 +50,22 @@ def __init__(self, jsq): @property @since(2.0) def id(self): - """The id of the streaming query. + """Returns the unique id of this query that persists across restarts from checkpoint data. + That is, this id is generated when a query is started for the first time, and + will be the same every time it is restarted from checkpoint data. + There can only be one query with the same id active in a Spark cluster. + Also see, `runId`. """ return self._jsq.id().toString() + @property + @since(2.1) + def runId(self): + """Returns the unique id of this query that does not persist across restarts. That is, every + query that is started (or restarted from checkpoint) will have a different runId. + """ + return self._jsq.runId().toString() + @property @since(2.0) def name(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 7469caeee3be5..8d073ecc8bec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -17,13 +17,20 @@ package org.apache.spark.sql.execution.streaming +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + /** * An ordered collection of offsets, used to track the progress of processing data from one or more * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance * vector clock that must progress linearly forward. + * + * @param offsets Sequence of Offsets + * @param metadata Optional, metadata infomation as a Json string, generated from + * [[OffsetSeqMetadata]] */ -case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) { +case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) { /** * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of @@ -54,6 +61,26 @@ object OffsetSeq { * `nulls` in the sequence are converted to `None`s. */ def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = { - OffsetSeq(offsets.map(Option(_)), metadata) + OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply)) } } + + +/** + * Contains metadata associated with a [[OffsetSeq]]. This information is + * persisted to the offset log in the checkpoint location via the [[OffsetSeq]] metadata field. + * + * @param batchWatermarkMs: The current eventTime watermark, used to + * bound the lateness of data that will processed. Time unit: milliseconds + * @param batchTimestampMs: The current batch processing timestamp. + * Time unit: milliseconds + */ +case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { + def json: String = Serialization.write(this)(OffsetSeqMetadata.format) +} + +object OffsetSeqMetadata { + private implicit val format = Serialization.formats(NoTypeHints) + def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json) +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index cc25b4474ba2c..3210d8ad64e22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -74,7 +74,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) // write metadata out.write('\n') - out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8)) + out.write(offsetSeq.metadata.map(_.json).getOrElse("").getBytes(UTF_8)) // write offsets, one per line offsetSeq.offsets.map(_.map(_.json)).foreach { offset => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index ba77e7c7bf2b3..7d0d086746c79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -43,6 +43,7 @@ trait ProgressReporter extends Logging { // Internal state of the stream, required for computing metrics. protected def id: UUID + protected def runId: UUID protected def name: String protected def triggerClock: Clock protected def logicalPlan: LogicalPlan @@ -52,7 +53,7 @@ trait ProgressReporter extends Logging { protected def committedOffsets: StreamProgress protected def sources: Seq[Source] protected def sink: Sink - protected def streamExecutionMetadata: StreamExecutionMetadata + protected def offsetSeqMetadata: OffsetSeqMetadata protected def currentBatchId: Long protected def sparkSession: SparkSession @@ -134,11 +135,12 @@ trait ProgressReporter extends Logging { val newProgress = new StreamingQueryProgress( id = id, + runId = runId, name = name, timestamp = currentTriggerStartTimestamp, batchId = currentBatchId, durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, - currentWatermark = streamExecutionMetadata.batchWatermarkMs, + currentWatermark = offsetSeqMetadata.batchWatermarkMs, stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, sink = sinkProgress) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6d0e269d341ee..6b7310ded2b13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.io.{InputStreamReader, OutputStreamWriter} +import java.nio.charset.StandardCharsets import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.locks.ReentrantLock @@ -24,7 +26,9 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.hadoop.fs.Path +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -58,9 +62,6 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - // TODO: restore this from the checkpoint directory. - override val id: UUID = UUID.randomUUID() - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay /** @@ -95,8 +96,23 @@ class StreamExecution( /** The current batchId or -1 if execution has not yet been initialized. */ protected var currentBatchId: Long = -1 - /** Stream execution metadata */ - protected var streamExecutionMetadata = StreamExecutionMetadata() + /** Metadata associated with the whole query */ + protected val queryMetadata: StreamExecutionMetadata = { + val metadataPath = new Path(checkpointFile("metadata")) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + StreamExecutionMetadata.read(metadataPath, hadoopConf).getOrElse { + val newMetadata = new StreamExecutionMetadata(UUID.randomUUID.toString) + StreamExecutionMetadata.write(newMetadata, metadataPath, hadoopConf) + newMetadata + } + } + + /** Metadata associated with the offset seq of a batch in the query. */ + protected var offsetSeqMetadata = OffsetSeqMetadata() + + override val id: UUID = UUID.fromString(queryMetadata.id) + + override val runId: UUID = UUID.randomUUID /** All stream sources present in the query plan. */ protected val sources = @@ -188,7 +204,7 @@ class StreamExecution( sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } - postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception. + postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception. // Unblock starting thread startLatch.countDown() @@ -247,7 +263,7 @@ class StreamExecution( this, s"Query $name terminated with exception: ${e.getMessage}", e, - Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json))) + Some(committedOffsets.toOffsetSeq(sources, offsetSeqMetadata))) logError(s"Query $name terminated with error", e) updateStatusMessage(s"Terminated with exception: ${e.getMessage}") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to @@ -265,7 +281,7 @@ class StreamExecution( // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( - new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString))) + new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) terminationLatch.countDown() } } @@ -284,9 +300,9 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}")) + offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata()) logDebug(s"Found possibly unprocessed offsets $availableOffsets " + - s"at batch timestamp ${streamExecutionMetadata.batchTimestampMs}") + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") offsetLog.get(batchId - 1).foreach { case lastOffsets => @@ -342,15 +358,15 @@ class StreamExecution( } if (hasNewData) { // Current batch timestamp in milliseconds - streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis() + offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() updateStatusMessage("Writing offsets to log") reportTimeTaken("walCommit") { assert(offsetLog.add( currentBatchId, - availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)), + availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId. " + - s"Metadata ${streamExecutionMetadata.toString}") + s"Metadata ${offsetSeqMetadata.toString}") // NOTE: The following code is correct because runBatches() processes exactly one // batch at a time. If we add pipeline parallelism (multiple batches in flight at @@ -420,10 +436,10 @@ class StreamExecution( val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) case ct: CurrentTimestamp => - CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs, + CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) case cd: CurrentDate => - CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs, + CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, cd.dataType) } @@ -434,7 +450,7 @@ class StreamExecution( outputMode, checkpointFile("state"), currentBatchId, - streamExecutionMetadata.batchWatermarkMs) + offsetSeqMetadata.batchWatermarkMs) lastExecution.executedPlan // Force the lazy generation of execution plan } @@ -451,12 +467,12 @@ class StreamExecution( logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") (e.maxEventTime.value / 1000) - e.delay.milliseconds() }.headOption.foreach { newWatermark => - if (newWatermark > streamExecutionMetadata.batchWatermarkMs) { + if (newWatermark > offsetSeqMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermark ms") - streamExecutionMetadata.batchWatermarkMs = newWatermark + offsetSeqMetadata.batchWatermarkMs = newWatermark } else { logTrace(s"Event time didn't move: $newWatermark < " + - s"$streamExecutionMetadata.currentEventTimeWatermark") + s"$offsetSeqMetadata.currentEventTimeWatermark") } } @@ -605,34 +621,64 @@ class StreamExecution( case object TERMINATED extends State } + /** - * Contains metadata associated with a stream execution. This information is - * persisted to the offset log via the OffsetSeq metadata field. Current - * information contained in this object includes: + * Contains metadata associated with a [[StreamingQuery]]. This information is written + * in the checkpoint location the first time a query is started and recovered every time the query + * is restarted. * - * @param batchWatermarkMs: The current eventTime watermark, used to - * bound the lateness of data that will processed. Time unit: milliseconds - * @param batchTimestampMs: The current batch processing timestamp. - * Time unit: milliseconds + * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts */ -case class StreamExecutionMetadata( - var batchWatermarkMs: Long = 0, - var batchTimestampMs: Long = 0) { - private implicit val formats = StreamExecutionMetadata.formats - - /** - * JSON string representation of this object. - */ - def json: String = Serialization.write(this) +case class StreamExecutionMetadata(id: String) { + def json: String = Serialization.write(this)(StreamExecutionMetadata.format) } -object StreamExecutionMetadata { - private implicit val formats = Serialization.formats(NoTypeHints) +object StreamExecutionMetadata extends Logging { + implicit val format = Serialization.formats(NoTypeHints) + + /** Read the metadata from file if it exists */ + def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamExecutionMetadata] = { + val fs = FileSystem.get(hadoopConf) + if (fs.exists(metadataFile)) { + var input: FSDataInputStream = null + try { + input = fs.open(metadataFile) + val reader = new InputStreamReader(input, StandardCharsets.UTF_8) + val metadata = Serialization.read[StreamExecutionMetadata](reader) + Some(metadata) + } catch { + case NonFatal(e) => + logError(s"Error reading stream metadata from $metadataFile", e) + throw e + } finally { + IOUtils.closeQuietly(input) + } + } else None + } - def apply(json: String): StreamExecutionMetadata = - Serialization.read[StreamExecutionMetadata](json) + /** Write metadata to file, overwrite if it exists */ + def write( + metadata: StreamExecutionMetadata, + metadataFile: Path, + hadoopConf: Configuration): Unit = { + var output: FSDataOutputStream = null + try { + val fs = FileSystem.get(hadoopConf) + output = fs.create(metadataFile, true) // overwrite if exists + val writer = new OutputStreamWriter(output) + Serialization.write(metadata, writer) + writer.close() + } catch { + case NonFatal(e) => + logError(s"Error writing stream metedata $metadata to $metadataFile", e) + throw e + } finally { + IOUtils.closeQuietly(output) + } + } } + /** * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread * and will use `classOf[StreamExecutionThread]` to check. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 21b8750ca913d..a3f3662e6f4c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -26,7 +26,7 @@ class StreamProgress( val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) extends scala.collection.immutable.Map[Source, Offset] { - def toOffsetSeq(source: Seq[Source], metadata: String): OffsetSeq = { + def toOffsetSeq(source: Seq[Source], metadata: OffsetSeqMetadata): OffsetSeq = { OffsetSeq(source.map(get), Some(metadata)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 8fc4e43b6de53..e4e8b3b911d0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -41,11 +41,22 @@ trait StreamingQuery { def name: String /** - * Returns the unique id of this query. + * Returns the unique id of this query that persists across restarts from checkpoint data. + * That is, this id is generated when a query is started for the first time, and + * will be the same every time it is restarted from checkpoint data. + * There can only be one query with the same id active in a Spark cluster. + * * @since 2.1.0 */ def id: UUID + /** + * Returns the unique id of this run of the query. That is, every start/restart of a query will + * generated a unique runId. Therefore, every time a query is restarted from + * checkpoint, it will have the same [[id]] but different [[runId]]s. + */ + def runId: UUID + /** * Returns the `SparkSession` associated with `this`. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index d9ee75c064065..6fc859d88d97e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -86,7 +86,10 @@ object StreamingQueryListener { * @since 2.1.0 */ @Experimental - class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event + class QueryStartedEvent private[sql]( + val id: UUID, + val runId: UUID, + val name: String) extends Event /** * :: Experimental :: @@ -106,5 +109,8 @@ object StreamingQueryListener { * @since 2.1.0 */ @Experimental - class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event + class QueryTerminatedEvent private[sql]( + val id: UUID, + val runId: UUID, + val exception: Option[String]) extends Event } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c448468bea519..1610a61781b8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -268,6 +268,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { trigger, triggerClock, outputMode) + + if (activeQueries.values.exists(_.id == query.id)) { + throw new IllegalStateException( + s"Cannot start query with id ${query.id} as another query with same id is " + + s"already active. Perhaps you are attempting to restart a query from checkpoint" + + s"that is already active.") + } + query.start() activeQueries.put(query.id, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 4c8247458fcfe..3b541193b4b18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -50,8 +50,9 @@ class StateOperatorProgress private[sql]( * a trigger. Each event relates to processing done for a single trigger of the streaming * query. Events are emitted even when no new data is available to be processed. * - * @param id A unique id of the query. - * @param name Name of the query. This name is unique across all active queries. + * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param name User-specified name of the query. * @param timestamp Timestamp (ms) of the beginning of the trigger. * @param batchId A unique id for the current batch of data being processed. Note that in the * case of retries after a failure a given batchId my be executed more than once. @@ -66,6 +67,7 @@ class StateOperatorProgress private[sql]( @Experimental class StreamingQueryProgress private[sql]( val id: UUID, + val runId: UUID, val name: String, val timestamp: Long, val batchId: Long, @@ -98,6 +100,7 @@ class StreamingQueryProgress private[sql]( } ("id" -> JString(id.toString)) ~ + ("runId" -> JString(runId.toString)) ("name" -> JString(name)) ~ ("timestamp" -> JInt(timestamp)) ~ ("numInputRows" -> JInt(numInputRows)) ~ diff --git a/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt new file mode 100644 index 0000000000000..79613e2362164 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt @@ -0,0 +1,3 @@ +{ + "id": "d366a8bf-db79-42ca-b5a4-d9ca0a11d63e" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 3afd11fa4686d..d3a83ea0b922f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -27,10 +27,19 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { /** test string offset type */ case class StringOffset(override val json: String) extends Offset - testWithUninterruptibleThread("serialization - deserialization") { + test("OffsetSeqMetadata - deserialization") { + assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}""")) + assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) + assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) + assert( + OffsetSeqMetadata(1, 2) === + OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + } + + testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir - val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) + val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) val batch0 = OffsetSeq.fill(LongOffset(0), LongOffset(1), LongOffset(2)) val batch1 = OffsetSeq.fill(StringOffset("one"), StringOffset("two"), StringOffset("three")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamExecutionMetadataSuite.scala new file mode 100644 index 0000000000000..e5cf1e01e2317 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamExecutionMetadataSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File +import java.util.UUID + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.streaming.StreamTest + +class StreamExecutionMetadataSuite extends StreamTest { + + test("writing and reading") { + withTempDir { dir => + val id = UUID.randomUUID.toString + val metadata = StreamExecutionMetadata(id) + val file = new Path(new File(dir, "test").toString) + StreamExecutionMetadata.write(metadata, file, hadoopConf) + val readMetadata = StreamExecutionMetadata.read(file, hadoopConf) + assert(readMetadata.nonEmpty) + assert(readMetadata.get.id === id) + } + } + + test("read Spark 2.1.0 format") { + // query-metadata-logs-version-2.1.0.txt has the execution metadata generated by Spark 2.1.0 + assert( + readForResource("query-metadata-logs-version-2.1.0.txt") === + StreamExecutionMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e")) + } + + private def readForResource(fileName: String): StreamExecutionMetadata = { + val input = getClass.getResource(s"/structured-streaming/$fileName") + StreamExecutionMetadata.read(new Path(input.toString), hadoopConf).get + } + + private val hadoopConf = new Configuration() +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala deleted file mode 100644 index c7139c588d1d3..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata - -class StreamExecutionMetadataSuite extends StreamTest { - - test("stream execution metadata") { - assert(StreamExecutionMetadata(0, 0) === - StreamExecutionMetadata("""{}""")) - assert(StreamExecutionMetadata(1, 0) === - StreamExecutionMetadata("""{"batchWatermarkMs":1}""")) - assert(StreamExecutionMetadata(0, 2) === - StreamExecutionMetadata("""{"batchTimestampMs":2}""")) - assert(StreamExecutionMetadata(1, 2) === - StreamExecutionMetadata( - """{"batchWatermarkMs":1,"batchTimestampMs":2}""")) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 08b93e7d0b498..76b6f0b6a5a75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -67,6 +67,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnQuery { query => assert(listener.startEvent !== null) assert(listener.startEvent.id === query.id) + assert(listener.startEvent.runId === query.runId) assert(listener.startEvent.name === query.name) assert(listener.progressEvents.isEmpty) assert(listener.terminationEvent === null) @@ -90,6 +91,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { eventually(Timeout(streamingTimeout)) { assert(listener.terminationEvent !== null) assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.runId === query.runId) assert(listener.terminationEvent.exception === None) } listener.checkAsyncErrors() @@ -165,7 +167,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStartedEvent serialization") { - val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name") + val queryStarted = new StreamingQueryListener.QueryStartedEvent( + UUID.randomUUID, UUID.randomUUID, "name") val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryStartedEvent] @@ -183,7 +186,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryTerminatedEvent serialization") { val exception = new RuntimeException("exception") val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent( - UUID.randomUUID, Some(exception.getMessage)) + UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage)) val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 4da712fa0f7e0..8df25f94f2bcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -36,6 +36,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { s""" |{ | "id" : "${testProgress.id.toString}", + | "runId" : "${testProgress.runId.toString}", | "name" : "name", | "timestamp" : 1, | "numInputRows" : 678, @@ -95,7 +96,8 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { object StreamingQueryStatusAndProgressSuite { val testProgress = new StreamingQueryProgress( - id = UUID.randomUUID(), + id = UUID.randomUUID, + runId = UUID.randomUUID, name = "name", timestamp = 1L, batchId = 2L, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 56abe1201c0cc..d43dab69b5f7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.streaming +import scala.util.Random + +import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.concurrent.Eventually._ import org.scalatest.BeforeAndAfter @@ -28,7 +31,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ -import org.apache.spark.util.{ManualClock, Utils} +import org.apache.spark.util.ManualClock class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { @@ -43,38 +46,68 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { sqlContext.streams.active.foreach(_.stop()) } - test("names unique across active queries, ids unique across all started queries") { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map { 6 / _} - - def startQuery(queryName: String): StreamingQuery = { - val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath - val writer = mapped.writeStream - writer - .queryName(queryName) - .format("memory") - .option("checkpointLocation", metadataRoot) - .start() + test("name unique in active queries") { + withTempDir { dir => + def startQuery(name: Option[String]): StreamingQuery = { + val writer = MemoryStream[Int].toDS.groupBy().count().writeStream + name.foreach(writer.queryName) + writer + .format("memory") + .outputMode("complete") + .start() + } + val q1 = startQuery(name = Some("q1")) + assert(q1.name === "q1") + val q2 = startQuery(name = Some("q2")) + assert(q2.name === "q2") + val e = intercept[IllegalArgumentException] { + startQuery(name = Some("q2")) + } + q1.stop() + q2.stop() } + } - val q1 = startQuery("q1") - assert(q1.name === "q1") + test( + "id unique in active queries + persists across restarts, runId unique across start/restarts") { + val inputData = MemoryStream[Int] + withTempDir { dir => + var cpDir: String = null + + def startQuery(restart: Boolean): StreamingQuery = { + if (cpDir == null || !restart) cpDir = s"$dir/${RandomStringUtils.randomAlphabetic(10)}" + MemoryStream[Int].toDS().groupBy().count() + .writeStream + .format("memory") + .outputMode("complete") + .queryName(s"name${RandomStringUtils.randomAlphabetic(10)}") + .option("checkpointLocation", cpDir) + .start() + } - // Verify that another query with same name cannot be started - val e1 = intercept[IllegalArgumentException] { - startQuery("q1") + // id and runId unique for new queries + val q1 = startQuery(restart = false) + val q2 = startQuery(restart = false) + assert(q1.id !== q2.id) + assert(q1.runId !== q2.runId) + q1.stop() + q2.stop() + + // id persists across restarts, runId unique across restarts + val q3 = startQuery(restart = false) + q3.stop() + + val q4 = startQuery(restart = true) + q4.stop() + assert(q3.id === q3.id) + assert(q3.runId !== q4.runId) + + // Only one query with same id can be active + val q5 = startQuery(restart = false) + val e = intercept[IllegalStateException] { + startQuery(restart = true) + } } - Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) } - - // Verify q1 was unaffected by the above exception and stop it - assert(q1.isActive) - q1.stop() - - // Verify another query can be started with name q1, but will have different id - val q2 = startQuery("q1") - assert(q2.name === "q1") - assert(q2.id !== q1.id) - q2.stop() } testQuietly("isActive, exception, and awaitTermination") { @@ -105,7 +138,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery( q => q.exception.get.startOffset.get.offsets === - q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets, + q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).offsets, "incorrect start offset on exception") ) } From f103def70f5db8abec8ed8905db4e71a418b89c1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Dec 2016 19:21:57 -0800 Subject: [PATCH 02/13] Fixed test --- .../main/scala/org/apache/spark/sql/streaming/progress.scala | 2 +- .../sql/streaming/StreamingQueryStatusAndProgressSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3b541193b4b18..bbc0cc683a1e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -100,7 +100,7 @@ class StreamingQueryProgress private[sql]( } ("id" -> JString(id.toString)) ~ - ("runId" -> JString(runId.toString)) + ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ ("timestamp" -> JInt(timestamp)) ~ ("numInputRows" -> JInt(numInputRows)) ~ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 8df25f94f2bcd..e02fefac8f66e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -37,7 +37,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { |{ | "id" : "${testProgress.id.toString}", | "runId" : "${testProgress.runId.toString}", - | "name" : "name", + | "name" : "myName", | "timestamp" : 1, | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, @@ -98,7 +98,7 @@ object StreamingQueryStatusAndProgressSuite { val testProgress = new StreamingQueryProgress( id = UUID.randomUUID, runId = UUID.randomUUID, - name = "name", + name = "myName", timestamp = 1L, batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, From f55b8525cb29f25d43628cd2b7e3ec1e294d73eb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Dec 2016 19:25:11 -0800 Subject: [PATCH 03/13] Made codahale metrics use id instead of name --- .../spark/sql/execution/streaming/StreamExecution.scala | 6 +++--- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6b7310ded2b13..91c867f03c0d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -141,8 +141,8 @@ class StreamExecution( /* Get the call site in the caller thread; will pass this into the micro batch thread */ private val callSite = Utils.getCallSite() - /** Used to report metrics to coda-hale. */ - lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name") + /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */ + lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$id") /** * The thread that runs the micro-batches of this stream. Note that this thread must be @@ -443,7 +443,7 @@ class StreamExecution( cd.dataType) } - val executedPlan = reportTimeTaken("queryPlanning") { + reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSession, triggerLogicalPlan, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index d43dab69b5f7c..ce6a7abb8b1ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -305,7 +305,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { /** Whether metrics of a query is registered for reporting */ def isMetricsRegistered(query: StreamingQuery): Boolean = { - val sourceName = s"spark.streaming.${query.name}" + val sourceName = s"spark.streaming.${query.id}" val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName) require(sources.size <= 1) sources.nonEmpty From c20f4fe28d30768857c094d28473656065263a23 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Dec 2016 19:47:04 -0800 Subject: [PATCH 04/13] Made name to be default null --- .../execution/streaming/StreamExecution.scala | 22 ++++++++++----- .../sql/streaming/StreamingQueryManager.scala | 17 ++++++------ .../sql/streaming/StreamingQuerySuite.scala | 27 ++++++++++++------- 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 91c867f03c0d9..717b822b22deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -114,6 +114,13 @@ class StreamExecution( override val runId: UUID = UUID.randomUUID + /** + * Pretty identified string of printing in logs. Format is + * If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]" + */ + private val prettyIdString = + Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + /** All stream sources present in the query plan. */ protected val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } @@ -142,7 +149,8 @@ class StreamExecution( private val callSite = Utils.getCallSite() /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */ - lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$id") + lazy val streamMetrics = new MetricsReporter( + this, s"spark.streaming.${Option(name).getOrElse(id)}") /** * The thread that runs the micro-batches of this stream. Note that this thread must be @@ -150,7 +158,7 @@ class StreamExecution( * [[HDFSMetadataLog]]. See SPARK-14131 for more details. */ val microBatchThread = - new StreamExecutionThread(s"stream execution thread for $name") { + new StreamExecutionThread(s"stream execution thread for $prettyIdString") { override def run(): Unit = { // To fix call site like "run at :0", we bridge the call site from the caller // thread to this micro batch thread @@ -261,10 +269,10 @@ class StreamExecution( case e: Throwable => streamDeathCause = new StreamingQueryException( this, - s"Query $name terminated with exception: ${e.getMessage}", + s"Query $prettyIdString terminated with exception: ${e.getMessage}", e, Some(committedOffsets.toOffsetSeq(sources, offsetSeqMetadata))) - logError(s"Query $name terminated with error", e) + logError(s"Query $prettyIdString terminated with error", e) updateStatusMessage(s"Terminated with exception: ${e.getMessage}") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to // handle them @@ -502,7 +510,7 @@ class StreamExecution( microBatchThread.join() } uniqueSources.foreach(_.stop()) - logInfo(s"Query $name was stopped") + logInfo(s"Query $prettyIdString was stopped") } /** @@ -593,7 +601,7 @@ class StreamExecution( override def explain(): Unit = explain(extended = false) override def toString: String = { - s"Streaming Query - $name [state = $state]" + s"Streaming Query $prettyIdString [state = $state]" } def toDebugString: String = { @@ -602,7 +610,7 @@ class StreamExecution( } else "" s""" |=== Streaming Query === - |Name: $name + |Identifier: $prettyIdString |Current Offsets: $committedOffsets | |Current State: $state diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 1610a61781b8e..c6ab41655f5ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -207,10 +207,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()): StreamingQuery = { activeQueriesLock.synchronized { - val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}") - if (activeQueries.values.exists(_.name == name)) { - throw new IllegalArgumentException( - s"Cannot start query with name $name as a query with that name is already active") + val name = userSpecifiedName match { + case Some(n) => + if (activeQueries.values.exists(_.name == userSpecifiedName.get)) { + throw new IllegalArgumentException( + s"Cannot start query with name $n as a query with that name is already active") + } + n + case None => null } val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => new Path(userSpecified).toUri.toString @@ -295,8 +299,3 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } } } - -private object StreamingQueryManager { - private val _nextId = new AtomicLong(0) - private def nextId: Long = _nextId.getAndIncrement() -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index ce6a7abb8b1ef..07d6b789284c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.streaming -import scala.util.Random - import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.concurrent.Eventually._ @@ -49,22 +47,31 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { test("name unique in active queries") { withTempDir { dir => def startQuery(name: Option[String]): StreamingQuery = { - val writer = MemoryStream[Int].toDS.groupBy().count().writeStream + val writer = MemoryStream[Int].toDS.writeStream name.foreach(writer.queryName) writer - .format("memory") - .outputMode("complete") + .foreach(new TestForeachWriter) .start() } - val q1 = startQuery(name = Some("q1")) - assert(q1.name === "q1") - val q2 = startQuery(name = Some("q2")) - assert(q2.name === "q2") + + // No name by default, multiple active queries can have no name + val q1 = startQuery(name = None) + assert(q1.name === null) + val q2 = startQuery(name = None) + assert(q2.name === null) + + // Can be set by user + val q3 = startQuery(name = Some("q3")) + assert(q3.name === "q3") + + // Multiple active queries cannot have same name val e = intercept[IllegalArgumentException] { - startQuery(name = Some("q2")) + startQuery(name = Some("q3")) } + q1.stop() q2.stop() + q3.stop() } } From bec2fb392e995a517578341b1addfbe00bf04330 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Dec 2016 20:23:07 -0800 Subject: [PATCH 05/13] Some more changes --- .../execution/streaming/StreamExecution.scala | 22 +++--- ...aSuite.scala => StreamMetadataSuite.scala} | 14 ++-- .../StreamingQueryListenerSuite.scala | 2 +- ...StreamingQueryStatusAndProgressSuite.scala | 72 ++++++++++++++++--- 4 files changed, 82 insertions(+), 28 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/{StreamExecutionMetadataSuite.scala => StreamMetadataSuite.scala} (77%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 717b822b22deb..fffb634d20901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -97,12 +97,12 @@ class StreamExecution( protected var currentBatchId: Long = -1 /** Metadata associated with the whole query */ - protected val queryMetadata: StreamExecutionMetadata = { + protected val streamMetadata: StreamMetadata = { val metadataPath = new Path(checkpointFile("metadata")) val hadoopConf = sparkSession.sessionState.newHadoopConf() - StreamExecutionMetadata.read(metadataPath, hadoopConf).getOrElse { - val newMetadata = new StreamExecutionMetadata(UUID.randomUUID.toString) - StreamExecutionMetadata.write(newMetadata, metadataPath, hadoopConf) + StreamMetadata.read(metadataPath, hadoopConf).getOrElse { + val newMetadata = new StreamMetadata(UUID.randomUUID.toString) + StreamMetadata.write(newMetadata, metadataPath, hadoopConf) newMetadata } } @@ -110,7 +110,7 @@ class StreamExecution( /** Metadata associated with the offset seq of a batch in the query. */ protected var offsetSeqMetadata = OffsetSeqMetadata() - override val id: UUID = UUID.fromString(queryMetadata.id) + override val id: UUID = UUID.fromString(streamMetadata.id) override val runId: UUID = UUID.randomUUID @@ -637,22 +637,22 @@ class StreamExecution( * * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts */ -case class StreamExecutionMetadata(id: String) { - def json: String = Serialization.write(this)(StreamExecutionMetadata.format) +case class StreamMetadata(id: String) { + def json: String = Serialization.write(this)(StreamMetadata.format) } -object StreamExecutionMetadata extends Logging { +object StreamMetadata extends Logging { implicit val format = Serialization.formats(NoTypeHints) /** Read the metadata from file if it exists */ - def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamExecutionMetadata] = { + def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { val fs = FileSystem.get(hadoopConf) if (fs.exists(metadataFile)) { var input: FSDataInputStream = null try { input = fs.open(metadataFile) val reader = new InputStreamReader(input, StandardCharsets.UTF_8) - val metadata = Serialization.read[StreamExecutionMetadata](reader) + val metadata = Serialization.read[StreamMetadata](reader) Some(metadata) } catch { case NonFatal(e) => @@ -666,7 +666,7 @@ object StreamExecutionMetadata extends Logging { /** Write metadata to file, overwrite if it exists */ def write( - metadata: StreamExecutionMetadata, + metadata: StreamMetadata, metadataFile: Path, hadoopConf: Configuration): Unit = { var output: FSDataOutputStream = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala similarity index 77% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamExecutionMetadataSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala index e5cf1e01e2317..87f8004ab9588 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamExecutionMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala @@ -25,15 +25,15 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.streaming.StreamTest -class StreamExecutionMetadataSuite extends StreamTest { +class StreamMetadataSuite extends StreamTest { test("writing and reading") { withTempDir { dir => val id = UUID.randomUUID.toString - val metadata = StreamExecutionMetadata(id) + val metadata = StreamMetadata(id) val file = new Path(new File(dir, "test").toString) - StreamExecutionMetadata.write(metadata, file, hadoopConf) - val readMetadata = StreamExecutionMetadata.read(file, hadoopConf) + StreamMetadata.write(metadata, file, hadoopConf) + val readMetadata = StreamMetadata.read(file, hadoopConf) assert(readMetadata.nonEmpty) assert(readMetadata.get.id === id) } @@ -43,12 +43,12 @@ class StreamExecutionMetadataSuite extends StreamTest { // query-metadata-logs-version-2.1.0.txt has the execution metadata generated by Spark 2.1.0 assert( readForResource("query-metadata-logs-version-2.1.0.txt") === - StreamExecutionMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e")) + StreamMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e")) } - private def readForResource(fileName: String): StreamExecutionMetadata = { + private def readForResource(fileName: String): StreamMetadata = { val input = getClass.getResource(s"/structured-streaming/$fileName") - StreamExecutionMetadata.read(new Path(input.toString), hadoopConf).get + StreamMetadata.read(new Path(input.toString), hadoopConf).get } private val hadoopConf = new Configuration() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 159726ac8add8..1bcdbfb5bd674 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -176,7 +176,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryProgressEvent serialization") { val event = new StreamingQueryListener.QueryProgressEvent( - StreamingQueryStatusAndProgressSuite.testProgress) + StreamingQueryStatusAndProgressSuite.testProgress2) val json = JsonProtocol.sparkEventToJson(event) val newEvent = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryProgressEvent] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index e02fefac8f66e..96f19db1a90e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -31,12 +31,12 @@ import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { test("StreamingQueryProgress - prettyJson") { - val json = testProgress.prettyJson - assert(json === + val json1 = testProgress1.prettyJson + assert(json1 === s""" |{ - | "id" : "${testProgress.id.toString}", - | "runId" : "${testProgress.runId.toString}", + | "id" : "${testProgress1.id.toString}", + | "runId" : "${testProgress1.runId.toString}", | "name" : "myName", | "timestamp" : 1, | "numInputRows" : 678, @@ -61,16 +61,48 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | } |} """.stripMargin.trim) - assert(compact(parse(json)) === testProgress.json) - + assert(compact(parse(json1)) === testProgress1.json) + + val json2 = testProgress2.prettyJson + assert( + json2 === + s""" + |{ + | "id" : "${testProgress2.id.toString}", + | "runId" : "${testProgress2.runId.toString}", + | "name" : null, + | "timestamp" : 1, + | "numInputRows" : 678, + | "durationMs" : { + | "total" : 0 + | }, + | "currentWatermark" : 3, + | "stateOperators" : [ { + | "numRowsTotal" : 0, + | "numRowsUpdated" : 1 + | } ], + | "sources" : [ { + | "description" : "source", + | "startOffset" : 123, + | "endOffset" : 456, + | "numInputRows" : 678 + | } ], + | "sink" : { + | "description" : "sink" + | } + |} + """.stripMargin.trim) + assert(compact(parse(json2)) === testProgress2.json) } test("StreamingQueryProgress - json") { - assert(compact(parse(testProgress.json)) === testProgress.json) + assert(compact(parse(testProgress1.json)) === testProgress1.json) + assert(compact(parse(testProgress2.json)) === testProgress2.json) } test("StreamingQueryProgress - toString") { - assert(testProgress.toString === testProgress.prettyJson) + assert(testProgress1.toString === testProgress1.prettyJson) + assert(testProgress2.toString === testProgress2.prettyJson) } test("StreamingQueryStatus - prettyJson") { @@ -95,7 +127,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { } object StreamingQueryStatusAndProgressSuite { - val testProgress = new StreamingQueryProgress( + val testProgress1 = new StreamingQueryProgress( id = UUID.randomUUID, runId = UUID.randomUUID, name = "myName", @@ -117,6 +149,28 @@ object StreamingQueryStatusAndProgressSuite { sink = new SinkProgress("sink") ) + val testProgress2 = new StreamingQueryProgress( + id = UUID.randomUUID, + runId = UUID.randomUUID, + name = null, // should not be present in the json + timestamp = 1L, + batchId = 2L, + durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, + currentWatermark = 3L, + stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), + sources = Array( + new SourceProgress( + description = "source", + startOffset = "123", + endOffset = "456", + numInputRows = 678, + inputRowsPerSecond = Double.NaN, // should not be present in the json + processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json + ) + ), + sink = new SinkProgress("sink") + ) + val testStatus = new StreamingQueryStatus("active", true, false) } From 0554e5e0c73ddc26bafd7013270d44a8f754f553 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 13:12:42 -0800 Subject: [PATCH 06/13] Added tests --- project/MimaExcludes.scala | 3 ++ python/pyspark/sql/tests.py | 24 ++++++++++ .../StreamingQueryListenerSuite.scala | 45 +++++++++++-------- 3 files changed, 54 insertions(+), 18 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4995af034f654..8ebb4842b3efe 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -101,6 +101,9 @@ object MimaExcludes { // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") + + // [SPARK-18657] Add StreamingQuery.runId + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId") ) } diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b7b2a5923c07f..5b95c3b7bee00 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1082,6 +1082,30 @@ def test_stream_save_options_overwrite(self): q.stop() shutil.rmtree(tmpPath) + def test_id_runid_name(self): + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') + for q in self.spark._wrapped.streams.active: + q.stop() + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + out = os.path.join(tmpPath, 'out') + chk = os.path.join(tmpPath, 'chk') + try: + q = df.writeStream \ + .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) + self.assertTrue(any(x.id == q.id for x in self.spark.streams.active)) + self.assertTrue(any(x.runId == q.runId for x in self.spark.streams.active)) + self.assertTrue(any(x.name == 'this_query' for x in self.spark.streams.active)) + q.stop() + q2 = df.writeStream \ + .start(path=out, format='parquet', checkpointLocation=chk) + self.assertTrue(q2.name == None) + q2.stop() + finally: + for q in self.spark.streams.active: + q.stop() + shutil.rmtree(tmpPath) + def test_stream_status_and_progress(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for q in self.spark._wrapped.streams.active: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 1bcdbfb5bd674..2c4e57270bcb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -167,31 +167,40 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStartedEvent serialization") { - val queryStarted = new StreamingQueryListener.QueryStartedEvent( - UUID.randomUUID, UUID.randomUUID, "name") - val json = JsonProtocol.sparkEventToJson(queryStarted) - val newQueryStarted = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryStartedEvent] + def testSerialization(event: QueryStartedEvent): Unit = { + val json = JsonProtocol.sparkEventToJson(event) + val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryStartedEvent] + assert(newEvent.id === event.id) + assert(newEvent.runId === event.runId) + assert(newEvent.name === event.name) + } + + testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name")) + testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null)) } test("QueryProgressEvent serialization") { - val event = new StreamingQueryListener.QueryProgressEvent( - StreamingQueryStatusAndProgressSuite.testProgress2) - val json = JsonProtocol.sparkEventToJson(event) - val newEvent = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryProgressEvent] - assert(event.progress.json === newEvent.progress.json) + def testSerialization(event: QueryProgressEvent): Unit = { + val json = JsonProtocol.sparkEventToJson(event) + val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent] + assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality + } + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1)) + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2)) } test("QueryTerminatedEvent serialization") { + def testSerialization(event: QueryTerminatedEvent): Unit = { + val json = JsonProtocol.sparkEventToJson(event) + val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryTerminatedEvent] + assert(newEvent.id === event.id) + assert(newEvent.runId === event.runId) + assert(newEvent.exception === event.exception) + } + val exception = new RuntimeException("exception") - val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent( - UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage)) - val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) - val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] - assert(queryQueryTerminated.id === newQueryTerminated.id) - assert(queryQueryTerminated.exception === newQueryTerminated.exception) + testSerialization( + new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage))) } testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { From 6e1bbddb18c0790ebd3484ca2d4ded3d379e2bb1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 13:28:30 -0800 Subject: [PATCH 07/13] Fix mima --- project/MimaExcludes.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8ebb4842b3efe..22be4fecf6b43 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -100,10 +100,10 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="), // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"), // [SPARK-18657] Add StreamingQuery.runId - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId") ) } From c9224efbde07d004542c065cb72fa3b50067c690 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 14:06:29 -0800 Subject: [PATCH 08/13] Fix python style --- python/pyspark/sql/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5b95c3b7bee00..a1a344a52a3a1 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1099,7 +1099,7 @@ def test_id_runid_name(self): q.stop() q2 = df.writeStream \ .start(path=out, format='parquet', checkpointLocation=chk) - self.assertTrue(q2.name == None) + self.assertTrue(q2.name is None) q2.stop() finally: for q in self.spark.streams.active: From afd5c0ff8bda1dc796a60bd090f4dbff6bab2e37 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 15:35:13 -0800 Subject: [PATCH 09/13] Improve docs --- python/pyspark/sql/streaming.py | 5 ++++- .../org/apache/spark/sql/streaming/StreamingQuery.scala | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 1e8b75d011291..4ec7e9fc11454 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -69,7 +69,10 @@ def runId(self): @property @since(2.0) def name(self): - """The name of the streaming query. This name is unique across all active queries. + """Returns the user-specified name of the query, or null if not specified. + This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` + as `dataframe.writeStream.queryName("query").start()`. + This name, if set, must be unique across all active queries. """ return self._jsq.name() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index e4e8b3b911d0c..57ffc6885ee29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -32,9 +32,10 @@ import org.apache.spark.sql.SparkSession trait StreamingQuery { /** - * Returns the name of the query. This name is unique across all active queries. This can be - * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as - * `dataframe.writeStream.queryName("query").start()`. + * Returns the user-specified name of the query, or null if not specified. + * This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` + * as `dataframe.writeStream.queryName("query").start()`. + * This name, if set, must be unique across all active queries. * * @since 2.0.0 */ From 7ee4cf1b04452a0ef5ba068f5aaf1c146bf0bbec Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 15:49:57 -0800 Subject: [PATCH 10/13] Fix indent --- .../execution/streaming/StreamExecution.scala | 36 +++++++++---------- .../apache/spark/sql/streaming/progress.scala | 2 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index fffb634d20901..d9648a5bf1799 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -647,24 +647,24 @@ object StreamMetadata extends Logging { /** Read the metadata from file if it exists */ def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { val fs = FileSystem.get(hadoopConf) - if (fs.exists(metadataFile)) { - var input: FSDataInputStream = null - try { - input = fs.open(metadataFile) - val reader = new InputStreamReader(input, StandardCharsets.UTF_8) - val metadata = Serialization.read[StreamMetadata](reader) - Some(metadata) - } catch { - case NonFatal(e) => - logError(s"Error reading stream metadata from $metadataFile", e) - throw e - } finally { - IOUtils.closeQuietly(input) - } - } else None + if (fs.exists(metadataFile)) { + var input: FSDataInputStream = null + try { + input = fs.open(metadataFile) + val reader = new InputStreamReader(input, StandardCharsets.UTF_8) + val metadata = Serialization.read[StreamMetadata](reader) + Some(metadata) + } catch { + case NonFatal(e) => + logError(s"Error reading stream metadata from $metadataFile", e) + throw e + } finally { + IOUtils.closeQuietly(input) + } + } else None } - /** Write metadata to file, overwrite if it exists */ + /** Write metadata to file */ def write( metadata: StreamMetadata, metadataFile: Path, @@ -672,13 +672,13 @@ object StreamMetadata extends Logging { var output: FSDataOutputStream = null try { val fs = FileSystem.get(hadoopConf) - output = fs.create(metadataFile, true) // overwrite if exists + output = fs.create(metadataFile) val writer = new OutputStreamWriter(output) Serialization.write(metadata, writer) writer.close() } catch { case NonFatal(e) => - logError(s"Error writing stream metedata $metadata to $metadataFile", e) + logError(s"Error writing stream metadata $metadata to $metadataFile", e) throw e } finally { IOUtils.closeQuietly(output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index bbc0cc683a1e0..2661277e1db13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -52,7 +52,7 @@ class StateOperatorProgress private[sql]( * * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. - * @param name User-specified name of the query. + * @param name User-specified name of the query, null if not specified. * @param timestamp Timestamp (ms) of the beginning of the trigger. * @param batchId A unique id for the current batch of data being processed. Note that in the * case of retries after a failure a given batchId my be executed more than once. From a2a6d90c8c8d967525ce93f3193f129af9f958b4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Dec 2016 23:10:05 -0800 Subject: [PATCH 11/13] Addressed comments --- .../sql/execution/streaming/OffsetSeq.scala | 4 - .../execution/streaming/StreamExecution.scala | 65 +------------- .../execution/streaming/StreamMetadata.scala | 88 +++++++++++++++++++ .../spark/sql/streaming/StreamingQuery.scala | 3 +- 4 files changed, 90 insertions(+), 70 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 8d073ecc8bec5..e5a1997d6b808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -25,10 +25,6 @@ import org.json4s.jackson.Serialization * An ordered collection of offsets, used to track the progress of processing data from one or more * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance * vector clock that must progress linearly forward. - * - * @param offsets Sequence of Offsets - * @param metadata Optional, metadata infomation as a Json string, generated from - * [[OffsetSeqMetadata]] */ case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d9648a5bf1799..e954be79b5e8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import java.io.{InputStreamReader, OutputStreamWriter} -import java.nio.charset.StandardCharsets import java.util.UUID import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.locks.ReentrantLock @@ -26,11 +24,7 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.commons.io.IOUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} -import org.json4s.NoTypeHints -import org.json4s.jackson.Serialization +import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -630,63 +624,6 @@ class StreamExecution( } -/** - * Contains metadata associated with a [[StreamingQuery]]. This information is written - * in the checkpoint location the first time a query is started and recovered every time the query - * is restarted. - * - * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts - */ -case class StreamMetadata(id: String) { - def json: String = Serialization.write(this)(StreamMetadata.format) -} - -object StreamMetadata extends Logging { - implicit val format = Serialization.formats(NoTypeHints) - - /** Read the metadata from file if it exists */ - def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { - val fs = FileSystem.get(hadoopConf) - if (fs.exists(metadataFile)) { - var input: FSDataInputStream = null - try { - input = fs.open(metadataFile) - val reader = new InputStreamReader(input, StandardCharsets.UTF_8) - val metadata = Serialization.read[StreamMetadata](reader) - Some(metadata) - } catch { - case NonFatal(e) => - logError(s"Error reading stream metadata from $metadataFile", e) - throw e - } finally { - IOUtils.closeQuietly(input) - } - } else None - } - - /** Write metadata to file */ - def write( - metadata: StreamMetadata, - metadataFile: Path, - hadoopConf: Configuration): Unit = { - var output: FSDataOutputStream = null - try { - val fs = FileSystem.get(hadoopConf) - output = fs.create(metadataFile) - val writer = new OutputStreamWriter(output) - Serialization.write(metadata, writer) - writer.close() - } catch { - case NonFatal(e) => - logError(s"Error writing stream metadata $metadata to $metadataFile", e) - throw e - } finally { - IOUtils.closeQuietly(output) - } - } -} - - /** * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread * and will use `classOf[StreamExecutionThread]` to check. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala new file mode 100644 index 0000000000000..7807c9fae840a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStreamReader, OutputStreamWriter} +import java.nio.charset.StandardCharsets + +import scala.util.control.NonFatal + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.streaming.StreamingQuery + +/** + * Contains metadata associated with a [[StreamingQuery]]. This information is written + * in the checkpoint location the first time a query is started and recovered every time the query + * is restarted. + * + * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts + */ +case class StreamMetadata(id: String) { + def json: String = Serialization.write(this)(StreamMetadata.format) +} + +object StreamMetadata extends Logging { + implicit val format = Serialization.formats(NoTypeHints) + + /** Read the metadata from file if it exists */ + def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { + val fs = FileSystem.get(hadoopConf) + if (fs.exists(metadataFile)) { + var input: FSDataInputStream = null + try { + input = fs.open(metadataFile) + val reader = new InputStreamReader(input, StandardCharsets.UTF_8) + val metadata = Serialization.read[StreamMetadata](reader) + Some(metadata) + } catch { + case NonFatal(e) => + logError(s"Error reading stream metadata from $metadataFile", e) + throw e + } finally { + IOUtils.closeQuietly(input) + } + } else None + } + + /** Write metadata to file */ + def write( + metadata: StreamMetadata, + metadataFile: Path, + hadoopConf: Configuration): Unit = { + var output: FSDataOutputStream = null + try { + val fs = FileSystem.get(hadoopConf) + output = fs.create(metadataFile) + val writer = new OutputStreamWriter(output) + Serialization.write(metadata, writer) + writer.close() + } catch { + case NonFatal(e) => + logError(s"Error writing stream metadata $metadata to $metadataFile", e) + throw e + } finally { + IOUtils.closeQuietly(output) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 57ffc6885ee29..1794e75462cfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -44,8 +44,7 @@ trait StreamingQuery { /** * Returns the unique id of this query that persists across restarts from checkpoint data. * That is, this id is generated when a query is started for the first time, and - * will be the same every time it is restarted from checkpoint data. - * There can only be one query with the same id active in a Spark cluster. + * will be the same every time it is restarted from checkpoint data. Also see [[runId]]. * * @since 2.1.0 */ From 0e3b0acbb1817e2e9a17cc20af088a43fa727312 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 14:44:58 -0800 Subject: [PATCH 12/13] Removed python test --- python/pyspark/sql/tests.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a1a344a52a3a1..b7b2a5923c07f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1082,30 +1082,6 @@ def test_stream_save_options_overwrite(self): q.stop() shutil.rmtree(tmpPath) - def test_id_runid_name(self): - df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for q in self.spark._wrapped.streams.active: - q.stop() - tmpPath = tempfile.mkdtemp() - shutil.rmtree(tmpPath) - out = os.path.join(tmpPath, 'out') - chk = os.path.join(tmpPath, 'chk') - try: - q = df.writeStream \ - .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) - self.assertTrue(any(x.id == q.id for x in self.spark.streams.active)) - self.assertTrue(any(x.runId == q.runId for x in self.spark.streams.active)) - self.assertTrue(any(x.name == 'this_query' for x in self.spark.streams.active)) - q.stop() - q2 = df.writeStream \ - .start(path=out, format='parquet', checkpointLocation=chk) - self.assertTrue(q2.name is None) - q2.stop() - finally: - for q in self.spark.streams.active: - q.stop() - shutil.rmtree(tmpPath) - def test_stream_status_and_progress(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for q in self.spark._wrapped.streams.active: From 4041a2297289aa6186a07e1ea74541d625174599 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 14:59:56 -0800 Subject: [PATCH 13/13] Fix test --- .../spark/sql/streaming/StreamingQuerySuite.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 7c28fcecd1a3f..893cb762c6580 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -143,19 +143,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException]), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), -<<<<<<< HEAD - AssertOnQuery( - q => q.exception.get.startOffset.get.offsets === - q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).offsets, - "incorrect start offset on exception") -======= AssertOnQuery(q => { q.exception.get.startOffset === - q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString && + q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString && q.exception.get.endOffset === - q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString + q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString }, "incorrect start offset or end offset on exception") ->>>>>>> apache-github/master ) }