From 1a986606576d4ca09cf68816fa338a89f8cbcf62 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 24 Apr 2021 23:36:03 -0700 Subject: [PATCH 1/3] Update metric per certain rows and at the end of the task. --- .../datasources/v2/DataSourceRDD.scala | 29 ++++++++++++++----- .../sql/execution/metric/CustomMetrics.scala | 2 ++ .../continuous/ContinuousDataSourceRDD.scala | 14 +++++++-- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 7850dfa39d161..92a4b742a8021 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} import org.apache.spark.sql.vectorized.ColumnarBatch class DataSourceRDDPartition(val index: Int, val inputPartition: InputPartition) @@ -66,7 +66,17 @@ class DataSourceRDD( new PartitionIterator[InternalRow](rowReader, customMetrics)) (iter, rowReader) } - context.addTaskCompletionListener[Unit](_ => reader.close()) + context.addTaskCompletionListener[Unit] { _ => + // In case of early stopping before consuming the entire iterator, + // we need to do one more metric update at the end of the task. + reader.currentMetricsValues.foreach { metric => + assert(customMetrics.contains(metric.name()), + s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + + s"${metric.name()}") + customMetrics(metric.name()).set(metric.value()) + } + reader.close() + } // TODO: SPARK-25083 remove the type erasure hack in data source scan new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) } @@ -81,6 +91,8 @@ private class PartitionIterator[T]( customMetrics: Map[String, SQLMetric]) extends Iterator[T] { private[this] var valuePrepared = false + private var numRow = 0L + override def hasNext: Boolean = { if (!valuePrepared) { valuePrepared = reader.next() @@ -92,12 +104,15 @@ private class PartitionIterator[T]( if (!hasNext) { throw QueryExecutionErrors.endOfStreamError() } - reader.currentMetricsValues.foreach { metric => - assert(customMetrics.contains(metric.name()), - s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + - s"${metric.name()}") - customMetrics(metric.name()).set(metric.value()) + if (numRow % CustomMetrics.numRowsPerUpdate == 0) { + reader.currentMetricsValues.foreach { metric => + assert(customMetrics.contains(metric.name()), + s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + + s"${metric.name()}") + customMetrics(metric.name()).set(metric.value()) + } } + numRow += 1 valuePrepared = false reader.get() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index cc28be3ca8ed7..cde9eabc3d1b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.connector.CustomMetric object CustomMetrics { private[spark] val V2_CUSTOM = "v2Custom" + private[spark] val numRowsPerUpdate = 100L + /** * Given a class name, builds and returns a metric type for a V2 custom metric class * `CustomMetric`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index 4e32cefbe31a2..78c0ac2b207c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -22,7 +22,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} import org.apache.spark.sql.types.StructType import org.apache.spark.util.NextIterator @@ -92,10 +92,18 @@ class ContinuousDataSourceRDD( val partitionReader = readerForPartition.getPartitionReader() new NextIterator[InternalRow] { + private var numRow = 0L + override def getNext(): InternalRow = { - partitionReader.currentMetricsValues.foreach { metric => - customMetrics(metric.name()).set(metric.value()) + if (numRow % CustomMetrics.numRowsPerUpdate == 0) { + partitionReader.currentMetricsValues.foreach { metric => + assert(customMetrics.contains(metric.name()), + s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + + s"${metric.name()}") + customMetrics(metric.name()).set(metric.value()) + } } + numRow += 1 readerForPartition.next() match { case null => finished = true From 4f0be6c267abebb1fa3f3375678e1cd146f62bc9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 4 May 2021 14:00:12 -0700 Subject: [PATCH 2/3] For review comment. --- .../execution/datasources/v2/DataSourceRDD.scala | 14 ++------------ .../sql/execution/metric/CustomMetrics.scala | 15 +++++++++++++-- .../continuous/ContinuousDataSourceRDD.scala | 7 +------ 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 92a4b742a8021..3a8f74db57eab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -69,12 +69,7 @@ class DataSourceRDD( context.addTaskCompletionListener[Unit] { _ => // In case of early stopping before consuming the entire iterator, // we need to do one more metric update at the end of the task. - reader.currentMetricsValues.foreach { metric => - assert(customMetrics.contains(metric.name()), - s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + - s"${metric.name()}") - customMetrics(metric.name()).set(metric.value()) - } + CustomMetrics.updateMetrics(reader.currentMetricsValues, customMetrics) reader.close() } // TODO: SPARK-25083 remove the type erasure hack in data source scan @@ -105,12 +100,7 @@ private class PartitionIterator[T]( throw QueryExecutionErrors.endOfStreamError() } if (numRow % CustomMetrics.numRowsPerUpdate == 0) { - reader.currentMetricsValues.foreach { metric => - assert(customMetrics.contains(metric.name()), - s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + - s"${metric.name()}") - customMetrics(metric.name()).set(metric.value()) - } + CustomMetrics.updateMetrics(reader.currentMetricsValues, customMetrics) } numRow += 1 valuePrepared = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index bc2003ae4eea7..268db41b049ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.execution.metric -import org.apache.spark.sql.connector.metric.CustomMetric +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} object CustomMetrics { private[spark] val V2_CUSTOM = "v2Custom" - private[spark] val numRowsPerUpdate = 100L + private[spark] val numRowsPerUpdate = 100 /** * Given a class name, builds and returns a metric type for a V2 custom metric class @@ -43,4 +43,15 @@ object CustomMetrics { None } } + + /** + * Updates given custom metrics. + */ + def updateMetrics( + currentMetricsValues: Seq[CustomTaskMetric], + customMetrics: Map[String, SQLMetric]): Unit = { + currentMetricsValues.foreach { metric => + customMetrics(metric.name()).set(metric.value()) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index 78c0ac2b207c0..da3b33a23b11c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -96,12 +96,7 @@ class ContinuousDataSourceRDD( override def getNext(): InternalRow = { if (numRow % CustomMetrics.numRowsPerUpdate == 0) { - partitionReader.currentMetricsValues.foreach { metric => - assert(customMetrics.contains(metric.name()), - s"Custom metrics ${customMetrics.keys.mkString(", ")} do not contain the metric " + - s"${metric.name()}") - customMetrics(metric.name()).set(metric.value()) - } + CustomMetrics.updateMetrics(partitionReader.currentMetricsValues, customMetrics) } numRow += 1 readerForPartition.next() match { From 5f6cf5a6a9de14765ab3570f5516ebd74ecdb3af Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 6 May 2021 00:46:17 -0700 Subject: [PATCH 3/3] For review comment. --- .../spark/sql/execution/datasources/v2/DataSourceRDD.scala | 2 +- .../org/apache/spark/sql/execution/metric/CustomMetrics.scala | 2 +- .../streaming/continuous/ContinuousDataSourceRDD.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 3a8f74db57eab..217a1d5750d42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -99,7 +99,7 @@ private class PartitionIterator[T]( if (!hasNext) { throw QueryExecutionErrors.endOfStreamError() } - if (numRow % CustomMetrics.numRowsPerUpdate == 0) { + if (numRow % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) { CustomMetrics.updateMetrics(reader.currentMetricsValues, customMetrics) } numRow += 1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index 268db41b049ac..3e6cad2676e15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} object CustomMetrics { private[spark] val V2_CUSTOM = "v2Custom" - private[spark] val numRowsPerUpdate = 100 + private[spark] val NUM_ROWS_PER_UPDATE = 100 /** * Given a class name, builds and returns a metric type for a V2 custom metric class diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index da3b33a23b11c..6d27961fa0bcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -95,7 +95,7 @@ class ContinuousDataSourceRDD( private var numRow = 0L override def getNext(): InternalRow = { - if (numRow % CustomMetrics.numRowsPerUpdate == 0) { + if (numRow % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) { CustomMetrics.updateMetrics(partitionReader.currentMetricsValues, customMetrics) } numRow += 1