Skip to content

Conversation

@aljoscha
Copy link
Contributor

@aljoscha aljoscha commented May 17, 2016

This is a cleanup version of #328, this time for real.

The interesting things are in FlinkPartialReduceFunction/FlinkReduceFunction, FlinkMergingPartialReduceFunction/FlinkMergingReduceFunction and FlinkMergingNonShuffleReduceFunction. The handle all windowing. What the ReduceFnRunner would do is implemented here but without any regard for triggers. All of these implement special cases of windowing: the first two are for general, non-merging windows, the second set is for doing a GroupByKey, the last one is for merging windows. In the last case we cannot do a pre-shuffle combine step because elements are not necessarily in the correct windows prior to the shuffle and merging of windows. We need to have the correct window for side-input access.

R: @kennknowles and @mxm for review

@aljoscha
Copy link
Contributor Author

@kennknowles You added the (new) TestFlinkPipelineRunner to the list of runners in FlinkRunnerRegistrar. Is this necessary for it to be available for the RunnableOnServiceTests. Problem is, that the java8 tests fail on travis with this:

java.lang.IllegalArgumentException: Unknown 'runner' specified 'org.apache.beam.runners.flink.TestFlinkPipelineRunner', supported pipeline runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, DirectPipelineRunner]

there are, however, no changes in those parts of the code and it also doesn't fail on Jenkins (or my machine). The only thing I could think about is the registrar. Or maybe you have an idea about what's going on.

@aljoscha
Copy link
Contributor Author

Also, on the java 8 travis profile this is running too long and being canceled. It seems that the java 8 build is taking way longer to finish, even before these changes. Anyone have any idea why that is?

@kennknowles
Copy link
Member

As for the Java 8 examples issue: My guess is the case is that on Travis we use -DforkCount=0 all the system variables are carried from test to test and need to be reset explicitly always. Should try to fix this.

Separately, I am not sure about the Java 8 tests being slow. I will take a look.

@aljoscha
Copy link
Contributor Author

For examples, I think I fixed it in my latest commit: the java 8 examples pom did not have a Failsafe plugin section like this:

plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-failsafe-plugin</artifactId>
  <configuration>
    <useManifestOnlyJar>false</useManifestOnlyJar>
    <redirectTestOutputToFile>true</redirectTestOutputToFile>
  </configuration>
  <executions>
    <execution>
      <goals>
        <goal>integration-test</goal>
        <goal>verify</goal>
      </goals>
      <configuration>
        <systemPropertyVariables>
          <beamTestPipelineOptions>${integrationTestPipelineOptions}</beamTestPipelineOptions>
        </systemPropertyVariables>
      </configuration>
    </execution>
  </executions>
</plugin>

I think in the maven version on travis the most recently active beamTestPipelineOptions are taken. With the current build order these happen to be from the Flink Runner tests.

I adde the section but I can't check whether it works because travis kills the tests before we reach that part.

For the java-8-taking-to-long part I have a hunch that it could be that Javadoc/src-package building is not correctly disabled on our combination of Maven and Java 8. I don't have anything concrete there, though.

@kennknowles
Copy link
Member

I think Jenkins needs a kick; looks like it gave up due to a conflict, even though I don't see one.

@aljoscha
Copy link
Contributor Author

Is there something I can do to make it rerun the tests or do I have to push a dummy commit?

@kennknowles
Copy link
Member

I think just a dummy commit. My favorite method is to rebase onto apache/master and force push to my fork.

@aljoscha aljoscha force-pushed the flink-windowed-value-batch-cleanup branch from 36291c7 to 9865be2 Compare May 19, 2016 05:41
@mxm
Copy link
Contributor

mxm commented May 19, 2016

I filed a JIRA issue some time ago but it's not possible according to Infra to rerun pull requests in Jenkins other than pushing a new version.

@aljoscha
Copy link
Contributor Author

Ah ok, If you look at all the recent PRs you'll notice that Jenkins fails for all of them with the same message. I wrote an email to the ML about this.

@aljoscha aljoscha force-pushed the flink-windowed-value-batch-cleanup branch from d689dd7 to 54496c3 Compare May 19, 2016 15:34
kennknowles and others added 14 commits May 19, 2016 17:35
This makes the runner available for selection by integration tests.
Today Flink batch supports only global windows. This is a situation we
intend our build to allow, eventually via JUnit category filtering.

For now all the test classes that use non-global windows are excluded
entirely via maven configuration. In the future, it should be on a
per-test-method basis.
Without it the RunnableOnService tests seem to not work
With this change we always use WindowedValue<T> for the underlying Flink
DataSets instead of just T. This allows us to support windowing as well.

This changes also a lot of other stuff enabled by the above:

 - Use WindowedValue throughout
 - Add proper translation for Window.into()
 - Make side inputs window aware
 - Make GroupByKey and Combine transformations window aware, this
   includes support for merging windows. GroupByKey is implemented as a
   Combine with a concatenating CombineFn, for simplicity

This removes Flink specific transformations for things that are handled
by builtin sources/sinks, among other things this:

 - Removes special translation for AvroIO.Read/Write and
   TextIO.Read/Write
 - Removes special support for Write.Bound, this was not working properly
   and is now handled by the Beam machinery that uses DoFns for this
 - Removes special translation for binary Co-Group, the code was still
   in there but was never used

With this change all RunnableOnService tests run on Flink Batch.
All of the stuff in the removed ITCases is covered (in more detail) by
the RunnableOnService tests.
this can be done using a Sysout in a ParDo
@aljoscha aljoscha force-pushed the flink-windowed-value-batch-cleanup branch from 54496c3 to 6c4af76 Compare May 19, 2016 15:35
@aljoscha
Copy link
Contributor Author

aljoscha commented May 19, 2016

@kennknowles that's what I always used to do but I thought we're discouraging it because it messes with github comments.

I finally found the reason why the Java 8 Examples tests were trying to run with the Flink runner. That POM doesn't have a surefire plugin section, so it does not explicitly set the beamTestPipelineOptions. On my machine that wasn't a problem but on travis the last active setting for beamTestPipelineOptions seems to be used. I copied this section from the Java Examples pom to make it work:

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-surefire-plugin</artifactId>
  <configuration>
    <systemPropertyVariables>
      <beamTestPipelineOptions>
      </beamTestPipelineOptions>
    </systemPropertyVariables>
  </configuration>
</plugin> 

I verified that this is actually what happens by printing the beamTestPipelineOptions, in this log we can see that the wrong settings are used for the Java 8 Example tests: https://s3.amazonaws.com/archive.travis-ci.org/jobs/131433948/log.txt. (Search for [INFO] Building Apache Beam :: Examples :: Java 8 All 0.1.0-incubating-SNAPSHOT

Before, this wasn't a problem because there we're no other POMs that were setting this property.

@kennknowles
Copy link
Member

Amusingly enough, since the commits were completely unchanged, the reported status from my PR applies here.

@kennknowles
Copy link
Member

Yes. I wonder if we have another way to improve the hygiene of our use of systemPropertyVariables. This same issue has occurred a few times.

LGTM from me. This is great.

I didn't make line-by-line comments on the side input stuff, but the change I described in that doc on the mailing list will affect the translation of CreatePCollectionView (which will either go away or can be a private primitive) and the place where you call fromIterableInternal in the broadcast var initializer (which if I understand correctly, will be invoking a ViewFn. Basically the same thing). I'll touch base again when that gets closer.

@aljoscha
Copy link
Contributor Author

Thanks, I'll also be following the developments.

I was a nice trick with the copy-PR, I didn't know that the jenkins/travis status would then also apply to this PR. I guess it's because they have the same commit IDs. 😄

I'll merge, run a last time on travis and then commit.

@asfgit asfgit closed this in af8f593 May 20, 2016
TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());

TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove the optimized CoGroupByKey translator? Is this within the scope of this PR?

@mxm
Copy link
Contributor

mxm commented May 20, 2016

Really great work @aljoscha! I would have liked to check out the changes in detail but didn't get to it before the merge. I'll leave a note next time to make this more explicit. Besides the great changes to the batch translation, there are some questionable cleanups that don't look like they should be part of this PR. Most prominently, the removal of the native translators and the test cases.

@aljoscha
Copy link
Contributor Author

The initial reason for the changes was to make all RunnableOnService tests work with the Flink Batch runner and make the behavior 100 % compliant with Beam. The Flink specific translations behave differently from the Beam internal sources/sinks, that's why I removed them They also didn't support all the features that the Beam-internal versions support. Removing them broke all test since the tests verify the Flink-specific behavior. All features are covered in more depth by the RunnableOnService tests, that's why I removed them.

@mxm
Copy link
Contributor

mxm commented May 20, 2016

Fair enough. The RunnableOnService tests seem to be very comprehensive. Removing the native translators also alleviates some maintenance work of the Flink Runner.

@aljoscha
Copy link
Contributor Author

Exactly. 😄

@aljoscha aljoscha deleted the flink-windowed-value-batch-cleanup branch June 19, 2016 09:39
dhalperi added a commit to dhalperi/beam that referenced this pull request Aug 23, 2016
iemejia pushed a commit to iemejia/beam that referenced this pull request Jan 12, 2018
pl04351820 pushed a commit to pl04351820/beam that referenced this pull request Dec 20, 2023
makes use of the updated plugin for generating DocFX YAMLs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants