Skip to content

Conversation

@kennknowles
Copy link
Member

@kennknowles kennknowles commented May 5, 2016

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

This is a sample configuration for now.

There are these kind of failures in the tests right now:

  1. Since the batch runner only supports global windows, I've filtered those tests out. I added the UnsupportedOperationException to the windowing translator so I could distinguish them.
  2. Those tests that have simple & supported pipelines succeed at building the pipeline, but somehow the graph is empty - I have checked and it seemed like translators are not even being invoked. This is beyond my current scope of digging in.
  3. Some other tests fail with other misc errors. Perhaps they use state, which results in NullPointerException.
  4. Pretty much all of the tests require side inputs since that is how PAssert works, so they cannot work in streaming mode.

@kennknowles
Copy link
Member Author

R: @mxm @aljoscha

I'm thinking that it would be good to get the configuration in place and then work towards re-enabling the tests. We'll need to disable many more, but I first wanted to get feedback on whether I just missed something obvious that would get all the batch tests to run.

I think it still makes sense to pull the first two commits either way.

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davorbonaci @jasonkuster @lukecwik my maven-fu might be lacking. I think I can remove the phase and goals here, since the execution has an id, and maybe lift some of the configuration, but most importantly, I could not get runnableOnServicePipelineOptions to do anything, instead having to set beamTestPipelineOptions. This seems mostly fine but please advise.

Copy link
Member

@lukecwik lukecwik May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to drop the whole plugin configuration and either:

  • do as we did in the dataflow runner pom to have a profile dependent on runnableOnService existing and then jenkins could be configured to build the flink pipeline and dependent modules with a system property on the jenkins command line which sets the runnableOnServicePipelineOptions as you have listed before
  • always run the jenkins tests part of the regular integration-test run by hardcoding runnableOnServicePipelineOptions in this module and having a trivial configuration part of builds/plugins:
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <executions>
                <id>runnable-on-service-tests</id>
              </execution>
            </executions>
          </plugin>

I don't know if you need the executions block or not, I would think it would inherit it from plugin but could be wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely need two executions for now, since we need to pass --streaming=true and --streaming=false. It actually seems to confuse Jenkins, since the test class is duplicated. It probably will also confuse it with the direct runner and spark runner tests.

For Flink and Spark, running against a local endpoint as a unit test (not really an integration test per se) is reasonable. We want to support both, so Jenkins can also run a real integration test against a cluster by overrides on the command line.

@kennknowles kennknowles force-pushed the flink-integration branch from 750a49d to 9bf7ec0 Compare May 6, 2016 01:36
@aljoscha
Copy link
Contributor

aljoscha commented May 6, 2016

@kennknowles I created a PR against your branch. The problem was that the Flink Batch API does not allow dangling operators. I fixed it by keeping track of dangling operators and terminating them with a dummy sink before execution.

I also changed the parallel option of surefire to none, otherwise the JVM would burst from all the Flink Testing Clusters. There are some failing tests, but these should be proper failures because of shortcomings in the Flink runner now.

@mxm
Copy link
Contributor

mxm commented May 6, 2016

@aljoscha I've had to disable parallel execution of the tests before. The Flink testing clusters are not light weight, they basically start a real cluster environment inside a single JVM.

Those tests that have simple & supported pipelines succeed at building the pipeline, but somehow the graph is empty.

Should be resolved with Aljoscha's fix. Thanks for fixing Aljoscha.

@kennknowles Could we add Aljoscha's fix to the commits up this PR? Your changes look good otherwise. Thanks for adding the Window.Bound translator. It doesn't really fix the windowing in batch when grouping by key but at least the Pipeline translator gives a meaningful error message.

Pretty much all of the tests require side inputs since that is how PAssert works, so they cannot work in streaming mode.

I wonder if we could fix that by making PCollectionViews available locally such that they can be accessed by the tests. Probably better to spend the time on implementing proper side inputs :)

