Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
.build(),
ImmutableMap.<String, Object>builder()
.put("key", "fn")
.put("label", "Transform Function")
.put("type", "JAVA_CLASS")
.put("value", fn1.getClass().getName())
.put("shortValue", fn1.getClass().getSimpleName())
Expand All @@ -955,6 +956,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
ImmutableSet<ImmutableMap<String, Object>> expectedFn2DisplayData = ImmutableSet.of(
ImmutableMap.<String, Object>builder()
.put("key", "fn")
.put("label", "Transform Function")
.put("type", "JAVA_CLASS")
.put("value", fn2.getClass().getName())
.put("shortValue", fn2.getClass().getSimpleName())
Expand Down
28 changes: 19 additions & 9 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ public PCollection<T> apply(PInput input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull(DisplayData.item("filePattern", filepattern))
.addIfNotDefault(DisplayData.item("validation", validate), true);
.addIfNotNull(DisplayData.item("filePattern", filepattern)
.withLabel("Input File Pattern"))
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"), true);
}

@Override
Expand Down Expand Up @@ -694,14 +696,22 @@ public PDone apply(PCollection<T> input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(DisplayData.item("schema", type))
.addIfNotNull(DisplayData.item("filePrefix", filenamePrefix))
.addIfNotDefault(
DisplayData.item("shardNameTemplate", shardTemplate),
.add(DisplayData.item("schema", type)
.withLabel("Record Schema"))
.addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
.withLabel("Output File Prefix"))
.addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
.withLabel("Output Shard Name Template"),
DEFAULT_SHARD_TEMPLATE)
.addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix), "")
.addIfNotDefault(DisplayData.item("numShards", numShards), 0)
.addIfNotDefault(DisplayData.item("validation", validate), true);
.addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
.withLabel("Output File Suffix"),
"")
.addIfNotDefault(DisplayData.item("numShards", numShards)
.withLabel("Maximum Output Shards"),
0)
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"),
true);
}

/**
Expand Down
43 changes: 29 additions & 14 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -631,13 +631,18 @@ public void populateDisplayData(DisplayData.Builder builder) {
TableReference table = getTable();

if (table != null) {
builder.add(DisplayData.item("table", toTableSpec(table)));
builder.add(DisplayData.item("table", toTableSpec(table))
.withLabel("Table"));
}

builder
.addIfNotNull(DisplayData.item("query", query))
.addIfNotNull(DisplayData.item("flattenResults", flattenResults))
.addIfNotDefault(DisplayData.item("validation", validate), true);
.addIfNotNull(DisplayData.item("query", query)
.withLabel("Query"))
.addIfNotNull(DisplayData.item("flattenResults", flattenResults)
.withLabel("Flatten Query Results"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could/should this link to the BigQuery documentation on flattenResults?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I like, though we don't have a precedent for linking to documentation. Should we make this the first or follow-up with a plan for other transforms?

.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"),
true);
}

/**
Expand Down Expand Up @@ -1753,17 +1758,23 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.addIfNotNull(DisplayData.item("table", jsonTableRef))
.addIfNotNull(DisplayData.item("schema", jsonSchema));
.addIfNotNull(DisplayData.item("table", jsonTableRef)
.withLabel("Table Reference"))
.addIfNotNull(DisplayData.item("schema", jsonSchema)
.withLabel("Table Schema"));

if (tableRefFunction != null) {
builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()));
builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
.withLabel("Table Reference Function"));
}

builder
.add(DisplayData.item("createDisposition", createDisposition.toString()))
.add(DisplayData.item("writeDisposition", writeDisposition.toString()))
.addIfNotDefault(DisplayData.item("validation", validate), true);
.add(DisplayData.item("createDisposition", createDisposition.toString())
.withLabel("Table CreateDisposition"))
.add(DisplayData.item("writeDisposition", writeDisposition.toString())
.withLabel("Table WriteDisposition"))
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"), true);
}

/** Returns the create disposition. */
Expand Down Expand Up @@ -1855,8 +1866,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.addIfNotNull(DisplayData.item("schema", jsonSchema))
.addIfNotNull(DisplayData.item("tableSpec", jsonTable));
.addIfNotNull(DisplayData.item("schema", jsonSchema)
.withLabel("Table Schema"))
.addIfNotNull(DisplayData.item("tableSpec", jsonTable)
.withLabel("Table Specification"));
}

