Skip to content
1 change: 1 addition & 0 deletions runners/samza/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ dependencies {
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.jackson_dataformat_yaml
testCompile library.java.google_code_gson
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
validatesRunner project(project.path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator;
import org.apache.beam.runners.samza.translation.SamzaTransformOverrides;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand Down Expand Up @@ -62,6 +63,7 @@
public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class);
private static final String BEAM_DOT_GRAPH = "beamDotGraph";
private static final String BEAM_JSON_GRAPH = "beamJsonGraph";

public static SamzaRunner fromOptions(PipelineOptions opts) {
final SamzaPipelineOptions samzaOptions =
Expand All @@ -82,11 +84,15 @@ private SamzaRunner(SamzaPipelineOptions options) {

public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
LOG.info("Portable pipeline to run:\n{}", dotGraph);
LOG.info("Portable pipeline to run DOT graph:\n{}", dotGraph);

final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
LOG.info("Portable pipeline to run JSON graph:\n{}", jsonGraph);

final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder, options);
configBuilder.put(BEAM_DOT_GRAPH, dotGraph);
configBuilder.put(BEAM_JSON_GRAPH, jsonGraph);

final Config config = configBuilder.build();
options.setConfigOverride(config);
Expand Down Expand Up @@ -121,19 +127,28 @@ public SamzaPipelineResult run(Pipeline pipeline) {
MetricsEnvironment.setMetricsSupported(true);

if (LOG.isDebugEnabled()) {
LOG.debug("Pre-processed Beam pipeline:\n{}", PipelineDotRenderer.toDotString(pipeline));
LOG.debug(
"Pre-processed Beam pipeline in dot format:\n{}",
PipelineDotRenderer.toDotString(pipeline));
LOG.debug(
"Pre-processed Beam pipeline in json format:\n{}",
PipelineJsonRenderer.toJsonString(pipeline));
}

pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());

final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);

final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);

final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
final ConfigBuilder configBuilder = new ConfigBuilder(options);

SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
configBuilder.put(BEAM_DOT_GRAPH, dotGraph);
configBuilder.put(BEAM_JSON_GRAPH, jsonGraph);

final Config config = configBuilder.build();
options.setConfigOverride(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.util;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A JSON renderer for BEAM {@link Pipeline} DAG. This can help us with visualization of the Beam
* DAG.
*/
@Experimental
public class PipelineJsonRenderer implements Pipeline.PipelineVisitor {

/**
* Interface to get I/O information for a Beam job. This will help add I/O information to the Beam
* DAG.
*/
@Experimental
public interface SamzaIOInfo {

/** Get I/O topic name and cluster. */
Optional<String> getIOInfo(TransformHierarchy.Node node);
}

/** A registrar for {@link SamzaIOInfo}. */
public interface SamzaIORegistrar {

SamzaIOInfo getSamzaIO();
}

private static final Logger LOG = LoggerFactory.getLogger(PipelineJsonRenderer.class);
private static final String OUTERMOST_NODE = "OuterMostNode";
@Nullable private static final SamzaIOInfo SAMZA_IO_INFO = loadSamzaIOInfo();

/**
* This method creates a JSON representation of the Beam pipeline.
*
* @param pipeline The beam pipeline
* @return JSON string representation of the pipeline
*/
public static String toJsonString(Pipeline pipeline) {
final PipelineJsonRenderer visitor = new PipelineJsonRenderer();
pipeline.traverseTopologically(visitor);
return visitor.jsonBuilder.toString();
}

/**
* This method creates a JSON representation for Beam Portable Pipeline.
*
* @param pipeline The beam portable pipeline
* @return JSON string representation of the pipeline
*/
public static String toJsonString(RunnerApi.Pipeline pipeline) {
throw new UnsupportedOperationException("JSON DAG for portable pipeline is not supported yet.");
}

private final StringBuilder jsonBuilder = new StringBuilder();
private final StringBuilder graphLinks = new StringBuilder();
private final Map<PValue, String> valueToProducerNodeName = new HashMap<>();
private int indent;

private PipelineJsonRenderer() {}

@Nullable
private static SamzaIOInfo loadSamzaIOInfo() {
final Iterator<SamzaIORegistrar> beamIORegistrarIterator =
ServiceLoader.load(SamzaIORegistrar.class).iterator();
return beamIORegistrarIterator.hasNext()
? Iterators.getOnlyElement(beamIORegistrarIterator).getSamzaIO()
: null;
}

@Override
public void enterPipeline(Pipeline p) {
writeLine("{ \n \"RootNode\": [");
graphLinks.append(",\"graphLinks\": [");
enterBlock();
}

@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
String fullName = node.getFullName();
writeLine("{ \"fullName\":\"%s\",", assignNodeName(fullName));
if (node.getEnclosingNode() != null) {
String enclosingNodeName = node.getEnclosingNode().getFullName();
writeLine(" \"enclosingNode\":\"%s\",", assignNodeName(enclosingNodeName));
}

Optional<String> ioInfo = getIOInfo(node);
if (ioInfo.isPresent() && !ioInfo.get().isEmpty()) {
writeLine(" \"ioInfo\":\"%s\",", escapeString(ioInfo.get()));
}

writeLine(" \"ChildNodes\":[");
enterBlock();
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
exitBlock();
writeLine("]},");
}

@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
String fullName = node.getFullName();
writeLine("{ \"fullName\":\"%s\",", escapeString(fullName));
String enclosingNodeName = node.getEnclosingNode().getFullName();
writeLine(" \"enclosingNode\":\"%s\"},", assignNodeName(enclosingNodeName));

node.getOutputs().values().forEach(x -> valueToProducerNodeName.put(x, fullName));
node.getInputs()
.forEach(
(key, value) -> {
final String producerName = valueToProducerNodeName.get(value);
graphLinks.append(
String.format("{\"from\":\"%s\"," + "\"to\":\"%s\"},", producerName, fullName));
});
}

