Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* Defines a {@link com.google.cloud.dataflow.sdk.coders.Coder}
* for Protocol Buffers messages, {@code ProtoCoder}.
*
* @see com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder
*/
package com.google.cloud.dataflow.sdk.coders.protobuf;
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* <h3>Reading from Cloud Bigtable</h3>
*
* <p>The Bigtable source returns a set of rows from a single table, returning a
* {@code PCollection&lt;Row&gt;}.
* {@code PCollection<Row>}.
*
* <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
* or builder configured with the project and other information necessary to identify the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* Defines transforms for reading and writing from Google Cloud Bigtable.
*
* @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
*/
package com.google.cloud.dataflow.sdk.io.bigtable;
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
*
* <p>{@link Default @Default} represents a set of annotations that can be used to annotate getter
* properties on {@link PipelineOptions} with information representing the default value to be
* returned if no value is specified.
* returned if no value is specified. Any default implementation (using the {@code default} keyword)
* is ignored.
*
* <p>{@link Hidden @Hidden} hides an option from being listed when {@code --help}
* is invoked via {@link PipelineOptionsFactory#fromArgs(String[])}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.dataflow.sdk.options;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.dataflow.sdk.options.Validation.Required;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
Expand Down Expand Up @@ -443,13 +445,21 @@ Class<T> getProxyClass() {
private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS;

/** Classes that are used as the boundary in the stack trace to find the callers class name. */
private static final Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES = ImmutableSet.of(
PipelineOptionsFactory.class.getName(),
Builder.class.getName());
private static final Set<String> PIPELINE_OPTIONS_FACTORY_CLASSES =
ImmutableSet.of(PipelineOptionsFactory.class.getName(), Builder.class.getName());

/** Methods that are ignored when validating the proxy class. */
private static final Set<Method> IGNORED_METHODS;

/** A predicate that checks if a method is synthetic via {@link Method#isSynthetic()}. */
private static final Predicate<Method> NOT_SYNTHETIC_PREDICATE =
new Predicate<Method>() {
@Override
public boolean apply(Method input) {
return !input.isSynthetic();
}
};

/** The set of options that have been registered and visible to the user. */
private static final Set<Class<? extends PipelineOptions>> REGISTERED_OPTIONS =
Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -662,7 +672,9 @@ public static void printHelp(PrintStream out, Class<? extends PipelineOptions> i
Preconditions.checkNotNull(iface);
validateWellFormed(iface, REGISTERED_OPTIONS);

Iterable<Method> methods = ReflectHelpers.getClosureOfMethodsOnInterface(iface);
Iterable<Method> methods =
Iterables.filter(
ReflectHelpers.getClosureOfMethodsOnInterface(iface), NOT_SYNTHETIC_PREDICATE);
ListMultimap<Class<?>, Method> ifaceToMethods = ArrayListMultimap.create();
for (Method method : methods) {
// Process only methods that are not marked as hidden.
Expand Down Expand Up @@ -876,7 +888,8 @@ private static List<PropertyDescriptor> getPropertyDescriptors(Class<?> beanClas
throws IntrospectionException {
// The sorting is important to make this method stable.
SortedSet<Method> methods = Sets.newTreeSet(MethodComparator.INSTANCE);
methods.addAll(Arrays.asList(beanClass.getMethods()));
methods.addAll(
Collections2.filter(Arrays.asList(beanClass.getMethods()), NOT_SYNTHETIC_PREDICATE));
SortedMap<String, Method> propertyNamesToGetters = getPropertyNamesToGetters(methods);
List<PropertyDescriptor> descriptors = Lists.newArrayList();

Expand Down Expand Up @@ -1017,8 +1030,9 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp
Class<?> klass) throws IntrospectionException {
Set<Method> methods = Sets.newHashSet(IGNORED_METHODS);
// Ignore static methods, "equals", "hashCode", "toString" and "as" on the generated class.
// Ignore synthetic methods
for (Method method : klass.getMethods()) {
if (Modifier.isStatic(method.getModifiers())) {
if (Modifier.isStatic(method.getModifiers()) || method.isSynthetic()) {
methods.add(method);
}
}
Expand All @@ -1035,6 +1049,7 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp
// Verify that there are no methods with the same name with two different return types.
Iterable<Method> interfaceMethods = FluentIterable
.from(ReflectHelpers.getClosureOfMethodsOnInterface(iface))
.filter(NOT_SYNTHETIC_PREDICATE)
.toSortedSet(MethodComparator.INSTANCE);
SortedSetMultimap<Method, Method> methodNameToMethodMap =
TreeMultimap.create(MethodNameComparator.INSTANCE, MethodComparator.INSTANCE);
Expand All @@ -1059,10 +1074,13 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp

// Verify that there is no getter with a mixed @JsonIgnore annotation and verify
// that no setter has @JsonIgnore.
Iterable<Method> allInterfaceMethods = FluentIterable
.from(ReflectHelpers.getClosureOfMethodsOnInterfaces(validatedPipelineOptionsInterfaces))
.append(ReflectHelpers.getClosureOfMethodsOnInterface(iface))
.toSortedSet(MethodComparator.INSTANCE);
Iterable<Method> allInterfaceMethods =
FluentIterable.from(
ReflectHelpers.getClosureOfMethodsOnInterfaces(
validatedPipelineOptionsInterfaces))
.append(ReflectHelpers.getClosureOfMethodsOnInterface(iface))
.filter(NOT_SYNTHETIC_PREDICATE)
.toSortedSet(MethodComparator.INSTANCE);
SortedSetMultimap<Method, Method> methodNameToAllMethodMap =
TreeMultimap.create(MethodNameComparator.INSTANCE, MethodComparator.INSTANCE);
for (Method method : allInterfaceMethods) {
Expand Down Expand Up @@ -1146,7 +1164,10 @@ private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOp

// Verify that no additional methods are on an interface that aren't a bean property.
SortedSet<Method> unknownMethods = new TreeSet<>(MethodComparator.INSTANCE);
unknownMethods.addAll(Sets.difference(Sets.newHashSet(klass.getMethods()), methods));
unknownMethods.addAll(
Sets.filter(
Sets.difference(Sets.newHashSet(klass.getMethods()), methods),
NOT_SYNTHETIC_PREDICATE));
Preconditions.checkArgument(unknownMethods.isEmpty(),
"Methods %s on [%s] do not conform to being bean properties.",
FluentIterable.from(unknownMethods).transform(ReflectHelpers.METHOD_FORMATTER),
Expand Down Expand Up @@ -1391,7 +1412,10 @@ private static ListMultimap<String, String> parseCommandLine(
* split up each string on ','.
*
* <p>We special case the "runner" option. It is mapped to the class of the {@link PipelineRunner}
* based off of the {@link PipelineRunner}s simple class name or fully qualified class name.
* based off of the {@link PipelineRunner PipelineRunners} simple class name. If the provided
* runner name is not registered via a {@link PipelineRunnerRegistrar}, we attempt to obtain the
* class that the name represents using {@link Class#forName(String)} and use the result class if
* it subclasses {@link PipelineRunner}.
*
* <p>If strict parsing is enabled, unknown options or options that cannot be converted to
* the expected java type using an {@link ObjectMapper} will be ignored.
Expand Down Expand Up @@ -1442,10 +1466,26 @@ public boolean apply(@Nullable String input) {
JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
if ("runner".equals(entry.getKey())) {
String runner = Iterables.getOnlyElement(entry.getValue());
Preconditions.checkArgument(SUPPORTED_PIPELINE_RUNNERS.containsKey(runner),
"Unknown 'runner' specified '%s', supported pipeline runners %s",
runner, Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner)) {
convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
} else {
try {
Class<?> runnerClass = Class.forName(runner);
checkArgument(
PipelineRunner.class.isAssignableFrom(runnerClass),
"Class '%s' does not implement PipelineRunner. Supported pipeline runners %s",
runner,
Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
convertedOptions.put("runner", runnerClass);
} catch (ClassNotFoundException e) {
String msg =
String.format(
"Unknown 'runner' specified '%s', supported pipeline runners %s",
runner,
Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
throw new IllegalArgumentException(msg, e);
}
}
} else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType())
|| returnType.getComponentType().isEnum()))
|| Collection.class.isAssignableFrom(returnType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,9 @@ private <K, V> void groupByKeyHelper(
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
context.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.io.Source.Reader;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
Expand Down Expand Up @@ -79,8 +80,7 @@ private <OutputT> TransformEvaluator<?> getTransformEvaluator(
@SuppressWarnings("unchecked")
private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
final InProcessEvaluationContext evaluationContext)
throws IOException {
final InProcessEvaluationContext evaluationContext) {
// Key by the application and the context the evaluation is occurring in (which call to
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
Expand All @@ -102,39 +102,51 @@ private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueu
return evaluatorQueue;
}

/**
* A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
* creates the {@link BoundedReader} and consumes all available input.
*
* <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
* each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
* may produce duplicate elements.
*/
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
private final Reader<OutputT> reader;
private boolean contentsRemaining;

public BoundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext)
throws IOException {
InProcessEvaluationContext evaluationContext) {
this.transform = transform;
this.evaluationContext = evaluationContext;
reader =
transform.getTransform().getSource().createReader(evaluationContext.getPipelineOptions());
contentsRemaining = reader.start();
}

@Override
public void processElement(WindowedValue<Object> element) {}

@Override
public InProcessTransformResult finishBundle() throws IOException {
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
try (final Reader<OutputT> reader =
transform
.getTransform()
.getSource()
.createReader(evaluationContext.getPipelineOptions());) {
contentsRemaining = reader.start();
UncommittedBundle<OutputT> output =
evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
}
reader.close();
return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
.addOutput(output)
.build();
}
return StepTransformResult
.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
.addOutput(output)
.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;

/**
* A callback for completing a bundle of input.
*/
interface CompletionCallback {
/**
* Handle a successful result.
*/
void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result);

/**
* Handle a result that terminated abnormally due to the provided {@link Throwable}.
*/
void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;

import java.util.Objects;
Expand Down
Loading