From bf5f73522ccaae332da2278777c7ea6aa81bcdf8 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Sun, 7 Jul 2024 15:18:20 -0700 Subject: [PATCH 1/4] Stage PrismRunner implementation and dependencies --- .../org/apache/beam/runners/prism/PrismPipelineOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java index ec0f8beb620a..1b665603b8b6 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java @@ -35,7 +35,7 @@ public interface PrismPipelineOptions extends PortablePipelineOptions { void setPrismLocation(String prismLocation); @Description( - "Override the SDK's version for deriving the Github Release URLs for " + "Override the SDK\\'s version for deriving the Github Release URLs for " + "downloading a zipped prism binary, for the current platform. If " + "set to a Github Release page URL, then it will use that release page as a base when constructing the download URL.") String getPrismVersionOverride(); From 9f32e3d2459817c46b09f433db8291fa1cf9add6 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Sun, 7 Jul 2024 15:33:49 -0700 Subject: [PATCH 2/4] A Java support class executes the Prism binary --- .../beam/runners/prism/PrismExecutor.java | 199 ++++++++++++++++++ .../beam/runners/prism/PrismExecutorTest.java | 99 +++++++++ 2 files changed, 298 insertions(+) create mode 100644 runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java create mode 100644 runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java new file mode 100644 index 000000000000..16320e59ff8a --- /dev/null +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.auto.value.AutoValue; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link PrismExecutor} builds and executes a {@link ProcessBuilder} for use by the {@link + * PrismRunner}. Prism is a {@link org.apache.beam.runners.portability.PortableRunner} maintained at + * sdks/go/cmd/prism. + */ +@AutoValue +abstract class PrismExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(PrismExecutor.class); + + protected @MonotonicNonNull Process process; + protected ExecutorService executorService = Executors.newSingleThreadExecutor(); + protected @MonotonicNonNull Future future = null; + + static Builder builder() { + return new AutoValue_PrismExecutor.Builder(); + } + + /** The command to execute the Prism binary. */ + abstract String getCommand(); + + /** + * Additional arguments to pass when invoking the Prism binary. Defaults to an {@link + * Collections#emptyList()}. + */ + abstract List getArguments(); + + /** Stops the execution of the {@link Process}, created as a result of {@link #execute}. */ + void stop() { + LOG.info("Stopping Prism..."); + if (future != null) { + future.cancel(true); + } + executorService.shutdown(); + try { + boolean ignored = executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + } + if (process == null) { + return; + } + if (!process.isAlive()) { + return; + } + process.destroy(); + try { + process.waitFor(); + } catch (InterruptedException ignored) { + } + } + + /** + * Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to STDOUT. + */ + void execute() throws IOException { + execute(createProcessBuilder().inheritIO()); + } + + /** + * Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to the + * {@param outputStream}. + */ + void execute(OutputStream outputStream) throws IOException { + execute(createProcessBuilder().redirectErrorStream(true)); + this.future = + executorService.submit( + () -> { + try { + ByteStreams.copy(checkStateNotNull(process).getInputStream(), outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to the + * {@param file}. + */ + void execute(File file) throws IOException { + execute( + createProcessBuilder() + .redirectErrorStream(true) + .redirectOutput(ProcessBuilder.Redirect.appendTo(file))); + } + + private void execute(ProcessBuilder processBuilder) throws IOException { + this.process = processBuilder.start(); + LOG.info( + "started {}, pid: {}", + String.join(" ", getCommandWithArguments()), + getPidOrNotAvailableWarning()); + } + + private List getCommandWithArguments() { + List commandWithArguments = new ArrayList<>(); + commandWithArguments.add(getCommand()); + commandWithArguments.addAll(getArguments()); + + return commandWithArguments; + } + + private ProcessBuilder createProcessBuilder() { + return new ProcessBuilder(getCommandWithArguments()); + } + + private String getPidOrNotAvailableWarning() { + Long pid = getPidIfAvailable(); + if (pid == null) { + return "(not available with Java environment)"; + } + + return String.valueOf(pid); + } + + private @Nullable Long getPidIfAvailable() { + if (process == null) { + return null; + } + Field field = getPidFieldIfAvailable(); + if (field == null) { + return null; + } + try { + return (Long) field.get(process); + } catch (IllegalAccessException ignored) { + } + return null; + } + + private static @Nullable Field getPidFieldIfAvailable() { + try { + Field pidField = Process.class.getDeclaredField("pid"); + pidField.setAccessible(true); + return pidField; + } catch (NoSuchFieldException ignored) { + return null; + } + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setCommand(String command); + + abstract Builder setArguments(List arguments); + + abstract Optional> getArguments(); + + abstract PrismExecutor autoBuild(); + + final PrismExecutor build() { + if (!getArguments().isPresent()) { + setArguments(Collections.emptyList()); + } + return autoBuild(); + } + } +} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java new file mode 100644 index 000000000000..315e585a0c5f --- /dev/null +++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link PrismExecutor}. */ +@RunWith(JUnit4.class) +public class PrismExecutorTest { + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule public TestName testName = new TestName(); + + @Test + public void executeThenStop() throws IOException { + PrismExecutor executor = underTest().build(); + executor.execute(); + sleep(3000L); + executor.stop(); + } + + @Test + public void executeWithStreamRedirectThenStop() throws IOException { + PrismExecutor executor = underTest().build(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + executor.execute(outputStream); + sleep(3000L); + executor.stop(); + String output = outputStream.toString(StandardCharsets.UTF_8.name()); + assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073"); + } + + @Test + public void executeWithFileOutputThenStop() throws IOException { + PrismExecutor executor = underTest().build(); + File log = temporaryFolder.newFile(testName.getMethodName()); + executor.execute(log); + sleep(3000L); + executor.stop(); + try (Stream stream = Files.lines(log.toPath(), StandardCharsets.UTF_8)) { + String output = stream.collect(Collectors.joining("\n")); + assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073"); + } + } + + @Test + public void executeWithCustomArgumentsThenStop() throws IOException { + PrismExecutor executor = + underTest().setArguments(Collections.singletonList("-job_port=5555")).build(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + executor.execute(outputStream); + sleep(3000L); + executor.stop(); + String output = outputStream.toString(StandardCharsets.UTF_8.name()); + assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:5555"); + } + + private PrismExecutor.Builder underTest() { + return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } +} From 0f34d52e5e5a76e6b7a60edb20227152a3736632 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Mon, 8 Jul 2024 17:56:05 +0000 Subject: [PATCH 3/4] Sync with head --- .../org/apache/beam/runners/prism/PrismPipelineOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java index 1b665603b8b6..ec0f8beb620a 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java @@ -35,7 +35,7 @@ public interface PrismPipelineOptions extends PortablePipelineOptions { void setPrismLocation(String prismLocation); @Description( - "Override the SDK\\'s version for deriving the Github Release URLs for " + "Override the SDK's version for deriving the Github Release URLs for " + "downloading a zipped prism binary, for the current platform. If " + "set to a Github Release page URL, then it will use that release page as a base when constructing the download URL.") String getPrismVersionOverride(); From cba2d239a1e5e2fea4ebd0a11e9c992c402eff8e Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 9 Jul 2024 15:34:55 +0000 Subject: [PATCH 4/4] Remove pid --- .../beam/runners/prism/PrismExecutor.java | 41 +------------------ 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java index 16320e59ff8a..fba2eec99c5c 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java @@ -23,7 +23,6 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,7 +33,6 @@ import java.util.concurrent.TimeUnit; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,10 +124,7 @@ void execute(File file) throws IOException { private void execute(ProcessBuilder processBuilder) throws IOException { this.process = processBuilder.start(); - LOG.info( - "started {}, pid: {}", - String.join(" ", getCommandWithArguments()), - getPidOrNotAvailableWarning()); + LOG.info("started {}", String.join(" ", getCommandWithArguments())); } private List getCommandWithArguments() { @@ -144,40 +139,6 @@ private ProcessBuilder createProcessBuilder() { return new ProcessBuilder(getCommandWithArguments()); } - private String getPidOrNotAvailableWarning() { - Long pid = getPidIfAvailable(); - if (pid == null) { - return "(not available with Java environment)"; - } - - return String.valueOf(pid); - } - - private @Nullable Long getPidIfAvailable() { - if (process == null) { - return null; - } - Field field = getPidFieldIfAvailable(); - if (field == null) { - return null; - } - try { - return (Long) field.get(process); - } catch (IllegalAccessException ignored) { - } - return null; - } - - private static @Nullable Field getPidFieldIfAvailable() { - try { - Field pidField = Process.class.getDeclaredField("pid"); - pidField.setAccessible(true); - return pidField; - } catch (NoSuchFieldException ignored) { - return null; - } - } - @AutoValue.Builder abstract static class Builder {