From b9a7bf03b312da151e1d7e37338092bbf5bcb38a Mon Sep 17 00:00:00 2001 From: crafty-coder Date: Fri, 30 Mar 2018 20:35:04 +0100 Subject: [PATCH 1/6] [SPARK-19018][SQL] Add support for custom encoding on csv writer --- python/pyspark/sql/readwriter.py | 7 +++- .../apache/spark/sql/DataFrameWriter.scala | 1 + .../datasources/csv/CSVFileFormat.scala | 10 ++++- .../execution/datasources/csv/CSVSuite.scala | 37 +++++++++++++++++++ 4 files changed, 52 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index e5288636c596e..3520e826a68d2 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -826,7 +826,7 @@ def text(self, path, compression=None, lineSep=None): def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, - charToEscapeQuoteEscaping=None): + charToEscapeQuoteEscaping=None, encoding=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -876,6 +876,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the quote character. If None is set, the default value is escape character when escape and quote characters are different, ``\0`` otherwise.. + :param encoding: sets encoding used for encoding the file. If None is set, it + uses the default value, ``UTF-8``. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ @@ -885,7 +887,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No dateFormat=dateFormat, timestampFormat=timestampFormat, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, - charToEscapeQuoteEscaping=charToEscapeQuoteEscaping) + charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, + encoding=encoding) self._jwrite.csv(path) @since(1.5) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index bb93889dc55e9..38bd40e7154d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -623,6 +623,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * enclosed in quotes. Default is to only escape values containing a quote character. *
  • `header` (default `false`): writes the names of columns as the first line.
  • *
  • `nullValue` (default empty string): sets the string representation of a null value.
  • + *
  • `encoding` (default `UTF-8`): encoding to use when saving to file.
  • *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index e20977a4ec79f..395b0dcf3409b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.csv +import java.nio.charset.Charset + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce._ @@ -146,7 +148,13 @@ private[csv] class CsvOutputWriter( context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) + private val charset = Charset.forName(params.charset) + + private val writer = CodecStreams.createOutputStreamWriter( + context, + new Path(path), + charset + ) private val gen = new UnivocityGenerator(dataSchema, writer, params) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 4398e547d9217..95c53d20f355b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -513,6 +513,43 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("Save csv with custom charset") { + Seq("iso-8859-1", "utf-8", "windows-1250").foreach { encoding => + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + // scalastyle:off + val originalDF = Seq("µß áâä ÁÂÄ").toDF("_c0") + // scalastyle:on + originalDF.write + .option("header", "false") + .option("encoding", encoding) + .csv(csvDir) + + val df = spark + .read + .option("header", "false") + .option("encoding", encoding) + .csv(csvDir) + + checkAnswer(df, originalDF) + } + } + } + + test("bad encoding name on writer") { + val exception = intercept[SparkException] { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + Seq("a,A,c,A,b,B").toDF().write + .option("header", "false") + .option("encoding", "1-9588-osi") + .csv(csvDir) + } + } + + assert(exception.getCause.getMessage.contains("1-9588-osi")) + } + test("commented lines in CSV data") { Seq("false", "true").foreach { multiLine => From 0d0addfd45dac4dc5fe4b32a0a2097990ba24cb5 Mon Sep 17 00:00:00 2001 From: crafty-coder Date: Sun, 6 May 2018 12:09:56 +0100 Subject: [PATCH 2/6] Fix checkstyle issue --- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 395b0dcf3409b..0bf5fa7ab0120 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -149,7 +149,7 @@ private[csv] class CsvOutputWriter( params: CSVOptions) extends OutputWriter with Logging { private val charset = Charset.forName(params.charset) - + private val writer = CodecStreams.createOutputStreamWriter( context, new Path(path), From 91f4750ff2f4781cea2fb23b1339659abf65009a Mon Sep 17 00:00:00 2001 From: crafty-coder Date: Mon, 16 Jul 2018 15:14:55 +0100 Subject: [PATCH 3/6] Add test to check UTF-16 and UTF-32 --- python/pyspark/sql/readwriter.py | 2 +- .../datasources/csv/CSVFileFormat.scala | 3 +- .../execution/datasources/csv/CSVSuite.scala | 32 ++++++++++--------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 56d0795128e6a..589fe4b8f7713 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -895,7 +895,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the quote character. If None is set, the default value is escape character when escape and quote characters are different, ``\0`` otherwise.. - :param encoding: sets encoding used for encoding the file. If None is set, it + :param encoding: sets the encoding (charset) to be used on the csv file. If None is set, it uses the default value, ``UTF-8``. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 0bf5fa7ab0120..1d2751b0d5cc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -153,8 +153,7 @@ private[csv] class CsvOutputWriter( private val writer = CodecStreams.createOutputStreamWriter( context, new Path(path), - charset - ) + charset) private val gen = new UnivocityGenerator(dataSchema, writer, params) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 600813d3b2d02..b283c4d6cad63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.File -import java.nio.charset.UnsupportedCharsetException +import java.nio.charset.{Charset, UnsupportedCharsetException} +import java.nio.file.Files import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.Locale @@ -513,24 +514,25 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } test("Save csv with custom charset") { - Seq("iso-8859-1", "utf-8", "windows-1250").foreach { encoding => + + // scalastyle:off nonascii + val content = "µß áâä ÁÂÄ" + // scalastyle:on nonascii + + Seq("iso-8859-1", "utf-8", "utf-16", "utf-32", "windows-1250").foreach { encoding => withTempDir { dir => - val csvDir = new File(dir, "csv").getCanonicalPath - // scalastyle:off - val originalDF = Seq("µß áâä ÁÂÄ").toDF("_c0") - // scalastyle:on - originalDF.write - .option("header", "false") - .option("encoding", encoding) - .csv(csvDir) + val csvDir = new File(dir, "csv") - val df = spark - .read - .option("header", "false") + val originalDF = Seq(content).toDF("_c0").repartition(1) + originalDF.write .option("encoding", encoding) - .csv(csvDir) + .csv(csvDir.getCanonicalPath) - checkAnswer(df, originalDF) + csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile => + val readback = Files.readAllBytes(csvFile.toPath) + val expected = (content + "\n").getBytes(Charset.forName(encoding)) + assert(readback === expected) + }) } } } From 349e132770f3a61e453841c82ddc06d34fe51649 Mon Sep 17 00:00:00 2001 From: crafty-coder Date: Tue, 17 Jul 2018 10:27:55 +0100 Subject: [PATCH 4/6] Add jira ticked (SPARK-19018:) as prefix on the related tests --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b283c4d6cad63..3d523d9ca0832 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -513,7 +513,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } - test("Save csv with custom charset") { + test("SPARK-19018: Save csv with custom charset") { // scalastyle:off nonascii val content = "µß áâä ÁÂÄ" @@ -537,12 +537,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } - test("bad encoding name on writer") { + test("SPARK-19018: error handling for unsupported charsets") { val exception = intercept[SparkException] { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath Seq("a,A,c,A,b,B").toDF().write - .option("header", "false") .option("encoding", "1-9588-osi") .csv(csvDir) } From fd857b005abba233eb7409479436c0abe4e23e4f Mon Sep 17 00:00:00 2001 From: crafty-coder Date: Wed, 18 Jul 2018 10:11:01 +0100 Subject: [PATCH 5/6] Add styling improvements - Improve method documentation - Inline method calls that are not too big - Use platform newline instead of hardcoded one. - Replace withTempDir with withTempPath --- python/pyspark/sql/readwriter.py | 4 ++-- .../org/apache/spark/sql/DataFrameWriter.scala | 3 ++- .../datasources/csv/CSVFileFormat.scala | 5 +---- .../sql/execution/datasources/csv/CSVSuite.scala | 16 ++++++++-------- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 589fe4b8f7713..0f2c11824ea12 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -895,8 +895,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the quote character. If None is set, the default value is escape character when escape and quote characters are different, ``\0`` otherwise.. - :param encoding: sets the encoding (charset) to be used on the csv file. If None is set, it - uses the default value, ``UTF-8``. + :param encoding: sets the encoding (charset) of saved csv files. If None is set, + the default UTF-8 charset will be used. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 995de85b76f1a..2e2c33f83ffaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -625,7 +625,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * enclosed in quotes. Default is to only escape values containing a quote character. *
  • `header` (default `false`): writes the names of columns as the first line.
  • *
  • `nullValue` (default empty string): sets the string representation of a null value.
  • - *
  • `encoding` (default `UTF-8`): encoding to use when saving to file.
  • + *
  • `encoding` (by default it is not set): specifies encoding (charset) of saved csv + * files. If it is not set, the UTF-8 charset will be used.
  • *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 1d2751b0d5cc9..01bffdeb9a5d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -150,10 +150,7 @@ private[csv] class CsvOutputWriter( private val charset = Charset.forName(params.charset) - private val writer = CodecStreams.createOutputStreamWriter( - context, - new Path(path), - charset) + private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) private val gen = new UnivocityGenerator(dataSchema, writer, params) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 3d523d9ca0832..e6088a2d11f88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -24,6 +24,8 @@ import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.Locale +import scala.util.Properties + import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec @@ -520,17 +522,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te // scalastyle:on nonascii Seq("iso-8859-1", "utf-8", "utf-16", "utf-32", "windows-1250").foreach { encoding => - withTempDir { dir => - val csvDir = new File(dir, "csv") - - val originalDF = Seq(content).toDF("_c0").repartition(1) - originalDF.write + withTempPath { path => + val csvDir = new File(path, "csv") + Seq(content).toDF().write .option("encoding", encoding) .csv(csvDir.getCanonicalPath) csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile => val readback = Files.readAllBytes(csvFile.toPath) - val expected = (content + "\n").getBytes(Charset.forName(encoding)) + val expected = (content + Properties.lineSeparator).getBytes(Charset.forName(encoding)) assert(readback === expected) }) } @@ -539,8 +539,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("SPARK-19018: error handling for unsupported charsets") { val exception = intercept[SparkException] { - withTempDir { dir => - val csvDir = new File(dir, "csv").getCanonicalPath + withTempPath { path => + val csvDir = new File(path, "csv").getCanonicalPath Seq("a,A,c,A,b,B").toDF().write .option("encoding", "1-9588-osi") .csv(csvDir) From 025958a7d9e8a741875db2af8878f60cb07409d3 Mon Sep 17 00:00:00 2001 From: crafty-coder Date: Wed, 18 Jul 2018 10:45:43 +0100 Subject: [PATCH 6/6] Fix import order on CSVSuite --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 6d7e88e8252ef..456b4535a0dcc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -24,8 +24,8 @@ import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.Locale -import scala.util.Properties import scala.collection.JavaConverters._ +import scala.util.Properties import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType