diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index 3edf73abb962..f7c875b1ea2c 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -75,6 +75,7 @@ dependencies { testCompile library.java.junit testCompile library.java.mockito_core testCompile library.java.jackson_dataformat_yaml + testCompile library.java.google_code_gson validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: ":runners:core-java", configuration: "testRuntime") validatesRunner project(project.path) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 42fbed0386dd..910f1aab4e85 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator; import org.apache.beam.runners.samza.translation.SamzaTransformOverrides; import org.apache.beam.runners.samza.translation.TranslationContext; +import org.apache.beam.runners.samza.util.PipelineJsonRenderer; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -62,6 +63,7 @@ public class SamzaRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class); private static final String BEAM_DOT_GRAPH = "beamDotGraph"; + private static final String BEAM_JSON_GRAPH = "beamJsonGraph"; public static SamzaRunner fromOptions(PipelineOptions opts) { final SamzaPipelineOptions samzaOptions = @@ -82,11 +84,15 @@ private SamzaRunner(SamzaPipelineOptions options) { public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { final String dotGraph = PipelineDotRenderer.toDotString(pipeline); - LOG.info("Portable pipeline to run:\n{}", dotGraph); + LOG.info("Portable pipeline to run DOT graph:\n{}", dotGraph); + + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); + LOG.info("Portable pipeline to run JSON graph:\n{}", jsonGraph); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder, options); configBuilder.put(BEAM_DOT_GRAPH, dotGraph); + configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); options.setConfigOverride(config); @@ -121,7 +127,12 @@ public SamzaPipelineResult run(Pipeline pipeline) { MetricsEnvironment.setMetricsSupported(true); if (LOG.isDebugEnabled()) { - LOG.debug("Pre-processed Beam pipeline:\n{}", PipelineDotRenderer.toDotString(pipeline)); + LOG.debug( + "Pre-processed Beam pipeline in dot format:\n{}", + PipelineDotRenderer.toDotString(pipeline)); + LOG.debug( + "Pre-processed Beam pipeline in json format:\n{}", + PipelineJsonRenderer.toJsonString(pipeline)); } pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); @@ -129,11 +140,15 @@ public SamzaPipelineResult run(Pipeline pipeline) { final String dotGraph = PipelineDotRenderer.toDotString(pipeline); LOG.info("Beam pipeline DOT graph:\n{}", dotGraph); + final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline); + LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph); + final Map idMap = PViewToIdMapper.buildIdMap(pipeline); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder); configBuilder.put(BEAM_DOT_GRAPH, dotGraph); + configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); final Config config = configBuilder.build(); options.setConfigOverride(config); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java new file mode 100644 index 000000000000..cc53764d7414 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.util; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JSON renderer for BEAM {@link Pipeline} DAG. This can help us with visualization of the Beam + * DAG. + */ +@Experimental +public class PipelineJsonRenderer implements Pipeline.PipelineVisitor { + + /** + * Interface to get I/O information for a Beam job. This will help add I/O information to the Beam + * DAG. + */ + @Experimental + public interface SamzaIOInfo { + + /** Get I/O topic name and cluster. */ + Optional getIOInfo(TransformHierarchy.Node node); + } + + /** A registrar for {@link SamzaIOInfo}. */ + public interface SamzaIORegistrar { + + SamzaIOInfo getSamzaIO(); + } + + private static final Logger LOG = LoggerFactory.getLogger(PipelineJsonRenderer.class); + private static final String OUTERMOST_NODE = "OuterMostNode"; + @Nullable private static final SamzaIOInfo SAMZA_IO_INFO = loadSamzaIOInfo(); + + /** + * This method creates a JSON representation of the Beam pipeline. + * + * @param pipeline The beam pipeline + * @return JSON string representation of the pipeline + */ + public static String toJsonString(Pipeline pipeline) { + final PipelineJsonRenderer visitor = new PipelineJsonRenderer(); + pipeline.traverseTopologically(visitor); + return visitor.jsonBuilder.toString(); + } + + /** + * This method creates a JSON representation for Beam Portable Pipeline. + * + * @param pipeline The beam portable pipeline + * @return JSON string representation of the pipeline + */ + public static String toJsonString(RunnerApi.Pipeline pipeline) { + throw new UnsupportedOperationException("JSON DAG for portable pipeline is not supported yet."); + } + + private final StringBuilder jsonBuilder = new StringBuilder(); + private final StringBuilder graphLinks = new StringBuilder(); + private final Map valueToProducerNodeName = new HashMap<>(); + private int indent; + + private PipelineJsonRenderer() {} + + @Nullable + private static SamzaIOInfo loadSamzaIOInfo() { + final Iterator beamIORegistrarIterator = + ServiceLoader.load(SamzaIORegistrar.class).iterator(); + return beamIORegistrarIterator.hasNext() + ? Iterators.getOnlyElement(beamIORegistrarIterator).getSamzaIO() + : null; + } + + @Override + public void enterPipeline(Pipeline p) { + writeLine("{ \n \"RootNode\": ["); + graphLinks.append(",\"graphLinks\": ["); + enterBlock(); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + String fullName = node.getFullName(); + writeLine("{ \"fullName\":\"%s\",", assignNodeName(fullName)); + if (node.getEnclosingNode() != null) { + String enclosingNodeName = node.getEnclosingNode().getFullName(); + writeLine(" \"enclosingNode\":\"%s\",", assignNodeName(enclosingNodeName)); + } + + Optional ioInfo = getIOInfo(node); + if (ioInfo.isPresent() && !ioInfo.get().isEmpty()) { + writeLine(" \"ioInfo\":\"%s\",", escapeString(ioInfo.get())); + } + + writeLine(" \"ChildNodes\":["); + enterBlock(); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + exitBlock(); + writeLine("]},"); + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + String fullName = node.getFullName(); + writeLine("{ \"fullName\":\"%s\",", escapeString(fullName)); + String enclosingNodeName = node.getEnclosingNode().getFullName(); + writeLine(" \"enclosingNode\":\"%s\"},", assignNodeName(enclosingNodeName)); + + node.getOutputs().values().forEach(x -> valueToProducerNodeName.put(x, fullName)); + node.getInputs() + .forEach( + (key, value) -> { + final String producerName = valueToProducerNodeName.get(value); + graphLinks.append( + String.format("{\"from\":\"%s\"," + "\"to\":\"%s\"},", producerName, fullName)); + }); + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) {} + + @Override + public void leavePipeline(Pipeline pipeline) { + exitBlock(); + writeLine("]"); + // delete the last comma + int lastIndex = graphLinks.length() - 1; + if (graphLinks.charAt(lastIndex) == ',') { + graphLinks.deleteCharAt(lastIndex); + } + graphLinks.append("]"); + jsonBuilder.append(graphLinks); + jsonBuilder.append("}"); + } + + private void enterBlock() { + indent += 4; + } + + private void exitBlock() { + indent -= 4; + } + + private void writeLine(String format, Object... args) { + // Since we append a comma after every entry to the graph, we will need to remove that one extra + // comma towards the end of the JSON. + int secondLastCharIndex = jsonBuilder.length() - 2; + if (jsonBuilder.length() > 1 + && jsonBuilder.charAt(secondLastCharIndex) == ',' + && (format.startsWith("}") || format.startsWith("]"))) { + jsonBuilder.deleteCharAt(secondLastCharIndex); + } + if (indent != 0) { + jsonBuilder.append(String.format("%-" + indent + "s", "")); + } + jsonBuilder.append(String.format(format, args)); + jsonBuilder.append("\n"); + } + + private static String escapeString(String x) { + return x.replace("\"", "\\\""); + } + + private static String shortenTag(String tag) { + return tag.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1"); + } + + private String assignNodeName(String nodeName) { + return escapeString(nodeName.isEmpty() ? OUTERMOST_NODE : nodeName); + } + + private Optional getIOInfo(TransformHierarchy.Node node) { + if (SAMZA_IO_INFO == null) { + return Optional.empty(); + } + return SAMZA_IO_INFO.getIOInfo(node); + } +} diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/util/PipelineJsonRendererTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/PipelineJsonRendererTest.java new file mode 100644 index 000000000000..f428fa633035 --- /dev/null +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/PipelineJsonRendererTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.util; + +import static org.junit.Assert.assertEquals; + +import com.google.auto.service.AutoService; +import com.google.gson.JsonParser; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Optional; +import org.apache.beam.runners.samza.SamzaPipelineOptions; +import org.apache.beam.runners.samza.SamzaRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +/** Tests for {@link org.apache.beam.runners.samza.util.PipelineJsonRenderer}. */ +public class PipelineJsonRendererTest { + + @Test + public void testEmptyPipeline() { + SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); + options.setRunner(SamzaRunner.class); + + Pipeline p = Pipeline.create(options); + + String jsonDag = + "{ \"RootNode\": [" + + " { \"fullName\":\"OuterMostNode\"," + + " \"ioInfo\":\"TestTopic\"," + + " \"ChildNodes\":[ ]}],\"graphLinks\": []" + + "}"; + + System.out.println(PipelineJsonRenderer.toJsonString(p)); + assertEquals( + JsonParser.parseString(jsonDag), + JsonParser.parseString( + PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), ""))); + } + + @Test + public void testCompositePipeline() throws IOException { + SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); + options.setRunner(SamzaRunner.class); + + Pipeline p = Pipeline.create(options); + + p.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1)))) + .apply(Window.into(FixedWindows.of(Duration.millis(10)))) + .apply(Sum.integersPerKey()); + + String jsonDagFileName = "src/test/resources/ExpectedDag.json"; + String jsonDag = + new String(Files.readAllBytes(Paths.get(jsonDagFileName)), StandardCharsets.UTF_8); + + assertEquals( + JsonParser.parseString(jsonDag), + JsonParser.parseString( + PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), ""))); + } + + @AutoService(PipelineJsonRenderer.SamzaIORegistrar.class) + public static class Registrar implements PipelineJsonRenderer.SamzaIORegistrar { + + @Override + public PipelineJsonRenderer.SamzaIOInfo getSamzaIO() { + return new PipelineJsonRenderer.SamzaIOInfo() { + @Override + public Optional getIOInfo(TransformHierarchy.Node node) { + if (node.isRootNode()) { + return Optional.of("TestTopic"); + } + return Optional.empty(); + } + }; + } + } +} diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json new file mode 100644 index 000000000000..8f45ec8252ee --- /dev/null +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -0,0 +1,68 @@ +{ + "RootNode": [ + { "fullName":"OuterMostNode", + "ioInfo":"TestTopic", + "ChildNodes":[ + { "fullName":"Create.TimestampedValues", + "enclosingNode":"OuterMostNode", + "ChildNodes":[ + { "fullName":"Create.TimestampedValues/Create.Values", + "enclosingNode":"Create.TimestampedValues", + "ChildNodes":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "enclosingNode":"Create.TimestampedValues/Create.Values", + "ChildNodes":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)"}, + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "ChildNodes":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)"} + ]}, + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)", + "ChildNodes":[ + { "fullName":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)", + "enclosingNode":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)"} + ]} + ]} + ]}, + { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)", + "enclosingNode":"Create.TimestampedValues", + "ChildNodes":[ + { "fullName":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)", + "enclosingNode":"Create.TimestampedValues/ParDo(ConvertTimestamps)"} + ]} + ]}, + { "fullName":"Window.Into()", + "enclosingNode":"OuterMostNode", + "ChildNodes":[ + { "fullName":"Window.Into()/Window.Assign", + "enclosingNode":"Window.Into()"} + ]}, + { "fullName":"Combine.perKey(SumInteger)", + "enclosingNode":"OuterMostNode", + "ChildNodes":[ + { "fullName":"Combine.perKey(SumInteger)/GroupByKey", + "enclosingNode":"Combine.perKey(SumInteger)"}, + { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues", + "enclosingNode":"Combine.perKey(SumInteger)", + "ChildNodes":[ + { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)", + "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues", + "ChildNodes":[ + { "fullName":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "enclosingNode":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)"} + ]} + ]} + ]} + ]} + ] +,"graphLinks": [ + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/Impulse","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)"}, + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource)","to":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)"}, + {"from":"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)","to":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)"}, + {"from":"Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)","to":"Window.Into()/Window.Assign"}, + {"from":"Window.Into()/Window.Assign","to":"Combine.perKey(SumInteger)/GroupByKey"}, + {"from":"Combine.perKey(SumInteger)/GroupByKey","to":"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)"}]} \ No newline at end of file