Skip to content

Conversation

@grzegorz8
Copy link

@grzegorz8 grzegorz8 commented Feb 1, 2018

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.


@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
AppliedPTransform<?, ?, ?> appliedPTransform = node.toAppliedPTransform(pipeline);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgroh Is there a better way of getting the AppliedPTransform from the node here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove the pipeline field and call getPipeline(), which is available while traversing the pipeline if you're using PipelineVisitor.Defaults as the root of your visitor hierarchy. Otherwise, this is basically exactly how I'd do it.

}
}

public static IsBounded.Enum isBounded(AppliedPTransform<?, ?, ?> transform) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgroh Is this a good place to put such a method or is there already an easier method for determining whether a PTransform/PCollection is unbounded?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is precisely sourceIsBounded for all well formed ReadPayloads (one method up)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I thought that as well but it takes a ReadPayload and when traversing the pipeline we don't know which transform is a read.

Another alternative would be

try {
  rawSource = ReadTranslation.unboundedSourceFromTransform(
      (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
          node.toAppliedPTransform(getPipeline()));
  // we have an unbounded source
} catch (IOException e) {
  // no unbounded source
}

but I'm not sure I like the exceptions-as-control-flow thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah; I see. If we don't know that it's a read transform I'm generically opposed to routing it through ReadTranslation, plus I also don't really like the use of exceptions as control flow.

Each PCollection contains its boundedness, so you can just inspect the outputs of the transform (which are labelled as PValues, but I believe are all PCollections at this point; maybe worth the check regardless). They have the PCollection.isBounded() method, in both proto and java forms

@aljoscha
Copy link
Contributor

aljoscha commented Feb 1, 2018

retest this please

1 similar comment
@aljoscha
Copy link
Contributor

aljoscha commented Feb 1, 2018

retest this please

@aljoscha
Copy link
Contributor

aljoscha commented Feb 1, 2018

UnboundedSourceWrapperTest is failing in the gradle build but it's not failing when I run in the IDE. I'll try running gradle locally now.

@mingmxu
Copy link

mingmxu commented Feb 2, 2018

I run a test locally and it seems the error is still there, and the test doesn't cover View.

@aljoscha
Copy link
Contributor

aljoscha commented Feb 5, 2018

@xumingmin Do you have that test as a nicely isolated thing that I could run so that we can debug this?

@mingmxu
Copy link

mingmxu commented Feb 5, 2018

@aljoscha it should be a false alarm. I move my test code(see below) to class FlinkPipelineExecutionEnvironmentTest and it pass. In short, good to go to me.

FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
    options.setRunner(TestFlinkRunner.class);
    options.setFlinkMaster("[auto]");

    FlinkRunner flinkRunner = FlinkRunner.fromOptions(options);
    FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
    Pipeline pipeline = Pipeline.create();

    final PCollectionView<Integer> lkpPCollection = pipeline.apply("src2", Create.of(1))
        .apply(View.<Integer>asSingleton());

    PCollection<String> mainPCollection = pipeline
        .apply(KafkaIO.<String, String>read()
            .withBootstrapServers("localhost:9092")
            .withTopic("teststream")
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata()
            )
        .apply(Values.create())
        ;

    mainPCollection.apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext c) throws Exception {
        if (Long.valueOf(c.element()) % 2 == c.sideInput(lkpPCollection)) {
          c.output(c.element());
        }
      }
    }).withSideInputs(lkpPCollection))

        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
        .apply(TextIO.write().withNumShards(1).withWindowedWrites().to("/dummy/path"));

    flinkEnv.translate(flinkRunner, pipeline);

asfgit pushed a commit that referenced this pull request Feb 6, 2018
@aljoscha
Copy link
Contributor

aljoscha commented Feb 6, 2018

Thanks a lot for working on this and coming up with the better solution. 👍

I'm merging...

@aljoscha aljoscha closed this Feb 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants