Skip to content

PARQUET-480: Update for Cascading 3.0#284

Closed
cchepelov wants to merge 11 commits intoapache:masterfrom
cchepelov:try_cascading3
Closed

PARQUET-480: Update for Cascading 3.0#284
cchepelov wants to merge 11 commits intoapache:masterfrom
cchepelov:try_cascading3

Conversation

@cchepelov
Copy link
Copy Markdown

The code in parquet-cascading is adapted to the API as of Cascading 2.5.3

Some incompatible changes were introduced in Cascading 3.0. This patch forks the parquet-cascading module to also provide a parquet-cascading3 module, which is about identical save for overloads which changed from requiring a Foo to requiring a Foo<? extends JobConf>

@julienledem
Copy link
Copy Markdown
Member

thanks @cchepelov
I'd like to avoid just plain duplicating all the code.
Could you separate code that does not change and code that does?
Surely every user of Cascading will have trouble with those incompatible changes.

@cchepelov
Copy link
Copy Markdown
Author

Thanks for your feedback @julienledem

Agreed, making a "cp -R" fork as I did is not very tasty, let's find how to to resolve this the best way.

$ diff -urN parquet-cascading parquet-cascading3|diffstat
 REVIEWERS.md                                                           |    4 ++
 pom.xml                                                                |    8 ++---
 src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java     |    4 +-
 src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java     |   14 +++++-----
 src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java     |    6 ++--
 src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java |    2 -
 6 files changed, 21 insertions(+), 17 deletions(-)

out of 15 files.

Now the trick things:

  • cascading 3.0 broke a couple API, which it really had to.
  • you can't compile something with both cascading 2.0 and 3.0 jars in the classpath
  • you can't run something with both cascading 2.0 and 3.0 jars in the classpath
  • there is no #IFDEF in Java

So, the question is: do you want me to split things that don't change into a "parquet-cascading-23common.jar" (built with e.g. cascading 2.x "provided"), keeping the respective 2.0 and 3.0-specific code in parquet-cascading / parquet-cascading3?

I'd be happy to do so if this is needed. Provisionally, it'd look like this:

parquet-cascading-23common:
  - src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
  - src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
  - src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
  - src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
  - src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
  - src/test/thrift/test.thrift
  - src/test/resources/names.txt
  - src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java

parquet-cascading-2 and parquet-cascading-3, each (test jar also depends on parquet-cascading-23common's test jar):
  - src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
  - src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
  - src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
  - src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
  • the required maven plumbing to ensure the 4 "leaf" packages depend on the parquet-cascading-23common / -test jars.
    All consumers of parquet-cascading will have to pull two jars to work.

Does this look like what you had in mind?

@julienledem
Copy link
Copy Markdown
Member

@cchepelov Looking for a good way to do this without making it too complicated. What does the diff look like between the Scheme classes?

@cchepelov
Copy link
Copy Markdown
Author

Sure, here's one:

--- ./parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java  2015-10-21 13:29:53.725167935 +0200
+++ ./parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java 2015-10-21 14:40:37.830312750 +0200
@@ -101,7 +101,7 @@

   @SuppressWarnings("rawtypes")
   @Override
-  public void sourceConfInit(FlowProcess<JobConf> fp,
+  public void sourceConfInit(FlowProcess<? extends JobConf> fp,
       Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {

     if (filterPredicate != null) {
@@ -114,7 +114,7 @@
  }

  @Override
- public Fields retrieveSourceFields(FlowProcess<JobConf> flowProcess, Tap tap) {
+ public Fields retrieveSourceFields(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
     MessageType schema = readSchema(flowProcess, tap);
     SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields());

@@ -123,7 +123,7 @@
     return getSourceFields();
   }

-  private MessageType readSchema(FlowProcess<JobConf> flowProcess, Tap tap) {
+  private MessageType readSchema(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
     try {
       Hfs hfs;

@@ -144,7 +144,7 @@
     }
   }

-   private List<Footer> getFooters(FlowProcess<JobConf> flowProcess, Hfs hfs) throws IOException {
+   private List<Footer> getFooters(FlowProcess<? extends JobConf> flowProcess, Hfs hfs) throws IOException {
      JobConf jobConf = flowProcess.getConfigCopy();
      DeprecatedParquetInputFormat format = new DeprecatedParquetInputFormat();
      format.addInputPath(jobConf, hfs.getPath());
@@ -153,7 +153,7 @@

    @SuppressWarnings("unchecked")
   @Override
-  public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
+  public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[], RecordReader> sc)
       throws IOException {
     Container<Tuple> value = (Container<Tuple>) sc.getInput().createValue();
     boolean hasNext = sc.getInput().next(null, value);
@@ -169,7 +169,7 @@

   @SuppressWarnings("rawtypes")
   @Override
-  public void sinkConfInit(FlowProcess<JobConf> fp,
+  public void sinkConfInit(FlowProcess<? extends JobConf> fp,
           Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
     DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
     jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
@@ -182,7 +182,7 @@
   }

   @Override
-  public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sink)
+  public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector> sink)
           throws IOException {
     TupleEntry tuple = sink.getOutgoingEntry();
     OutputCollector outputCollector = sink.getOutput();

(very similar things happen in the rest; it's really mostly mechanical)

@julienledem
Copy link
Copy Markdown
Member

I wonder if we could work around duplicating this part by just having the raw type in the parameters:
For example:

public void sink(FlowProcess fp, SinkCall<Object[], OutputCollector> sink)

This would have warnings but should compile and work in both cases.

@cchepelov
Copy link
Copy Markdown
Author

Um, would it, including for Scala consumers ? It's worth a try. Let me get back to you tomorrow.

Sur 8 déc. 2015 20:55, à 20:55, Julien Le Dem notifications@github.com a écrit:

I wonder if we could work around duplicating this part by just having
the raw type in the parameters:
For example:

public void sink(FlowProcess fp, SinkCall<Object[], OutputCollector>
sink)

This would have warnings but should compile and work in both cases.


Reply to this email directly or view it on GitHub:
#284 (comment)

@cchepelov
Copy link
Copy Markdown
Author

(got sidetracked, will come back at the issue once got an initial "scalding on cascading3" build up&running and ready to go into a "polish issues" phase)

@johnynek
Copy link
Copy Markdown

-1 to the raw type suggestion in general. Scala can have a lot of issues with that.

@julienledem
Copy link
Copy Markdown
Member

Hi @johnynek ! Nice to see you around :)
If you have suggestions to avoid the code duplication here, that would be helpful. I'm sure other libraries had to deal with the Cascading 3 incompatible changes.

@johnynek
Copy link
Copy Markdown

This file should not be added right: parquet-cascading3/.cache (I could not comment on a binary file).

@johnynek
Copy link
Copy Markdown

@julienledem Hello, sir, I hope all is well. I don't know a good way to deal with the duplication without some templateing and build system hacking. That said, I don't have a big issue, since I hope we can all move on to cascading 3 soon. Also, there is not a ton of duplication, and the code is not high velocity, so I'd make the compomise.

:/ my idealism wanes?

@cchepelov
Copy link
Copy Markdown
Author

cchepelov commented Jan 15, 2016 via email

@cchepelov cchepelov force-pushed the try_cascading3 branch 2 times, most recently from 4cc15d0 to 87c570f Compare January 27, 2016 15:47
@cchepelov
Copy link
Copy Markdown
Author

Hi @julienledem ; apparently Maven and I found a way to agree on whether and how to share unchanged code between parquet-cascading and parquet-cascading3 / cc @johnynek

@johnynek
Copy link
Copy Markdown

Looms good to me! Thanks for doing the work to minimize duplication!.

@julienledem cool?

@julienledem
Copy link
Copy Markdown
Member

Thanks @cchepelov it looks much better.
It looks like we could do a few things to get duplication to almost nothing.

Here is the diff for the remaining duplicated code:

$ diff parquet-cascading/src/main/java/org/apache/parquet/cascading/ parquet-cascading3/src/main/java/org/apache/parquet/cascading/
diff parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
60c60
<   public void sourceConfInit(FlowProcess<JobConf> fp,
---
>   public void sourceConfInit(FlowProcess<? extends JobConf> fp,
69c69
<   public void sinkConfInit(FlowProcess<JobConf> fp,
---
>   public void sinkConfInit(FlowProcess<? extends JobConf> fp,
diff parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
104c104
<   public void sourceConfInit(FlowProcess<JobConf> fp,
---
>   public void sourceConfInit(FlowProcess<? extends JobConf> fp,
117c117
<  public Fields retrieveSourceFields(FlowProcess<JobConf> flowProcess, Tap tap) {
---
>  public Fields retrieveSourceFields(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
126c126
<   private MessageType readSchema(FlowProcess<JobConf> flowProcess, Tap tap) {
---
>   private MessageType readSchema(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
147c147
<    private List<Footer> getFooters(FlowProcess<JobConf> flowProcess, Hfs hfs) throws IOException {
---
>    private List<Footer> getFooters(FlowProcess<? extends JobConf> flowProcess, Hfs hfs) throws IOException {
156c156
<   public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
---
>   public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[], RecordReader> sc)
172c172
<   public void sinkConfInit(FlowProcess<JobConf> fp,
---
>   public void sinkConfInit(FlowProcess<? extends JobConf> fp,
185c185
<   public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sink)
---
>   public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector> sink)
diff parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
141c141
<   public void sourceConfInit(FlowProcess<JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) {
---
>   public void sourceConfInit(FlowProcess<? extends JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, JobConf jobConf) {
156c156
<   public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
---
>   public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[], RecordReader> sc)
171c171
<   public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sc)
---
>   public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector> sc)
diff parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java parquet-cascading3/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
45,49d44
<   public String getName() {
<     return "cascading";
<   }
< 
<   @Override
  • In TupleWriteSupport, I'm not sure why you needed to remove getName(), but it seems this could be handled in a way that you can move this class to the common tree and use Inheritance to change the name (if necessary);
  • For the rest it seems only the methods' signature change, so you could in theory just wrap a common implementation that does not extend Scheme but has all the implementation. if you'd rather not do that then add @deprecated annotation on the cascading2 classes to show that we're going to delete them at some point.

@cchepelov
Copy link
Copy Markdown
Author

diff parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java parquet-cascading3/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
45,49d44
< public String getName() {
< return "cascading";
< }
<
< @OverRide

  • In TupleWriteSupport, I'm not sure why you needed to remove
    getName(), but it seems this could be handled in a way that you
    can move this class to the common tree and use Inheritance to
    change the name (if necessary);

Looks like an oversight from my part, I'll probably do just that, move
it back next to TupleReadSupport.java.

  • For the rest it seems only the methods' signature change, so you
    could in theory just wrap a common implementation that does not
    extend Scheme but has all the implementation. if you'd rather not
    do that then add @deprecated https://github.com/Deprecated
    annotation on the cascading2 classes to show that we're going to
    delete them at some point.

Yes, it /might/ be possible to play with the Java Type system so that
the same source code compiles against both Cascading 2.x and Cascading
3.x dependencies… but then I must make sure the resulting jars are also
equivalent with respect to the Scala compiler's type system.

I'd rather go with the second route (stick @deprecated("This
implementation is made for Cascading 2.x, which is deprecated now
Cascading 3.x is out. Support is in the parquet-cascading3 module") on
all of parquet-cascading's remaining unique classes), if that's fine
with you.

@julienledem
Copy link
Copy Markdown
Member

@cchepelov Yep that's fine. Thanks again.

@cchepelov
Copy link
Copy Markdown
Author

Thanks for the feedback @julienledem !

@julienledem
Copy link
Copy Markdown
Member

@cchepelov did you have a jira for this?
The Pull request need to be prefixed by PARQUET-XXX:
PARQUET-XXX: Update for Cascading 3.0
where PARQUET-XXX is your jira id.
That's for tracking the changelog in git.

@cchepelov cchepelov changed the title Update for Cascading 3.0 [PARQUET-480] Update for Cascading 3.0 Jan 29, 2016
@cchepelov cchepelov changed the title [PARQUET-480] Update for Cascading 3.0 PARQUET-480: Update for Cascading 3.0 Jan 29, 2016
@cchepelov
Copy link
Copy Markdown
Author

@julienledem aah, forgot that. Done.

@julienledem
Copy link
Copy Markdown
Member

@cchepelov Could you rebase your branch? Thank you

@cchepelov
Copy link
Copy Markdown
Author

@julienledem done!

@asfgit asfgit closed this in 5769479 Feb 1, 2016
@julienledem
Copy link
Copy Markdown
Member

Thank you @cchepelov !

piyushnarang pushed a commit to piyushnarang/parquet-mr that referenced this pull request Jun 15, 2016
The code in parquet-cascading is adapted to the API as of Cascading 2.5.3

Some incompatible changes were introduced in Cascading 3.0. This patch forks the parquet-cascading module to also provide a parquet-cascading3 module, which is about identical save for overloads which changed from requiring a Foo<JobConf> to requiring a Foo<? extends JobConf>

Author: Cyrille Chépélov (TP12) <cch@transparencyrights.com>

Closes apache#284 from cchepelov/try_cascading3 and squashes the following commits:

e7d1304 [Cyrille Chépélov (TP12)] Adding a @deprecated notice on parquet-cascading's remaining classes
05a417d [Cyrille Chépélov (TP12)] cascading2/3: share back TupleWriteSupport.java (accidentally unmerged)
7fff2d4 [Cyrille Chépélov (TP12)] cascading/cascading3: remove duplicates, push common files into parquet-cascading-common23
338a416 [Cyrille Chépélov (TP12)] Removing unwanted file (what?!) + .gitignoring this kind of files
d9f0455 [Cyrille Chépélov (TP12)] TupleEntry#get is now TupleEntry#getObject
a7f490a [Cyrille Chépélov (TP12)] Revert "Missing test conversion to Cascading 3.0"
cc8b870 [Cyrille Chépélov (TP12)] Missing test conversion to Cascading 3.0
2d73512 [Cyrille Chépélov (TP12)] conflicting values can come in one order or the other. Accept both.
33355d5 [Cyrille Chépélov (TP12)] Fix version mismatch (duh!)
7128639 [Cyrille Chépélov (TP12)] non-C locale can break tests implementation (decimal formats)
53aa2f9 [Cyrille Chépélov (TP12)] Adding a parquet-cascading3 module (forking the parquet-cascading module and accounting for API changes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants