Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -104,6 +105,16 @@ public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.add("source", source.getClass())
.addIfNotDefault("maxRecords", maxNumRecords, Long.MAX_VALUE)
.addIfNotNull("maxReadTime", maxReadTime)
.include(source);
}

private static class UnboundedToBoundedSourceAdapter<T>
extends BoundedSource<ValueWithRecordId<T>> {
private final UnboundedSource<T, ?> source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -318,6 +319,22 @@ public final boolean producesSortedKeys(PipelineOptions options) throws Exceptio
return sourceDelegate.producesSortedKeys(options);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.include(sourceDelegate)
.add("source", sourceDelegate.getClass());

if (channelFactory instanceof Enum) {
// GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name()
// value instead
builder.add("compressionMode", ((Enum) channelFactory).name());
} else {
builder.add("compressionMode", channelFactory.getClass());
}
}

/**
* Returns the delegate source's default output coder.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -396,6 +397,19 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return getPropertyMap(entity).get("entity_bytes").getIntegerValue();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault("host", host, DEFAULT_HOST)
.addIfNotNull("dataset", datasetId)
.addIfNotNull("namespace", namespace);

if (query != null) {
builder.add("query", query.toString());
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
Expand Down Expand Up @@ -597,6 +611,13 @@ public void validate(PipelineOptions options) {
public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
return new DatastoreWriteOperation(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotDefault("host", host, DEFAULT_HOST)
.addIfNotNull("dataset", datasetId);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
Expand Down Expand Up @@ -142,6 +143,30 @@ public void validate(PipelineOptions options) {}
@Override
public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

String fileNamePattern = String.format("%s%s%s",
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
builder.add("fileNamePattern", fileNamePattern);
}

/**
* Returns the file extension to be used. If the user did not request a file
* extension then this method returns the empty string. Otherwise this method
* adds a {@code "."} to the beginning of the users extension if one is not present.
*/
private static String getFileExtension(String usersExtension) {
if (usersExtension == null || usersExtension.isEmpty()) {
return "";
}
if (usersExtension.startsWith(".")) {
return usersExtension;
}
return "." + usersExtension;
}

/**
* Abstract {@link Sink.WriteOperation} that manages the process of writing to a
* {@link FileBasedSink}.
Expand Down Expand Up @@ -363,21 +388,6 @@ protected final List<String> generateDestinationFilenames(int numFiles) {
return destFilenames;
}

/**
* Returns the file extension to be used. If the user did not request a file
* extension then this method returns the empty string. Otherwise this method
* adds a {@code "."} to the beginning of the users extension if one is not present.
*/
private String getFileExtension(String usersExtension) {
if (usersExtension == null || usersExtension.isEmpty()) {
return "";
}
if (usersExtension.startsWith(".")) {
return usersExtension;
}
return "." + usersExtension;
}

/**
* Removes temporary output files. Uses the temporary filename to find files to remove.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -257,6 +258,12 @@ private static long getEstimatedSizeOfFilesBySampling(
/ selectedFiles.size();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add("filePattern", getFileOrPatternSpec());
}

private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
final String file,
final long desiredBundleSizeBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.cloud.dataflow.sdk.io.range.OffsetRangeTracker;
import com.google.cloud.dataflow.sdk.io.range.RangeTracker;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.common.base.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -198,6 +199,15 @@ public boolean allowsDynamicSplitting() {
return true;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add("minBundleSize", minBundleSize)
.addIfNotDefault("startOffset", startOffset, 0)
.addIfNotDefault("endOffset", endOffset, Long.MAX_VALUE);
}

/**
* A {@link Source.Reader} that implements code common to readers of all
* {@link OffsetBasedSource}s.
Expand Down
15 changes: 15 additions & 0 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
Expand Down Expand Up @@ -143,6 +144,13 @@ public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("source", source.getClass())
.include(source);
}

static {
registerDefaultTransformEvaluator();
}
Expand Down Expand Up @@ -249,5 +257,12 @@ public final PCollection<T> apply(PInput input) {
public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("source", source.getClass())
.include(source);
}
}
}
13 changes: 12 additions & 1 deletion sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;
import com.google.cloud.dataflow.sdk.values.PCollection;

import java.io.Serializable;
Expand Down Expand Up @@ -118,7 +120,7 @@
* @param <T> the type that will be written to the Sink.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public abstract class Sink<T> implements Serializable {
public abstract class Sink<T> implements Serializable, HasDisplayData {
/**
* Ensures that the sink is valid and can be written to before the write operation begins. One
* should use {@link com.google.common.base.Preconditions} to implement this method.
Expand All @@ -130,6 +132,15 @@ public abstract class Sink<T> implements Serializable {
*/
public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);

/**
* {@inheritDoc}
*
* <p>By default, does not register any display data. Implementors may override this method
* to provide their own display metadata.
*/
@Override
public void populateDisplayData(DisplayData.Builder builder) {}

/**
* A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
*
Expand Down
13 changes: 12 additions & 1 deletion sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;

import org.joda.time.Instant;

Expand Down Expand Up @@ -51,7 +53,7 @@
* @param <T> Type of elements read by the source.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public abstract class Source<T> implements Serializable {
public abstract class Source<T> implements Serializable, HasDisplayData {
/**
* Checks that this source is valid, before it can be used in a pipeline.
*
Expand All @@ -65,6 +67,15 @@ public abstract class Source<T> implements Serializable {
*/
public abstract Coder<T> getDefaultOutputCoder();

/**
* {@inheritDoc}
*
* <p>By default, does not register any display data. Implementors may override this method
* to provide their own display metadata.
*/
@Override
public void populateDisplayData(DisplayData.Builder builder) {}

/**
* The interface that readers of custom input sources must implement.
*
Expand Down
8 changes: 8 additions & 0 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -73,6 +74,13 @@ public PDone apply(PCollection<T> input) {
return createWrite(input, sink.createWriteOperation(options));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("sink", sink.getClass())
.include(sink);
}

/**
* Returns the {@link Sink} associated with this PTransform.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation;
import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -219,6 +220,14 @@ public void validate(PipelineOptions options) {
public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
return new XmlWriteOperation<>(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull("rootElement", rootElementName)
.addIfNotNull("recordClass", classToBind);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.cloud.dataflow.sdk.coders.JAXBCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.common.base.Preconditions;

import org.codehaus.stax2.XMLInputFactory2;
Expand All @@ -34,7 +35,6 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
Expand Down Expand Up @@ -214,6 +214,15 @@ public void validate() {
recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull("rootElement", rootElement)
.addIfNotNull("recordElement", recordElement)
.addIfNotNull("recordClass", recordClass);
}

@Override
public Coder<T> getDefaultOutputCoder() {
return JAXBCoder.of(recordClass);
Expand Down
Loading