From 8982c63a2217026728dd52e37d1df14d5d298fb1 Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Thu, 3 Jun 2021 17:39:14 -0700 Subject: [PATCH 01/12] Add interface to access I/O topic information for a Samza Beam job --- .../org/apache/beam/runners/samza/BeamIO.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/BeamIO.java diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/BeamIO.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/BeamIO.java new file mode 100644 index 000000000000..93414c46c322 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/BeamIO.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.runners.TransformHierarchy; + +/** + * Interface to get I/O information for a Beam job. This will help add I/O information to the Beam + * DAG. + */ +@Experimental +public interface BeamIO { + + /** Get I/O topic name and cluster. */ + String getIOInfo(TransformHierarchy.Node node); + + /** A registrar for {@link BeamIO}. */ + interface BeamIORegistrar { + BeamIO getBeamIO(); + } +} From 341d589ff45a69fd401d3962016f6f04197de02b Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Thu, 3 Jun 2021 19:06:00 -0700 Subject: [PATCH 02/12] Add JSON config for Beam DAG visualization --- .../samza/{BeamIO.java => SamzaIOInfo.java} | 6 +- .../beam/runners/samza/SamzaRunner.java | 12 +- .../samza/renderer/PipelineJsonRenderer.java | 196 ++++++++++++++++++ .../runners/samza/renderer/package-info.java | 20 ++ .../renderer/PipelineJsonRendererTest.java | 81 ++++++++ .../samza/src/test/resources/ExpectedDag.json | 97 +++++++++ 6 files changed, 408 insertions(+), 4 deletions(-) rename runners/samza/src/main/java/org/apache/beam/runners/samza/{BeamIO.java => SamzaIOInfo.java} (91%) create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java create mode 100644 runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java create mode 100644 runners/samza/src/test/resources/ExpectedDag.json diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/BeamIO.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java similarity index 91% rename from runners/samza/src/main/java/org/apache/beam/runners/samza/BeamIO.java rename to runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java index 93414c46c322..fdb2a3bed70a 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/BeamIO.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java @@ -25,13 +25,13 @@ * DAG. */ @Experimental -public interface BeamIO { +public interface SamzaIOInfo { /** Get I/O topic name and cluster. */ String getIOInfo(TransformHierarchy.Node node); - /** A registrar for {@link BeamIO}. */ + /** A registrar for {@link SamzaIOInfo}. */ interface BeamIORegistrar { - BeamIO getBeamIO(); + SamzaIOInfo getBeamIO(); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 42fbed0386dd..3aede25c0aef 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.runners.samza.renderer.PipelineJsonRenderer; import org.apache.beam.runners.samza.translation.ConfigBuilder; import org.apache.beam.runners.samza.translation.PViewToIdMapper; import org.apache.beam.runners.samza.translation.PortableTranslationContext; @@ -62,6 +63,7 @@ public class SamzaRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class); private static final String BEAM_DOT_GRAPH = "beamDotGraph"; + private static final String BEAM_JSON_GRAPH = "beamJsonGraph"; public static SamzaRunner fromOptions(PipelineOptions opts) { final SamzaPipelineOptions samzaOptions = @@ -82,11 +84,15 @@ private SamzaRunner(SamzaPipelineOptions options) { public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { final String dotGraph = PipelineDotRenderer.toDotString(pipeline); - LOG.info("Portable pipeline to run:\n{}", dotGraph); + LOG.info("Portable pipeline to run dot graph:\n{}", dotGraph); + + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); + LOG.info("Portable pipeline to run json graph:\n{}", jsonGraph); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder, options); configBuilder.put(BEAM_DOT_GRAPH, dotGraph); + configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); options.setConfigOverride(config); @@ -129,11 +135,15 @@ public SamzaPipelineResult run(Pipeline pipeline) { final String dotGraph = PipelineDotRenderer.toDotString(pipeline); LOG.info("Beam pipeline DOT graph:\n{}", dotGraph); + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); + LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); + final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); configBuilder.put(BEAM_DOT_GRAPH, dotGraph); + configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); options.setConfigOverride(config); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java new file mode 100644 index 000000000000..f06dd1eb8991 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.renderer; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.samza.SamzaIOInfo; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JSON renderer for BEAM {@link Pipeline} DAG. This can help us with visualization of the Beam + * DAG. + */ +@Experimental +public class PipelineJsonRenderer implements Pipeline.PipelineVisitor { + private static final Logger LOG = LoggerFactory.getLogger(PipelineJsonRenderer.class); + private static final String OUTERMOST_NODE = "OuterMostNode"; + + public static String toJsonString(Pipeline pipeline) { + final PipelineJsonRenderer visitor = new PipelineJsonRenderer(); + visitor.begin(); + pipeline.traverseTopologically(visitor); + visitor.end(); + return visitor.jsonBuilder.toString(); + } + + public static String toJsonString(RunnerApi.Pipeline pipeline) { + return null; + } + + private final StringBuilder jsonBuilder = new StringBuilder(); + private final StringBuilder graphLinks = new StringBuilder(); + private final Map nodeToId = new HashMap<>(); + private final Map valueToProducerNodeName = new HashMap<>(); + private int indent; + private int nextNodeId; + + private PipelineJsonRenderer() {} + + @Override + public void enterPipeline(Pipeline p) {} + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + String fullName = node.getFullName(); + writeLine( + "{ \"fullName\":\"%s\",", escapeString(fullName.isEmpty() ? OUTERMOST_NODE : fullName)); + writeLine( + " \"shortName\":\"%s\",", + escapeString(fullName.isEmpty() ? OUTERMOST_NODE : node.getTransform().getName())); + writeLine(" \"id\":\"%s\",", escapeString(fullName.isEmpty() ? OUTERMOST_NODE : fullName)); + if (!fullName.isEmpty()) { + String enclosingNodeName = node.getEnclosingNode().getFullName(); + writeLine( + " \"enclosingNode\":\"%s\",", + escapeString(enclosingNodeName.isEmpty() ? OUTERMOST_NODE : enclosingNodeName)); + } + + String ioInfo = getIOTopicInfo(node); + if (!ioInfo.isEmpty()) { + writeLine(" \"ioInfo\":\"%s\",", escapeString(ioInfo)); + } + + writeLine(" \"ChildNode\":["); + enterBlock(); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + exitBlock(); + writeLine("]},"); + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + String fullName = node.getFullName(); + writeLine("{ \"fullName\":\"%s\",", escapeString(fullName)); + writeLine(" \"shortName\":\"%s\",", escapeString(node.getTransform().getName())); + String enclosingNodeName = node.getEnclosingNode().getFullName(); + writeLine( + " \"enclosingNode\":\"%s\",", + escapeString(enclosingNodeName.isEmpty() ? OUTERMOST_NODE : enclosingNodeName)); + writeLine(" \"id\":\"%s\"},", escapeString(fullName)); + + node.getOutputs().values().forEach(x -> valueToProducerNodeName.put(x, fullName)); + node.getInputs() + .forEach( + (key, value) -> { + final String producerName = valueToProducerNodeName.get(value); + String style = "solid"; + if (node.getTransform().getAdditionalInputs().containsKey(key)) { + style = "dashed"; + } + graphLinks.append( + String.format( + "{\"from\":\"%s\"," + "\"to\":\"%s\"," + "\"hashId\":\"%s\"},", + producerName, fullName, escapeString(shortenTag(key.getId())))); + }); + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) {} + + @Override + public void leavePipeline(Pipeline pipeline) {} + + private void begin() { + writeLine("{ \n \"RootNode\": ["); + graphLinks.append(",\"graphLinks\": ["); + enterBlock(); + } + + private void end() { + exitBlock(); + writeLine("]"); + // delete the last comma + int lastIndex = graphLinks.length() - 1; + if (graphLinks.charAt(lastIndex) == ',') { + graphLinks.deleteCharAt(lastIndex); + } + graphLinks.append("]"); + jsonBuilder.append(graphLinks); + jsonBuilder.append("}"); + } + + private void enterBlock() { + indent += 4; + } + + private void exitBlock() { + indent -= 4; + } + + private void writeLine(String format, Object... args) { + int secondLastCharIndex = jsonBuilder.length() - 2; + if (jsonBuilder.length() > 1 + && jsonBuilder.charAt(secondLastCharIndex) == ',' + && (format.startsWith("}") || format.startsWith("]"))) { + jsonBuilder.deleteCharAt(secondLastCharIndex); + } + if (indent != 0) { + jsonBuilder.append(String.format("%-" + indent + "s", "")); + } + jsonBuilder.append(String.format(format, args)); + jsonBuilder.append("\n"); + } + + private static String escapeString(String x) { + return x.replace("\"", "\\\""); + } + + private static String shortenTag(String tag) { + return tag.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1"); + } + + private String getIOTopicInfo(TransformHierarchy.Node node) { + final SamzaIOInfo samzaIOInfo; + final Iterator beamIORegistrarIterator = + ServiceLoader.load(SamzaIOInfo.BeamIORegistrar.class).iterator(); + samzaIOInfo = + beamIORegistrarIterator.hasNext() + ? Iterators.getOnlyElement(beamIORegistrarIterator).getBeamIO() + : null; + + String nodeInfo = ""; + if (samzaIOInfo != null) { + nodeInfo = samzaIOInfo.getIOInfo(node); + } + return nodeInfo; + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java new file mode 100644 index 000000000000..abaf895cbec3 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Internal implementation of the Beam runner for Apache Samza. */ +package org.apache.beam.runners.samza.renderer; diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java new file mode 100644 index 000000000000..1322f4f3fdec --- /dev/null +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.renderer; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link PipelineJsonRenderer}. */ +@RunWith(JUnit4.class) +public class PipelineJsonRendererTest { + + static { + System.setProperty("beamUseDummyRunner", Boolean.TRUE.toString()); + } + + @Rule public final transient TestPipeline p = TestPipeline.create(); + + @Test + public void testEmptyPipeline() { + System.out.println(PipelineJsonRenderer.toJsonString(p)); + assertEquals( + "{ \"RootNode\": [" + + " { \"fullName\":\"OuterMostNode\"," + + " \"shortName\":\"OuterMostNode\"," + + " \"id\":\"OuterMostNode\"," + + " \"ChildNode\":[ ]}],\"graphLinks\": []" + + "}", + PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), "")); + } + + @Test + public void testCompositePipeline() throws IOException { + + p.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1)))) + .apply(Window.into(FixedWindows.of(Duration.millis(10)))) + .apply(Sum.integersPerKey()); + + String jsonDagFileName = "src/test/resources/ExpectedDag.json"; + String jsonDag = + new String(Files.readAllBytes(Paths.get(jsonDagFileName)), StandardCharsets.UTF_8); + + assertEquals( + jsonDag.replaceAll("\\s+", "").replaceAll(",\"hashId\":\"[^\"]*\"", ""), + PipelineJsonRenderer.toJsonString(p) + .replaceAll(System.lineSeparator(), "") + .replaceAll("\\s+", "") + .replaceAll(",\"hashId\":\"[^\"]*\"", "")); + } +} diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json new file mode 100644 index 000000000000..ed3d25e29001 --- /dev/null +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -0,0 +1,97 @@ +{ + "RootNode": [ + { "fullName":"OuterMostNode", + "shortName":"OuterMostNode", + "id":"OuterMostNode", + "ChildNode":[ + { "fullName":"Create.TimestampedValues", + "shortName":"Create.TimestampedValues", + "id":"Create.TimestampedValues", + "enclosingNode":"OuterMostNode", + "ChildNode":[ + { "fullName":"Create.TimestampedValues/Create.Values", + "shortName":"Create.Values", + "id":"Create.TimestampedValues/Create.Values", + "enclosingNode":"Create.TimestampedValues", + "ChildNode":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "shortName":"Read(CreateSource)", + "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "enclosingNode":"Create.TimestampedValues/Create.Values", + "ChildNode":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse", + "shortName":"Impulse", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse"}, + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", + "shortName":"ParDo(OutputSingleSource)", + "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "ChildNode":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)", + "shortName":"ParMultiDo(OutputSingleSource)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", + "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)"} + ]}, + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", + "shortName":"ParDo(BoundedSourceAsSDFWrapper)", + "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "ChildNode":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)", + "shortName":"ParMultiDo(BoundedSourceAsSDFWrapper)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", + "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)"} + ]} + ]} + ]}, + { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)", + "shortName":"ParDo(ConvertTimestamps)", + "id":"Create.TimestampedValues/ParDo(ConvertTimestamps)", + "enclosingNode":"Create.TimestampedValues", + "ChildNode":[ + { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)", + "shortName":"ParMultiDo(ConvertTimestamps)", + "enclosingNode":"Create.TimestampedValues/ParDo(ConvertTimestamps)", + "id":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)"} + ]} + ]}, + { "fullName":"Window.Into()", + "shortName":"Window.Into()", + "id":"Window.Into()", + "enclosingNode":"OuterMostNode", + "ChildNode":[ + { "fullName":"Window.Into()/Window.Assign", + "shortName":"Window.Assign", + "enclosingNode":"Window.Into()", + "id":"Window.Into()/Window.Assign"} + ]}, + { "fullName":"Combine.perKey(SumInteger)", + "shortName":"Combine.perKey(SumInteger)", + "id":"Combine.perKey(SumInteger)", + "enclosingNode":"OuterMostNode", + "ChildNode":[ + { "fullName":"Combine.perKey(SumInteger)/GroupByKey", + "shortName":"GroupByKey", + "enclosingNode":"Combine.perKey(SumInteger)", + "id":"Combine.perKey(SumInteger)/GroupByKey"}, + { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues", + "shortName":"Combine.GroupedValues", + "id":"Combine.perKey(SumInteger)/Combine.GroupedValues", + "enclosingNode":"Combine.perKey(SumInteger)", + "ChildNode":[ + { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", + "shortName":"ParDo(Anonymous)", + "id":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", + "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues", + "ChildNode":[ + { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "shortName":"ParMultiDo(Anonymous)", + "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", + "id":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)"} + ]} + ]} + ]} + ]} + ] +,"graphLinks": [{"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","hashId":"402#bb20b45fd4d95138"},{"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","hashId":"402#3d93cb799b3970be"},{"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","to":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","hashId":"402#a32dc9f64f1df03a"},{"from":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","to":"Window.Into()/Window.Assign","hashId":"402#8ce970b71df42503"},{"from":"Window.Into()/Window.Assign","to":"Combine.perKey(SumInteger)/GroupByKey","hashId":"402#98f8ba3dc812a76d"},{"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)","hashId":"402#554dcd2b40b5f04b"}]} From bd4901a0ef6cd878c5ddad661152d25a9443b6ec Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Tue, 8 Jun 2021 09:20:55 -0700 Subject: [PATCH 03/12] Address review --- .../org/apache/beam/runners/samza/SamzaRunner.java | 12 +----------- .../samza/renderer/PipelineJsonRendererTest.java | 3 --- runners/samza/src/test/resources/ExpectedDag.json | 8 +++++++- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 3aede25c0aef..57f12baedaef 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -24,7 +24,6 @@ import java.util.ServiceLoader; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.SplittableParDo; -import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; import org.apache.beam.runners.samza.renderer.PipelineJsonRenderer; @@ -62,7 +61,6 @@ }) public class SamzaRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class); - private static final String BEAM_DOT_GRAPH = "beamDotGraph"; private static final String BEAM_JSON_GRAPH = "beamJsonGraph"; public static SamzaRunner fromOptions(PipelineOptions opts) { @@ -83,15 +81,11 @@ private SamzaRunner(SamzaPipelineOptions options) { } public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { - final String dotGraph = PipelineDotRenderer.toDotString(pipeline); - LOG.info("Portable pipeline to run dot graph:\n{}", dotGraph); - final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); LOG.info("Portable pipeline to run json graph:\n{}", jsonGraph); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder, options); - configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); @@ -127,14 +121,11 @@ public SamzaPipelineResult run(Pipeline pipeline) { MetricsEnvironment.setMetricsSupported(true); if (LOG.isDebugEnabled()) { - LOG.debug("Pre-processed Beam pipeline:\n{}", PipelineDotRenderer.toDotString(pipeline)); + LOG.debug("Pre-processed Beam pipeline:\n{}", PipelineJsonRenderer.toJsonString(pipeline)); } pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); - final String dotGraph = PipelineDotRenderer.toDotString(pipeline); - LOG.info("Beam pipeline DOT graph:\n{}", dotGraph); - final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); @@ -142,7 +133,6 @@ public SamzaPipelineResult run(Pipeline pipeline) { final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); - configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java index 1322f4f3fdec..5f304a40296c 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java @@ -34,11 +34,8 @@ import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; /** Tests for {@link PipelineJsonRenderer}. */ -@RunWith(JUnit4.class) public class PipelineJsonRendererTest { static { diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json index ed3d25e29001..75f5b3d6b7a5 100644 --- a/runners/samza/src/test/resources/ExpectedDag.json +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -94,4 +94,10 @@ ]} ]} ] -,"graphLinks": [{"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","hashId":"402#bb20b45fd4d95138"},{"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","hashId":"402#3d93cb799b3970be"},{"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","to":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","hashId":"402#a32dc9f64f1df03a"},{"from":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","to":"Window.Into()/Window.Assign","hashId":"402#8ce970b71df42503"},{"from":"Window.Into()/Window.Assign","to":"Combine.perKey(SumInteger)/GroupByKey","hashId":"402#98f8ba3dc812a76d"},{"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)","hashId":"402#554dcd2b40b5f04b"}]} +,"graphLinks": [ + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","hashId":"402#bb20b45fd4d95138"}, + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","hashId":"402#3d93cb799b3970be"}, + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","to":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","hashId":"402#a32dc9f64f1df03a"}, + {"from":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","to":"Window.Into()/Window.Assign","hashId":"402#8ce970b71df42503"}, + {"from":"Window.Into()/Window.Assign","to":"Combine.perKey(SumInteger)/GroupByKey","hashId":"402#98f8ba3dc812a76d"}, + {"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)","hashId":"402#554dcd2b40b5f04b"}]} From b042fc0eb08e088fe2b7959efdb1c56e8de2a9a7 Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Tue, 15 Jun 2021 16:21:34 -0700 Subject: [PATCH 04/12] Address review --- runners/samza/build.gradle | 1 + .../samza/renderer/PipelineJsonRenderer.java | 4 +-- .../renderer/PipelineJsonRendererTest.java | 30 +++++++++++-------- .../samza/src/test/resources/ExpectedDag.json | 12 ++++---- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index 3edf73abb962..f7c875b1ea2c 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -75,6 +75,7 @@ dependencies { testCompile library.java.junit testCompile library.java.mockito_core testCompile library.java.jackson_dataformat_yaml + testCompile library.java.google_code_gson validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: ":runners:core-java", configuration: "testRuntime") validatesRunner project(project.path) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java index f06dd1eb8991..14778fc9c7fb 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -117,9 +117,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { style = "dashed"; } graphLinks.append( - String.format( - "{\"from\":\"%s\"," + "\"to\":\"%s\"," + "\"hashId\":\"%s\"},", - producerName, fullName, escapeString(shortenTag(key.getId())))); + String.format("{\"from\":\"%s\"," + "\"to\":\"%s\"},", producerName, fullName)); }); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java index 5f304a40296c..dbf716c9c199 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java @@ -19,11 +19,15 @@ import static org.junit.Assert.assertEquals; +import com.google.gson.JsonParser; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.runners.samza.SamzaPipelineOptions; +import org.apache.beam.runners.samza.SamzaRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -32,20 +36,18 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.Rule; import org.junit.Test; /** Tests for {@link PipelineJsonRenderer}. */ public class PipelineJsonRendererTest { - static { - System.setProperty("beamUseDummyRunner", Boolean.TRUE.toString()); - } - - @Rule public final transient TestPipeline p = TestPipeline.create(); - @Test public void testEmptyPipeline() { + SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); + options.setRunner(SamzaRunner.class); + + Pipeline p = Pipeline.create(options); + System.out.println(PipelineJsonRenderer.toJsonString(p)); assertEquals( "{ \"RootNode\": [" @@ -59,6 +61,10 @@ public void testEmptyPipeline() { @Test public void testCompositePipeline() throws IOException { + SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); + options.setRunner(SamzaRunner.class); + + Pipeline p = Pipeline.create(options); p.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1)))) .apply(Window.into(FixedWindows.of(Duration.millis(10)))) @@ -69,10 +75,8 @@ public void testCompositePipeline() throws IOException { new String(Files.readAllBytes(Paths.get(jsonDagFileName)), StandardCharsets.UTF_8); assertEquals( - jsonDag.replaceAll("\\s+", "").replaceAll(",\"hashId\":\"[^\"]*\"", ""), - PipelineJsonRenderer.toJsonString(p) - .replaceAll(System.lineSeparator(), "") - .replaceAll("\\s+", "") - .replaceAll(",\"hashId\":\"[^\"]*\"", "")); + JsonParser.parseString(jsonDag), + JsonParser.parseString( + PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), ""))); } } diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json index 75f5b3d6b7a5..75e957e2196e 100644 --- a/runners/samza/src/test/resources/ExpectedDag.json +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -95,9 +95,9 @@ ]} ] ,"graphLinks": [ - {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","hashId":"402#bb20b45fd4d95138"}, - {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","hashId":"402#3d93cb799b3970be"}, - {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","to":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","hashId":"402#a32dc9f64f1df03a"}, - {"from":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","to":"Window.Into()/Window.Assign","hashId":"402#8ce970b71df42503"}, - {"from":"Window.Into()/Window.Assign","to":"Combine.perKey(SumInteger)/GroupByKey","hashId":"402#98f8ba3dc812a76d"}, - {"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)","hashId":"402#554dcd2b40b5f04b"}]} + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)"}, + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)"}, + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","to":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)"}, + {"from":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","to":"Window.Into()/Window.Assign"}, + {"from":"Window.Into()/Window.Assign","to":"Combine.perKey(SumInteger)/GroupByKey"}, + {"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)"}]} From 0578f9c8e6f3a49ff0eab0adc295b96b83704929 Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Tue, 3 Aug 2021 14:54:14 -0700 Subject: [PATCH 05/12] Remove `shortName` and `id` --- .../samza/renderer/PipelineJsonRenderer.java | 6 --- .../renderer/PipelineJsonRendererTest.java | 2 - .../samza/src/test/resources/ExpectedDag.json | 52 +++---------------- 3 files changed, 8 insertions(+), 52 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java index 14778fc9c7fb..91fc1a413d83 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -69,10 +69,6 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { String fullName = node.getFullName(); writeLine( "{ \"fullName\":\"%s\",", escapeString(fullName.isEmpty() ? OUTERMOST_NODE : fullName)); - writeLine( - " \"shortName\":\"%s\",", - escapeString(fullName.isEmpty() ? OUTERMOST_NODE : node.getTransform().getName())); - writeLine(" \"id\":\"%s\",", escapeString(fullName.isEmpty() ? OUTERMOST_NODE : fullName)); if (!fullName.isEmpty()) { String enclosingNodeName = node.getEnclosingNode().getFullName(); writeLine( @@ -100,12 +96,10 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) { public void visitPrimitiveTransform(TransformHierarchy.Node node) { String fullName = node.getFullName(); writeLine("{ \"fullName\":\"%s\",", escapeString(fullName)); - writeLine(" \"shortName\":\"%s\",", escapeString(node.getTransform().getName())); String enclosingNodeName = node.getEnclosingNode().getFullName(); writeLine( " \"enclosingNode\":\"%s\",", escapeString(enclosingNodeName.isEmpty() ? OUTERMOST_NODE : enclosingNodeName)); - writeLine(" \"id\":\"%s\"},", escapeString(fullName)); node.getOutputs().values().forEach(x -> valueToProducerNodeName.put(x, fullName)); node.getInputs() diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java index dbf716c9c199..56d52f89fb33 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java @@ -52,8 +52,6 @@ public void testEmptyPipeline() { assertEquals( "{ \"RootNode\": [" + " { \"fullName\":\"OuterMostNode\"," - + " \"shortName\":\"OuterMostNode\"," - + " \"id\":\"OuterMostNode\"," + " \"ChildNode\":[ ]}],\"graphLinks\": []" + "}", PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), "")); diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json index 75e957e2196e..22fc89015a94 100644 --- a/runners/samza/src/test/resources/ExpectedDag.json +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -1,94 +1,58 @@ { "RootNode": [ { "fullName":"OuterMostNode", - "shortName":"OuterMostNode", - "id":"OuterMostNode", "ChildNode":[ { "fullName":"Create.TimestampedValues", - "shortName":"Create.TimestampedValues", - "id":"Create.TimestampedValues", "enclosingNode":"OuterMostNode", "ChildNode":[ { "fullName":"Create.TimestampedValues/Create.Values", - "shortName":"Create.Values", - "id":"Create.TimestampedValues/Create.Values", "enclosingNode":"Create.TimestampedValues", "ChildNode":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)", - "shortName":"Read(CreateSource)", - "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)", "enclosingNode":"Create.TimestampedValues/Create.Values", "ChildNode":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse", - "shortName":"Impulse", - "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", - "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse"}, + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)"}, { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", - "shortName":"ParDo(OutputSingleSource)", - "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", "ChildNode":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)", - "shortName":"ParMultiDo(OutputSingleSource)", - "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", - "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)"} + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)"} ]}, { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", - "shortName":"ParDo(BoundedSourceAsSDFWrapper)", - "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", "ChildNode":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)", - "shortName":"ParMultiDo(BoundedSourceAsSDFWrapper)", - "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", - "id":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)"} + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)"} ]} ]} ]}, { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)", - "shortName":"ParDo(ConvertTimestamps)", - "id":"Create.TimestampedValues/ParDo(ConvertTimestamps)", "enclosingNode":"Create.TimestampedValues", "ChildNode":[ { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)", - "shortName":"ParMultiDo(ConvertTimestamps)", - "enclosingNode":"Create.TimestampedValues/ParDo(ConvertTimestamps)", - "id":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)"} + "enclosingNode":"Create.TimestampedValues/ParDo(ConvertTimestamps)"} ]} ]}, { "fullName":"Window.Into()", - "shortName":"Window.Into()", - "id":"Window.Into()", "enclosingNode":"OuterMostNode", "ChildNode":[ { "fullName":"Window.Into()/Window.Assign", - "shortName":"Window.Assign", - "enclosingNode":"Window.Into()", - "id":"Window.Into()/Window.Assign"} + "enclosingNode":"Window.Into()"} ]}, { "fullName":"Combine.perKey(SumInteger)", - "shortName":"Combine.perKey(SumInteger)", - "id":"Combine.perKey(SumInteger)", "enclosingNode":"OuterMostNode", "ChildNode":[ { "fullName":"Combine.perKey(SumInteger)/GroupByKey", - "shortName":"GroupByKey", - "enclosingNode":"Combine.perKey(SumInteger)", - "id":"Combine.perKey(SumInteger)/GroupByKey"}, + "enclosingNode":"Combine.perKey(SumInteger)"}, { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues", - "shortName":"Combine.GroupedValues", - "id":"Combine.perKey(SumInteger)/Combine.GroupedValues", "enclosingNode":"Combine.perKey(SumInteger)", "ChildNode":[ { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", - "shortName":"ParDo(Anonymous)", - "id":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues", "ChildNode":[ { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "shortName":"ParMultiDo(Anonymous)", - "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", - "id":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)"} + "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)"} ]} ]} ]} @@ -100,4 +64,4 @@ {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","to":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)"}, {"from":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","to":"Window.Into()/Window.Assign"}, {"from":"Window.Into()/Window.Assign","to":"Combine.perKey(SumInteger)/GroupByKey"}, - {"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)"}]} + {"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)"}]} \ No newline at end of file From 9b3421e45e485afc61ab1a6383b78225d6b1e16b Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Tue, 3 Aug 2021 16:40:00 -0700 Subject: [PATCH 06/12] Remove checker framework errors --- .../java/org/apache/beam/runners/samza/SamzaRunner.java | 2 +- .../runners/samza/renderer/PipelineJsonRenderer.java | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 57f12baedaef..181bd7553806 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -81,7 +81,7 @@ private SamzaRunner(SamzaPipelineOptions options) { } public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { - final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline).toString(); LOG.info("Portable pipeline to run json graph:\n{}", jsonGraph); final ConfigBuilder configBuilder = new ConfigBuilder(options); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java index 91fc1a413d83..e18bc36b4b6d 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.samza.SamzaIOInfo; @@ -48,8 +49,8 @@ public static String toJsonString(Pipeline pipeline) { return visitor.jsonBuilder.toString(); } - public static String toJsonString(RunnerApi.Pipeline pipeline) { - return null; + public static Optional toJsonString(RunnerApi.Pipeline pipeline) { + return Optional.empty(); } private final StringBuilder jsonBuilder = new StringBuilder(); @@ -106,10 +107,6 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { .forEach( (key, value) -> { final String producerName = valueToProducerNodeName.get(value); - String style = "solid"; - if (node.getTransform().getAdditionalInputs().containsKey(key)) { - style = "dashed"; - } graphLinks.append( String.format("{\"from\":\"%s\"," + "\"to\":\"%s\"},", producerName, fullName)); }); From 3b0a69d069f76eca7fe0f525de1595861bfbf00f Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Wed, 4 Aug 2021 19:47:07 -0700 Subject: [PATCH 07/12] Fix json --- .../beam/runners/samza/renderer/PipelineJsonRenderer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java index e18bc36b4b6d..d4ecb8a60db3 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -99,7 +99,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { writeLine("{ \"fullName\":\"%s\",", escapeString(fullName)); String enclosingNodeName = node.getEnclosingNode().getFullName(); writeLine( - " \"enclosingNode\":\"%s\",", + " \"enclosingNode\":\"%s\"},", escapeString(enclosingNodeName.isEmpty() ? OUTERMOST_NODE : enclosingNodeName)); node.getOutputs().values().forEach(x -> valueToProducerNodeName.put(x, fullName)); From bd060f0fa57268fe5e2b30297c29cff5f08e3b4e Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Mon, 9 Aug 2021 15:39:11 -0700 Subject: [PATCH 08/12] Address review --- .../beam/runners/samza/SamzaIOInfo.java | 4 +- .../beam/runners/samza/SamzaRunner.java | 7 +++- .../samza/renderer/PipelineJsonRenderer.java | 39 +++++++++---------- .../renderer/PipelineJsonRendererTest.java | 32 ++++++++++++--- .../samza/src/test/resources/ExpectedDag.json | 23 +++++------ 5 files changed, 65 insertions(+), 40 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java index fdb2a3bed70a..b3e3049c885e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java @@ -31,7 +31,7 @@ public interface SamzaIOInfo { String getIOInfo(TransformHierarchy.Node node); /** A registrar for {@link SamzaIOInfo}. */ - interface BeamIORegistrar { - SamzaIOInfo getBeamIO(); + interface SamzaIORegistrar { + SamzaIOInfo getSamzaIO(); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 181bd7553806..f17167b7738c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.SplittableParDo; @@ -81,7 +82,11 @@ private SamzaRunner(SamzaPipelineOptions options) { } public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { - final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline).toString(); + Optional jsonGraphOpt = PipelineJsonRenderer.toJsonString(pipeline); + String jsonGraph = ""; + if (jsonGraphOpt.isPresent()) { + jsonGraph = jsonGraphOpt.get(); + } LOG.info("Portable pipeline to run json graph:\n{}", jsonGraph); final ConfigBuilder configBuilder = new ConfigBuilder(options); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java index d4ecb8a60db3..1e2aaf9c0217 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -40,6 +40,8 @@ public class PipelineJsonRenderer implements Pipeline.PipelineVisitor { private static final Logger LOG = LoggerFactory.getLogger(PipelineJsonRenderer.class); private static final String OUTERMOST_NODE = "OuterMostNode"; + private final Iterator beamIORegistrarIterator = + ServiceLoader.load(SamzaIOInfo.SamzaIORegistrar.class).iterator(); public static String toJsonString(Pipeline pipeline) { final PipelineJsonRenderer visitor = new PipelineJsonRenderer(); @@ -49,7 +51,7 @@ public static String toJsonString(Pipeline pipeline) { return visitor.jsonBuilder.toString(); } - public static Optional toJsonString(RunnerApi.Pipeline pipeline) { + public static Optional toJsonString(RunnerApi.Pipeline pipeline) { return Optional.empty(); } @@ -68,21 +70,16 @@ public void enterPipeline(Pipeline p) {} @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { String fullName = node.getFullName(); - writeLine( - "{ \"fullName\":\"%s\",", escapeString(fullName.isEmpty() ? OUTERMOST_NODE : fullName)); - if (!fullName.isEmpty()) { + writeLine("{ \"fullName\":\"%s\",", assignNodeName(fullName)); + if (node.getEnclosingNode() != null) { String enclosingNodeName = node.getEnclosingNode().getFullName(); - writeLine( - " \"enclosingNode\":\"%s\",", - escapeString(enclosingNodeName.isEmpty() ? OUTERMOST_NODE : enclosingNodeName)); + writeLine(" \"enclosingNode\":\"%s\",", assignNodeName(enclosingNodeName)); } - String ioInfo = getIOTopicInfo(node); - if (!ioInfo.isEmpty()) { - writeLine(" \"ioInfo\":\"%s\",", escapeString(ioInfo)); - } + Optional ioInfo = getIOTopicInfo(node); + ioInfo.ifPresent(s -> writeLine(" \"ioInfo\":\"%s\",", escapeString(s))); - writeLine(" \"ChildNode\":["); + writeLine(" \"ChildNodes\":["); enterBlock(); return CompositeBehavior.ENTER_TRANSFORM; } @@ -98,9 +95,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { String fullName = node.getFullName(); writeLine("{ \"fullName\":\"%s\",", escapeString(fullName)); String enclosingNodeName = node.getEnclosingNode().getFullName(); - writeLine( - " \"enclosingNode\":\"%s\"},", - escapeString(enclosingNodeName.isEmpty() ? OUTERMOST_NODE : enclosingNodeName)); + writeLine(" \"enclosingNode\":\"%s\"},", assignNodeName(enclosingNodeName)); node.getOutputs().values().forEach(x -> valueToProducerNodeName.put(x, fullName)); node.getInputs() @@ -167,18 +162,20 @@ private static String shortenTag(String tag) { return tag.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1"); } - private String getIOTopicInfo(TransformHierarchy.Node node) { + private String assignNodeName(String nodeName) { + return escapeString(nodeName.isEmpty() ? OUTERMOST_NODE : nodeName); + } + + private Optional getIOTopicInfo(TransformHierarchy.Node node) { final SamzaIOInfo samzaIOInfo; - final Iterator beamIORegistrarIterator = - ServiceLoader.load(SamzaIOInfo.BeamIORegistrar.class).iterator(); samzaIOInfo = beamIORegistrarIterator.hasNext() - ? Iterators.getOnlyElement(beamIORegistrarIterator).getBeamIO() + ? Iterators.getOnlyElement(beamIORegistrarIterator).getSamzaIO() : null; - String nodeInfo = ""; + Optional nodeInfo = Optional.empty(); if (samzaIOInfo != null) { - nodeInfo = samzaIOInfo.getIOInfo(node); + nodeInfo = Optional.of(samzaIOInfo.getIOInfo(node)); } return nodeInfo; } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java index 56d52f89fb33..b9bce6a4fce1 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java @@ -19,15 +19,18 @@ import static org.junit.Assert.assertEquals; +import com.google.auto.service.AutoService; import com.google.gson.JsonParser; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import org.apache.beam.runners.samza.SamzaIOInfo; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -48,13 +51,18 @@ public void testEmptyPipeline() { Pipeline p = Pipeline.create(options); - System.out.println(PipelineJsonRenderer.toJsonString(p)); - assertEquals( + String jsonDag = "{ \"RootNode\": [" + " { \"fullName\":\"OuterMostNode\"," - + " \"ChildNode\":[ ]}],\"graphLinks\": []" - + "}", - PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), "")); + + " \"ioInfo\":\"TestTopic\"," + + " \"ChildNodes\":[ ]}],\"graphLinks\": []" + + "}"; + + System.out.println(PipelineJsonRenderer.toJsonString(p)); + assertEquals( + JsonParser.parseString(jsonDag), + JsonParser.parseString( + PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), ""))); } @Test @@ -77,4 +85,18 @@ public void testCompositePipeline() throws IOException { JsonParser.parseString( PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), ""))); } + + @AutoService(SamzaIOInfo.SamzaIORegistrar.class) + public static class Registrar implements SamzaIOInfo.SamzaIORegistrar { + + @Override + public SamzaIOInfo getSamzaIO() { + return new SamzaIOInfo() { + @Override + public String getIOInfo(TransformHierarchy.Node node) { + return "TestTopic"; + } + }; + } + } } diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json index 22fc89015a94..8f45ec8252ee 100644 --- a/runners/samza/src/test/resources/ExpectedDag.json +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -1,27 +1,28 @@ { "RootNode": [ { "fullName":"OuterMostNode", - "ChildNode":[ + "ioInfo":"TestTopic", + "ChildNodes":[ { "fullName":"Create.TimestampedValues", "enclosingNode":"OuterMostNode", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Create.TimestampedValues/Create.Values", "enclosingNode":"Create.TimestampedValues", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)", "enclosingNode":"Create.TimestampedValues/Create.Values", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse", "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)"}, { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)", "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)"} ]}, { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)", "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)"} ]} @@ -29,28 +30,28 @@ ]}, { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)", "enclosingNode":"Create.TimestampedValues", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)", "enclosingNode":"Create.TimestampedValues/ParDo(ConvertTimestamps)"} ]} ]}, { "fullName":"Window.Into()", "enclosingNode":"OuterMostNode", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Window.Into()/Window.Assign", "enclosingNode":"Window.Into()"} ]}, { "fullName":"Combine.perKey(SumInteger)", "enclosingNode":"OuterMostNode", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Combine.perKey(SumInteger)/GroupByKey", "enclosingNode":"Combine.perKey(SumInteger)"}, { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues", "enclosingNode":"Combine.perKey(SumInteger)", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues", - "ChildNode":[ + "ChildNodes":[ { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)"} ]} From c0354afa2bd7f33d1e6a69fb9ec58deb157b8605 Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Tue, 10 Aug 2021 11:03:14 -0700 Subject: [PATCH 09/12] Address review --- .../beam/runners/samza/SamzaRunner.java | 7 +--- .../samza/renderer/PipelineJsonRenderer.java | 34 +++++++++++-------- .../renderer/PipelineJsonRendererTest.java | 5 ++- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index f17167b7738c..57f12baedaef 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Optional; import java.util.ServiceLoader; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.SplittableParDo; @@ -82,11 +81,7 @@ private SamzaRunner(SamzaPipelineOptions options) { } public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { - Optional jsonGraphOpt = PipelineJsonRenderer.toJsonString(pipeline); - String jsonGraph = ""; - if (jsonGraphOpt.isPresent()) { - jsonGraph = jsonGraphOpt.get(); - } + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); LOG.info("Portable pipeline to run json graph:\n{}", jsonGraph); final ConfigBuilder configBuilder = new ConfigBuilder(options); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java index 1e2aaf9c0217..160cd968002e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; +import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.samza.SamzaIOInfo; import org.apache.beam.sdk.Pipeline; @@ -40,8 +41,7 @@ public class PipelineJsonRenderer implements Pipeline.PipelineVisitor { private static final Logger LOG = LoggerFactory.getLogger(PipelineJsonRenderer.class); private static final String OUTERMOST_NODE = "OuterMostNode"; - private final Iterator beamIORegistrarIterator = - ServiceLoader.load(SamzaIOInfo.SamzaIORegistrar.class).iterator(); + @Nullable private static final SamzaIOInfo SAMZA_IO_INFO = loadSamzaIOInfo(); public static String toJsonString(Pipeline pipeline) { final PipelineJsonRenderer visitor = new PipelineJsonRenderer(); @@ -51,8 +51,8 @@ public static String toJsonString(Pipeline pipeline) { return visitor.jsonBuilder.toString(); } - public static Optional toJsonString(RunnerApi.Pipeline pipeline) { - return Optional.empty(); + public static String toJsonString(RunnerApi.Pipeline pipeline) { + return ""; } private final StringBuilder jsonBuilder = new StringBuilder(); @@ -64,6 +64,15 @@ public static Optional toJsonString(RunnerApi.Pipeline pipeline) { private PipelineJsonRenderer() {} + @Nullable + private static SamzaIOInfo loadSamzaIOInfo() { + final Iterator beamIORegistrarIterator = + ServiceLoader.load(SamzaIOInfo.SamzaIORegistrar.class).iterator(); + return beamIORegistrarIterator.hasNext() + ? Iterators.getOnlyElement(beamIORegistrarIterator).getSamzaIO() + : null; + } + @Override public void enterPipeline(Pipeline p) {} @@ -77,7 +86,9 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { } Optional ioInfo = getIOTopicInfo(node); - ioInfo.ifPresent(s -> writeLine(" \"ioInfo\":\"%s\",", escapeString(s))); + if (ioInfo.isPresent() && !ioInfo.get().isEmpty()) { + writeLine(" \"ioInfo\":\"%s\",", escapeString(ioInfo.get())); + } writeLine(" \"ChildNodes\":["); enterBlock(); @@ -167,16 +178,9 @@ private String assignNodeName(String nodeName) { } private Optional getIOTopicInfo(TransformHierarchy.Node node) { - final SamzaIOInfo samzaIOInfo; - samzaIOInfo = - beamIORegistrarIterator.hasNext() - ? Iterators.getOnlyElement(beamIORegistrarIterator).getSamzaIO() - : null; - - Optional nodeInfo = Optional.empty(); - if (samzaIOInfo != null) { - nodeInfo = Optional.of(samzaIOInfo.getIOInfo(node)); + if (SAMZA_IO_INFO == null) { + return Optional.empty(); } - return nodeInfo; + return Optional.of(SAMZA_IO_INFO.getIOInfo(node)); } } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java index b9bce6a4fce1..b19917f78a48 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java @@ -94,7 +94,10 @@ public SamzaIOInfo getSamzaIO() { return new SamzaIOInfo() { @Override public String getIOInfo(TransformHierarchy.Node node) { - return "TestTopic"; + if (node.isRootNode()) { + return "TestTopic"; + } + return ""; } }; } From 0fc4a06b8870750740c881d5fd231dd2e7d13cf2 Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Tue, 10 Aug 2021 11:27:49 -0700 Subject: [PATCH 10/12] Address review --- .../java/org/apache/beam/runners/samza/SamzaIOInfo.java | 3 ++- .../beam/runners/samza/renderer/PipelineJsonRenderer.java | 2 +- .../runners/samza/renderer/PipelineJsonRendererTest.java | 7 ++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java index b3e3049c885e..0332f97e2b5d 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.samza; +import java.util.Optional; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -28,7 +29,7 @@ public interface SamzaIOInfo { /** Get I/O topic name and cluster. */ - String getIOInfo(TransformHierarchy.Node node); + Optional getIOInfo(TransformHierarchy.Node node); /** A registrar for {@link SamzaIOInfo}. */ interface SamzaIORegistrar { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java index 160cd968002e..af40dd2e0fd3 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java @@ -181,6 +181,6 @@ private Optional getIOTopicInfo(TransformHierarchy.Node node) { if (SAMZA_IO_INFO == null) { return Optional.empty(); } - return Optional.of(SAMZA_IO_INFO.getIOInfo(node)); + return SAMZA_IO_INFO.getIOInfo(node); } } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java index b19917f78a48..bf772dd5d84a 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java @@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Optional; import org.apache.beam.runners.samza.SamzaIOInfo; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.SamzaRunner; @@ -93,11 +94,11 @@ public static class Registrar implements SamzaIOInfo.SamzaIORegistrar { public SamzaIOInfo getSamzaIO() { return new SamzaIOInfo() { @Override - public String getIOInfo(TransformHierarchy.Node node) { + public Optional getIOInfo(TransformHierarchy.Node node) { if (node.isRootNode()) { - return "TestTopic"; + return Optional.of("TestTopic"); } - return ""; + return Optional.empty(); } }; } From 0631185152c2273fff7be69f931eba2dada620c0 Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Thu, 12 Aug 2021 10:42:25 -0700 Subject: [PATCH 11/12] Address review --- .../beam/runners/samza/SamzaIOInfo.java | 38 ----------- .../beam/runners/samza/SamzaRunner.java | 21 +++++- .../runners/samza/renderer/package-info.java | 20 ------ .../PipelineJsonRenderer.java | 65 +++++++++++++------ .../PipelineJsonRendererTest.java | 13 ++-- 5 files changed, 68 insertions(+), 89 deletions(-) delete mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java delete mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java rename runners/samza/src/main/java/org/apache/beam/runners/samza/{renderer => util}/PipelineJsonRenderer.java (80%) rename runners/samza/src/test/java/org/apache/beam/runners/samza/{renderer => util}/PipelineJsonRendererTest.java (90%) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java deleted file mode 100644 index 0332f97e2b5d..000000000000 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaIOInfo.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.samza; - -import java.util.Optional; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.runners.TransformHierarchy; - -/** - * Interface to get I/O information for a Beam job. This will help add I/O information to the Beam - * DAG. - */ -@Experimental -public interface SamzaIOInfo { - - /** Get I/O topic name and cluster. */ - Optional getIOInfo(TransformHierarchy.Node node); - - /** A registrar for {@link SamzaIOInfo}. */ - interface SamzaIORegistrar { - SamzaIOInfo getSamzaIO(); - } -} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 57f12baedaef..910f1aab4e85 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -24,9 +24,9 @@ import java.util.ServiceLoader; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; -import org.apache.beam.runners.samza.renderer.PipelineJsonRenderer; import org.apache.beam.runners.samza.translation.ConfigBuilder; import org.apache.beam.runners.samza.translation.PViewToIdMapper; import org.apache.beam.runners.samza.translation.PortableTranslationContext; @@ -34,6 +34,7 @@ import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator; import org.apache.beam.runners.samza.translation.SamzaTransformOverrides; import org.apache.beam.runners.samza.translation.TranslationContext; +import org.apache.beam.runners.samza.util.PipelineJsonRenderer; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -61,6 +62,7 @@ }) public class SamzaRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class); + private static final String BEAM_DOT_GRAPH = "beamDotGraph"; private static final String BEAM_JSON_GRAPH = "beamJsonGraph"; public static SamzaRunner fromOptions(PipelineOptions opts) { @@ -81,11 +83,15 @@ private SamzaRunner(SamzaPipelineOptions options) { } public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { + final String dotGraph = PipelineDotRenderer.toDotString(pipeline); + LOG.info("Portable pipeline to run DOT graph:\n{}", dotGraph); + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); - LOG.info("Portable pipeline to run json graph:\n{}", jsonGraph); + LOG.info("Portable pipeline to run JSON graph:\n{}", jsonGraph); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder, options); + configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); @@ -121,11 +127,19 @@ public SamzaPipelineResult run(Pipeline pipeline) { MetricsEnvironment.setMetricsSupported(true); if (LOG.isDebugEnabled()) { - LOG.debug("Pre-processed Beam pipeline:\n{}", PipelineJsonRenderer.toJsonString(pipeline)); + LOG.debug( + "Pre-processed Beam pipeline in dot format:\n{}", + PipelineDotRenderer.toDotString(pipeline)); + LOG.debug( + "Pre-processed Beam pipeline in json format:\n{}", + PipelineJsonRenderer.toJsonString(pipeline)); } pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); + final String dotGraph = PipelineDotRenderer.toDotString(pipeline); + LOG.info("Beam pipeline DOT graph:\n{}", dotGraph); + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); @@ -133,6 +147,7 @@ public SamzaPipelineResult run(Pipeline pipeline) { final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); + configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java deleted file mode 100644 index abaf895cbec3..000000000000 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Internal implementation of the Beam runner for Apache Samza. */ -package org.apache.beam.runners.samza.renderer; diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java similarity index 80% rename from runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java rename to runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java index af40dd2e0fd3..6c7f9d5442a4 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/renderer/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.samza.renderer; +package org.apache.beam.runners.samza.util; import java.util.HashMap; import java.util.Iterator; @@ -24,7 +24,6 @@ import java.util.ServiceLoader; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.samza.SamzaIOInfo; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -39,42 +38,72 @@ */ @Experimental public class PipelineJsonRenderer implements Pipeline.PipelineVisitor { + + /** + * Interface to get I/O information for a Beam job. This will help add I/O information to the Beam + * DAG. + */ + @Experimental + public interface SamzaIOInfo { + + /** Get I/O topic name and cluster. */ + Optional getIOInfo(TransformHierarchy.Node node); + } + + /** A registrar for {@link SamzaIOInfo}. */ + interface SamzaIORegistrar { + + SamzaIOInfo getSamzaIO(); + } + private static final Logger LOG = LoggerFactory.getLogger(PipelineJsonRenderer.class); private static final String OUTERMOST_NODE = "OuterMostNode"; @Nullable private static final SamzaIOInfo SAMZA_IO_INFO = loadSamzaIOInfo(); + /** + * This method creates a JSON representation of the Beam pipeline. + * + * @param pipeline The beam pipeline + * @return JSON string representation of the pipeline + */ public static String toJsonString(Pipeline pipeline) { final PipelineJsonRenderer visitor = new PipelineJsonRenderer(); - visitor.begin(); pipeline.traverseTopologically(visitor); - visitor.end(); return visitor.jsonBuilder.toString(); } + /** + * This method creates a JSON representation for Beam Portable Pipeline. + * + * @param pipeline The beam portable pipeline + * @return JSON string representation of the pipeline + */ public static String toJsonString(RunnerApi.Pipeline pipeline) { - return ""; + throw new UnsupportedOperationException("JSON DAG for portable pipeline is not supported yet."); } private final StringBuilder jsonBuilder = new StringBuilder(); private final StringBuilder graphLinks = new StringBuilder(); - private final Map nodeToId = new HashMap<>(); private final Map valueToProducerNodeName = new HashMap<>(); private int indent; - private int nextNodeId; private PipelineJsonRenderer() {} @Nullable private static SamzaIOInfo loadSamzaIOInfo() { - final Iterator beamIORegistrarIterator = - ServiceLoader.load(SamzaIOInfo.SamzaIORegistrar.class).iterator(); + final Iterator beamIORegistrarIterator = + ServiceLoader.load(SamzaIORegistrar.class).iterator(); return beamIORegistrarIterator.hasNext() ? Iterators.getOnlyElement(beamIORegistrarIterator).getSamzaIO() : null; } @Override - public void enterPipeline(Pipeline p) {} + public void enterPipeline(Pipeline p) { + writeLine("{ \n \"RootNode\": ["); + graphLinks.append(",\"graphLinks\": ["); + enterBlock(); + } @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { @@ -85,7 +114,7 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { writeLine(" \"enclosingNode\":\"%s\",", assignNodeName(enclosingNodeName)); } - Optional ioInfo = getIOTopicInfo(node); + Optional ioInfo = getIOInfo(node); if (ioInfo.isPresent() && !ioInfo.get().isEmpty()) { writeLine(" \"ioInfo\":\"%s\",", escapeString(ioInfo.get())); } @@ -122,15 +151,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { public void visitValue(PValue value, TransformHierarchy.Node producer) {} @Override - public void leavePipeline(Pipeline pipeline) {} - - private void begin() { - writeLine("{ \n \"RootNode\": ["); - graphLinks.append(",\"graphLinks\": ["); - enterBlock(); - } - - private void end() { + public void leavePipeline(Pipeline pipeline) { exitBlock(); writeLine("]"); // delete the last comma @@ -152,6 +173,8 @@ private void exitBlock() { } private void writeLine(String format, Object... args) { + // Since we append a comma after every entry to the graph, we will need to remove that one extra + // comma towards the end of the JSON. int secondLastCharIndex = jsonBuilder.length() - 2; if (jsonBuilder.length() > 1 && jsonBuilder.charAt(secondLastCharIndex) == ',' @@ -177,7 +200,7 @@ private String assignNodeName(String nodeName) { return escapeString(nodeName.isEmpty() ? OUTERMOST_NODE : nodeName); } - private Optional getIOTopicInfo(TransformHierarchy.Node node) { + private Optional getIOInfo(TransformHierarchy.Node node) { if (SAMZA_IO_INFO == null) { return Optional.empty(); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/PipelineJsonRendererTest.java similarity index 90% rename from runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java rename to runners/samza/src/test/java/org/apache/beam/runners/samza/util/PipelineJsonRendererTest.java index bf772dd5d84a..f428fa633035 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/renderer/PipelineJsonRendererTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/PipelineJsonRendererTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.samza.renderer; +package org.apache.beam.runners.samza.util; import static org.junit.Assert.assertEquals; @@ -26,7 +26,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Optional; -import org.apache.beam.runners.samza.SamzaIOInfo; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.sdk.Pipeline; @@ -42,7 +41,7 @@ import org.joda.time.Instant; import org.junit.Test; -/** Tests for {@link PipelineJsonRenderer}. */ +/** Tests for {@link org.apache.beam.runners.samza.util.PipelineJsonRenderer}. */ public class PipelineJsonRendererTest { @Test @@ -87,12 +86,12 @@ public void testCompositePipeline() throws IOException { PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), ""))); } - @AutoService(SamzaIOInfo.SamzaIORegistrar.class) - public static class Registrar implements SamzaIOInfo.SamzaIORegistrar { + @AutoService(PipelineJsonRenderer.SamzaIORegistrar.class) + public static class Registrar implements PipelineJsonRenderer.SamzaIORegistrar { @Override - public SamzaIOInfo getSamzaIO() { - return new SamzaIOInfo() { + public PipelineJsonRenderer.SamzaIOInfo getSamzaIO() { + return new PipelineJsonRenderer.SamzaIOInfo() { @Override public Optional getIOInfo(TransformHierarchy.Node node) { if (node.isRootNode()) { From b91a607c6c2efb9f0f9b7767ee6469923ba84dbf Mon Sep 17 00:00:00 2001 From: Pawas Chhokra Date: Thu, 12 Aug 2021 10:52:30 -0700 Subject: [PATCH 12/12] Make interface public --- .../apache/beam/runners/samza/util/PipelineJsonRenderer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java index 6c7f9d5442a4..cc53764d7414 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java @@ -51,7 +51,7 @@ public interface SamzaIOInfo { } /** A registrar for {@link SamzaIOInfo}. */ - interface SamzaIORegistrar { + public interface SamzaIORegistrar { SamzaIOInfo getSamzaIO(); }