Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 60 additions & 160 deletions spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.iceberg.spark

import com.google.common.collect.Maps
import java.nio.ByteBuffer
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, ManifestWriter}
Expand Down Expand Up @@ -153,14 +151,15 @@ object SparkTableUtil {
* @param partition a partition
* @param conf a serializable Hadoop conf
* @param metricsConfig a metrics conf
* @return a Seq of [[SparkDataFile]]
* @return a Seq of [[DataFile]]
*/
def listPartition(
partition: SparkPartition,
spec: PartitionSpec,
conf: SerializableConfiguration,
metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
metricsConfig: MetricsConfig): Seq[DataFile] = {

listPartition(partition.values, partition.uri, partition.format, conf.get(), metricsConfig)
listPartition(partition.values, partition.uri, partition.format, spec, conf.get(), metricsConfig)
}

/**
Expand All @@ -174,22 +173,23 @@ object SparkTableUtil {
* @param format partition format, avro or parquet
* @param conf a Hadoop conf
* @param metricsConfig a metrics conf
* @return a seq of [[SparkDataFile]]
* @return a seq of [[DataFile]]
*/
def listPartition(
partition: Map[String, String],
uri: String,
format: String,
spec: PartitionSpec,
conf: Configuration = new Configuration(),
metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = {
metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[DataFile] = {

if (format.contains("avro")) {
listAvroPartition(partition, uri, conf)
listAvroPartition(partition, uri, spec, conf)
} else if (format.contains("parquet")) {
listParquetPartition(partition, uri, conf, metricsConfig)
listParquetPartition(partition, uri, spec, conf, metricsConfig)
} else if (format.contains("orc")) {
// TODO: use MetricsConfig in listOrcPartition
listOrcPartition(partition, uri, conf)
listOrcPartition(partition, uri, spec, conf)
} else {
throw new UnsupportedOperationException(s"Unknown partition format: $format")
}
Expand All @@ -200,113 +200,6 @@ object SparkTableUtil {
*/
case class SparkPartition(values: Map[String, String], uri: String, format: String)

/**
* Case class representing a data file.
*/
case class SparkDataFile(
path: String,
partition: collection.Map[String, String],
format: String,
fileSize: Long,
rowGroupSize: Long,
rowCount: Long,
columnSizes: Array[Long],
valueCounts: Array[Long],
nullValueCounts: Array[Long],
lowerBounds: Seq[Array[Byte]],
upperBounds: Seq[Array[Byte]]
) {

/**
* Convert this to a [[DataFile]] that can be added to a [[org.apache.iceberg.Table]].
*
* @param spec a [[PartitionSpec]] that will be used to parse the partition key
* @return a [[DataFile]] that can be passed to [[org.apache.iceberg.AppendFiles]]
*/
def toDataFile(spec: PartitionSpec): DataFile = {
// values are strings, so pass a path to let the builder coerce to the right types
val partitionKey = spec.fields.asScala.map(_.name).map { name =>
s"$name=${partition(name)}"
}.mkString("/")

DataFiles.builder(spec)
.withPath(path)
.withFormat(format)
.withFileSizeInBytes(fileSize)
.withMetrics(new Metrics(rowCount,
arrayToMap(columnSizes),
arrayToMap(valueCounts),
arrayToMap(nullValueCounts),
arrayToMap(lowerBounds),
arrayToMap(upperBounds)))
.withPartitionPath(partitionKey)
.build()
}
}

private def bytesMapToArray(map: java.util.Map[Integer, ByteBuffer]): Seq[Array[Byte]] = {
if (map != null && !map.isEmpty) {
val keys = map.keySet.asScala
val max = keys.max
val arr = Array.fill(max + 1)(null.asInstanceOf[Array[Byte]])

keys.foreach { key =>
val buffer = map.get(key)

val copy = if (buffer.hasArray) {
val bytes = buffer.array()
if (buffer.arrayOffset() == 0 && buffer.position() == 0 &&
bytes.length == buffer.remaining()) {
bytes
} else {
val start = buffer.arrayOffset() + buffer.position()
val end = start + buffer.remaining()
util.Arrays.copyOfRange(bytes, start, end);
}
} else {
val bytes = Array.fill(buffer.remaining())(0.asInstanceOf[Byte])
buffer.get(bytes)
bytes
}

arr.update(key, copy)
}

arr
} else {
null
}
}

private def mapToArray(map: java.util.Map[Integer, java.lang.Long]): Array[Long] = {
if (map != null && !map.isEmpty) {
val keys = map.keySet.asScala
val max = keys.max
val arr = Array.fill(max + 1)(-1L)

keys.foreach { key =>
arr.update(key, map.get(key))
}

arr
} else {
null
}
}

private def arrayToMap(arr: Seq[Array[Byte]]): java.util.Map[Integer, ByteBuffer] = {
if (arr != null) {
val map: java.util.Map[Integer, ByteBuffer] = Maps.newHashMap()
arr.zipWithIndex.foreach {
case (null, _) => // skip
case (value, index) => map.put(index, ByteBuffer.wrap(value))
}
map
} else {
null
}
}

private def arrayToMap(arr: Array[Long]): java.util.Map[Integer, java.lang.Long] = {
if (arr != null) {
val map: java.util.Map[Integer, java.lang.Long] = Maps.newHashMap()
Expand All @@ -329,70 +222,74 @@ object SparkTableUtil {
private def listAvroPartition(
partitionPath: Map[String, String],
partitionUri: String,
conf: Configuration): Seq[SparkDataFile] = {
spec: PartitionSpec,
conf: Configuration): Seq[DataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)

fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
SparkDataFile(
stat.getPath.toString,
partitionPath, "avro", stat.getLen,
stat.getBlockSize,
-1,
null,
null,
null,
null,
null)
val metrics = new Metrics(-1L, arrayToMap(null), arrayToMap(null), arrayToMap(null))
Copy link
Contributor

@rdsr rdsr Feb 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't rowCount be a positive number?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the metric is not intended to be used so it is set to an invalid value. We might need to read through whole file to get row count, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to keep the logic as close to what we had before as possible.

Copy link
Contributor

@rdsr rdsr Feb 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think anything positive should do. keeping it <= 0 may possibly affect some scan planning code to filter out this particular file. e.g see org.apache.iceberg.expressions.InclusiveMetricsEvaluator
@aokolnychyi , thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, @rdsr. This is definitely a problem. Right now, the InclusiveMetricsEvaluator will remove files with negative or 0 row counts.

I don't think that the solution is to use a positive number here. The reason why this was required is that we want good stats for job planning. Setting this to -1 causes a correctness bug, but setting it to some other constant will introduce bad behavior when using the stats that are provided by Iceberg. I think we should either count the number of records, use a heuristic (file size / est. row size?), or remove support for importing Avro tables. I'm leaning toward counting the number of records.

We should also change the check in InclusiveMetricsEvaluator to check for files with 0 rows and allow files with -1 rows through to fix the correctness bug for existing tables that used this path to import Avro data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the number of records must be correct and precise as we want to answer some data queries with metadata (e.g. give me the number of records per partition). Updating our metrics evaluators to handle -1 seems reasonable to me as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdsr, could you create follow-up issues so that we don't forget?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I'll do that!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you guys for the detail explanation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #809 to track this.

val partitionKey = spec.fields.asScala.map(_.name).map { name =>
s"$name=${partitionPath(name)}"
}.mkString("/")

DataFiles.builder(spec)
.withPath(stat.getPath.toString)
.withFormat("avro")
.withFileSizeInBytes(stat.getLen)
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build()
}
}

//noinspection ScalaDeprecation
private def listParquetPartition(
partitionPath: Map[String, String],
partitionUri: String,
spec: PartitionSpec,
conf: Configuration,
metricsSpec: MetricsConfig): Seq[SparkDataFile] = {
metricsSpec: MetricsConfig): Seq[DataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)

fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
val metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), metricsSpec)
val partitionKey = spec.fields.asScala.map(_.name).map { name =>
s"$name=${partitionPath(name)}"
}.mkString("/")

SparkDataFile(
stat.getPath.toString,
partitionPath, "parquet", stat.getLen,
stat.getBlockSize,
metrics.recordCount,
mapToArray(metrics.columnSizes),
mapToArray(metrics.valueCounts),
mapToArray(metrics.nullValueCounts),
bytesMapToArray(metrics.lowerBounds),
bytesMapToArray(metrics.upperBounds))
DataFiles.builder(spec)
.withPath(stat.getPath.toString)
.withFormat("parquet")
.withFileSizeInBytes(stat.getLen)
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build()
}
}

private def listOrcPartition(
partitionPath: Map[String, String],
partitionUri: String,
conf: Configuration): Seq[SparkDataFile] = {
spec: PartitionSpec,
conf: Configuration): Seq[DataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)

fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
val metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath, conf))
val partitionKey = spec.fields.asScala.map(_.name).map { name =>
s"$name=${partitionPath(name)}"
}.mkString("/")

