From 53aa2f9dfaa38b1d8c3cefe7883272a8472a1450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Wed, 21 Oct 2015 15:01:39 +0200 Subject: [PATCH 01/11] Adding a parquet-cascading3 module (forking the parquet-cascading module and accounting for API changes) --- parquet-cascading3/.cache | Bin 0 -> 2492 bytes parquet-cascading3/REVIEWERS.md | 27 +++ parquet-cascading3/pom.xml | 119 +++++++++++ .../parquet/cascading/ParquetTBaseScheme.java | 80 ++++++++ .../parquet/cascading/ParquetTupleScheme.java | 191 ++++++++++++++++++ .../parquet/cascading/ParquetValueScheme.java | 184 +++++++++++++++++ .../parquet/cascading/SchemaIntersection.java | 63 ++++++ .../parquet/cascading/TupleReadSupport.java | 80 ++++++++ .../parquet/cascading/TupleWriteSupport.java | 106 ++++++++++ .../cascading/convert/TupleConverter.java | 115 +++++++++++ .../convert/TupleRecordMaterializer.java | 46 +++++ .../cascading/TestParquetTBaseScheme.java | 186 +++++++++++++++++ .../cascading/TestParquetTupleScheme.java | 182 +++++++++++++++++ .../src/test/resources/names.txt | 3 + .../src/test/thrift/test.thrift | 25 +++ pom.xml | 26 +++ 16 files changed, 1433 insertions(+) create mode 100644 parquet-cascading3/.cache create mode 100644 parquet-cascading3/REVIEWERS.md create mode 100644 parquet-cascading3/pom.xml create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java create mode 100644 parquet-cascading3/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java create mode 100644 parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java create mode 100644 parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java create mode 100644 parquet-cascading3/src/test/resources/names.txt create mode 100644 parquet-cascading3/src/test/thrift/test.thrift diff --git a/parquet-cascading3/.cache b/parquet-cascading3/.cache new file mode 100644 index 0000000000000000000000000000000000000000..916a4dd2ba8c6c0590282ca9f5ad300ebd1865bf GIT binary patch literal 2492 zcmV;t2}AZDiwFP!000000PS0Ua1>V^f4SuHgC8M;BosOnun~f7c9T?SG1O^DK!O)Z zxddy|DYLita(QHLcX|6ZBv5Hbt<=$WDq~G&tPYH(j58?JDbNX>z}XsU;`Vgi$gk zTU3=zh3>R5Yfe+|(Nd{lq_E5sn!-$G*kaA9rdSrXTpHP>>2==Y&TCv9!f7lC;z$WX zD1uVgCX)|yf$|PzM42QlBW5dB4VvsU^bt%MrV^(GVW{rKGc6OP z8vk?UFA$ze4f5LS$-im;PglQode!MuHG96@e`){I=WnU0gZ^?vu1*IwM9Cm?9iQv- zAth-Trko+_6aQ?*N-}Pnra_tGcJO-1-LLi^-GsKt@iB|xv>YP|s?afcGdGAg+X!X0 zqV*{15Ya(EtF%UHk#;19W$-D(IgKU?LTqn||>7N6@EptdY`fGR3B05oUlK zgQjG~1m@UP+4-j#qJ}|Oi`3%pX%;=Mgw@868V(?H>iw08oa44`-L%<8gQk^9Zquhe zNv;4}zu*^JQ2z&eRaGoiNfJGkeR3Sy2F|-Pr?XjfdQp3|<&#Yz;}EUPkV_7^qGd2ua;Ca=rWKRoq_o*cfRcmZBN<8g9e<@+VFMnm zaP`YjOZlmbln-c;Hv$TX@*OAoaHpbap{`el|AC?wV0F|=Fai)FQrxhqicem8oh`q$ z@vR7ocB2aD-=tVW_oK=o#ezJbMhulgCqwlyF%xi0WM^haZ+CkP%~p-HNi>BKND;Kq zN7b1fXdggjz;KA6$iVth^M&R2{vClb3v!hKmPusryh{g zny1J$G6`1Q>+(ffA$q_U1*&izZV)O6Hvmi_uMb~Xc&WUy?=LPKr*n5ald9d3&hdZF z+-esIY9{tKF*1yQIBV>Yc5uw>luFbPvGYDC_1PqDGTTsYRuhg z;Gz~Fp15uQhBZ(0tcjrVp!>y26+;n{q9elqk9iv8nTM*_P>r=y1ZD+2l=UK2I#` zvjVW|r3ODffCb+_`u4iFs{U3Xmg}q;<*?NsB%=&9`0Eo%n;I5|1W}wFIQWXu{g8fY zi35-7pgSL}S@+ztzrHvjJkC(Xrwj(ipHxo8f*uVsL;uN%w_h5*-0*n_J;2md*ig`+ zz0lJ>%Jprs0;2f9`%hkIJNCJEzAMsm7O$m-&W}|&smVz@-9e|EEe>wT{NtT}sAA2j z2IjA5bF#$U$;K~NKJ?tzZ#yDXcAYsIfK`Cw?KLA$+)TgtXvQH?XQ>bRZHQPGo;iF> ze`LwOIT>fU+UD)j$!QaJ>)7JTpS?T2XJtnOl>(HK7*=06x{2xrWr65JP|ldwQ2`L* z&T*ttg$#Q2S!JzqixUw$9>cB8oo!r)BUXi_rL*a8M`XTd_^oKbm^ z!6S;s(EOM%P~K?k0(&hSMh(9Dkbpw0sEmvI?XIIG+fST5;AN+ zG(R`Z+u7xb`}e$et|Ob>oPGsEbDVRc<0{d4yJ%%0piS&<`O`-kDh1rQ&lPMJVQ$9q z_e{F+*R|$eifXe8xJimKf3;3BWDoBfbUW=IgxFVg>rTcq;B`~J1%jL9pTs3^WvEgt zq(BC4$Si)2kW3l>-fy;@J-1}fCp<@h3q%M(hAu>c| zjjgT*0^$`ARvS0Aauy#K>xB=D8`35l<4Qk&kGVAQ)I@^|-zL9E{`i1s+b4BrLw?Oe zUGQFNt0S$q`j^{wF=DgR#)F)gSl&3uF{-g*1vGxy3f?N62`*f zS6KK8l+9y5r};ipvr|O3l7ym|++ms;QJvp0$yn8hY2jt7W!2CJNy?^TfM2;tnK8+* zQbbPJdQyX9{2h0-%B`}bnzBX`sHz$3s=?2{DB9n*vH32sF^{%MYo$B*(|-Z})X*NL GH2?rC@!&H6 literal 0 HcmV?d00001 diff --git a/parquet-cascading3/REVIEWERS.md b/parquet-cascading3/REVIEWERS.md new file mode 100644 index 0000000000..f7972357e9 --- /dev/null +++ b/parquet-cascading3/REVIEWERS.md @@ -0,0 +1,27 @@ + + +The following reviewers had reviewed the parquet-cascading (pre-Cascading 3.0) project: + +| Name | Apache Id | github id | +|--------------------|------------|-------------| +| Dmitriy Ryaboy | dvryaboy | dvryaboy | +| Tianshuo Deng | tianshuo | tsdeng | + + diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml new file mode 100644 index 0000000000..521a7e747e --- /dev/null +++ b/parquet-cascading3/pom.xml @@ -0,0 +1,119 @@ + + + + org.apache.parquet + parquet + ../pom.xml + 1.8.1 + + + 4.0.0 + + parquet-cascading3 + jar + + Apache Parquet Cascading (for Cascading 3.0 onwards) + https://parquet.apache.org + + + + conjars.org + http://conjars.org/repo + + + + + + org.apache.parquet + parquet-column + ${project.version} + + + org.apache.parquet + parquet-hadoop + ${project.version} + + + org.apache.parquet + parquet-thrift + ${project.version} + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + provided + + + log4j + log4j + ${log4j.version} + provided + + + org.apache.parquet + parquet-column + ${project.version} + test-jar + test + + + org.mockito + mockito-all + 1.9.5 + test + + + cascading + cascading-hadoop + ${cascading3.version} + provided + + + + + + + maven-enforcer-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.thrift.tools + maven-thrift-plugin + 0.1.10 + + ${thrift.executable} + + + + thrift-sources + generate-test-sources + + testCompile + + + + + + + diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java new file mode 100644 index 0000000000..af04b47c8e --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.thrift.TBase; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.hadoop.thrift.ThriftReadSupport; +import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; +import org.apache.parquet.thrift.TBaseRecordConverter; + +public class ParquetTBaseScheme> extends ParquetValueScheme { + + // In the case of reads, we can read the thrift class from the file metadata + public ParquetTBaseScheme() { + this(new Config()); + } + + public ParquetTBaseScheme(Class thriftClass) { + this(new Config().withRecordClass(thriftClass)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate) { + this(new Config().withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate, Class thriftClass) { + this(new Config().withRecordClass(thriftClass).withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(Config config) { + super(config); + } + + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + super.sourceConfInit(fp, tap, jobConf); + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class); + ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class); + } + + @Override + public void sinkConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (this.config.getKlass() == null) { + throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor"); + } + + DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); + DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class); + TBaseWriteSupport.setThriftClass(jobConf, this.config.getKlass()); + } +} diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java new file mode 100644 index 0000000000..4532d3b3f8 --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.parquet.cascading; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.CompositeTap; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.Hfs; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.mapred.Container; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.schema.MessageType; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A Cascading Scheme that converts Parquet groups into Cascading tuples. + * If you provide it with sourceFields, it will selectively materialize only the columns for those fields. + * The names must match the names in the Parquet schema. + * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the + * Parquet schema. + * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be + * flattened to a top-level field in the Cascading tuple. + * + * @author Avi Bryant + */ + +public class ParquetTupleScheme extends Scheme{ + + private static final long serialVersionUID = 0L; + private String parquetSchema; + private final FilterPredicate filterPredicate; + + public ParquetTupleScheme() { + super(); + this.filterPredicate = null; + } + + public ParquetTupleScheme(Fields sourceFields) { + super(sourceFields); + this.filterPredicate = null; + } + + public ParquetTupleScheme(FilterPredicate filterPredicate) { + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) { + super(sourceFields); + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + /** + * ParquetTupleScheme constructor used a sink need to be implemented + * + * @param sourceFields used for the reading step + * @param sinkFields used for the writing step + * @param schema is mandatory if you add sinkFields and needs to be the + * toString() from a MessageType. This value is going to be parsed when the + * parquet file will be created. + */ + public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) { + super(sourceFields, sinkFields); + parquetSchema = schema; + this.filterPredicate = null; + } + + @SuppressWarnings("rawtypes") + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate); + } + + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class); + TupleReadSupport.setRequestedFields(jobConf, getSourceFields()); + } + + @Override + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { + MessageType schema = readSchema(flowProcess, tap); + SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); + + setSourceFields(intersection.getSourceFields()); + + return getSourceFields(); + } + + private MessageType readSchema(FlowProcess flowProcess, Tap tap) { + try { + Hfs hfs; + + if( tap instanceof CompositeTap ) + hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next(); + else + hfs = (Hfs) tap; + + List