@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {}

@Override
public void leavePipeline(Pipeline pipeline) {
exitBlock();
writeLine("]");
// delete the last comma
int lastIndex = graphLinks.length() - 1;
if (graphLinks.charAt(lastIndex) == ',') {
graphLinks.deleteCharAt(lastIndex);
}
graphLinks.append("]");
jsonBuilder.append(graphLinks);
jsonBuilder.append("}");
}

private void enterBlock() {
indent += 4;
}

private void exitBlock() {
indent -= 4;
}

private void writeLine(String format, Object... args) {
// Since we append a comma after every entry to the graph, we will need to remove that one extra
// comma towards the end of the JSON.
int secondLastCharIndex = jsonBuilder.length() - 2;
if (jsonBuilder.length() > 1
&& jsonBuilder.charAt(secondLastCharIndex) == ','
&& (format.startsWith("}") || format.startsWith("]"))) {
jsonBuilder.deleteCharAt(secondLastCharIndex);
}
if (indent != 0) {
jsonBuilder.append(String.format("%-" + indent + "s", ""));
}
jsonBuilder.append(String.format(format, args));
jsonBuilder.append("\n");
}

private static String escapeString(String x) {
return x.replace("\"", "\\\"");
}

private static String shortenTag(String tag) {
return tag.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1");
}

private String assignNodeName(String nodeName) {
return escapeString(nodeName.isEmpty() ? OUTERMOST_NODE : nodeName);
}

private Optional<String> getIOInfo(TransformHierarchy.Node node) {
if (SAMZA_IO_INFO == null) {
return Optional.empty();
}
return SAMZA_IO_INFO.getIOInfo(node);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.util;

import static org.junit.Assert.assertEquals;

import com.google.auto.service.AutoService;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;

/** Tests for {@link org.apache.beam.runners.samza.util.PipelineJsonRenderer}. */
public class PipelineJsonRendererTest {

@Test
public void testEmptyPipeline() {
SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
options.setRunner(SamzaRunner.class);

Pipeline p = Pipeline.create(options);

String jsonDag =
"{ \"RootNode\": ["
+ " { \"fullName\":\"OuterMostNode\","
+ " \"ioInfo\":\"TestTopic\","
+ " \"ChildNodes\":[ ]}],\"graphLinks\": []"
+ "}";

System.out.println(PipelineJsonRenderer.toJsonString(p));
assertEquals(
JsonParser.parseString(jsonDag),
JsonParser.parseString(
PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), "")));
}

@Test
public void testCompositePipeline() throws IOException {
SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
options.setRunner(SamzaRunner.class);

Pipeline p = Pipeline.create(options);

p.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1))))
.apply(Window.into(FixedWindows.of(Duration.millis(10))))
.apply(Sum.integersPerKey());

String jsonDagFileName = "src/test/resources/ExpectedDag.json";
String jsonDag =
new String(Files.readAllBytes(Paths.get(jsonDagFileName)), StandardCharsets.UTF_8);

assertEquals(
JsonParser.parseString(jsonDag),
JsonParser.parseString(
PipelineJsonRenderer.toJsonString(p).replaceAll(System.lineSeparator(), "")));
}

@AutoService(PipelineJsonRenderer.SamzaIORegistrar.class)
public static class Registrar implements PipelineJsonRenderer.SamzaIORegistrar {

@Override
public PipelineJsonRenderer.SamzaIOInfo getSamzaIO() {
return new PipelineJsonRenderer.SamzaIOInfo() {
@Override
public Optional<String> getIOInfo(TransformHierarchy.Node node) {
if (node.isRootNode()) {
return Optional.of("TestTopic");
}
return Optional.empty();
}
};
}
}
}
Loading