Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* Defines a {@link com.google.cloud.dataflow.sdk.coders.Coder}
* for Protocol Buffers messages, {@code ProtoCoder}.
*
* @see com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder
*/
package com.google.cloud.dataflow.sdk.coders.protobuf;
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* <h3>Reading from Cloud Bigtable</h3>
*
* <p>The Bigtable source returns a set of rows from a single table, returning a
* {@code PCollection&lt;Row&gt;}.
* {@code PCollection<Row>}.
*
* <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
* or builder configured with the project and other information necessary to identify the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* Defines transforms for reading and writing from Google Cloud Bigtable.
*
* @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
*/
package com.google.cloud.dataflow.sdk.io.bigtable;
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,9 @@ private <K, V> void groupByKeyHelper(
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
context.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.io.Source.Reader;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
Expand Down Expand Up @@ -79,8 +80,7 @@ private <OutputT> TransformEvaluator<?> getTransformEvaluator(
@SuppressWarnings("unchecked")
private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
final InProcessEvaluationContext evaluationContext)
throws IOException {
final InProcessEvaluationContext evaluationContext) {
// Key by the application and the context the evaluation is occurring in (which call to
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
Expand All @@ -102,39 +102,51 @@ private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueu
return evaluatorQueue;
}

/**
* A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
* creates the {@link BoundedReader} and consumes all available input.
*
* <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
* each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
* may produce duplicate elements.
*/
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
private final Reader<OutputT> reader;
private boolean contentsRemaining;

public BoundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext)
throws IOException {
InProcessEvaluationContext evaluationContext) {
this.transform = transform;
this.evaluationContext = evaluationContext;
reader =
transform.getTransform().getSource().createReader(evaluationContext.getPipelineOptions());
contentsRemaining = reader.start();
}

@Override
public void processElement(WindowedValue<Object> element) {}

@Override
public InProcessTransformResult finishBundle() throws IOException {
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
try (final Reader<OutputT> reader =
transform
.getTransform()
.getSource()
.createReader(evaluationContext.getPipelineOptions());) {
contentsRemaining = reader.start();
UncommittedBundle<OutputT> output =
evaluationContext.createRootBundle(transform.getOutput());
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
}
reader.close();
return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
.addOutput(output)
.build();
}
return StepTransformResult
.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
.addOutput(output)
.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;

/**
* A callback for completing a bundle of input.
*/
interface CompletionCallback {
/**
* Handle a successful result.
*/
void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result);

/**
* Handle a result that terminated abnormally due to the provided {@link Throwable}.
*/
void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;

import java.util.Objects;
Expand Down
Loading