diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 0645b2b533..9580891a3b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -66,6 +66,8 @@ public class ParquetProperties {
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
+ public static final boolean DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED = false;
+
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
private static final int MIN_SLAB_SIZE = 64;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 37a551cdea..b3854c5932 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -20,6 +20,7 @@
import static org.apache.parquet.column.ParquetProperties.DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED;
import static org.apache.parquet.column.ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED;
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
@@ -98,7 +99,14 @@
*
*
* if none of those is set the data is uncompressed.
- *
+ *
+ * This class also generates the committer required to manifest the work in the
+ * destination directory if and when the job is committed.
+ * This has historically always created an instance of {@link ParquetOutputCommitter}.
+ * If {@link #PAGE_PATH_OUTPUT_COMMITTER_ENABLED} is true, the superclass is used
+ * to create the committer, which on Hadoop 3.1 and later involves the
+ * {@code PathOutputCommitterFactory} mechanism to dynamically choose a committer
+ * for the target filesystem. Such committers do not generated summary files.
* @param the type of the materialized records
*/
public class ParquetOutputFormat extends FileOutputFormat {
@@ -158,6 +166,13 @@ public static enum JobSummaryLevel {
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
+ /**
+ * Use the output committer created by the superclass, rather than a {@link ParquetOutputCommitter}.
+ * This delivers correctness and scalability on cloud storage, but will not write schema files.
+ * Value: {@value}.
+ */
+ public static final String PAGE_PATH_OUTPUT_COMMITTER_ENABLED = "parquet.path.outputcommitter.enabled";
+
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
String deprecatedFlag = conf.get(ENABLE_JOB_SUMMARY);
@@ -390,7 +405,7 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) {
}
private WriteSupport writeSupport;
- private ParquetOutputCommitter committer;
+ private OutputCommitter committer;
/**
* constructor used when this OutputFormat in wrapped in another one (In Pig for example)
@@ -555,7 +570,22 @@ public WriteSupport getWriteSupport(Configuration configuration) {
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
- committer = new ParquetOutputCommitter(output, context);
+ final Configuration conf = context.getConfiguration();
+ if (conf.getBoolean(PAGE_PATH_OUTPUT_COMMITTER_ENABLED, DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED)) {
+ // hand off creation of a committer to superclass.
+ // On hadoop 3.1+ this will use a factory mechanism to dynamically
+ // bind to a filesystem specific committer, an explict override
+ // or fall back to the classic FileOutputCommitter
+ committer = super.getOutputCommitter(context);
+ LOG.debug("Writing to {} with output committer {}", committer, output);
+
+ if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE) {
+ // warn if summary file generation has been requested, as they won't be created.
+ LOG.warn("Committer {} does not support summary files", committer);
+ }
+ } else {
+ committer = new ParquetOutputCommitter(output, context);
+ }
}
return committer;
}