Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** A read-only {@link FileSystem} implementation looking up resources using a ClassLoader. */
public class ClassLoaderFileSystem extends FileSystem<ClassLoaderFileSystem.ClassLoaderResourceId> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll want to do that too (and use the new artifact api).


public static final String SCHEMA = "classpath";
private static final String PREFIX = SCHEMA + "://";

ClassLoaderFileSystem() {}

@Override
protected List<MatchResult> match(List<String> specs) throws IOException {
throw new UnsupportedOperationException("Un-globbable filesystem.");
}

@Override
protected WritableByteChannel create(
ClassLoaderResourceId resourceId, CreateOptions createOptions) throws IOException {
throw new UnsupportedOperationException("Read-only filesystem.");
}

@Override
protected ReadableByteChannel open(ClassLoaderResourceId resourceId) throws IOException {
ClassLoader classLoader = getClass().getClassLoader();
InputStream inputStream =
classLoader.getResourceAsStream(resourceId.path.substring(PREFIX.length()));
if (inputStream == null) {

throw new IOException(
"Unable to load "
+ resourceId.path
+ " with "
+ classLoader
+ " URL "
+ classLoader.getResource(resourceId.path.substring(PREFIX.length())));
}
return Channels.newChannel(inputStream);
}

@Override
protected void copy(
List<ClassLoaderResourceId> srcResourceIds, List<ClassLoaderResourceId> destResourceIds)
throws IOException {
throw new UnsupportedOperationException("Read-only filesystem.");
}

@Override
protected void rename(
List<ClassLoaderResourceId> srcResourceIds, List<ClassLoaderResourceId> destResourceIds)
throws IOException {
throw new UnsupportedOperationException("Read-only filesystem.");
}

@Override
protected void delete(Collection<ClassLoaderResourceId> resourceIds) throws IOException {
throw new UnsupportedOperationException("Read-only filesystem.");
}

@Override
protected ClassLoaderResourceId matchNewResource(String path, boolean isDirectory) {
return new ClassLoaderResourceId(path);
}

@Override
protected String getScheme() {
return SCHEMA;
}

public static class ClassLoaderResourceId implements ResourceId {

private final String path;

private ClassLoaderResourceId(String path) {
checkArgument(path.startsWith(PREFIX), path);
this.path = path;
}

@Override
public ClassLoaderResourceId resolve(String other, ResolveOptions resolveOptions) {
if (other.startsWith(PREFIX)) {
return new ClassLoaderResourceId(other);
} else if (other.startsWith("/")) {
return new ClassLoaderResourceId(SCHEMA + ":/" + other);
} else {
return new ClassLoaderResourceId(path + "/" + other);
}
}

@Override
public ClassLoaderResourceId getCurrentDirectory() {
int ix = path.lastIndexOf('/');
if (ix <= PREFIX.length()) {
return new ClassLoaderResourceId(PREFIX);
} else {
return new ClassLoaderResourceId(path.substring(0, ix));
}
}

@Override
public String getScheme() {
return SCHEMA;
}

@Nullable
@Override
public String getFilename() {
return path;
}

@Override
public boolean isDirectory() {
return false;
}
}

