Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
7 changes: 7 additions & 0 deletions sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,13 @@
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* Defines a {@link com.google.cloud.dataflow.sdk.coders.Coder}
* for Protocol Buffers messages, {@code ProtoCoder}.
*
* @see com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder
*/
package com.google.cloud.dataflow.sdk.coders.protobuf;
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* <h3>Reading from Cloud Bigtable</h3>
*
* <p>The Bigtable source returns a set of rows from a single table, returning a
* {@code PCollection&lt;Row&gt;}.
* {@code PCollection<Row>}.
*
* <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
* or builder configured with the project and other information necessary to identify the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* Defines transforms for reading and writing from Google Cloud Bigtable.
*
* @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
*/
package com.google.cloud.dataflow.sdk.io.bigtable;
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
*
* <p>{@link Default @Default} represents a set of annotations that can be used to annotate getter
* properties on {@link PipelineOptions} with information representing the default value to be
* returned if no value is specified.
* returned if no value is specified. Any default implementation (using the {@code default} keyword)
* is ignored.
*
* <p>{@link Hidden @Hidden} hides an option from being listed when {@code --help}
* is invoked via {@link PipelineOptionsFactory#fromArgs(String[])}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.dataflow.sdk.options;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.dataflow.sdk.options.Validation.Required;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
Expand Down Expand Up @@ -443,13 +445,21 @@ Class<T> getProxyClass() {
private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS;

/** Classes that are used as the boundary in the stack trace to find the callers class name. */
private static final Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES = ImmutableSet.of(
PipelineOptionsFactory.class.getName(),
Builder.class.getName());
private static final Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES =
ImmutableSet.of(PipelineOptionsFactory.class.getName(), Builder.class.getName());

/** Methods that are ignored when validating the proxy class. */
private static final Set<Method> IGNORED_METHODS;

/** A predicate that checks if a method is synthetic via {@link Method#isSynthetic()}. */
private static final Predicate<Method> NOT_SYNTHETIC_PREDICATE =
new Predicate<Method>() {
@Override
public boolean apply(Method input) {
return !input.isSynthetic();
}
};

/** The set of options that have been registered and visible to the user. */
private static final Set<Class<? extends PipelineOptions>> REGISTERED_OPTIONS =
Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -662,7 +672,9 @@ public static void printHelp(PrintStream out, Class<? extends PipelineOptions> i
Preconditions.checkNotNull(iface);
validateWellFormed(iface, REGISTERED_OPTIONS);

Iterable<Method> methods = ReflectHelpers.getClosureOfMethodsOnInterface(iface);
Iterable<Method> methods =
Iterables.filter(
ReflectHelpers.getClosureOfMethodsOnInterface(iface), NOT_SYNTHETIC_PREDICATE);
ListMultimap<Class<?>, Method> ifaceToMethods = ArrayListMultimap.create();
for (Method method : methods) {
// Process only methods that are not marked as hidden.
Expand Down Expand Up @@ -876,7 +888,8 @@ private static List<PropertyDescriptor> getPropertyDescriptors(Class<?> beanClas
throws IntrospectionException {
// The sorting is important to make this method stable.
SortedSet<Method> methods = Sets.newTreeSet(MethodComparator.INSTANCE);
methods.addAll(Arrays.asList(beanClass.getMethods()));
methods.addAll(
Collections2.filter(Arrays.asList(beanClass.getMethods()), NOT_SYNTHETIC_PREDICATE));
SortedMap<String, Method> propertyNamesToGetters = getPropertyNamesToGetters(methods);
List<PropertyDescriptor> descriptors = Lists.newArrayList();

Expand Down Expand Up @@ -1017,8 +1030,9 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp
Class<?> klass) throws IntrospectionException {
Set<Method> methods = Sets.newHashSet(IGNORED_METHODS);
// Ignore static methods, "equals", "hashCode", "toString" and "as" on the generated class.
// Ignore synthetic methods
for (Method method : klass.getMethods()) {
if (Modifier.isStatic(method.getModifiers())) {
if (Modifier.isStatic(method.getModifiers()) || method.isSynthetic()) {
methods.add(method);
}
}
Expand All @@ -1035,6 +1049,7 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp
// Verify that there are no methods with the same name with two different return types.
Iterable<Method> interfaceMethods = FluentIterable
.from(ReflectHelpers.getClosureOfMethodsOnInterface(iface))
.filter(NOT_SYNTHETIC_PREDICATE)
.toSortedSet(MethodComparator.INSTANCE);
SortedSetMultimap<Method, Method> methodNameToMethodMap =
TreeMultimap.create(MethodNameComparator.INSTANCE, MethodComparator.INSTANCE);
Expand All @@ -1059,10 +1074,13 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp

// Verify that there is no getter with a mixed @JsonIgnore annotation and verify
// that no setter has @JsonIgnore.
Iterable<Method> allInterfaceMethods = FluentIterable
.from(ReflectHelpers.getClosureOfMethodsOnInterfaces(validatedPipelineOptionsInterfaces))
.append(ReflectHelpers.getClosureOfMethodsOnInterface(iface))
.toSortedSet(MethodComparator.INSTANCE);
Iterable<Method> allInterfaceMethods =
FluentIterable.from(
ReflectHelpers.getClosureOfMethodsOnInterfaces(
validatedPipelineOptionsInterfaces))
.append(ReflectHelpers.getClosureOfMethodsOnInterface(iface))
.filter(NOT_SYNTHETIC_PREDICATE)
.toSortedSet(MethodComparator.INSTANCE);
SortedSetMultimap<Method, Method> methodNameToAllMethodMap =
TreeMultimap.create(MethodNameComparator.INSTANCE, MethodComparator.INSTANCE);
for (Method method : allInterfaceMethods) {
Expand Down Expand Up @@ -1146,7 +1164,10 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp

// Verify that no additional methods are on an interface that aren't a bean property.
SortedSet<Method> unknownMethods = new TreeSet<>(MethodComparator.INSTANCE);
unknownMethods.addAll(Sets.difference(Sets.newHashSet(klass.getMethods()), methods));
unknownMethods.addAll(
Sets.filter(
Sets.difference(Sets.newHashSet(klass.getMethods()), methods),
NOT_SYNTHETIC_PREDICATE));
Preconditions.checkArgument(unknownMethods.isEmpty(),
"Methods %s on [%s] do not conform to being bean properties.",
FluentIterable.from(unknownMethods).transform(ReflectHelpers.METHOD_FORMATTER),
Expand Down Expand Up @@ -1391,7 +1412,10 @@ private static ListMultimap<String, String> parseCommandLine(
* split up each string on ','.
*
* <p>We special case the "runner" option. It is mapped to the class of the {@link PipelineRunner}
* based off of the {@link PipelineRunner}s simple class name or fully qualified class name.
* based off of the {@link PipelineRunner PipelineRunners} simple class name. If the provided
* runner name is not registered via a {@link PipelineRunnerRegistrar}, we attempt to obtain the
* class that the name represents using {@link Class#forName(String)} and use the result class if
* it subclasses {@link PipelineRunner}.
*
* <p>If strict parsing is enabled, unknown options or options that cannot be converted to
* the expected java type using an {@link ObjectMapper} will be ignored.
Expand Down Expand Up @@ -1442,10 +1466,26 @@ public boolean apply(@Nullable String input) {
JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
if ("runner".equals(entry.getKey())) {
String runner = Iterables.getOnlyElement(entry.getValue());
Preconditions.checkArgument(SUPPORTED_PIPELINE_RUNNERS.containsKey(runner),
"Unknown 'runner' specified '%s', supported pipeline runners %s",
runner, Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner)) {
convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
} else {
try {
Class<?> runnerClass = Class.forName(runner);
checkArgument(
PipelineRunner.class.isAssignableFrom(runnerClass),
"Class '%s' does not implement PipelineRunner. Supported pipeline runners %s",
runner,
Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
convertedOptions.put("runner", runnerClass);
} catch (ClassNotFoundException e) {
String msg =
String.format(
"Unknown 'runner' specified '%s', supported pipeline runners %s",
runner,
Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
throw new IllegalArgumentException(msg, e);
}
}
} else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType())
|| returnType.getComponentType().isEnum()))
|| Collection.class.isAssignableFrom(returnType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,9 @@ private <K, V> void groupByKeyHelper(
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
context.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.io.Source.Reader;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
Expand Down Expand Up @@ -79,8 +80,7 @@ private <OutputT> TransformEvaluator<?> getTransformEvaluator(
@SuppressWarnings("unchecked")
private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
final InProcessEvaluationContext evaluationContext)
throws IOException {
final InProcessEvaluationContext evaluationContext) {
// Key by the application and the context the evaluation is occurring in (which call to
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
Expand All @@ -102,39 +102,51 @@ private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueu
return evaluatorQueue;
}

/**
* A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
* creates the {@link BoundedReader} and consumes all available input.
*
* <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
* each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
* may produce duplicate elements.
*/
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
private final Reader<OutputT> reader;
private boolean contentsRemaining;

public BoundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext)
throws IOException {
InProcessEvaluationContext evaluationContext) {
this.transform = transform;
this.evaluationContext = evaluationContext;
reader =
transform.getTransform().getSource().createReader(evaluationContext.getPipelineOptions());
contentsRemaining = reader.start();
}

@Override
public void processElement(WindowedValue<Object> element) {}

@Override
public InProcessTransformResult finishBundle() throws IOException {
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
try (final Reader<OutputT> reader =
transform
.getTransform()
.getSource()
.createReader(evaluationContext.getPipelineOptions());) {
contentsRemaining = reader.start();
UncommittedBundle<OutputT> output =
evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
}
reader.close();
return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
.addOutput(output)
.build();
}
return StepTransformResult
.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
.addOutput(output)
.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;

/**
* A callback for completing a bundle of input.
*/
interface CompletionCallback {
/**
* Handle a successful result.
*/
void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result);

/**
* Handle a result that terminated abnormally due to the provided {@link Throwable}.
*/
void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;

import java.util.Objects;
Expand Down
Loading