Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.types._

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[avro] class AvroOutputWriter(
path: String,
val path: String,
context: TaskAttemptContext,
schema: StructType,
avroSchema: Schema) extends OutputWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

private[libsvm] class LibSVMOutputWriter(
path: String,
val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3150,6 +3150,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val MAX_CONCURRENT_OUTPUT_FILE_WRITERS = buildConf("spark.sql.maxConcurrentOutputFileWriters")
.internal()
.doc("Maximum number of output file writers to use concurrently. If number of writers " +
"needed reaches this limit, task will sort rest of output then writing them.")
.version("3.2.0")
.intConf
.createWithDefault(0)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3839,6 +3847,8 @@ class SQLConf extends Serializable with Logging {

def decorrelateInnerQueryEnabled: Boolean = getConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED)

def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)

private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty
private[this] var numFiles: Int = 0
private[this] var submittedFiles: Int = 0
private[this] var numSubmittedFiles: Int = 0
private[this] var numBytes: Long = 0L
private[this] var numRows: Long = 0L

private[this] var curFile: Option[String] = None
private[this] val submittedFiles = mutable.HashSet[String]()

/**
* Get the size of the file expected to have been written by a worker.
Expand Down Expand Up @@ -134,23 +134,20 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
partitions.append(partitionValues)
}

override def newBucket(bucketId: Int): Unit = {
// currently unhandled
override def newFile(filePath: String): Unit = {
submittedFiles += filePath
numSubmittedFiles += 1
}

override def newFile(filePath: String): Unit = {
statCurrentFile()
curFile = Some(filePath)
submittedFiles += 1
override def closeFile(filePath: String): Unit = {
updateFileStats(filePath)
submittedFiles.remove(filePath)
}

private def statCurrentFile(): Unit = {
curFile.foreach { path =>
getFileSize(path).foreach { len =>
numBytes += len
numFiles += 1
}
curFile = None
private def updateFileStats(filePath: String): Unit = {
getFileSize(filePath).foreach { len =>
numBytes += len
numFiles += 1
}
}

Expand All @@ -159,16 +156,17 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
}

override def getFinalStats(): WriteTaskStats = {
statCurrentFile()
submittedFiles.foreach(updateFileStats)
submittedFiles.clear()

// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numRows)
}

if (submittedFiles != numFiles) {
logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " +
if (numSubmittedFiles != numFiles) {
logInfo(s"Expected $numSubmittedFiles files, but only saw $numFiles. " +
"This could be due to the output format not writing empty files, " +
"or files being not immediately visible in the filesystem.")
}
Expand Down
Loading