@aljoscha
Copy link
Contributor

aljoscha commented May 6, 2016

I added some more fixes to my PR. I got it down quite a bit but these are still failing (in addition to the tests that @kennknowles already disabled in the pom):

failed tests:
  PAssertTest.testBasicMatcherFailure:174->runExpectingAssertionFailure:307 assertion should have failed
  PAssertTest.testContainsInAnyOrderFalse:286->runExpectingAssertionFailure:307 assertion should have failed
Tests in error:
  ParDoTest.testWindowingInStartAndFinishBundle:1380 » UnsupportedOperation In o...

The last one fails because of missing window support in Flink batch. The first two fail because of a pretty fundamental problem: Flink batch does not allow emission of null values from inputs or user operations (except in some corner cases). The PAssert tests rely on this behavior of Beam to work.

In one of the commits I removed the special TypeSerializer for VoidCoder and also our special implementation of Create in favor of the CreateSource. Before, we got away by always intercepting the VoidCoder and wrapping it in a TypeSerializer that would always emit a VoidValue instead of a proper null. This way, we could emit "null values" from our special implementation of Create. If the user fn reads these, this will not be consistent with how it should behave, however. I also had to disable our (weirdly named) MaybeEmptyTestITCase because of this.

@kennknowles kennknowles force-pushed the flink-integration branch from 1bc6957 to b98ee40 Compare May 6, 2016 17:36
@kennknowles
Copy link
Member Author

kennknowles commented May 6, 2016

Nice use of branch-to-branch PR :-)

I added a couple more tweaks to get things closer.

  1. A made the nulls in PAssert into ints. They are just dummy values and there is no need to deal with null support just to get tests working.
  2. Allow assertion errors to surface via TestFlinkPipelineRunner. My code there is definitely not a good idea, just a hack. In the other existing runners this happens a little differently: all invocations of user code are protected and wrapped in a UserCodeException which then surfaces and is unwrapped to the original exception. Then PAssertTest just catches the original exception.
  3. Added the first bit of filtering based on the capability matrix, to exclude tests that require windowing that is not supported.

Incidentally, running this on my desktop I'm often getting Insufficient number of network buffers: required 12, but only 11 available. The total number of network buffers is currently set to 2048. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'". I haven't spent any time looking into this, but perhaps there is a default that should go into the pom?

@aljoscha
Copy link
Contributor

aljoscha commented May 7, 2016

Hehe, yes 😃

Some of it is hacky but I think we're getting there. I'll finally start working for real on side inputs for streaming next week, this should also be interesting.

The Insufficient number of network buffers I only got before I put <parallel>none</parallel> never after I changed it. You also see the tests slowly executing one after another?

We also have more failing tests now the the PAssert seems to work properly. You are not working on fixing those, right? I would start going through them and figuring out what's going on if that's alright with you. I would then create another PR against this PR.

@aljoscha
Copy link
Contributor

aljoscha commented May 9, 2016

I fixed another round of tests and did a PR against your branch. Most of them were caused by Flink not supporting null values so I replaced all of the dummy Create((Void) null) by a Create(1).

The last set of failing tests is caused by Flink Batch not using a WindowedValue underneath as does Flink Streaming, so everything that uses timestamps is failing. I opened a separate Jira to track that: https://issues.apache.org/jira/browse/BEAM-270

Failed tests:
  CreateTest.testCreateTimestamped
Expected: iterable over ["b:2", "a:1", "c:3"] in any order
     but: Not matched: "a:1462795571142"
  WithTimestampsTest.withTimestampsBackwardsInTimeAndWithAllowedTimestampSkewShouldSucceed
Expected: iterable over [<KV{2147483647, 1970-01-25T20:31:22.647Z}>, <KV{1234, 1970-01-01T00:00:00.234Z}>, <KV{0, 1969-12-31T23:59:59.000Z}>, <KV{946684800000, 1999-12-31T23:59:59.000Z}>] in any order
     but: Not matched: <KV{946684800000, 2016-05-09T12:06:30.851Z}>
  WithTimestampsTest.withTimestampsShouldApplyTimestamps