private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
Expand Down Expand Up @@ -2096,7 +2109,8 @@ public void finishBundle(Context context) throws Exception {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema));
builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema)
.withLabel("Table Schema"));
}

public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
Expand Down Expand Up @@ -2312,7 +2326,8 @@ public void populateDisplayData(DisplayData.Builder builder) {

builder.addIfNotNull(DisplayData.item("tableSpec", tableSpec));
if (tableRefFunction != null) {
builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()));
builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
.withLabel("Table Reference Function"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ public String getKindString() {
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.add(DisplayData.item("source", source.getClass()))
.addIfNotDefault(DisplayData.item("maxRecords", maxNumRecords), Long.MAX_VALUE)
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime))
.add(DisplayData.item("source", source.getClass())
.withLabel("Read Source"))
.addIfNotDefault(DisplayData.item("maxRecords", maxNumRecords)
.withLabel("Maximum Read Records"), Long.MAX_VALUE)
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
.withLabel("Maximum Read Time"))
.include(source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,17 @@ public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.include(sourceDelegate)
.add(DisplayData.item("source", sourceDelegate.getClass()));
.add(DisplayData.item("source", sourceDelegate.getClass())
.withLabel("Read Source"));

if (channelFactory instanceof Enum) {
// GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name()
// value instead
builder.add(DisplayData.item("compressionMode", ((Enum) channelFactory).name()));
builder.add(DisplayData.item("compressionMode", ((Enum) channelFactory).name())
.withLabel("Compression Mode"));
} else {
builder.add(DisplayData.item("compressionMode", channelFactory.getClass()));
builder.add(DisplayData.item("compressionMode", channelFactory.getClass())
.withLabel("Compression Mode"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public PCollection<Long> apply(PBegin begin) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("upTo", numElements));
builder.add(DisplayData.item("upTo", numElements)
.withLabel("Count Up To"));
}
}

Expand Down Expand Up @@ -233,14 +234,17 @@ public PCollection<Long> apply(PBegin begin) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.add(DisplayData.item("timestampFn", timestampFn.getClass()));
builder.add(DisplayData.item("timestampFn", timestampFn.getClass())
.withLabel("Timestamp Function"));

if (maxReadTime.isPresent()) {
builder.add(DisplayData.item("maxReadTime", maxReadTime.get()));
builder.add(DisplayData.item("maxReadTime", maxReadTime.get())
.withLabel("Maximum Read Time"));
}

if (maxNumRecords.isPresent()) {
builder.add(DisplayData.item("maxRecords", maxNumRecords.get()));
builder.add(DisplayData.item("maxRecords", maxNumRecords.get())
.withLabel("Maximum Read Records"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,16 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault(DisplayData.item("host", host), DEFAULT_HOST)
.addIfNotNull(DisplayData.item("dataset", datasetId))
.addIfNotNull(DisplayData.item("namespace", namespace));
.addIfNotDefault(DisplayData.item("host", host)
.withLabel("Datastore Service"), DEFAULT_HOST)
.addIfNotNull(DisplayData.item("dataset", datasetId)
.withLabel("Input Dataset"))
.addIfNotNull(DisplayData.item("namespace", namespace)
.withLabel("App Engine Namespace"));

if (query != null) {
builder.add(DisplayData.item("query", query.toString()));
builder.add(DisplayData.item("query", query.toString())
.withLabel("Query"));
}
}

Expand Down Expand Up @@ -606,8 +610,10 @@ public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault(DisplayData.item("host", host), DEFAULT_HOST)
.addIfNotNull(DisplayData.item("dataset", datasetId));
.addIfNotDefault(DisplayData.item("host", host)
.withLabel("Datastore Service"), DEFAULT_HOST)
.addIfNotNull(DisplayData.item("dataset", datasetId)
.withLabel("Output Dataset"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void populateDisplayData(DisplayData.Builder builder) {

String fileNamePattern = String.format("%s%s%s",
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
builder.add(DisplayData.item("fileNamePattern", fileNamePattern));
builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
.withLabel("File Name Pattern"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ private static long getEstimatedSizeOfFilesBySampling(
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("filePattern", getFileOrPatternSpec()));
builder.add(DisplayData.item("filePattern", getFileOrPatternSpec())
.withLabel("File Pattern"));
}

private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,12 @@ public boolean allowsDynamicSplitting() {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault(DisplayData.item("minBundleSize", minBundleSize), 1L)
.addIfNotDefault(DisplayData.item("startOffset", startOffset), 0L)
.addIfNotDefault(DisplayData.item("endOffset", endOffset), Long.MAX_VALUE);
.addIfNotDefault(DisplayData.item("minBundleSize", minBundleSize)
.withLabel("Minimum Bundle Size"), 1L)
.addIfNotDefault(DisplayData.item("startOffset", startOffset)
.withLabel("Start Read Offset"), 0L)
.addIfNotDefault(DisplayData.item("endOffset", endOffset)
.withLabel("End Read Offset"), Long.MAX_VALUE);
}

/**
Expand Down
42 changes: 25 additions & 17 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@ private static void validatePubsubName(String name) {
}
}

/**
* Populate common {@link DisplayData} between Pubsub source and sink.
*/
private static void populateCommonDisplayData(DisplayData.Builder builder,
String timestampLabel, String idLabel, PubsubTopic topic) {
builder
.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
.withLabel("Timestamp Label Attribute"))
.addIfNotNull(DisplayData.item("idLabel", idLabel)
.withLabel("ID Label Attribute"));

if (topic != null) {
builder.add(DisplayData.item("topic", topic.asPath())
.withLabel("Pubsub Topic"));
}
}

/**
* Class representing a Cloud Pub/Sub Subscription.
*/
Expand Down Expand Up @@ -641,19 +658,17 @@ public PCollection<T> apply(PInput input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
populateCommonDisplayData(builder, timestampLabel, idLabel, topic);

builder
.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel))
.addIfNotNull(DisplayData.item("idLabel", idLabel))
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime))
.addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords), 0);

if (topic != null) {
builder.add(DisplayData.item("topic", topic.asPath()));
}
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
.withLabel("Maximum Read Time"))
.addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
.withLabel("Maximum Read Records"), 0);

if (subscription != null) {
builder.add(DisplayData.item("subscription", subscription.asPath()));
builder.add(DisplayData.item("subscription", subscription.asPath())
.withLabel("Pubsub Subscription"));
}
}

Expand Down Expand Up @@ -953,14 +968,7 @@ public PDone apply(PCollection<T> input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel))
.addIfNotNull(DisplayData.item("idLabel", idLabel));

if (topic != null) {
builder.add(DisplayData.item("topic", topic.asPath()));
}
populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
}

@Override
Expand Down
6 changes: 4 additions & 2 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public String getKindString() {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(DisplayData.item("source", source.getClass()))
.add(DisplayData.item("source", source.getClass())
.withLabel("Read Source"))
.include(source);
}

Expand Down Expand Up @@ -264,7 +265,8 @@ public String getKindString() {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(DisplayData.item("source", source.getClass()))
.add(DisplayData.item("source", source.getClass())
.withLabel("Read Source"))
.include(source);
}
}
Expand Down
Loading