/** {@link AutoService} registrar for the {@link ClassLoaderFileSystem}. */
@AutoService(FileSystemRegistrar.class)
@Experimental(Experimental.Kind.FILESYSTEM)
public static class ClassLoaderFileSystemRegistrar implements FileSystemRegistrar {
@Override
public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
return ImmutableList.of(new ClassLoaderFileSystem());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;

import static java.nio.channels.Channels.newInputStream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ClassLoaderFileSystemTest {

private static final String SOME_CLASS =
"classpath://org/apache/beam/sdk/io/ClassLoaderFileSystem.class";

@Test
public void testOpen() throws IOException {
ClassLoaderFileSystem filesystem = new ClassLoaderFileSystem();
ReadableByteChannel channel = filesystem.open(filesystem.matchNewResource(SOME_CLASS, false));
checkIsClass(channel);
}

@Test
public void testRegistrar() throws IOException {
ReadableByteChannel channel = FileSystems.open(FileSystems.matchNewResource(SOME_CLASS, false));
checkIsClass(channel);
}

@Test
public void testResolve() throws IOException {
ClassLoaderFileSystem filesystem = new ClassLoaderFileSystem();
ClassLoaderFileSystem.ClassLoaderResourceId original =
filesystem.matchNewResource(SOME_CLASS, false);
ClassLoaderFileSystem.ClassLoaderResourceId parent = original.getCurrentDirectory();
ClassLoaderFileSystem.ClassLoaderResourceId grandparent = parent.getCurrentDirectory();
assertEquals("classpath://org/apache/beam/sdk", grandparent.getFilename());
ClassLoaderFileSystem.ClassLoaderResourceId resource =
grandparent
.resolve("io", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(
"ClassLoaderFileSystem.class", ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
ReadableByteChannel channel = filesystem.open(resource);
checkIsClass(channel);
}

public void checkIsClass(ReadableByteChannel channel) throws IOException {
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
InputStream inputStream = newInputStream(channel);
byte[] magic = new byte[4];
inputStream.read(magic);
assertArrayEquals(magic, new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did I never know this? 😆

}
}
85 changes: 64 additions & 21 deletions sdks/python/apache_beam/runners/portability/abstract_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import absolute_import

import copy
import itertools
import json
import logging
Expand Down Expand Up @@ -46,7 +47,9 @@
from apache_beam.utils.timestamp import Timestamp

if TYPE_CHECKING:
from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports
# pylint: disable=ungrouped-imports
from typing import BinaryIO
from google.protobuf import struct_pb2
from apache_beam.portability.api import beam_runner_api_pb2

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -278,6 +281,27 @@ def to_runner_api(self):
state=self.state)


class JarArtifactManager(object):
def __init__(self, jar_path, root):
self._root = root
self._zipfile_handle = zipfile.ZipFile(jar_path, 'a')

def close(self):
self._zipfile_handle.close()

def file_writer(self, path):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a type annotation and/or comment on the return value here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# type: (str) -> Tuple[BinaryIO, str]

"""Given a relative path, returns an open handle that can be written to
and an reference that can later be used to read this file."""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and an reference that can later be used to read this file."""
and a reference that can later be used to read this file."""

full_path = '%s/%s' % (self._root, path)
return self._zipfile_handle.open(
full_path, 'w', force_zip64=True), 'classpath://%s' % full_path

def zipfile_handle(self):
return self._zipfile_handle


class UberJarBeamJob(AbstractBeamJob):
"""Abstract baseclass for creating a Beam job. The resulting job will be
packaged and run in an executable uber jar."""
Expand All @@ -291,8 +315,6 @@ class UberJarBeamJob(AbstractBeamJob):
PIPELINE_PATH = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, "pipeline.json"])
PIPELINE_OPTIONS_PATH = '/'.join(
[PIPELINE_FOLDER, PIPELINE_NAME, 'pipeline-options.json'])
ARTIFACT_MANIFEST_PATH = '/'.join(
[PIPELINE_FOLDER, PIPELINE_NAME, 'artifact-manifest.json'])
ARTIFACT_FOLDER = '/'.join([PIPELINE_FOLDER, PIPELINE_NAME, 'artifacts'])

def __init__(
Expand All @@ -313,27 +335,23 @@ def prepare(self):
with tempfile.NamedTemporaryFile(suffix='.jar') as tout:
self._jar = tout.name
shutil.copy(self._executable_jar, self._jar)
with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z:
with z.open(self.PIPELINE_PATH, 'w') as fout:
fout.write(
json_format.MessageToJson(self._pipeline_proto).encode('utf-8'))
with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout:
fout.write(
json_format.MessageToJson(self._pipeline_options).encode('utf-8'))
with z.open(self.PIPELINE_MANIFEST, 'w') as fout:
fout.write(
json.dumps({
'defaultJobName': self.PIPELINE_NAME
}).encode('utf-8'))
self._start_artifact_service(self._jar, self._artifact_port)

def _start_artifact_service(self, jar, requested_port):
self._artifact_staging_service = artifact_service.ZipFileArtifactService(
jar, self.ARTIFACT_FOLDER)
self._artifact_manager = JarArtifactManager(self._jar, self.ARTIFACT_FOLDER)
self._artifact_staging_service = artifact_service.ArtifactStagingService(
self._artifact_manager.file_writer)
self._artifact_staging_service.register_job(
self._job_id,
{
env_id: env.dependencies
for (env_id,
env) in self._pipeline_proto.components.environments.items()
})
self._artifact_staging_server = grpc.server(futures.ThreadPoolExecutor())
port = self._artifact_staging_server.add_insecure_port(
'[::]:%s' % requested_port)
beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server(
beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
self._artifact_staging_service, self._artifact_staging_server)
self._artifact_staging_endpoint = endpoints_pb2.ApiServiceDescriptor(
url='localhost:%d' % port)
Expand All @@ -343,9 +361,34 @@ def _start_artifact_service(self, jar, requested_port):

def _stop_artifact_service(self):
self._artifact_staging_server.stop(1)
self._artifact_staging_service.close()
self._artifact_manifest_location = (
self._artifact_staging_service.retrieval_token(self._job_id))

# Update dependencies to point to staged files.
pipeline = copy.copy(self._pipeline_proto)
if any(env.dependencies
for env in pipeline.components.environments.values()):
for env_id, deps in self._artifact_staging_service.resolved_deps(
self._job_id).items():
# Slice assignment not supported for repeated fields.
env = self._pipeline_proto.components.environments[env_id]
del env.dependencies[:]
env.dependencies.extend(deps)

# Copy the pipeline definition and metadata into the jar.
z = self._artifact_manager.zipfile_handle()
with z.open(self.PIPELINE_PATH, 'w') as fout:
fout.write(
json_format.MessageToJson(self._pipeline_proto).encode('utf-8'))
with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout:
fout.write(
json_format.MessageToJson(self._pipeline_options).encode('utf-8'))
with z.open(self.PIPELINE_MANIFEST, 'w') as fout:
fout.write(
json.dumps({
'defaultJobName': self.PIPELINE_NAME
}).encode('utf-8'))

# Closes the jar file.
self._artifact_manager.close()

def artifact_staging_endpoint(self):
return self._artifact_staging_endpoint
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import tempfile
import time
import urllib
import zipfile

import requests
from google.protobuf import json_format
Expand Down Expand Up @@ -146,12 +145,6 @@ def delete(self, path, **kwargs):

def run(self):
self._stop_artifact_service()
# Move the artifact manifest to the expected location.
with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as z:
with z.open(self._artifact_manifest_location) as fin:
manifest_contents = fin.read()
with z.open(self.ARTIFACT_MANIFEST_PATH, 'w') as fout:
fout.write(manifest_contents)

# Upload the jar and start the job.
with open(self._jar, 'rb') as jar_file:
Expand Down
Loading