Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,7 @@ def toJSON(self, use_unicode=False):
rdd = self._jschema_rdd.baseSchemaRDD().toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))

def saveAsParquetFile(self, path):
def saveAsParquetFile(self, path, overwrite=False):
"""Save the contents as a Parquet file, preserving the schema.

Files that are written out using this method can be read back in as
Expand All @@ -1903,7 +1903,7 @@ def saveAsParquetFile(self, path):
>>> sorted(srdd2.collect()) == sorted(srdd.collect())
True
"""
self._jschema_rdd.saveAsParquetFile(path)
self._jschema_rdd.saveAsParquetFile(path, overwrite)

def registerTempTable(self, name):
"""Registers this RDD as a temporary table using the given name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ package object dsl {

object plans { // scalastyle:ignore
implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) extends LogicalPlanFunctions {
def writeToFile(path: String) = WriteToFile(path, logicalPlan)
def writeToFile(path: String, overwrite: Boolean) =
WriteToFile(path, logicalPlan, overwrite)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ case class CreateTableAsSelect[T](

case class WriteToFile(
path: String,
child: LogicalPlan) extends UnaryNode {
child: LogicalPlan,
overwrite: Boolean) extends UnaryNode {
override def output = child.output
}

Expand Down
21 changes: 19 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,29 @@ private[sql] trait SchemaRDDLike {
/**
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
* are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
* function.
* function. It will raise exception if the specified path already existed.
*
* @param path The destination path.
* @group schema
*/
def saveAsParquetFile(path: String): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
saveAsParquetFile(path, false)
}

/**
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
* are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
* function.
*
* @param path The destination path.
* @param overwrite If it's false, an exception will raise if the path already existed,
* otherwise create it.
* If it's true, we either create the specified path or overwrite
* the existed one (by deleting the path and re-create it).
* @group schema
*/
def saveAsParquetFile(path: String, overwrite: Boolean): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan, overwrite)).toRdd
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object ParquetOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
case logical.WriteToFile(path, child, overwrite) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil
ParquetRelation.createEmpty(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to replace create with createEmpty? I see create will do some check and then call createEmpty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe, as we believe the logical plan has been resolved already during the analysis phase.

path,
child.output,
overwrite,
sparkContext.hadoopConfiguration,
sqlContext)
InsertIntoParquetTable(relation, planLater(child), overwrite) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,20 @@ private[sql] object ParquetRelation {
*
* @param pathString The directory the Parquetfile will be stored in.
* @param attributes The schema of the relation.
* @param overwrite Overwrite the existed file path:
* If it's false, an exception will raise if the path already existed,
* otherwise create a new file path.
* If it's true, we will remove the path if it's existed, and recreate it.
* @param conf A configuration to be used.
* @param sqlContext SQLContext
* @return An empty ParquetRelation.
*/
def createEmpty(pathString: String,
attributes: Seq[Attribute],
allowExisting: Boolean,
overwrite: Boolean,
conf: Configuration,
sqlContext: SQLContext): ParquetRelation = {
val path = checkPath(pathString, allowExisting, conf)
val path = createPath(pathString, overwrite, conf)
conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
ParquetRelation.enableLogForwarding()
Expand All @@ -169,7 +174,7 @@ private[sql] object ParquetRelation {
}
}

private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
private def createPath(pathStr: String, overwrite: Boolean, conf: Configuration): Path = {
if (pathStr == null) {
throw new IllegalArgumentException("Unable to create ParquetRelation: path is null")
}
Expand All @@ -179,9 +184,23 @@ private[sql] object ParquetRelation {
throw new IllegalArgumentException(
s"Unable to create ParquetRelation: incorrectly formatted path $pathStr")
}

val path = origPath.makeQualified(fs)
if (!allowExisting && fs.exists(path)) {
sys.error(s"File $pathStr already exists.")
val pathExisted = fs.exists(path)

if (pathExisted) {
if (overwrite) {
try {
fs.delete(path, true)
} catch {
case e: IOException =>
throw new IOException(s"Unable to clear output directory ${path}")
}
} else {
sys.error(s"File ${path} already exists.")
}
} else {
fs.mkdirs(path)
}

if (fs.exists(path) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,45 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result2(0)(1) === "the answer")
}

test("Test overwrite") {
val tmpdir = Utils.createTempDir()
Utils.deleteRecursively(tmpdir)
val result1 = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
val result2 = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD

// file does not exist with option overwrite = false
result1.saveAsParquetFile(tmpdir.toString, overwrite = false)
parquetFile(tmpdir.toString)
.toSchemaRDD
.registerTempTable("tmpcopy")
val tmpdata1 = sql("SELECT * FROM tmpcopy").collect()
assert(tmpdata1.size === 2) // output the testNestedDir1

// file does exist with option overwrite = true
result2.saveAsParquetFile(tmpdir.toString, overwrite = true)
parquetFile(tmpdir.toString)
.toSchemaRDD
.registerTempTable("tmpcopy")
val tmpdata2 = sql("SELECT * FROM tmpcopy").collect()
assert(tmpdata2.size === 1) // output the testNestedDir4

// file does exist with option overwrite = false
intercept[Exception] {
result1.saveAsParquetFile(tmpdir.toString, overwrite = false)
}

Utils.deleteRecursively(tmpdir)
// file does not exist with option overwrite = true
result2.saveAsParquetFile(tmpdir.toString, overwrite = true)
parquetFile(tmpdir.toString)
.toSchemaRDD
.registerTempTable("tmpcopy")
val tmpdata3 = sql("SELECT * FROM tmpcopy").collect()
assert(tmpdata3.size === 1) // output the testNestedDir4

Utils.deleteRecursively(tmpdir)
}

test("Writing out Addressbook and reading it back in") {
// TODO: find out why CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
// has no effect in this test case
Expand Down