SparkDataFile(
stat.getPath.toString,
partitionPath, "orc", stat.getLen,
stat.getBlockSize,
metrics.recordCount,
mapToArray(metrics.columnSizes),
mapToArray(metrics.valueCounts),
mapToArray(metrics.nullValueCounts),
bytesMapToArray(metrics.lowerBounds()),
bytesMapToArray(metrics.upperBounds())
)
DataFiles.builder(spec)
.withPath(stat.getPath.toString)
.withFormat("orc")
.withFileSizeInBytes(stat.getLen)
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build()
}
}

Expand Down Expand Up @@ -421,17 +318,15 @@ object SparkTableUtil {
private def buildManifest(
conf: SerializableConfiguration,
spec: PartitionSpec,
basePath: String): Iterator[SparkDataFile] => Iterator[ManifestFile] = { files =>
basePath: String): Iterator[DataFile] => Iterator[ManifestFile] = { files =>
if (files.hasNext) {
val io = new HadoopFileIO(conf.get())
val ctx = TaskContext.get()
val location = new Path(basePath, s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest")
val outputFile = io.newOutputFile(FileFormat.AVRO.addExtension(location.toString))
val writer = ManifestWriter.write(spec, outputFile)
try {
files.foreach { file =>
writer.add(file.toDataFile(spec))
}
files.foreach(writer.add)
} finally {
writer.close()
}
Expand Down Expand Up @@ -489,13 +384,15 @@ object SparkTableUtil {
val format = sourceTable.storage.serde.orElse(sourceTable.provider)
require(format.nonEmpty, "Could not determine table format")

val partition = Map.empty[String, String]
val spec = PartitionSpec.unpartitioned()
val conf = spark.sessionState.newHadoopConf()
val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)

val files = listPartition(Map.empty, sourceTable.location.toString, format.get, conf, metricsConfig)
val files = listPartition(partition, sourceTable.location.toString, format.get, spec, conf, metricsConfig)

val append = targetTable.newAppend()
files.foreach(file => append.appendFile(file.toDataFile(PartitionSpec.unpartitioned)))
files.foreach(append.appendFile)
append.commit()
}

Expand All @@ -516,6 +413,8 @@ object SparkTableUtil {
stagingDir: String): Unit = {

implicit val manifestFileEncoder: Encoder[ManifestFile] = Encoders.javaSerialization[ManifestFile]
implicit val dataFileEncoder: Encoder[DataFile] = Encoders.javaSerialization[DataFile]
implicit val pathDataFileEncoder: Encoder[(String, DataFile)] = Encoders.tuple(Encoders.STRING, dataFileEncoder)

import spark.implicits._

Expand All @@ -527,10 +426,11 @@ object SparkTableUtil {
val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)

val manifests = partitionDS
.flatMap(partition => listPartition(partition, serializableConf, metricsConfig))
.flatMap(partition => listPartition(partition, spec, serializableConf, metricsConfig))
.repartition(numShufflePartitions)
.orderBy($"path")
.mapPartitions(buildManifest(serializableConf, spec, stagingDir))
.map(file => (file.path.toString, file))
.orderBy($"_1")
.mapPartitions(files => buildManifest(serializableConf, spec, stagingDir)(files.map(_._2)))
.collect()

try {
Expand Down