Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {

@Override
public void populateDisplayData(DisplayData.Builder builder) {
fn.populateDisplayData(builder);
builder.include(fn);
}

private void readObject(java.io.ObjectInputStream in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ private static SideInputReader makeSideInputReader(
private static void populateDisplayData(
DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) {
builder
.include(fn, fnClass)
.include(fn)
.add(DisplayData.item("fn", fnClass));
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
Expand All @@ -39,6 +37,7 @@
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
Expand All @@ -52,7 +51,7 @@
* <p>Components specify their display data by implementing the {@link HasDisplayData}
* interface.
*/
public class DisplayData {
public class DisplayData implements Serializable {
private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item<?>>newHashMap());
private static final DateTimeFormatter TIMESTAMP_FORMATTER = ISODateTimeFormat.dateTime();

Expand Down Expand Up @@ -144,7 +143,7 @@ public String toString() {
return builder.toString();
}

private static String namespaceOf(ClassForDisplay clazz) {
private static String namespaceOf(Class<?> clazz) {
return clazz.getName();
}

Expand All @@ -154,31 +153,42 @@ private static String namespaceOf(ClassForDisplay clazz) {
*/
public interface Builder {
/**
* Register display data from the specified subcomponent.
* Register display data from the specified subcomponent. For example, a {@link PTransform}
* which delegates to a user-provided function can implement {@link HasDisplayData} on the
* function and include it from the {@link PTransform}:
*
* @see #include(HasDisplayData, String)
* <pre><code>{@literal @Override}
* public void populateDisplayData(DisplayData.Builder builder) {
* super.populateDisplayData(builder);
*
* builder
* .add(DisplayData.item("userFn", userFn)) // To register the class name of the userFn
* .include(userFn); // To allow the userFn to register additional display data
* }
* </code></pre>
*
* Using {@code include(subcomponent)} will associate each of the registered items with the
* namespace of the {@code subcomponent} being registered. To register display data in the
* current namespace, such as from a base class implementation, use
* {@code subcomponent.populateDisplayData(builder)} instead.
*
* @see HasDisplayData#populateDisplayData(DisplayData.Builder)
*/
Builder include(HasDisplayData subComponent);

/**
* Register display data from the specified subcomponent, using the specified namespace.
* Register display data from the specified subcomponent, overriding the namespace of
* subcomponent display items with the specified namespace.
*
* @see #include(HasDisplayData, String)
* @see #include(HasDisplayData)
*/
Builder include(HasDisplayData subComponent, Class<?> namespace);

/**
* Register display data from the specified subcomponent, using the specified namespace.
* Register display data from the specified subcomponent, overriding the namespace of
* subcomponent display items with the specified namespace.
*
* @see #include(HasDisplayData, String)
*/
Builder include(HasDisplayData subComponent, ClassForDisplay namespace);

/**
* Register display data from the specified subcomponent, using the specified namespace.
*
* <p>For example, a {@link ParDo} transform includes display data from the encapsulated
* {@link DoFn}.
* @see #include(HasDisplayData)
*/
Builder include(HasDisplayData subComponent, String namespace);

Expand Down Expand Up @@ -206,7 +216,7 @@ public interface Builder {
* within {@link HasDisplayData#populateDisplayData} implementations.
*/
@AutoValue
public abstract static class Item<T> {
public abstract static class Item<T> implements Serializable {

/**
* The namespace for the display item. The namespace defaults to the component which
Expand Down Expand Up @@ -292,12 +302,6 @@ private static <T> Item<T> create(String key, Type type, @Nullable T value) {
*/
public Item<T> withNamespace(Class<?> namespace) {
checkNotNull(namespace, "namespace argument cannot be null");
return withNamespace(ClassForDisplay.of(namespace));
}

/** @see #withNamespace(Class) */
private Item<T> withNamespace(ClassForDisplay namespace) {
checkNotNull(namespace, "namesapce argument cannot be null");
return withNamespace(namespaceOf(namespace));
}

Expand Down Expand Up @@ -377,7 +381,7 @@ public static class Identifier {
private final String ns;
private final String key;

public static Identifier of(ClassForDisplay namespace, String key) {
public static Identifier of(Class<?> namespace, String key) {
return of(namespaceOf(namespace), key);
}

Expand Down Expand Up @@ -470,12 +474,7 @@ FormattedItemValue format(Object value) {
JAVA_CLASS {
@Override
FormattedItemValue format(Object value) {
if (value instanceof Class<?>) {
ClassForDisplay classForDisplay = ClassForDisplay.of((Class<?>) value);
return format(classForDisplay);
}

ClassForDisplay clazz = checkType(value, ClassForDisplay.class, JAVA_CLASS);
Class<?> clazz = checkType(value, Class.class, JAVA_CLASS);
return new FormattedItemValue(clazz.getName(), clazz.getSimpleName());
}
};
Expand Down Expand Up @@ -525,7 +524,7 @@ private static Type tryInferFrom(@Nullable Object value) {
return TIMESTAMP;
} else if (value instanceof Duration) {
return DURATION;
} else if (value instanceof Class<?> || value instanceof ClassForDisplay) {
} else if (value instanceof Class<?>) {
return JAVA_CLASS;
} else if (value instanceof String) {
return STRING;
Expand Down Expand Up @@ -587,12 +586,6 @@ public Builder include(HasDisplayData subComponent) {

@Override
public Builder include(HasDisplayData subComponent, Class<?> namespace) {
checkNotNull(namespace, "Input namespace override cannot be null");
return include(subComponent, ClassForDisplay.of(namespace));
}

@Override
public Builder include(HasDisplayData subComponent, ClassForDisplay namespace) {
checkNotNull(namespace, "Input namespace override cannot be null");
return include(subComponent, namespaceOf(namespace));
}
Expand Down Expand Up @@ -720,13 +713,6 @@ public static <T> Item<Class<T>> item(String key, @Nullable Class<T> value) {
return item(key, Type.JAVA_CLASS, value);
}

/**
* Create a display item for the specified key and class value.
*/
public static Item<ClassForDisplay> item(String key, @Nullable ClassForDisplay value) {
return item(key, Type.JAVA_CLASS, value);
}

/**
* Create a display item for the specified key, type, and value. This method should be used
* if the type of the input value can only be determined at runtime. Otherwise,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,52 @@
*/
package org.apache.beam.sdk.transforms.display;

import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;

/**
* Marker interface for {@link PTransform PTransforms} and components used within
* {@link PTransform PTransforms} to specify display data to be used within UIs and diagnostic
* tools.
* Marker interface for {@link PTransform PTransforms} and components to specify display data used
* within UIs and diagnostic tools.
*
* <p>Display data is registered by overriding
* {@link #populateDisplayData(DisplayData.Builder)} in a component which implements
* {@code HasDisplayData}. Display data is available for {@link PipelineOptions} and
* {@link PTransform} implementations.
*
* <pre><code>{@literal @Override}
* public void populateDisplayData(DisplayData.Builder builder) {
* super.populateDisplayData(builder);
*
* builder
* .include(subComponent)
* .add(DisplayData.item("minFilter", 42))
* .addIfNotDefault(DisplayData.item("useTransactions", this.txn), false)
* .add(DisplayData.item("topic", "projects/myproject/topics/mytopic")
* .withLabel("Pub/Sub Topic"))
* .add(DisplayData.item("serviceInstance", "myservice.com/fizzbang")
* .withLinkUrl("http://www.myservice.com/fizzbang"));
* }
* </code></pre>
*
* <p>Display data is optional and may be collected during pipeline construction. It should
* only be used to informational purposes. Tools and components should not assume that display data
* only be used for informational purposes. Tools and components should not assume that display data
* will always be collected, or that collected display data will always be displayed.
*
* @see #populateDisplayData(DisplayData.Builder)
*/
public interface HasDisplayData {
/**
* Register display data for the given transform or component. Metadata can be registered
* directly on the provided builder, as well as via included sub-components.
* Register display data for the given transform or component.
*
* <pre>
* {@code
* @Override
* public void populateDisplayData(DisplayData.Builder builder) {
* builder
* .include(subComponent)
* .add(DisplayData.item("minFilter", 42))
* .addIfNotDefault(DisplayData.item("useTransactions", this.txn), false)
* .add(DisplayData.item("topic", "projects/myproject/topics/mytopic")
* .withLabel("Pub/Sub Topic"))
* .add(DisplayData.item("serviceInstance", "myservice.com/fizzbang")
* .withLinkUrl("http://www.myservice.com/fizzbang"));
* }
* }
* </pre>
* <p>{@code populateDisplayData(DisplayData.Builder)} is invoked by Pipeline runners to collect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for this block?

* display data via {@link DisplayData#from(HasDisplayData)}. Implementations may call
* {@code super.populateDisplayData(builder)} in order to register display data in the current
* namespace, but should otherwise use {@code subcomponent.populateDisplayData(builder)} to use
* the namespace of the subcomponent.
*
* @param builder The builder to populate with display data.
*
* @see HasDisplayData
*/
void populateDisplayData(DisplayData.Builder builder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;

import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -714,6 +715,21 @@ public void populateDisplayData(DisplayData.Builder builder) {
assertThat(displayData, includesDisplayDataFrom(combineFn));
}

@Test
public void testDisplayDataForWrappedFn() {
UniqueInts combineFn = new UniqueInts() {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("foo", "bar"));
}
};
Combine.PerKey<?, ?, ?> combine = Combine.perKey(combineFn);
DisplayData displayData = DisplayData.from(combine);

assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass()));
assertThat(displayData, hasDisplayItem(hasNamespace(combineFn.getClass())));
}

////////////////////////////////////////////////////////////////////////////
// Test classes, for different kinds of combining fns.

Expand Down
Loading