Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.io;

import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;

import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;

import com.google.api.services.bigquery.model.TableSchema;

import org.junit.Test;

import java.util.Set;

/**
* Unit tests for Dataflow usage of {@link BigQueryIO} transforms.
*/
public class DataflowBigQueryIOTest {
@Test
public void testBatchSinkPrimitiveDisplayData() {
DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions();
options.setStreaming(false);
testSinkPrimitiveDisplayData(options);
}

@Test
public void testStreamingSinkPrimitiveDisplayData() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for Junit4 it is preferred not to prefix with "test". So just"

@Test
public void streamingSinkPrimitiveDisplayData

etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The convention is split in the BEAM codebase, but most test cases use the "test" prefix: 1916 prefixed vs 93 not according to this script.

I'm going to follow the BEAM convention. It makes sense to migrate if we're not following JUnit4 convention, but we should do it across the codebase. Feel free to file a bug assigned to me if you feel it's worth the effort.

DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions();
options.setStreaming(true);
testSinkPrimitiveDisplayData(options);
}

private void testSinkPrimitiveDisplayData(DataflowPipelineOptions options) {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(options);

BigQueryIO.Write.Bound write = BigQueryIO.Write
.to("project:dataset.table")
.withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
.withoutValidation();

Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("BigQueryIO.Write should include the table spec in its primitive display data",
displayData, hasItem(hasDisplayItem(hasKey("tableSpec"))));

assertThat("BigQueryIO.Write should include the table schema in its primitive display data",
displayData, hasItem(hasDisplayItem(hasKey("schema"))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@
*/
package org.apache.beam.runners.dataflow.io;

import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;

import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat;

import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
Expand All @@ -41,13 +51,13 @@
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Set;

/**
* {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms.
*/
@RunWith(JUnit4.class)
public class DataflowTextIOTest {

private TestDataflowPipelineOptions buildTestPipelineOptions() {
TestDataflowPipelineOptions options =
PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
Expand Down Expand Up @@ -116,4 +126,15 @@ public void testGoodWildcards() throws Exception {
private void applyRead(Pipeline pipeline, String path) {
pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
}

@Test
public void testPrimitiveDisplayData() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on test naming.

DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();

TextIO.Write.Bound<?> write = TextIO.Write.to("foobar");

Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("TextIO.Write should include the file prefix in its primitive display data",
displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.transforms;

import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;

import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineTest;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;

import org.junit.Test;

import java.util.Set;

/**
* Unit tests for Dataflow usage of {@link Combine} transforms.
*/
public class DataflowCombineTest {
@Test
public void testCombinePerKeyPrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();

CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts();
PTransform<PCollection<KV<Integer, Integer>>, ? extends POutput> combine =
Combine.perKey(combineFn);

Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine,
KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));

assertThat("Combine.perKey should include the combineFn in its primitive transform",
displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.transforms;

import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.util.NoopCredentialFactory;
import org.apache.beam.sdk.util.NoopPathValidator;

import com.google.common.collect.Lists;

/**
* Factory methods for creating {@link DisplayDataEvaluator} instances against the
* {@link DataflowPipelineRunner}.
*/
public final class DataflowDisplayDataEvaluator {
/** Do not instantiate. */
private DataflowDisplayDataEvaluator() {}

/**
* Retrieve a set of default {@link DataflowPipelineOptions} which can be used to build
* dataflow pipelines for evaluating display data.
*/
public static DataflowPipelineOptions getDefaultOptions() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

options.setProject("foobar");
options.setTempLocation("gs://bucket/tmpLocation");
options.setFilesToStage(Lists.<String>newArrayList());

options.as(DataflowPipelineDebugOptions.class).setPathValidatorClass(NoopPathValidator.class);
options.as(GcpOptions.class).setCredentialFactoryClass(NoopCredentialFactory.class);

return options;
}

/**
* Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
* the {@link DataflowPipelineRunner}.
*/
public static DisplayDataEvaluator create() {
return create(getDefaultOptions());
}

/**
* Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
* the {@link DataflowPipelineRunner} with the specified {@code options}.
*/
public static DisplayDataEvaluator create(DataflowPipelineOptions options) {
return DisplayDataEvaluator.forRunner(DataflowPipelineRunner.class, options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.transforms;

import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;

import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;

import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;

import org.junit.Test;

import java.io.Serializable;
import java.util.Set;

/**
* Unit tests for Dataflow usage of {@link MapElements} transforms.
*/
public class DataflowMapElementsTest implements Serializable {
@Test
public void testPrimitiveDisplayData() {
SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() {
@Override
public Integer apply(Integer input) {
return input;
}
};

MapElements<?, ?> map = MapElements.via(mapFn);
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();

Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map);
assertThat("MapElements should include the mapFn in its primitive display data",
displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,15 @@ public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation(
return new BigQueryWriteOperation(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.addIfNotNull(DisplayData.item("schema", jsonSchema))
.addIfNotNull(DisplayData.item("tableSpec", jsonTable));
}

private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
// The maximum number of retry load jobs.
private static final int MAX_RETRY_LOAD_JOBS = 3;
Expand Down Expand Up @@ -1396,6 +1405,13 @@ public void finishBundle(Context context) throws Exception {
uniqueIdsForTableRows.clear();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema));
}

public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
TableReference tableReference = parseTableSpec(tableSpec);
Expand Down Expand Up @@ -1603,6 +1619,16 @@ public void processElement(ProcessContext context) throws IOException {
new TableRowInfo(context.element(), uniqueId)));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(DisplayData.item("tableSpec", tableSpec));
if (tableRefFunction != null) {
builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()));
}
}

private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
if (tableSpec != null) {
return tableSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public boolean allowsDynamicSplitting() {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(DisplayData.item("minBundleSize", minBundleSize))
.addIfNotDefault(DisplayData.item("minBundleSize", minBundleSize), 1L)
.addIfNotDefault(DisplayData.item("startOffset", startOffset), 0L)
.addIfNotDefault(DisplayData.item("endOffset", endOffset), Long.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public void finishBundle(Context c) throws Exception {
c.output(result);
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
Write.Bound.this.populateDisplayData(builder);
}
}).withSideInputs(writeOperationView))
.setCoder(writeOperation.getWriterResultCoder());

Expand Down
Loading