Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ public void testSettingOfSdkPipelineOptions() throws IOException {
settings.put("numberOfWorkerHarnessThreads", 0);
settings.put("experiments", null);

assertEquals(ImmutableMap.of("options", settings),
job.getEnvironment().getSdkPipelineOptions());
Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
assertThat(sdkPipelineOptions, hasKey("options"));
assertEquals(settings, sdkPipelineOptions.get("options"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.options;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;

import java.lang.reflect.Method;

/**
* For internal use. Specification for an option defined in a {@link PipelineOptions} interface.
*/
class PipelineOptionSpec {
private final Class<? extends PipelineOptions> clazz;
private final String name;
private final Method getter;

static PipelineOptionSpec of(Class<? extends PipelineOptions> clazz, String name, Method getter) {
return new PipelineOptionSpec(clazz, name, getter);
}

private PipelineOptionSpec(Class<? extends PipelineOptions> clazz, String name, Method getter) {
this.clazz = clazz;
this.name = name;
this.getter = getter;
}

/**
* The {@link PipelineOptions} interface which defines this {@link PipelineOptionSpec}.
*/
Class<? extends PipelineOptions> getDefiningInterface() {
return clazz;
}

/**
* Name of the property.
*/
String getName() {
return name;
}

/**
* The getter method for this property.
*/
Method getGetterMethod() {
return getter;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("definingInterface", getDefiningInterface())
.add("name", getName())
.add("getterMethod", getGetterMethod())
.toString();
}

@Override
public int hashCode() {
return Objects.hashCode(getDefiningInterface(), getName(), getGetterMethod());
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof PipelineOptionSpec)) {
return false;
}

PipelineOptionSpec that = (PipelineOptionSpec) obj;
return Objects.equal(this.getDefiningInterface(), that.getDefiningInterface())
&& Objects.equal(this.getName(), that.getName())
&& Objects.equal(this.getGetterMethod(), that.getGetterMethod());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;

import org.apache.beam.sdk.transforms.display.HasDisplayData;
import com.google.auto.service.AutoService;

import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -194,7 +194,7 @@
@JsonSerialize(using = Serializer.class)
@JsonDeserialize(using = Deserializer.class)
@ThreadSafe
public interface PipelineOptions {
public interface PipelineOptions extends HasDisplayData {
/**
* Transforms this object into an object of type {@code <T>} saving each property
* that has been manipulated. {@code <T>} must extend {@link PipelineOptions}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableListMultimap;
Expand All @@ -43,8 +43,11 @@
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.RowSortedTable;
import com.google.common.collect.Sets;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeBasedTable;
import com.google.common.collect.TreeMultimap;

import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -77,6 +80,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -444,6 +448,7 @@ Class<T> getProxyClass() {
@SuppressWarnings("rawtypes")
private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0];
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ClassLoader CLASS_LOADER;
private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS;

/** Classes that are used as the boundary in the stack trace to find the callers class name. */
Expand Down Expand Up @@ -510,33 +515,22 @@ static ClassLoader findClassLoader() {
throw new ExceptionInInitializerError(e);
}

ClassLoader classLoader = findClassLoader();
CLASS_LOADER = findClassLoader();

// Store the list of all available pipeline runners.
ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder =
ImmutableMap.builder();
Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
pipelineRunnerRegistrars.addAll(
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, classLoader)));
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER)));
for (PipelineRunnerRegistrar registrar : pipelineRunnerRegistrars) {
for (Class<? extends PipelineRunner<?>> klass : registrar.getPipelineRunners()) {
builder.put(klass.getSimpleName(), klass);
}
}
SUPPORTED_PIPELINE_RUNNERS = builder.build();

// Load and register the list of all classes that extend PipelineOptions.
register(PipelineOptions.class);
Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
pipelineOptionsRegistrars.addAll(
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, classLoader)));
for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
register(klass);
}
}
initializeRegistry();
}

/**
Expand Down Expand Up @@ -565,6 +559,33 @@ public static synchronized void register(Class<? extends PipelineOptions> iface)
REGISTERED_OPTIONS.add(iface);
}

/**
* Resets the set of interfaces registered with this factory to the default state.
*
* @see PipelineOptionsFactory#register(Class)
*/
@VisibleForTesting
static synchronized void resetRegistry() {
REGISTERED_OPTIONS.clear();
initializeRegistry();
}

/**
* Load and register the list of all classes that extend PipelineOptions.
*/
private static void initializeRegistry() {
register(PipelineOptions.class);
Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
pipelineOptionsRegistrars.addAll(
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER)));
for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
register(klass);
}
}
}

/**
* Validates that the interface conforms to the following:
* <ul>
Expand Down Expand Up @@ -674,32 +695,20 @@ public static void printHelp(PrintStream out, Class<? extends PipelineOptions> i
Preconditions.checkNotNull(iface);
validateWellFormed(iface, REGISTERED_OPTIONS);

Iterable<Method> methods =
Iterables.filter(
ReflectHelpers.getClosureOfMethodsOnInterface(iface), NOT_SYNTHETIC_PREDICATE);
ListMultimap<Class<?>, Method> ifaceToMethods = ArrayListMultimap.create();
for (Method method : methods) {
// Process only methods that are not marked as hidden.
if (method.getAnnotation(Hidden.class) == null) {
ifaceToMethods.put(method.getDeclaringClass(), method);
}
Set<PipelineOptionSpec> properties =
PipelineOptionsReflector.getOptionSpecs(iface);

RowSortedTable<Class<?>, String, Method> ifacePropGetterTable = TreeBasedTable.create(
ClassNameComparator.INSTANCE, Ordering.natural());
for (PipelineOptionSpec prop : properties) {
ifacePropGetterTable.put(prop.getDefiningInterface(), prop.getName(), prop.getGetterMethod());
}
SortedSet<Class<?>> ifaces = new TreeSet<>(ClassNameComparator.INSTANCE);
// Keep interfaces that are not marked as hidden.
ifaces.addAll(Collections2.filter(ifaceToMethods.keySet(), new Predicate<Class<?>>() {
@Override
public boolean apply(Class<?> input) {
return input.getAnnotation(Hidden.class) == null;
}
}));
for (Class<?> currentIface : ifaces) {
Map<String, Method> propertyNamesToGetters =
getPropertyNamesToGetters(ifaceToMethods.get(currentIface));

// Don't output anything if there are no defined options
if (propertyNamesToGetters.isEmpty()) {
continue;
}
for (Map.Entry<Class<?>, Map<String, Method>> ifaceToPropertyMap :
ifacePropGetterTable.rowMap().entrySet()) {
Class<?> currentIface = ifaceToPropertyMap.getKey();
Map<String, Method> propertyNamesToGetters = ifaceToPropertyMap.getValue();

SortedSetMultimap<String, String> requiredGroupNameToProperties =
getRequiredGroupNamesToProperties(propertyNamesToGetters);

Expand Down Expand Up @@ -838,15 +847,21 @@ static List<PropertyDescriptor> getPropertyDescriptors(
* <p>TODO: Swap back to using Introspector once the proxy class issue with AppEngine is
* resolved.
*/
private static List<PropertyDescriptor> getPropertyDescriptors(Class<?> beanClass)
private static List<PropertyDescriptor> getPropertyDescriptors(
Class<? extends PipelineOptions> beanClass)
throws IntrospectionException {
// The sorting is important to make this method stable.
SortedSet<Method> methods = Sets.newTreeSet(MethodComparator.INSTANCE);
methods.addAll(
Collections2.filter(Arrays.asList(beanClass.getMethods()), NOT_SYNTHETIC_PREDICATE));
SortedMap<String, Method> propertyNamesToGetters = getPropertyNamesToGetters(methods);
List<PropertyDescriptor> descriptors = Lists.newArrayList();

SortedMap<String, Method> propertyNamesToGetters = new TreeMap<>();
for (Map.Entry<String, Method> entry :
PipelineOptionsReflector.getPropertyNamesToGetters(methods).entries()) {
propertyNamesToGetters.put(entry.getKey(), entry.getValue());
}

List<PropertyDescriptor> descriptors = Lists.newArrayList();
List<TypeMismatch> mismatches = new ArrayList<>();
/*
* Add all the getter/setter pairs to the list of descriptors removing the getter once
Expand Down Expand Up @@ -918,28 +933,6 @@ private static void throwForTypeMismatches(List<TypeMismatch> mismatches) {
}
}

/**
* Returns a map of the property name to the getter method it represents.
* If there are duplicate methods with the same bean name, then it is indeterminate
* as to which method will be returned.
*/
private static SortedMap<String, Method> getPropertyNamesToGetters(Iterable<Method> methods) {
SortedMap<String, Method> propertyNamesToGetters = Maps.newTreeMap();
for (Method method : methods) {
String methodName = method.getName();
if ((!methodName.startsWith("get")
&& !methodName.startsWith("is"))
|| method.getParameterTypes().length != 0
|| method.getReturnType() == void.class) {
continue;
}
String propertyName = Introspector.decapitalize(
methodName.startsWith("is") ? methodName.substring(2) : methodName.substring(3));
propertyNamesToGetters.put(propertyName, method);
}
return propertyNamesToGetters;
}

/**
* Returns a map of required groups of arguments to the properties that satisfy the requirement.
*/
Expand Down Expand Up @@ -981,21 +974,22 @@ private static SortedSetMultimap<String, String> getRequiredGroupNamesToProperti
*/
private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOptions> iface,
Set<Class<? extends PipelineOptions>> validatedPipelineOptionsInterfaces,
Class<?> klass) throws IntrospectionException {
Class<? extends PipelineOptions> klass) throws IntrospectionException {
Set<Method> methods = Sets.newHashSet(IGNORED_METHODS);
// Ignore static methods, "equals", "hashCode", "toString" and "as" on the generated class.
// Ignore synthetic methods
for (Method method : klass.getMethods()) {
if (Modifier.isStatic(method.getModifiers()) || method.isSynthetic()) {
methods.add(method);
}
}
// Ignore standard infrastructure methods on the generated class.
try {
methods.add(klass.getMethod("equals", Object.class));
methods.add(klass.getMethod("hashCode"));
methods.add(klass.getMethod("toString"));
methods.add(klass.getMethod("as", Class.class));
methods.add(klass.getMethod("cloneAs", Class.class));
methods.add(klass.getMethod("populateDisplayData", DisplayData.Builder.class));
} catch (NoSuchMethodException | SecurityException e) {
throw Throwables.propagate(e);
}
Expand Down
Loading