Expected: iterable over [<KV{946684800000, 2000-01-01T00:00:00.000Z}>, <KV{2147483647, 1970-01-25T20:31:23.647Z}>, <KV{1234, 1970-01-01T00:00:01.234Z}>, <KV{0, 1970-01-01T00:00:00.000Z}>] in any order
     but: Not matched: <KV{0, 2016-05-09T12:06:30.431Z}>
Tests in error:
  CountingInputTest.testUnboundedInputTimestamps:147 » Runtime Pipeline executio...
  CountingSourceTest.testUnboundedSourceTimestamps:155 » Runtime Pipeline execut...

@kennknowles
Copy link
Member Author

I think we can solve this last one with another JUnit category for timestamp control, which is a row on the capability matrix so it fits.

@aljoscha
Copy link
Contributor

aljoscha commented May 9, 2016

Yeah, I thought so as well.

@kennknowles
Copy link
Member Author

kennknowles commented May 9, 2016

One thing that we need to consider: null values are a language-specific concept. The language-agnostic conception is really "PCollection containing elements encoded with VoidCoder". In modern type systems, "PCollection of VoidCoder" would be interpreted as a collection of Unit or () since it is actually a type with a single inhabitant, not a type with zero inhabitants.

More broadly, as long as a user can provide a Coder<T> for their PCollection<T> the runner should mostly be agnostic of T, except for special types that primitive operations need to understand. In this narrow case, each SDK instantiates VoidCoder as appropriate to invoke a DoFn written in that language.

So it may be more appropriate to have the behavior you mentioned before, where the Flink runner notices the use of root level VoidCoder and box the values as needed to run it on Flink, unboxing when a DoFn is invoked. Is there a particular reason to not have it?

@aljoscha
Copy link
Contributor

aljoscha commented May 9, 2016

There is no real problem preventing this, no. But you're right, that the system should be able to handle everything that the user can provide a Coder for. We didn't do it yet because it would require us to always do boxing/unboxing. I thought about another way we could do it: we can move the layer at which we use the Coder to decode/encode. Right now, the coder is wrapped in a TypeSerializer, which is more or less the Flink equivalent to a Coder. We could also always transmit byte arrays, that is, Flink would only ever see a TypeSerializer<byte[]> and we would only decode at the last step before the user gets the value.

@aljoscha
Copy link
Contributor

I thought about it some more. The problem with null should go away once we switch to using WindowedValue everywhere: https://issues.apache.org/jira/browse/BEAM-270

@aljoscha
Copy link
Contributor

@kennknowles Are you planning to merge this soon or will we leave it open until the other stuff is fixed as well?

@kennknowles
Copy link
Member Author

@aljoscha I think we can start to get useful coverage from this before all the tests pass. I just want to use some combination of <exclude> and <excludedGroups> to get things green, and we can work from there in mainline. It will save merge trouble for sure.

So, considering the new ideas about VoidCoder, my thoughts are to just revert the switch to integers in those places where it might actually affect data size (like Combine.globally), add needed exclusions, then merge it. Optionally, also could also revert PAssert though it doesn't matter so much there.

What do you think?

@aljoscha
Copy link
Contributor

Sounds good, you can also revert the VoidCoder fix in PAssert. I think I'm very close to fixing everything, including support for windows.

@kennknowles
Copy link
Member Author

Cool. I feel like rebasing this to remove some of the experiments would be nice. I don't want to conflict with your current work, so let me know if/when you think it would be safe for me to rebase, add exclusions, and merge.

@aljoscha
Copy link
Contributor

I think you can go ahead and rebase/clean up. I've got it working except for merging windows, but I still want to go over the code another time so I'll rebase on master then.

By the way, for the batch windowing I'm completely ignoring triggers. Is this also what the Dataflow runner does? This essentially does a shuffle by key and window and merges elements using a CombineFn. (Except for merging windows where I can't do the shuffle by key and window but just by key.)

@kennknowles
Copy link
Member Author

Yes, ignoring triggers in batch is fine. I have a design to doc share about that as soon as I can patch it up...

@kennknowles
Copy link
Member Author

@amitsela the changes to PAssert here actually cause some Spark runner failures. I cherry-picked just the PAssert changes to be sure, and it appears the same.

Given that this shouldn't actually be a semantic change, my thought is that there might be something going on with null values in the Spark runner as well (possibly causing a no-op PAssert?). It looks like some PCollectionView is not set when it is read as a side input in the PAssert, but after 5 minutes of looking around I don't have a clear idea what I'd need to do. I hope you don't mind, but to unblock things I think I will disable these tests temporarily as I think they were falsely passing before.

@aljoscha
Copy link
Contributor

I saw this as well in the Flink tests. Once I fixed the null value handling and side inputs I saw a lot more failing tests. I thinks what we need is a test that only verifies that null values work, one that checks whether a PAssert properly fails when it should and one that checks whether they succeed when they should.

@kennknowles
Copy link
Member Author

The existing PAssertTest might have enough (but maybe could have more) but it requires this PR (for Flink) and #294 (for Spark). Side note: We'd like to have a PAssert that can run without side input support, when possible, so runners can get signal earlier.

@kennknowles
Copy link
Member Author

If I take your prior comments as "LGTM" I will merge when Travis finishes up.

@aljoscha
Copy link
Contributor

Ah yes, I wasn't aware that we really need so see the actual text "LGTM". 😉

I also have #328 which fixes the remaining tests. It got a bit more complicated than I though, especially getting CombineFnWithContext with windowed side inputs and side outputs to work was a bit of a PITA. I'll rebase that one once you merge this here PR.

kennknowles and others added 22 commits May 12, 2016 19:45
This makes the runner available for selection by integration tests.
Today Flink batch supports only global windows. This is a situation we
intend our build to allow, eventually via JUnit category filtering.

For now all the test classes that use non-global windows are excluded
entirely via maven configuration. In the future, it should be on a
per-test-method basis.
We're now using a PerKeyCombineFnRunner for all interaction with the
CombineFn. This required adding a proper ProcessContext in
FlinkReduceFunction and FlinkPartialReduceFunction, along with adding
support for side inputs there.
This does not work because Flink Batch does not allow sending null
elements. This is a pretty deep thing and hard to fix.

In an earlier commit I removed the special TypeSerializer for VoidCoder.
Before, we got away by always intercepting the VoidCoder and wrapping it in a
TypeSerializer that would always emit a VoidValue instead of a proper
null. If the user fn reads this, this will not be consistent with how it
should behave, however.
The single null value is only used as a dummy, thus can also be an
integer. This makes it work with runners that don't support sending null
values.
The single null value is only used as a dummy, thus can also be an
integer. This makes it work with runners that don't support sending null
values.
The single null value is only used as a dummy, thus can also be an
integer. This makes it work with runners that don't support sending null
values.
@kennknowles
Copy link
Member Author

I believe this is superseded by #328.

@kennknowles kennknowles mentioned this pull request May 13, 2016
4 tasks
@kennknowles kennknowles deleted the flink-integration branch November 12, 2016 03:00
iemejia pushed a commit to iemejia/beam that referenced this pull request Jan 12, 2018
pl04351820 pushed a commit to pl04351820/beam that referenced this pull request Dec 20, 2023
* test: update conf tests

* test: use relative directories, not hardcoded, in conf test makefile

* test: add format at end of generation

* test: remove unused var

* Updated Makefile to make less assumptions about project structure

Co-authored-by: Craig Labenz <craig.labenz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants