Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ dependencies {
// For resolving external transform requests
runtime project(":sdks:java:io:kafka")
runtime library.java.kafka_clients
runtime project(":sdks:java:io:google-cloud-platform")
}

// NOTE: runShadow must be used in order to run the job server. The standard run
Expand Down
4 changes: 4 additions & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ dependencies {
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime")
testCompile project(path: ":runners:direct-java", configuration: "shadow")
testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")
testCompile project(path: ":sdks:java:testing:test-utils", configuration: "testRuntime")
// For testing Cross-language transforms
testCompile project(":runners:core-construction-java")
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
testCompile library.java.junit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.util.Clock;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
Expand All @@ -39,9 +42,11 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
Expand All @@ -53,6 +58,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -705,7 +711,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract static class Builder<T>
implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<T>> {
abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topic);

abstract Builder<T> setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory);
Expand Down Expand Up @@ -733,6 +740,85 @@ abstract static class Builder<T> {
abstract Builder<T> setClock(@Nullable Clock clock);

abstract Read<T> build();

@Override
public PTransform<PBegin, PCollection<T>> buildExternal(External.Configuration config) {
if (config.topic != null) {
StaticValueProvider<String> topic = StaticValueProvider.of(config.topic);
setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()));
}
if (config.subscription != null) {
StaticValueProvider<String> subscription = StaticValueProvider.of(config.subscription);
setSubscriptionProvider(
NestedValueProvider.of(subscription, new SubscriptionTranslator()));
}
if (config.idAttribute != null) {
setIdAttribute(config.idAttribute);
}
if (config.timestampAttribute != null) {
setTimestampAttribute(config.timestampAttribute);
}
setPubsubClientFactory(FACTORY);
setNeedsAttributes(config.needsAttributes);
Coder coder = ByteArrayCoder.of();
if (config.needsAttributes) {
SimpleFunction<PubsubMessage, T> parseFn =
(SimpleFunction<PubsubMessage, T>) new ParsePayloadAsPubsubMessageProto();
setParseFn(parseFn);
setCoder(coder);
} else {
setParseFn(new ParsePayloadUsingCoder<>(coder));
setCoder(coder);
}
setNeedsMessageId(false);
return build();
}
}

/** Exposes {@link PubSubIO.Read} as an external transform for cross-language usage. */
@Experimental
@AutoService(ExternalTransformRegistrar.class)
public static class External implements ExternalTransformRegistrar {

public static final String URN = "beam:external:java:pubsub:read:v1";

@Override
public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
return ImmutableMap.of(URN, AutoValue_PubsubIO_Read.Builder.class);
}

/** Parameters class to expose the transform to an external SDK. */
public static class Configuration {

// All byte arrays are UTF-8 encoded strings
@Nullable private String topic;
@Nullable private String subscription;
@Nullable private String idAttribute;
@Nullable private String timestampAttribute;
private boolean needsAttributes;

public void setTopic(@Nullable String topic) {
this.topic = topic;
}

public void setSubscription(@Nullable String subscription) {
this.subscription = subscription;
}

public void setIdLabel(@Nullable String idAttribute) {
this.idAttribute = idAttribute;
}

public void setTimestampAttribute(@Nullable String timestampAttribute) {
this.timestampAttribute = timestampAttribute;
}

public void setWithAttributes(Boolean needsAttributes) {
// we must use Boolean instead of boolean because the external payload system
// inspects the native type of each coder urn, and BooleanCoder wants Boolean.
this.needsAttributes = needsAttributes;
}
}
}

/**
Expand Down Expand Up @@ -955,7 +1041,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>
abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract static class Builder<T>
implements ExternalTransformBuilder<External.Configuration, PCollection<T>, PDone> {
abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);

abstract Builder<T> setPubsubClientFactory(PubsubClient.PubsubClientFactory factory);
Expand All @@ -971,6 +1058,58 @@ abstract static class Builder<T> {
abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> formatFn);

abstract Write<T> build();

@Override
public PTransform<PCollection<T>, PDone> buildExternal(External.Configuration config) {
if (config.topic != null) {
StaticValueProvider<String> topic = StaticValueProvider.of(config.topic);
setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()));
}
if (config.idAttribute != null) {
setIdAttribute(config.idAttribute);
}
if (config.timestampAttribute != null) {
setTimestampAttribute(config.timestampAttribute);
}
SimpleFunction<T, PubsubMessage> parseFn =
(SimpleFunction<T, PubsubMessage>) new FormatPayloadFromPubsubMessageProto();
setFormatFn(parseFn);
return build();
}
}

/** Exposes {@link PubSubIO.Write} as an external transform for cross-language usage. */
@Experimental
@AutoService(ExternalTransformRegistrar.class)
public static class External implements ExternalTransformRegistrar {

public static final String URN = "beam:external:java:pubsub:write:v1";

@Override
public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
return ImmutableMap.of(URN, AutoValue_PubsubIO_Write.Builder.class);
}

/** Parameters class to expose the transform to an external SDK. */
public static class Configuration {

// All byte arrays are UTF-8 encoded strings
private String topic;
@Nullable private String idAttribute;
@Nullable private String timestampAttribute;

public void setTopic(String topic) {
this.topic = topic;
}

public void setIdLabel(@Nullable String idAttribute) {
this.idAttribute = idAttribute;
}

public void setTimestampAttribute(@Nullable String timestampAttribute) {
this.timestampAttribute = timestampAttribute;
}
}
}

/**
Expand Down Expand Up @@ -1213,6 +1352,22 @@ public T apply(PubsubMessage input) {
}
}

private static class ParsePayloadAsPubsubMessageProto
extends SimpleFunction<PubsubMessage, byte[]> {
@Override
public byte[] apply(PubsubMessage input) {
Map<String, String> attributes = input.getAttributeMap();
com.google.pubsub.v1.PubsubMessage.Builder message =
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(input.getPayload()));
// TODO(BEAM-8085) this should not be null
if (attributes != null) {
message.putAllAttributes(attributes);
}
return message.build().toByteArray();
}
}

private static class FormatPayloadAsUtf8 extends SimpleFunction<String, PubsubMessage> {
@Override
public PubsubMessage apply(String input) {
Expand All @@ -1237,6 +1392,20 @@ public PubsubMessage apply(T input) {
}
}

private static class FormatPayloadFromPubsubMessageProto
extends SimpleFunction<byte[], PubsubMessage> {
@Override
public PubsubMessage apply(byte[] input) {
try {
com.google.pubsub.v1.PubsubMessage message =
com.google.pubsub.v1.PubsubMessage.parseFrom(input);
return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Could not decode Pubsub message", e);
}
}
}

private static class IdentityMessageFn extends SimpleFunction<PubsubMessage, PubsubMessage> {
@Override
public PubsubMessage apply(PubsubMessage input) {
Expand Down
Loading