diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java new file mode 100644 index 000000000000..8437ccfdf5fa --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +/** A read-only {@link FileSystem} implementation looking up resources using a ClassLoader. */ +public class ClassLoaderFileSystem extends FileSystem { + + public static final String SCHEMA = "classpath"; + private static final String PREFIX = SCHEMA + "://"; + + ClassLoaderFileSystem() {} + + @Override + protected List match(List specs) throws IOException { + throw new UnsupportedOperationException("Un-globbable filesystem."); + } + + @Override + protected WritableByteChannel create( + ClassLoaderResourceId resourceId, CreateOptions createOptions) throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected ReadableByteChannel open(ClassLoaderResourceId resourceId) throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + InputStream inputStream = + classLoader.getResourceAsStream(resourceId.path.substring(PREFIX.length())); + if (inputStream == null) { + + throw new IOException( + "Unable to load " + + resourceId.path + + " with " + + classLoader + + " URL " + + classLoader.getResource(resourceId.path.substring(PREFIX.length()))); + } + return Channels.newChannel(inputStream); + } + + @Override + protected void copy( + List srcResourceIds, List destResourceIds) + throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected void rename( + List srcResourceIds, List destResourceIds) + throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected void delete(Collection resourceIds) throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected ClassLoaderResourceId matchNewResource(String path, boolean isDirectory) { + return new ClassLoaderResourceId(path); + } + + @Override + protected String getScheme() { + return SCHEMA; + } + + public static class ClassLoaderResourceId implements ResourceId { + + private final String path; + + private ClassLoaderResourceId(String path) { + checkArgument(path.startsWith(PREFIX), path); + this.path = path; + } + + @Override + public ClassLoaderResourceId resolve(String other, ResolveOptions resolveOptions) { + if (other.startsWith(PREFIX)) { + return new ClassLoaderResourceId(other); + } else if (other.startsWith("/")) { + return new ClassLoaderResourceId(SCHEMA + ":/" + other); + } else { + return new ClassLoaderResourceId(path + "/" + other); + } + } + + @Override + public ClassLoaderResourceId getCurrentDirectory() { + int ix = path.lastIndexOf('/'); + if (ix <= PREFIX.length()) { + return new ClassLoaderResourceId(PREFIX); + } else { + return new ClassLoaderResourceId(path.substring(0, ix)); + } + } + + @Override + public String getScheme() { + return SCHEMA; + } + + @Nullable + @Override + public String getFilename() { + return path; + } + + @Override + public boolean isDirectory() { + return false; + } + } + + /** {@link AutoService} registrar for the {@link ClassLoaderFileSystem}. */ + @AutoService(FileSystemRegistrar.class) + @Experimental(Experimental.Kind.FILESYSTEM) + public static class ClassLoaderFileSystemRegistrar implements FileSystemRegistrar { + @Override + public Iterable fromOptions(@Nullable PipelineOptions options) { + return ImmutableList.of(new ClassLoaderFileSystem()); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java new file mode 100644 index 000000000000..2667196fb338 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static java.nio.channels.Channels.newInputStream; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.ReadableByteChannel; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ClassLoaderFileSystemTest { + + private static final String SOME_CLASS = + "classpath://org/apache/beam/sdk/io/ClassLoaderFileSystem.class"; + + @Test + public void testOpen() throws IOException { + ClassLoaderFileSystem filesystem = new ClassLoaderFileSystem(); + ReadableByteChannel channel = filesystem.open(filesystem.matchNewResource(SOME_CLASS, false)); + checkIsClass(channel); + } + + @Test + public void testRegistrar() throws IOException { + ReadableByteChannel channel = FileSystems.open(FileSystems.matchNewResource(SOME_CLASS, false)); + checkIsClass(channel); + } + + @Test + public void testResolve() throws IOException { + ClassLoaderFileSystem filesystem = new ClassLoaderFileSystem(); + ClassLoaderFileSystem.ClassLoaderResourceId original = + filesystem.matchNewResource(SOME_CLASS, false); + ClassLoaderFileSystem.ClassLoaderResourceId parent = original.getCurrentDirectory(); + ClassLoaderFileSystem.ClassLoaderResourceId grandparent = parent.getCurrentDirectory(); + assertEquals("classpath://org/apache/beam/sdk", grandparent.getFilename()); + ClassLoaderFileSystem.ClassLoaderResourceId resource = + grandparent + .resolve("io", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve( + "ClassLoaderFileSystem.class", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); + ReadableByteChannel channel = filesystem.open(resource); + checkIsClass(channel); + } + + public void checkIsClass(ReadableByteChannel channel) throws IOException { + FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); + InputStream inputStream = newInputStream(channel); + byte[] magic = new byte[4]; + inputStream.read(magic); + assertArrayEquals(magic, new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE}); + } +} diff --git a/sdks/python/apache_beam/runners/portability/abstract_job_service.py b/sdks/python/apache_beam/runners/portability/abstract_job_service.py index d1bc313566fa..f1c26c71ae3d 100644 --- a/sdks/python/apache_beam/runners/portability/abstract_job_service.py +++ b/sdks/python/apache_beam/runners/portability/abstract_job_service.py @@ -18,6 +18,7 @@ from __future__ import absolute_import +import copy import itertools import json import logging @@ -46,7 +47,9 @@ from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: - from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports + # pylint: disable=ungrouped-imports + from typing import BinaryIO + from google.protobuf import struct_pb2 from apache_beam.portability.api import beam_runner_api_pb2 _LOGGER = logging.getLogger(__name__) @@ -278,6 +281,27 @@ def to_runner_api(self): state=self.state) +class JarArtifactManager(object): + def __init__(self, jar_path, root): + self._root = root + self._zipfile_handle = zipfile.ZipFile(jar_path, 'a') + + def close(self): + self._zipfile_handle.close() + + def file_writer(self, path): + # type: (str) -> Tuple[BinaryIO, str] + + """Given a relative path, returns an open handle that can be written to + and an reference that can later be used to read this file.""" + full_path = '%s/%s' % (self._root, path) + return self._zipfile_handle.open( + full_path, 'w', force_zip64=True), 'classpath://%s' % full_path + + def zipfile_handle(self): + return self._zipfile_handle + + class UberJarBeamJob(AbstractBeamJob): """Abstract baseclass for creating a Beam job. The resulting job will be packaged and run in an executable uber jar.""" @@ -291,8 +315,6 @@ class UberJarBeamJob(AbstractBeamJob): PIPELINE_PATH = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, "pipeline.json"]) PIPELINE_OPTIONS_PATH = '/'.join( [PIPELINE_FOLDER, PIPELINE_NAME, 'pipeline-options.json']) - ARTIFACT_MANIFEST_PATH = '/'.join( - [PIPELINE_FOLDER, PIPELINE_NAME, 'artifact-manifest.json']) ARTIFACT_FOLDER = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, 'artifacts']) def __init__( @@ -313,27 +335,23 @@ def prepare(self): with tempfile.NamedTemporaryFile(suffix='.jar') as tout: self._jar = tout.name shutil.copy(self._executable_jar, self._jar) - with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z: - with z.open(self.PIPELINE_PATH, 'w') as fout: - fout.write( - json_format.MessageToJson(self._pipeline_proto).encode('utf-8')) - with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout: - fout.write( - json_format.MessageToJson(self._pipeline_options).encode('utf-8')) - with z.open(self.PIPELINE_MANIFEST, 'w') as fout: - fout.write( - json.dumps({ - 'defaultJobName': self.PIPELINE_NAME - }).encode('utf-8')) self._start_artifact_service(self._jar, self._artifact_port) def _start_artifact_service(self, jar, requested_port): - self._artifact_staging_service = artifact_service.ZipFileArtifactService( - jar, self.ARTIFACT_FOLDER) + self._artifact_manager = JarArtifactManager(self._jar, self.ARTIFACT_FOLDER) + self._artifact_staging_service = artifact_service.ArtifactStagingService( + self._artifact_manager.file_writer) + self._artifact_staging_service.register_job( + self._job_id, + { + env_id: env.dependencies + for (env_id, + env) in self._pipeline_proto.components.environments.items() + }) self._artifact_staging_server = grpc.server(futures.ThreadPoolExecutor()) port = self._artifact_staging_server.add_insecure_port( '[::]:%s' % requested_port) - beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server( + beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server( self._artifact_staging_service, self._artifact_staging_server) self._artifact_staging_endpoint = endpoints_pb2.ApiServiceDescriptor( url='localhost:%d' % port) @@ -343,9 +361,34 @@ def _start_artifact_service(self, jar, requested_port): def _stop_artifact_service(self): self._artifact_staging_server.stop(1) - self._artifact_staging_service.close() - self._artifact_manifest_location = ( - self._artifact_staging_service.retrieval_token(self._job_id)) + + # Update dependencies to point to staged files. + pipeline = copy.copy(self._pipeline_proto) + if any(env.dependencies + for env in pipeline.components.environments.values()): + for env_id, deps in self._artifact_staging_service.resolved_deps( + self._job_id).items(): + # Slice assignment not supported for repeated fields. + env = self._pipeline_proto.components.environments[env_id] + del env.dependencies[:] + env.dependencies.extend(deps) + + # Copy the pipeline definition and metadata into the jar. + z = self._artifact_manager.zipfile_handle() + with z.open(self.PIPELINE_PATH, 'w') as fout: + fout.write( + json_format.MessageToJson(self._pipeline_proto).encode('utf-8')) + with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout: + fout.write( + json_format.MessageToJson(self._pipeline_options).encode('utf-8')) + with z.open(self.PIPELINE_MANIFEST, 'w') as fout: + fout.write( + json.dumps({ + 'defaultJobName': self.PIPELINE_NAME + }).encode('utf-8')) + + # Closes the jar file. + self._artifact_manager.close() def artifact_staging_endpoint(self): return self._artifact_staging_endpoint diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index 77b6292cc8b7..4fa92cddc55a 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -27,7 +27,6 @@ import tempfile import time import urllib -import zipfile import requests from google.protobuf import json_format @@ -146,12 +145,6 @@ def delete(self, path, **kwargs): def run(self): self._stop_artifact_service() - # Move the artifact manifest to the expected location. - with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z: - with z.open(self._artifact_manifest_location) as fin: - manifest_contents = fin.read() - with z.open(self.ARTIFACT_MANIFEST_PATH, 'w') as fout: - fout.write(manifest_contents) # Upload the jar and start the job. with open(self._jar, 'rb') as jar_file: diff --git a/sdks/python/apache_beam/runners/portability/local_job_service_test.py b/sdks/python/apache_beam/runners/portability/local_job_service_test.py index f1b17ab7069a..ec164e0e8ffd 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service_test.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service_test.py @@ -22,10 +22,6 @@ import logging import unittest -import grpc - -from apache_beam.portability.api import beam_artifact_api_pb2 -from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.portability import local_job_service @@ -41,17 +37,6 @@ def __init__(self, job_service): def get_pipeline_options(self): return None - def stage(self, pipeline, artifact_staging_endpoint, staging_session_token): - channel = grpc.insecure_channel(artifact_staging_endpoint) - staging_stub = beam_artifact_api_pb2_grpc.LegacyArtifactStagingServiceStub( - channel) - manifest_response = staging_stub.CommitManifest( - beam_artifact_api_pb2.CommitManifestRequest( - staging_session_token=staging_session_token, - manifest=beam_artifact_api_pb2.Manifest())) - channel.close() - return manifest_response.retrieval_token - class LocalJobServerTest(unittest.TestCase): def test_end_to_end(self): diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py index 7d3ada269948..252f70a72797 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py @@ -183,12 +183,6 @@ def _create_submission_request(self, jar, job_name): def run(self): self._stop_artifact_service() - # Move the artifact manifest to the expected location. - with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z: - with z.open(self._artifact_manifest_location) as fin: - manifest_contents = fin.read() - with z.open(self.ARTIFACT_MANIFEST_PATH, 'w') as fout: - fout.write(manifest_contents) # Upload the jar and start the job. self._spark_submission_id = self.post( diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py index b08d5b8a903a..3ae25d4169f7 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py @@ -28,18 +28,16 @@ import zipfile import freezegun -import grpc import requests_mock from apache_beam.options import pipeline_options from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SparkRunnerOptions -from apache_beam.portability.api import beam_artifact_api_pb2 -from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.portability import spark_runner from apache_beam.runners.portability import spark_uber_jar_job_server +from apache_beam.runners.portability.local_job_service_test import TestJobServicePlan @contextlib.contextmanager @@ -135,17 +133,14 @@ def spark_submission_status_response(state): 'http://host:6066', options) # Prepare the job. - prepare_response = job_server.Prepare( - beam_job_api_pb2.PrepareJobRequest( - job_name='job', pipeline=beam_runner_api_pb2.Pipeline())) - channel = grpc.insecure_channel( - prepare_response.artifact_staging_endpoint.url) - retrieval_token = beam_artifact_api_pb2_grpc.LegacyArtifactStagingServiceStub( - channel).CommitManifest( - beam_artifact_api_pb2.CommitManifestRequest( - staging_session_token=prepare_response.staging_session_token, - manifest=beam_artifact_api_pb2.Manifest())).retrieval_token - channel.close() + plan = TestJobServicePlan(job_server) + + # Prepare the job. + prepare_response = plan.prepare(beam_runner_api_pb2.Pipeline()) + retrieval_token = plan.stage( + beam_runner_api_pb2.Pipeline(), + prepare_response.artifact_staging_endpoint.url, + prepare_response.staging_session_token) # Now actually run the job. http_mock.post(