From e7371d56e7163a0832e1d44ba8deddd0135b9819 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 1 May 2020 12:09:56 -0700 Subject: [PATCH 01/10] [BEAM-9577] Artifact v2 support for uber jars. --- .../beam/sdk/io/ClassLoaderFileSystem.java | 154 ++++++++++++++++++ .../sdk/io/ClassLoaderFileSystemTest.java | 57 +++++++ .../portability/abstract_job_service.py | 75 ++++++--- .../portability/flink_uber_jar_job_server.py | 6 - 4 files changed, 266 insertions(+), 26 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java new file mode 100644 index 000000000000..8d09fe4231d5 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +/** A read-only {@link FileSystem} implementation looking up resources using a ClassLoader. */ +public class ClassLoaderFileSystem extends FileSystem { + + public static final String SCHEMA = "classpath"; + private static final String PREFIX = SCHEMA + "://"; + + ClassLoaderFileSystem() {} + + @Override + protected List match(List specs) throws IOException { + throw new UnsupportedOperationException("Un-globable filesystem."); + } + + @Override + protected WritableByteChannel create( + ClassLoaderResourceId resourceId, CreateOptions createOptions) throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected ReadableByteChannel open(ClassLoaderResourceId resourceId) throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + InputStream inputStream = + classLoader.getResourceAsStream(resourceId.path.substring(PREFIX.length())); + if (inputStream == null) { + throw new IOException("Unable to load " + resourceId.path + " with " + classLoader); + } + return Channels.newChannel(inputStream); + } + + @Override + protected void copy( + List srcResourceIds, List destResourceIds) + throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected void rename( + List srcResourceIds, List destResourceIds) + throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected void delete(Collection resourceIds) throws IOException { + throw new UnsupportedOperationException("Read-only filesystem."); + } + + @Override + protected ClassLoaderResourceId matchNewResource(String path, boolean isDirectory) { + return new ClassLoaderResourceId(path); + } + + @Override + protected String getScheme() { + return SCHEMA; + } + + public static class ClassLoaderResourceId implements ResourceId { + + private final String path; + + private ClassLoaderResourceId(String path) { + checkArgument(path.startsWith(PREFIX)); + this.path = path; + } + + @Override + public ResourceId resolve(String other, ResolveOptions resolveOptions) { + if (other.startsWith(PREFIX)) { + return new ClassLoaderResourceId(other); + } else if (other.startsWith("/")) { + return new ClassLoaderResourceId(SCHEMA + ":/" + other); + } else { + return new ClassLoaderResourceId(path + "/" + other); + } + } + + @Override + public ResourceId getCurrentDirectory() { + int ix = path.lastIndexOf('/'); + if (ix <= PREFIX.length()) { + return new ClassLoaderResourceId(PREFIX); + } else { + return new ClassLoaderResourceId(path.substring(0, ix)); + } + } + + @Override + public String getScheme() { + return SCHEMA; + } + + @Nullable + @Override + public String getFilename() { + return path; + } + + @Override + public boolean isDirectory() { + return false; + } + } + + /** {@link AutoService} registrar for the {@link ClassLoaderFileSystem}. */ + @AutoService(FileSystemRegistrar.class) + @Experimental(Experimental.Kind.FILESYSTEM) + public static class ClassLoaderFileSystemRegistrar implements FileSystemRegistrar { + @Override + public Iterable fromOptions(@Nullable PipelineOptions options) { + return ImmutableList.of(new ClassLoaderFileSystem()); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java new file mode 100644 index 000000000000..f5943ce434f4 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static java.nio.channels.Channels.newInputStream; +import static org.junit.Assert.assertArrayEquals; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.ReadableByteChannel; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ClassLoaderFileSystemTest { + + private static final String SOME_CLASS = + "classloader://org/apache/beam/sdk/io/ClassLoaderFilesystem.class"; + + @Test + public void testOpen() throws IOException { + ClassLoaderFileSystem filesystem = new ClassLoaderFileSystem(); + ReadableByteChannel channel = filesystem.open(filesystem.matchNewResource(SOME_CLASS, false)); + checkIsClass(channel); + } + + @Test + public void testRegistrar() throws IOException { + ReadableByteChannel channel = FileSystems.open(FileSystems.matchNewResource(SOME_CLASS, false)); + checkIsClass(channel); + } + + public void checkIsClass(ReadableByteChannel channel) throws IOException { + FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); + InputStream inputStream = newInputStream(channel); + byte[] magic = new byte[4]; + inputStream.read(magic); + assertArrayEquals(magic, new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE}); + } +} diff --git a/sdks/python/apache_beam/runners/portability/abstract_job_service.py b/sdks/python/apache_beam/runners/portability/abstract_job_service.py index d1bc313566fa..6827b9760491 100644 --- a/sdks/python/apache_beam/runners/portability/abstract_job_service.py +++ b/sdks/python/apache_beam/runners/portability/abstract_job_service.py @@ -18,6 +18,7 @@ from __future__ import absolute_import +import copy import itertools import json import logging @@ -278,6 +279,23 @@ def to_runner_api(self): state=self.state) +class JarArtifactManager(object): + def __init__(self, jar_path, root): + self._root = root + self._zipfile_handle = zipfile.ZipFile(jar_path, 'a') + + def close(self): + self._zipfile_handle.close() + + def file_writer(self, path): + full_path = '%s/%s' % (self._root, path) + return self._zipfile_handle.open( + full_path, 'w', force_zip64=True), 'classpath://%s' % full_path + + def zipfile_handle(self): + return self._zipfile_handle + + class UberJarBeamJob(AbstractBeamJob): """Abstract baseclass for creating a Beam job. The resulting job will be packaged and run in an executable uber jar.""" @@ -291,8 +309,6 @@ class UberJarBeamJob(AbstractBeamJob): PIPELINE_PATH = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, "pipeline.json"]) PIPELINE_OPTIONS_PATH = '/'.join( [PIPELINE_FOLDER, PIPELINE_NAME, 'pipeline-options.json']) - ARTIFACT_MANIFEST_PATH = '/'.join( - [PIPELINE_FOLDER, PIPELINE_NAME, 'artifact-manifest.json']) ARTIFACT_FOLDER = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, 'artifacts']) def __init__( @@ -313,39 +329,58 @@ def prepare(self): with tempfile.NamedTemporaryFile(suffix='.jar') as tout: self._jar = tout.name shutil.copy(self._executable_jar, self._jar) - with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z: - with z.open(self.PIPELINE_PATH, 'w') as fout: - fout.write( - json_format.MessageToJson(self._pipeline_proto).encode('utf-8')) - with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout: - fout.write( - json_format.MessageToJson(self._pipeline_options).encode('utf-8')) - with z.open(self.PIPELINE_MANIFEST, 'w') as fout: - fout.write( - json.dumps({ - 'defaultJobName': self.PIPELINE_NAME - }).encode('utf-8')) self._start_artifact_service(self._jar, self._artifact_port) def _start_artifact_service(self, jar, requested_port): - self._artifact_staging_service = artifact_service.ZipFileArtifactService( - jar, self.ARTIFACT_FOLDER) + self._artifact_manager = JarArtifactManager(self._jar, self.ARTIFACT_FOLDER) + self._artifact_staging_service = artifact_service.ArtifactStagingService( + self._artifact_manager.file_writer) + self._artifact_staging_service.register_job( + self._job_id, + {env_id: env.dependencies + for (env_id, env) in self._pipeline_proto.components.environments.items()}) self._artifact_staging_server = grpc.server(futures.ThreadPoolExecutor()) port = self._artifact_staging_server.add_insecure_port( '[::]:%s' % requested_port) - beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server( + beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server( self._artifact_staging_service, self._artifact_staging_server) self._artifact_staging_endpoint = endpoints_pb2.ApiServiceDescriptor( url='localhost:%d' % port) self._artifact_staging_server.start() _LOGGER.info('Artifact server started on port %s', port) + _LOGGER.error('Artifact server started on port %s', port) return port def _stop_artifact_service(self): self._artifact_staging_server.stop(1) - self._artifact_staging_service.close() - self._artifact_manifest_location = ( - self._artifact_staging_service.retrieval_token(self._job_id)) + self._artifact_manifest_location = None + + # Update dependencies to point to staged files. + pipeline = copy.copy(self._pipeline_proto) + if any( + env.dependencies for env in pipeline.components.environments.values()): + for env_id, deps in self._artifact_staging_service.resolved_deps(self._job_id).items(): + # Slice assignment not supported for repeated fields. + env = self._pipeline_proto.components.environments[env_id] + del env.dependencies[:] + env.dependencies.extend(deps) + + # Copy the pipeline definition and metadata into the jar. + z = self._artifact_manager.zipfile_handle() + with z.open(self.PIPELINE_PATH, 'w') as fout: + fout.write( + json_format.MessageToJson(self._pipeline_proto).encode('utf-8')) + with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout: + fout.write( + json_format.MessageToJson(self._pipeline_options).encode('utf-8')) + with z.open(self.PIPELINE_MANIFEST, 'w') as fout: + fout.write( + json.dumps({ + 'defaultJobName': self.PIPELINE_NAME + }).encode('utf-8')) + + # Closes the jar file. + self._artifact_manager.close() def artifact_staging_endpoint(self): return self._artifact_staging_endpoint diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index 77b6292cc8b7..660496c60f1e 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -146,12 +146,6 @@ def delete(self, path, **kwargs): def run(self): self._stop_artifact_service() - # Move the artifact manifest to the expected location. - with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z: - with z.open(self._artifact_manifest_location) as fin: - manifest_contents = fin.read() - with z.open(self.ARTIFACT_MANIFEST_PATH, 'w') as fout: - fout.write(manifest_contents) # Upload the jar and start the job. with open(self._jar, 'rb') as jar_file: From 8a3c14022b3dc4c1cd06a84e89eec6cd0afe0bba Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 14 May 2020 13:00:38 -0700 Subject: [PATCH 02/10] lint, refactor fix --- .../org/apache/beam/sdk/io/ClassLoaderFileSystem.java | 2 +- .../apache/beam/sdk/io/ClassLoaderFileSystemTest.java | 2 +- .../runners/portability/abstract_job_service.py | 11 +++++++---- .../runners/portability/flink_uber_jar_job_server.py | 1 - 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java index 8d09fe4231d5..e98121dbe428 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java @@ -100,7 +100,7 @@ public static class ClassLoaderResourceId implements ResourceId { private final String path; private ClassLoaderResourceId(String path) { - checkArgument(path.startsWith(PREFIX)); + checkArgument(path.startsWith(PREFIX), path); this.path = path; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java index f5943ce434f4..8be1a913fe56 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java @@ -32,7 +32,7 @@ public class ClassLoaderFileSystemTest { private static final String SOME_CLASS = - "classloader://org/apache/beam/sdk/io/ClassLoaderFilesystem.class"; + "classpath://org/apache/beam/sdk/io/ClassLoaderFilesystem.class"; @Test public void testOpen() throws IOException { diff --git a/sdks/python/apache_beam/runners/portability/abstract_job_service.py b/sdks/python/apache_beam/runners/portability/abstract_job_service.py index 6827b9760491..7e4becd77f55 100644 --- a/sdks/python/apache_beam/runners/portability/abstract_job_service.py +++ b/sdks/python/apache_beam/runners/portability/abstract_job_service.py @@ -337,8 +337,11 @@ def _start_artifact_service(self, jar, requested_port): self._artifact_manager.file_writer) self._artifact_staging_service.register_job( self._job_id, - {env_id: env.dependencies - for (env_id, env) in self._pipeline_proto.components.environments.items()}) + { + env_id: env.dependencies + for (env_id, + env) in self._pipeline_proto.components.environments.items() + }) self._artifact_staging_server = grpc.server(futures.ThreadPoolExecutor()) port = self._artifact_staging_server.add_insecure_port( '[::]:%s' % requested_port) @@ -357,8 +360,8 @@ def _stop_artifact_service(self): # Update dependencies to point to staged files. pipeline = copy.copy(self._pipeline_proto) - if any( - env.dependencies for env in pipeline.components.environments.values()): + if any(env.dependencies + for env in pipeline.components.environments.values()): for env_id, deps in self._artifact_staging_service.resolved_deps(self._job_id).items(): # Slice assignment not supported for repeated fields. env = self._pipeline_proto.components.environments[env_id] diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index 660496c60f1e..4fa92cddc55a 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -27,7 +27,6 @@ import tempfile import time import urllib -import zipfile import requests from google.protobuf import json_format From 3a75ae86e70defaa01609f12822254ce300a79c4 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 14 May 2020 13:02:41 -0700 Subject: [PATCH 03/10] fix spark like flink --- .../runners/portability/spark_uber_jar_job_server.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py index 7d3ada269948..252f70a72797 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server.py @@ -183,12 +183,6 @@ def _create_submission_request(self, jar, job_name): def run(self): self._stop_artifact_service() - # Move the artifact manifest to the expected location. - with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z: - with z.open(self._artifact_manifest_location) as fin: - manifest_contents = fin.read() - with z.open(self.ARTIFACT_MANIFEST_PATH, 'w') as fout: - fout.write(manifest_contents) # Upload the jar and start the job. self._spark_submission_id = self.post( From 4edfdaa14ef89aa7774067b85786f7a4cf11fb77 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 14 May 2020 14:27:26 -0700 Subject: [PATCH 04/10] lint, attempt to use system class loader to make jenkins happy --- .../java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java | 3 +-- .../apache_beam/runners/portability/abstract_job_service.py | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java index e98121dbe428..aa079215e225 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java @@ -57,9 +57,8 @@ protected WritableByteChannel create( @Override protected ReadableByteChannel open(ClassLoaderResourceId resourceId) throws IOException { - ClassLoader classLoader = getClass().getClassLoader(); InputStream inputStream = - classLoader.getResourceAsStream(resourceId.path.substring(PREFIX.length())); + ClassLoader.getSystemResourceAsStream(resourceId.path.substring(PREFIX.length())); if (inputStream == null) { throw new IOException("Unable to load " + resourceId.path + " with " + classLoader); } diff --git a/sdks/python/apache_beam/runners/portability/abstract_job_service.py b/sdks/python/apache_beam/runners/portability/abstract_job_service.py index 7e4becd77f55..b1d41d532bf2 100644 --- a/sdks/python/apache_beam/runners/portability/abstract_job_service.py +++ b/sdks/python/apache_beam/runners/portability/abstract_job_service.py @@ -362,7 +362,8 @@ def _stop_artifact_service(self): pipeline = copy.copy(self._pipeline_proto) if any(env.dependencies for env in pipeline.components.environments.values()): - for env_id, deps in self._artifact_staging_service.resolved_deps(self._job_id).items(): + for env_id, deps in self._artifact_staging_service.resolved_deps( + self._job_id).items(): # Slice assignment not supported for repeated fields. env = self._pipeline_proto.components.environments[env_id] del env.dependencies[:] From 00ecfeb6829defaef5231bfe0068b99ed4f07ff5 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 14 May 2020 15:18:59 -0700 Subject: [PATCH 05/10] fix build break, more tests --- .../beam/sdk/io/ClassLoaderFileSystem.java | 8 ++++---- .../sdk/io/ClassLoaderFileSystemTest.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java index aa079215e225..dc3aeafc954a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java @@ -46,7 +46,7 @@ public class ClassLoaderFileSystem extends FileSystem match(List specs) throws IOException { - throw new UnsupportedOperationException("Un-globable filesystem."); + throw new UnsupportedOperationException("Un-globbable filesystem."); } @Override @@ -60,7 +60,7 @@ protected ReadableByteChannel open(ClassLoaderResourceId resourceId) throws IOEx InputStream inputStream = ClassLoader.getSystemResourceAsStream(resourceId.path.substring(PREFIX.length())); if (inputStream == null) { - throw new IOException("Unable to load " + resourceId.path + " with " + classLoader); + throw new IOException("Unable to load " + resourceId.path); } return Channels.newChannel(inputStream); } @@ -104,7 +104,7 @@ private ClassLoaderResourceId(String path) { } @Override - public ResourceId resolve(String other, ResolveOptions resolveOptions) { + public ClassLoaderResourceId resolve(String other, ResolveOptions resolveOptions) { if (other.startsWith(PREFIX)) { return new ClassLoaderResourceId(other); } else if (other.startsWith("/")) { @@ -115,7 +115,7 @@ public ResourceId resolve(String other, ResolveOptions resolveOptions) { } @Override - public ResourceId getCurrentDirectory() { + public ClassLoaderResourceId getCurrentDirectory() { int ix = path.lastIndexOf('/'); if (ix <= PREFIX.length()) { return new ClassLoaderResourceId(PREFIX); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java index 8be1a913fe56..4bd84a5c307d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java @@ -19,10 +19,12 @@ import static java.nio.channels.Channels.newInputStream; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.io.InputStream; import java.nio.channels.ReadableByteChannel; +import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,6 +49,23 @@ public void testRegistrar() throws IOException { checkIsClass(channel); } + @Test + public void testResolve() throws IOException { + ClassLoaderFileSystem filesystem = new ClassLoaderFileSystem(); + ClassLoaderFileSystem.ClassLoaderResourceId original = + filesystem.matchNewResource(SOME_CLASS, false); + ClassLoaderFileSystem.ClassLoaderResourceId parent = original.getCurrentDirectory(); + ClassLoaderFileSystem.ClassLoaderResourceId grandparent = parent.getCurrentDirectory(); + assertEquals("classpath://org/apache/beam/sdk", grandparent.getFilename()); + ClassLoaderFileSystem.ClassLoaderResourceId resource = + grandparent + .resolve("io", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve( + "ClassLoaderFilesystem.class", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); + ReadableByteChannel channel = filesystem.open(resource); + checkIsClass(channel); + } + public void checkIsClass(ReadableByteChannel channel) throws IOException { FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); InputStream inputStream = newInputStream(channel); From c428a1aedeaec049e56e0b958c97e64ea29735c2 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 14 May 2020 16:24:20 -0700 Subject: [PATCH 06/10] more debugging --- .../apache/beam/sdk/io/ClassLoaderFileSystem.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java index dc3aeafc954a..8437ccfdf5fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java @@ -57,10 +57,18 @@ protected WritableByteChannel create( @Override protected ReadableByteChannel open(ClassLoaderResourceId resourceId) throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); InputStream inputStream = - ClassLoader.getSystemResourceAsStream(resourceId.path.substring(PREFIX.length())); + classLoader.getResourceAsStream(resourceId.path.substring(PREFIX.length())); if (inputStream == null) { - throw new IOException("Unable to load " + resourceId.path); + + throw new IOException( + "Unable to load " + + resourceId.path + + " with " + + classLoader + + " URL " + + classLoader.getResource(resourceId.path.substring(PREFIX.length()))); } return Channels.newChannel(inputStream); } From 582caf271cd7cddbed1cecce1b898c295d07fcf2 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 15 May 2020 14:36:04 -0700 Subject: [PATCH 07/10] case insensitive filesystems hide bugs! --- .../org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java index 4bd84a5c307d..2667196fb338 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ClassLoaderFileSystemTest.java @@ -34,7 +34,7 @@ public class ClassLoaderFileSystemTest { private static final String SOME_CLASS = - "classpath://org/apache/beam/sdk/io/ClassLoaderFilesystem.class"; + "classpath://org/apache/beam/sdk/io/ClassLoaderFileSystem.class"; @Test public void testOpen() throws IOException { @@ -61,7 +61,7 @@ public void testResolve() throws IOException { grandparent .resolve("io", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY) .resolve( - "ClassLoaderFilesystem.class", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); + "ClassLoaderFileSystem.class", ResolveOptions.StandardResolveOptions.RESOLVE_FILE); ReadableByteChannel channel = filesystem.open(resource); checkIsClass(channel); } From 122b1598593d417b79d61457991fe6fce7f31744 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 15 May 2020 14:48:10 -0700 Subject: [PATCH 08/10] fixup: reviewer comments --- .../runners/portability/abstract_job_service.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/abstract_job_service.py b/sdks/python/apache_beam/runners/portability/abstract_job_service.py index b1d41d532bf2..bd68a6cfe83c 100644 --- a/sdks/python/apache_beam/runners/portability/abstract_job_service.py +++ b/sdks/python/apache_beam/runners/portability/abstract_job_service.py @@ -47,7 +47,8 @@ from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: - from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports + from typing import BinaryIO # pylint: disable=ungrouped-imports + from google.protobuf import struct_pb2 from apache_beam.portability.api import beam_runner_api_pb2 _LOGGER = logging.getLogger(__name__) @@ -288,6 +289,9 @@ def close(self): self._zipfile_handle.close() def file_writer(self, path): + # type: (str) -> Tuple[BinaryIO, str] + """Given a relative path, returns an open handle that can be written to + and an reference that can later be used to read this file.""" full_path = '%s/%s' % (self._root, path) return self._zipfile_handle.open( full_path, 'w', force_zip64=True), 'classpath://%s' % full_path @@ -351,12 +355,10 @@ def _start_artifact_service(self, jar, requested_port): url='localhost:%d' % port) self._artifact_staging_server.start() _LOGGER.info('Artifact server started on port %s', port) - _LOGGER.error('Artifact server started on port %s', port) return port def _stop_artifact_service(self): self._artifact_staging_server.stop(1) - self._artifact_manifest_location = None # Update dependencies to point to staged files. pipeline = copy.copy(self._pipeline_proto) From 2839770cb4e7590fb935b22b1706ff039a59520f Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 18 May 2020 13:20:19 -0700 Subject: [PATCH 09/10] Fix lint, formatting, test mocks. --- .../portability/abstract_job_service.py | 4 +++- .../portability/local_job_service_test.py | 11 ----------- .../spark_uber_jar_job_server_test.py | 19 ++++++++----------- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/abstract_job_service.py b/sdks/python/apache_beam/runners/portability/abstract_job_service.py index bd68a6cfe83c..f1c26c71ae3d 100644 --- a/sdks/python/apache_beam/runners/portability/abstract_job_service.py +++ b/sdks/python/apache_beam/runners/portability/abstract_job_service.py @@ -47,7 +47,8 @@ from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: - from typing import BinaryIO # pylint: disable=ungrouped-imports + # pylint: disable=ungrouped-imports + from typing import BinaryIO from google.protobuf import struct_pb2 from apache_beam.portability.api import beam_runner_api_pb2 @@ -290,6 +291,7 @@ def close(self): def file_writer(self, path): # type: (str) -> Tuple[BinaryIO, str] + """Given a relative path, returns an open handle that can be written to and an reference that can later be used to read this file.""" full_path = '%s/%s' % (self._root, path) diff --git a/sdks/python/apache_beam/runners/portability/local_job_service_test.py b/sdks/python/apache_beam/runners/portability/local_job_service_test.py index f1b17ab7069a..be19f247d851 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service_test.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service_test.py @@ -41,17 +41,6 @@ def __init__(self, job_service): def get_pipeline_options(self): return None - def stage(self, pipeline, artifact_staging_endpoint, staging_session_token): - channel = grpc.insecure_channel(artifact_staging_endpoint) - staging_stub = beam_artifact_api_pb2_grpc.LegacyArtifactStagingServiceStub( - channel) - manifest_response = staging_stub.CommitManifest( - beam_artifact_api_pb2.CommitManifestRequest( - staging_session_token=staging_session_token, - manifest=beam_artifact_api_pb2.Manifest())) - channel.close() - return manifest_response.retrieval_token - class LocalJobServerTest(unittest.TestCase): def test_end_to_end(self): diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py index b08d5b8a903a..7b4cc7352f27 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py @@ -135,17 +135,14 @@ def spark_submission_status_response(state): 'http://host:6066', options) # Prepare the job. - prepare_response = job_server.Prepare( - beam_job_api_pb2.PrepareJobRequest( - job_name='job', pipeline=beam_runner_api_pb2.Pipeline())) - channel = grpc.insecure_channel( - prepare_response.artifact_staging_endpoint.url) - retrieval_token = beam_artifact_api_pb2_grpc.LegacyArtifactStagingServiceStub( - channel).CommitManifest( - beam_artifact_api_pb2.CommitManifestRequest( - staging_session_token=prepare_response.staging_session_token, - manifest=beam_artifact_api_pb2.Manifest())).retrieval_token - channel.close() + plan = TestJobServicePlan(job_server) + + # Prepare the job. + prepare_response = plan.prepare(beam_runner_api_pb2.Pipeline()) + retrieval_token = plan.stage( + beam_runner_api_pb2.Pipeline(), + prepare_response.artifact_staging_endpoint.url, + prepare_response.staging_session_token) # Now actually run the job. http_mock.post( From 8a4132f35f92f165f272609776a58d2bf0e1c466 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 18 May 2020 14:00:50 -0700 Subject: [PATCH 10/10] lint import fixes --- .../apache_beam/runners/portability/local_job_service_test.py | 4 ---- .../runners/portability/spark_uber_jar_job_server_test.py | 4 +--- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/local_job_service_test.py b/sdks/python/apache_beam/runners/portability/local_job_service_test.py index be19f247d851..ec164e0e8ffd 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service_test.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service_test.py @@ -22,10 +22,6 @@ import logging import unittest -import grpc - -from apache_beam.portability.api import beam_artifact_api_pb2 -from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.portability import local_job_service diff --git a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py index 7b4cc7352f27..3ae25d4169f7 100644 --- a/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_uber_jar_job_server_test.py @@ -28,18 +28,16 @@ import zipfile import freezegun -import grpc import requests_mock from apache_beam.options import pipeline_options from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SparkRunnerOptions -from apache_beam.portability.api import beam_artifact_api_pb2 -from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.portability import spark_runner from apache_beam.runners.portability import spark_uber_jar_job_server +from apache_beam.runners.portability.local_job_service_test import TestJobServicePlan @contextlib.contextmanager