diff --git a/pom.xml b/pom.xml
index 3cae1a45d65a..0ad044f56df7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
1.7.7v2-rev248-1.21.00.2.3
+ 0.0.2v2-rev6-1.21.0v1b3-rev22-1.21.00.5.160222
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6595a214e4f8..97621362cbf8 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -389,6 +389,40 @@
0.12.0
+
+ com.google.auth
+ google-auth-library-oauth2-http
+ 0.3.1
+
+
+
+ com.google.guava
+ guava-jdk5
+
+
+
+
+
+ io.netty
+ netty-handler
+ 4.1.0.Beta8
+
+
+
+ com.google.api.grpc
+ grpc-pubsub-v1
+ ${pubsubgrpc.version}
+
+
+
+ com.google.guava
+ guava-jdk5
+
+
+
+
com.google.cloud.bigtablebigtable-protos
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
new file mode 100644
index 000000000000..4c79cb8ade7c
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A helper interface for talking to pub/sub via an underlying transport.
+ */
+public interface PubsubClient extends AutoCloseable {
+ /**
+ * Gracefully close the underlying transport.
+ */
+ @Override
+ void close();
+
+ /**
+ * A message to be sent to pub/sub.
+ */
+ class OutgoingMessage {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ /**
+ * Timestamp for element (ms since epoch).
+ */
+ public final long timestampMsSinceEpoch;
+
+ public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
+ this.elementBytes = elementBytes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ }
+ }
+
+ /**
+ * A message received from pub/sub.
+ */
+ class IncomingMessage {
+ /**
+ * Underlying (encoded) element.
+ */
+ public final byte[] elementBytes;
+
+ /**
+ * Timestamp for element (ms since epoch). Either pub/sub's processing time,
+ * or the custom timestamp associated with the message.
+ */
+ public final long timestampMsSinceEpoch;
+
+ /**
+ * Timestamp (in system time) at which we requested the message (ms since epoch).
+ */
+ public final long requestTimeMsSinceEpoch;
+
+ /**
+ * Id to pass back to pub/sub to acknowledge receipt of this message.
+ */
+ public final String ackId;
+
+ /**
+ * Id to pass to the runner to distinguish this message from all others.
+ */
+ public final byte[] recordId;
+
+ public IncomingMessage(
+ byte[] elementBytes,
+ long timestampMsSinceEpoch,
+ long requestTimeMsSinceEpoch,
+ String ackId,
+ byte[] recordId) {
+ this.elementBytes = elementBytes;
+ this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+ this.ackId = ackId;
+ this.recordId = recordId;
+ }
+ }
+
+ /**
+ * Publish {@code outgoingMessages} to pub/sub {@code topic}. Return number of messages
+ * published.
+ *
+ * @throws IOException
+ */
+ int publish(String topic, Iterable outgoingMessages) throws IOException;
+
+ /**
+ * Request the next batch of up to {@code batchSize} messages from {@code subscription}.
+ * Return the received messages, or empty collection if none were available. Does not
+ * wait for messages to arrive. Returned messages will record heir request time
+ * as {@code requestTimeMsSinceEpoch}.
+ *
+ * @throws IOException
+ */
+ Collection pull(
+ long requestTimeMsSinceEpoch, String subscription, int
+ batchSize) throws IOException;
+
+ /**
+ * Acknowldege messages from {@code subscription} with {@code ackIds}.
+ *
+ * @throws IOException
+ */
+ void acknowledge(String subscription, Iterable ackIds) throws IOException;
+
+ /**
+ * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to
+ * be {@code deadlineSeconds} from now.
+ *
+ * @throws IOException
+ */
+ void modifyAckDeadline(String subscription, Iterable ackIds, int deadlineSeconds)
+ throws IOException;
+
+ /**
+ * Create {@code topic}.
+ *
+ * @throws IOException
+ */
+ void createTopic(String topic) throws IOException;
+
+ /*
+ * Delete {@code topic}.
+ *
+ * @throws IOException
+ */
+ void deleteTopic(String topic) throws IOException;
+
+ /**
+ * Return a list of topics for {@code project}.
+ *
+ * @throws IOException
+ */
+ Collection listTopics(String project) throws IOException;
+
+ /**
+ * Create {@code subscription} to {@code topic}.
+ *
+ * @throws IOException
+ */
+ void createSubscription(String topic, String subscription, int ackDeadlineSeconds) throws
+ IOException;
+
+ /**
+ * Delete {@code subscription}.
+ *
+ * @throws IOException
+ */
+ void deleteSubscription(String subscription) throws IOException;
+
+ /**
+ * Return a list of subscriptions for {@code topic} in {@code project}.
+ *
+ * @throws IOException
+ */
+ Collection listSubscriptions(String project, String topic) throws IOException;
+}
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
new file mode 100644
index 000000000000..ce947bf7dfd7
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.api.client.util.DateTime;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.DeleteSubscriptionRequest;
+import com.google.pubsub.v1.DeleteTopicRequest;
+import com.google.pubsub.v1.ListSubscriptionsRequest;
+import com.google.pubsub.v1.ListSubscriptionsResponse;
+import com.google.pubsub.v1.ListTopicsRequest;
+import com.google.pubsub.v1.ListTopicsResponse;
+import com.google.pubsub.v1.ModifyAckDeadlineRequest;
+import com.google.pubsub.v1.PublishRequest;
+import com.google.pubsub.v1.PublishResponse;
+import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.SubscriberGrpc;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.ClientAuthInterceptor;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A helper class for talking to pub/sub via grpc.
+ */
+public class PubsubGrpcClient implements PubsubClient {
+ private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
+ private static final int PUBSUB_PORT = 443;
+ private static final List PUBSUB_SCOPES =
+ Collections.singletonList("https://www.googleapis.com/auth/pubsub");
+ private static final int LIST_BATCH_SIZE = 1000;
+
+ /**
+ * Timeout for grpc calls (in s).
+ */
+ private static final int TIMEOUT_S = 15;
+
+ /**
+ * Underlying netty channel, or null if closed.
+ */
+ @Nullable
+ private ManagedChannel publisherChannel;
+
+ /**
+ * Credentials determined from options and environment.
+ */
+ private final GoogleCredentials credentials;
+
+ /**
+ * Label to use for custom timestamps, or {@literal null} if should use pub/sub publish time
+ * instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Label to use for custom ids, or {@literal null} if should use pub/sub provided ids.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Cached stubs, or null if not cached.
+ */
+ @Nullable
+ private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
+ private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
+
+ private PubsubGrpcClient(
+ @Nullable String timestampLabel, @Nullable String idLabel,
+ ManagedChannel publisherChannel, GoogleCredentials credentials) {
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ this.publisherChannel = publisherChannel;
+ this.credentials = credentials;
+ }
+
+ /**
+ * Construct a new pub/sub grpc client. It should be closed via {@link #close} in order
+ * to ensure tidy cleanup of underlying netty resources. If non-{@literal null}, use
+ * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within
+ * message metadata.
+ */
+ public static PubsubGrpcClient newClient(
+ @Nullable String timestampLabel, @Nullable String idLabel,
+ GcpOptions options) throws IOException {
+ ManagedChannel channel = NettyChannelBuilder
+ .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+ .build();
+ // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the
+ // various command line options. It currently only supports the older
+ // com.google.api.client.auth.oauth2.Credentials.
+ GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
+ return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials);
+ }
+
+ /**
+ * Gracefully close the underlying netty channel.
+ */
+ @Override
+ public void close() {
+ Preconditions.checkState(publisherChannel != null, "Client has already been closed");
+ publisherChannel.shutdown();
+ try {
+ publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore.
+ Thread.currentThread().interrupt();
+ }
+ publisherChannel = null;
+ cachedPublisherStub = null;
+ cachedSubscriberStub = null;
+ }
+
+ /**
+ * Return channel with interceptor for returning credentials.
+ */
+ private Channel newChannel() throws IOException {
+ Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
+ ClientAuthInterceptor interceptor =
+ new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
+ return ClientInterceptors.intercept(publisherChannel, interceptor);
+ }
+
+ /**
+ * Return a stub for making a publish request with a timeout.
+ */
+ private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException {
+ if (cachedPublisherStub == null) {
+ cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
+ }
+ return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Return a stub for making a subscribe request with a timeout.
+ */
+ private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException {
+ if (cachedSubscriberStub == null) {
+ cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
+ }
+ return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int publish(String topic, Iterable outgoingMessages) throws IOException {
+ PublishRequest.Builder request = PublishRequest.newBuilder()
+ .setTopic(topic);
+ for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ PubsubMessage.Builder message =
+ PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
+
+ if (timestampLabel != null) {
+ message.getMutableAttributes()
+ .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+ }
+
+ if (idLabel != null) {
+ message.getMutableAttributes()
+ .put(idLabel,
+ Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+ }
+
+ request.addMessages(message);
+ }
+
+ PublishResponse response = publisherStub().publish(request.build());
+ return response.getMessageIdsCount();
+ }
+
+ @Override
+ public Collection pull(
+ long requestTimeMsSinceEpoch,
+ String subscription,
+ int batchSize) throws IOException {
+ PullRequest request = PullRequest.newBuilder()
+ .setSubscription(subscription)
+ .setReturnImmediately(true)
+ .setMaxMessages(batchSize)
+ .build();
+ PullResponse response = subscriberStub().pull(request);
+ if (response.getReceivedMessagesCount() == 0) {
+ return ImmutableList.of();
+ }
+ List incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
+ for (ReceivedMessage message : response.getReceivedMessagesList()) {
+ PubsubMessage pubsubMessage = message.getMessage();
+ Map attributes = pubsubMessage.getAttributes();
+
+ // Payload.
+ byte[] elementBytes = pubsubMessage.getData().toByteArray();
+
+ // Timestamp.
+ // Start with pub/sub processing time.
+ Timestamp timestampProto = pubsubMessage.getPublishTime();
+ long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L;
+ if (timestampLabel != null && attributes != null) {
+ String timestampString = attributes.get(timestampLabel);
+ if (timestampString != null && !timestampString.isEmpty()) {
+ try {
+ // Try parsing as milliseconds since epoch. Note there is no way to parse a
+ // string in RFC 3339 format here.
+ // Expected IllegalArgumentException if parsing fails; we use that to fall back
+ // to RFC 3339.
+ timestampMsSinceEpoch = Long.parseLong(timestampString);
+ } catch (IllegalArgumentException e1) {
+ try {
+ // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an
+ // IllegalArgumentException if parsing fails, and the caller should handle.
+ timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue();
+ } catch (IllegalArgumentException e2) {
+ // Fallback to pub/sub processing time.
+ }
+ }
+ }
+ // else: fallback to pub/sub processing time.
+ }
+ // else: fallback to pub/sub processing time.
+
+ // Ack id.
+ String ackId = message.getAckId();
+ Preconditions.checkState(ackId != null && !ackId.isEmpty());
+
+ // Record id, if any.
+ @Nullable byte[] recordId = null;
+ if (idLabel != null && attributes != null) {
+ String recordIdString = attributes.get(idLabel);
+ if (recordIdString != null && !recordIdString.isEmpty()) {
+ recordId = recordIdString.getBytes();
+ }
+ }
+ if (recordId == null) {
+ recordId = pubsubMessage.getMessageId().getBytes();
+ }
+
+ incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+ requestTimeMsSinceEpoch, ackId, recordId));
+ }
+ return incomingMessages;
+ }
+
+ @Override
+ public void acknowledge(String subscription, Iterable ackIds) throws IOException {
+ AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
+ .setSubscription(subscription)
+ .addAllAckIds(ackIds)
+ .build();
+ subscriberStub().acknowledge(request); // ignore Empty result.
+ }
+
+ @Override
+ public void modifyAckDeadline(String subscription, Iterable ackIds, int deadlineSeconds)
+ throws IOException {
+ ModifyAckDeadlineRequest request =
+ ModifyAckDeadlineRequest.newBuilder()
+ .setSubscription(subscription)
+ .addAllAckIds(ackIds)
+ .setAckDeadlineSeconds(deadlineSeconds)
+ .build();
+ subscriberStub().modifyAckDeadline(request); // ignore Empty result.
+ }
+
+ @Override
+ public void createTopic(String topic) throws IOException {
+ Topic request = Topic.newBuilder()
+ .setName(topic)
+ .build();
+ publisherStub().createTopic(request); // ignore Topic result.
+ }
+
+ @Override
+ public void deleteTopic(String topic) throws IOException {
+ DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(topic).build();
+ publisherStub().deleteTopic(request); // ignore Empty result.
+ }
+
+ @Override
+ public Collection listTopics(String project) throws IOException {
+ ListTopicsRequest.Builder request =
+ ListTopicsRequest.newBuilder()
+ .setProject(project)
+ .setPageSize(LIST_BATCH_SIZE);
+ ListTopicsResponse response = publisherStub().listTopics(request.build());
+ if (response.getTopicsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List topics = new ArrayList<>(response.getTopicsCount());
+ while (true) {
+ for (Topic topic : response.getTopicsList()) {
+ topics.add(topic.getName());
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = publisherStub().listTopics(request.build());
+ }
+ return topics;
+ }
+
+ @Override
+ public void createSubscription(String topic, String subscription, int ackDeadlineSeconds)
+ throws IOException {
+ Subscription request = Subscription.newBuilder()
+ .setTopic(topic)
+ .setName(subscription)
+ .setAckDeadlineSeconds(ackDeadlineSeconds)
+ .build();
+ subscriberStub().createSubscription(request); // ignore Subscription result.
+ }
+
+ @Override
+ public void deleteSubscription(String subscription) throws IOException {
+ DeleteSubscriptionRequest request =
+ DeleteSubscriptionRequest.newBuilder().setSubscription(subscription).build();
+ subscriberStub().deleteSubscription(request); // ignore Empty result.
+ }
+
+ @Override
+ public Collection listSubscriptions(String project, String topic) throws IOException {
+ ListSubscriptionsRequest.Builder request =
+ ListSubscriptionsRequest.newBuilder()
+ .setProject(project)
+ .setPageSize(LIST_BATCH_SIZE);
+ ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
+ if (response.getSubscriptionsCount() == 0) {
+ return ImmutableList.of();
+ }
+ List subscriptions = new ArrayList<>(response.getSubscriptionsCount());
+ while (true) {
+ for (Subscription subscription : response.getSubscriptionsList()) {
+ if (subscription.getTopic().equals(topic)) {
+ subscriptions.add(subscription.getName());
+ }
+ }
+ if (response.getNextPageToken().isEmpty()) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = subscriberStub().listSubscriptions(request.build());
+ }
+ return subscriptions;
+ }
+}
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java
new file mode 100644
index 000000000000..e90d7bbc9903
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
+import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterFirst;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.annotation.Nullable;
+
+/**
+ * A PTransform which streams messages to pub/sub.
+ *
+ *
The underlying implementation is just a {@link DoFn} which publishes as a side effect.
+ *
We use gRPC for its speed and low memory overhead.
+ *
We try to send messages in batches while also limiting send latency.
+ *
No stats are logged. Rather some counters are used to keep track of elements and batches.
+ *
Though some background threads are used by the underlying netty system all actual pub/sub
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
+ * to execute concurrently and thus hide latency.
+ *
+ */
+public class PubsubUnboundedSink extends PTransform, PDone> {
+ private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class);
+
+ /**
+ * Maximum number of messages per publish.
+ */
+ private static final int PUBLISH_BATCH_SIZE = 1000;
+
+ /**
+ * Maximum size of a publish batch, in bytes.
+ */
+ private static final long PUBLISH_BATCH_BYTES = 400000;
+
+ /**
+ * Longest delay between receiving a message and pushing it to pub/sub.
+ */
+ private static final Duration MAX_LATENCY = Duration.standardSeconds(2);
+
+ /**
+ * Period of samples for stats.
+ */
+ private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
+
+ /**
+ * Period of updates for stats.
+ */
+ private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
+
+ /**
+ * How frequently to log stats.
+ */
+ private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
+
+ /**
+ * Additional sharding so that we can hide publish message latency.
+ */
+ private static final int SCALE_OUT = 4;
+
+ public static final Coder CODER = new
+ AtomicCoder() {
+ @Override
+ public void encode(
+ PubsubClient.OutgoingMessage value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED);
+ BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED);
+ }
+
+ @Override
+ public PubsubClient.OutgoingMessage decode(
+ InputStream inStream, Context context) throws CoderException, IOException {
+ byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED);
+ long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED);
+ return new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch);
+ }
+ };
+
+ // ================================================================================
+ // ShardFv
+ // ================================================================================
+
+ /**
+ * Convert elements to messages and shard them.
+ */
+ private class ShardFn extends DoFn> {
+ /**
+ * Number of cores available for publishing.
+ */
+ private final int numCores;
+
+ private final Aggregator elementCounter =
+ createAggregator("elements", new Sum.SumLongFn());
+
+ public ShardFn(int numCores) {
+ this.numCores = numCores;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ elementCounter.addValue(1L);
+ byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+ long timestampMsSinceEpoch = c.timestamp().getMillis();
+ c.output(KV.of(ThreadLocalRandom.current().nextInt(numCores * SCALE_OUT),
+ new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch)));
+ }
+ }
+
+ // ================================================================================
+ // WriterFn
+ // ================================================================================
+
+ /**
+ * Publish messages to pub/sub in batches.
+ */
+ private class WriterFn
+ extends DoFn>, Void> {
+
+ /**
+ * Client on which to talk to pub/sub. Null until created by {@link #startBundle}.
+ */
+ @Nullable
+ private transient PubsubClient pubsubClient;
+
+ private final Aggregator batchCounter =
+ createAggregator("batches", new Sum.SumLongFn());
+ private final Aggregator elementCounter =
+ createAggregator("elements", new Sum.SumLongFn());
+ private final Aggregator byteCounter =
+ createAggregator("bytes", new Sum.SumLongFn());
+
+ /**
+ * BLOCKING
+ * Send {@code messages} as a batch to pub/sub.
+ */
+ private void publishBatch(List messages, int bytes)
+ throws IOException {
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ int n = pubsubClient.publish(topic, messages);
+ Preconditions.checkState(n == messages.size());
+ batchCounter.addValue(1L);
+ elementCounter.addValue((long) messages.size());
+ byteCounter.addValue((long) bytes);
+ }
+
+ @Override
+ public void startBundle(Context c) throws Exception {
+ Preconditions.checkState(pubsubClient == null);
+ pubsubClient = PubsubGrpcClient.newClient(timestampLabel, idLabel,
+ c.getPipelineOptions().as(GcpOptions.class));
+ super.startBundle(c);
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ List pubsubMessages = new ArrayList<>(PUBLISH_BATCH_SIZE);
+ int bytes = 0;
+ for (PubsubClient.OutgoingMessage message : c.element().getValue()) {
+ if (!pubsubMessages.isEmpty()
+ && bytes + message.elementBytes.length > PUBLISH_BATCH_BYTES) {
+ // Break large (in bytes) batches into smaller.
+ // (We've already broken by batch size using the trigger belom, though that may
+ // run slightly over the actual PUBLISH_BATCH_SIZE)
+ // BLOCKS until published.
+ publishBatch(pubsubMessages, bytes);
+ pubsubMessages.clear();
+ bytes = 0;
+ }
+ pubsubMessages.add(message);
+ bytes += message.elementBytes.length;
+ }
+ if (!pubsubMessages.isEmpty()) {
+ // BLOCKS until published.
+ publishBatch(pubsubMessages, bytes);
+ }
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ pubsubClient.close();
+ pubsubClient = null;
+ super.finishBundle(c);
+ }
+ }
+
+
+ // ================================================================================
+ // PubsubUnboundedSink
+ // ================================================================================
+
+ /**
+ * Number of cores available for publishing.
+ */
+ private final int numCores;
+
+ /**
+ * Pub/sub topic to publish to.
+ */
+ private final String topic;
+
+ /**
+ * Coder for elements. Elements are effectively double-encoded: first to a byte array
+ * using this checkpointCoder, then to a base-64 string to conform to pub/sub's payload
+ * conventions.
+ */
+ private final Coder elementCoder;
+
+ /**
+ * Pub/sub metadata field holding timestamp of each element, or {@literal null} if should use
+ * pub/sub message publish timestamp instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Pub/sub metadata field holding id for each element, or {@literal null} if need to generate
+ * a unique id ourselves.
+ * CAUTION: Currently ignored.
+ */
+ @Nullable
+ private final String idLabel;
+
+ public PubsubUnboundedSink(
+ int numCores,
+ String topic,
+ Coder elementCoder,
+ String timestampLabel,
+ String idLabel) {
+ this.numCores = numCores;
+ this.topic = topic;
+ this.elementCoder = elementCoder;
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ }
+
+ @Override
+ public PDone apply(PCollection input) {
+ String label = "PubsubSink(" + topic.replaceFirst(".*/", "") + ")";
+ input.apply(Window.into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterFirst.of(AfterPane.elementCountAtLeast(PUBLISH_BATCH_SIZE),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(MAX_LATENCY))))
+ .discardingFiredPanes()
+ .withAllowedLateness(Duration.ZERO))
+ .apply(ParDo.named(label + ".Shard").of(new ShardFn(numCores)))
+ .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
+ .apply(GroupByKey.create())
+ .apply(ParDo.named(label + ".Writer").of(new WriterFn()));
+ return PDone.in(input.getPipeline());
+ }
+}
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java
new file mode 100644
index 000000000000..52fec011b7ab
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.api.client.util.Preconditions;
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.ListCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.BucketingFunction;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.util.MovingFunction;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.GeneralSecurityException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+/**
+ * A PTransform which streams messages from pub/sub.
+ *
+ *
The underlying implementation in an {@link UnboundedSource} which receives messages
+ * in batches and hands them out one at a time.
+ *
We use gRPC for its speed and low memory overhead.
+ *
The watermark (either in pub/sub processing time or custom timestamp time) is estimated
+ * by keeping track of the minimum of the last minutes worth of messages. This assumes pub/sub
+ * delivers the oldest (in pub/sub processing time) available message at least once a minute,
+ * and that custom timestamps are 'mostly' monotonic with pub/sub processing time. Unfortunately
+ * both of those assumptions are currently false. Thus the estimated watermark may get ahead of
+ * the 'true' watermark and cause some messages to be late.
+ *
Checkpoints are used both to ACK received messages back to pub/sub (so that they may
+ * be retired on the pub/sub end), and to NACK already consumed messages should a checkpoint
+ * need to be restored (so that pub/sub will resend those messages promptly).
+ *
The subscription must already exist.
+ *
The subscription should have an ACK timeout of 60 seconds.
+ *
We log vital stats every 30 seconds.
+ *
Though some background threads are used by the underlying netty system all actual
+ * pub/sub calls are blocking. We rely on the underlying runner to allow multiple
+ * {@link UnboundedSource.UnboundedReader} instance to execute concurrently and thus hide latency.
+ *
+ */
+public class PubsubUnboundedSource extends PTransform> {
+ private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
+
+ /**
+ * Coder for checkpoints.
+ */
+ private static final CheckpointCoder> CHECKPOINT_CODER = new CheckpointCoder<>();
+
+ /**
+ * Maximum number of messages per pull.
+ */
+ private static final int PULL_BATCH_SIZE = 1000;
+
+ /**
+ * Maximum number of ack ids per ack or ack extension.
+ */
+ private static final int ACK_BATCH_SIZE = 2000;
+
+ /**
+ * Maximum number of messages in flight.
+ */
+ private static final int MAX_IN_FLIGHT = 20000;
+
+ /**
+ * Ack timeout for initial get.
+ */
+ private static final Duration ACK_TIMEOUT = Duration.standardSeconds(60);
+
+ /**
+ * Duration by which to extend acks when they are near timeout.
+ */
+ private static final Duration ACK_EXTENSION = Duration.standardSeconds(30);
+
+ /*
+ * How close we can get to a deadline before we need to extend it.
+ */
+ private static final Duration ACK_SAFETY = Duration.standardSeconds(15);
+
+ /**
+ * How close we can get to a deadline before we need to consider it passed.
+ */
+ private static final Duration ACK_TOO_LATE = Duration.standardSeconds(5);
+
+ /**
+ * Period of samples to determine watermark and other stats.
+ */
+ private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
+
+ /**
+ * Period of updates to determine watermark and other stats.
+ */
+ private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
+
+ /**
+ * How frequently to log stats.
+ */
+ private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
+
+ /**
+ * Minimum number of unread messages required before considering updating watermark.
+ */
+ private static final int MIN_WATERMARK_MESSAGES = 10;
+
+ /**
+ * Minimum number of SAMPLE_UPDATE periods over which unread messages shoud be spread
+ * before considering updating watermark.
+ */
+ private static final int MIN_WATERMARK_SPREAD = 2;
+
+ /**
+ * Additional sharding so that we can hide read message latency.
+ */
+ private static final int SCALE_OUT = 4;
+
+ private static final Combine.BinaryCombineLongFn MIN =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return Math.min(left, right);
+ }
+
+ @Override
+ public long identity() {
+ return Long.MAX_VALUE;
+ }
+ };
+
+ private static final Combine.BinaryCombineLongFn MAX =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return Math.max(left, right);
+ }
+
+ @Override
+ public long identity() {
+ return Long.MIN_VALUE;
+ }
+ };
+
+ private static final Combine.BinaryCombineLongFn SUM =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return left + right;
+ }
+
+ @Override
+ public long identity() {
+ return 0;
+ }
+ };
+
+
+ // ================================================================================
+ // Checkpoint
+ // ================================================================================
+
+ /**
+ * Which messages have been durably committed and thus can now be acked.
+ * Which messages have been read but not yet committed, in which case they should be nacked if
+ * we need to restore.
+ */
+ public static class Checkpoint implements UnboundedSource.CheckpointMark {
+ /**
+ * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting.
+ * If the checkpoint is for restoring: initially {@literal null}, then explicitly set.
+ * Not persisted in durable checkpoint.
+ * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called
+ * the 'true' active reader may have changed.
+ */
+ @Nullable
+ private Reader reader;
+
+ /**
+ * If the checkpoint is for persisting: The ack ids of messages which have been passed
+ * downstream since the last checkpoint.
+ * If the checkpoint is for restoring: {@literal null}.
+ * Not persisted in durable checkpoint.
+ */
+ @Nullable
+ private final List safeToAckIds;
+
+ /**
+ * If the checkpoint is for persisting: The ack ids of messages which have been received
+ * from pub/sub but not yet passed downstream at the time of the snapshot.
+ * If the checkpoint is for restoring: Same, but recovered from durable storage.
+ */
+ private final List notYetReadIds;
+
+ public Checkpoint(
+ @Nullable Reader reader, @Nullable List safeToAckIds,
+ List notYetReadIds) {
+ this.reader = reader;
+ this.safeToAckIds = safeToAckIds;
+ this.notYetReadIds = notYetReadIds;
+ }
+
+ public void setReader(Reader reader) {
+ Preconditions.checkState(this.reader == null, "Cannot setReader on persisting checkpoint");
+ this.reader = reader;
+ }
+
+ /**
+ * BLOCKING
+ * All messages which have been passed downstream have now been durably committed.
+ * We can ack them upstream.
+ */
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ Preconditions.checkState(reader != null && safeToAckIds != null,
+ "Cannot finalize a restored checkpoint");
+ // Even if the 'true' active reader has changed since the checkpoint was taken we are
+ // fine:
+ // - The underlying pub/sub topic will not have changed, so the following acks will still
+ // go to the right place.
+ // - We'll delete the ack ids from the readers in-flight state, but that only effects
+ // flow control and stats, neither of which are relevent anymore.
+ try {
+ int n = safeToAckIds.size();
+ List batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE));
+ for (String ackId : safeToAckIds) {
+ batchSafeToAckIds.add(ackId);
+ if (batchSafeToAckIds.size() >= ACK_BATCH_SIZE) {
+ reader.ackBatch(batchSafeToAckIds);
+ n -= batchSafeToAckIds.size();
+ // CAUTION: Don't reuse the same list since ackBatch holds on to it.
+ batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE));
+ }
+ }
+ if (!batchSafeToAckIds.isEmpty()) {
+ reader.ackBatch(batchSafeToAckIds);
+ }
+ } finally {
+ Preconditions.checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0);
+ }
+ }
+
+ /**
+ * BLOCKING
+ * Nack all messages which have been read from pub/sub but not passed downstream.
+ * This way pub/sub will send them again promptly.
+ */
+ public void nackAll() throws IOException {
+ Preconditions.checkState(reader != null, "Reader was not set");
+ List batchYetToAckIds =
+ new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE));
+ for (String ackId : notYetReadIds) {
+ batchYetToAckIds.add(ackId);
+ if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) {
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
+ batchYetToAckIds.clear();
+ }
+ }
+ if (!batchYetToAckIds.isEmpty()) {
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
+ }
+ }
+ }
+
+ /**
+ * The coder for our checkpoints.
+ */
+ private static class CheckpointCoder extends AtomicCoder> {
+ private static final Coder> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
+
+ @Override
+ public void encode(Checkpoint value, OutputStream outStream, Context context)
+ throws IOException {
+ LIST_CODER.encode(value.notYetReadIds, outStream, context);
+ }
+
+ @Override
+ public Checkpoint decode(InputStream inStream, Context context) throws IOException {
+ List notYetReadIds = LIST_CODER.decode(inStream, context);
+ return new Checkpoint<>(null, null, notYetReadIds);
+ }
+ }
+
+ // ================================================================================
+ // Reader
+ // ================================================================================
+
+ /**
+ * A reader which keeps track of which messages have been received from pub/sub
+ * but not yet consumed downstream and/or acked back to pub/sub.
+ */
+ private static class Reader extends UnboundedSource.UnboundedReader {
+ /**
+ * For access to topic and checkpointCoder.
+ */
+ private final Source outer;
+
+ /**
+ * Client on which to talk to pub/sub. Null if closed.
+ */
+ @Nullable
+ private PubsubGrpcClient pubsubClient;
+
+ /**
+ * Ack ids of messages we have delivered downstream but not yet acked.
+ */
+ private List safeToAckIds;
+
+ /**
+ * Messages we have received from pub/sub and not yet delivered downstream.
+ * We preserve their order.
+ */
+ private final Queue notYetRead;
+
+ /**
+ * Map from ack ids of messages we have received from pub/sub but not yet acked to their
+ * current deadline. Ordered from earliest to latest deadline.
+ */
+ private final LinkedHashMap inFlight;
+
+ /**
+ * Batches of successfully acked ids which need to be pruned from the above.
+ * CAUTION: Accessed by both reader and checkpointing threads.
+ */
+ private final Queue> ackedIds;
+
+ /**
+ * Byte size of undecoded elements in {@link #notYetRead}.
+ */
+ private long notYetReadBytes;
+
+ /**
+ * Bucketed map from received time (as system time, ms since epoch) to message
+ * timestamps (mssince epoch) of all received but not-yet read messages.
+ * Used to estimate watermark.
+ */
+ private BucketingFunction minUnreadTimestampMsSinceEpoch;
+
+ /**
+ * Minimum of timestamps (ms since epoch) of all recently read messages.
+ * Used to estimate watermark.
+ */
+ private MovingFunction minReadTimestampMsSinceEpoch;
+
+ /**
+ * System time (ms since epoch) we last received a message from pub/sub, or -1 if
+ * not yet received any messages.
+ */
+ private long lastReceivedMsSinceEpoch;
+
+ /**
+ * The last reported watermark (ms since epoch), or beginning of time if none yet reported.
+ */
+ private long lastWatermarkMsSinceEpoch;
+
+ /**
+ * The current message, or {@literal null} if none.
+ */
+ @Nullable
+ private PubsubClient.IncomingMessage current;
+
+ /**
+ * Stats only: System time (ms since epoch) we last logs stats, or -1 if never.
+ */
+ private long lastLogTimestampMsSinceEpoch;
+
+ /**
+ * Stats only: Total number of messages received.
+ */
+ private long numReceived;
+
+ /**
+ * Stats only: Number of messages which have recently been received.
+ */
+ private MovingFunction numReceivedRecently;
+
+ /**
+ * Stats only: Number of messages which have recently had their deadline extended.
+ */
+ private MovingFunction numExtendedDeadlines;
+
+ /**
+ * Stats only: Number of messages which have recenttly had their deadline extended even
+ * though it may be too late to do so.
+ */
+ private MovingFunction numLateDeadlines;
+
+
+ /**
+ * Stats only: Number of messages which have recently been acked.
+ */
+ private MovingFunction numAcked;
+
+ /**
+ * Stats only: Number of messages which have recently been nacked.
+ */
+ private MovingFunction numNacked;
+
+ /**
+ * Stats only: Number of message bytes which have recently been read by downstream consumer.
+ */
+ private MovingFunction numReadBytes;
+
+ /**
+ * Stats only: Minimum of timestamp (ms since epoch) of all recently received messages.
+ * Used to estimate timestamp skew. Does not contribute to watermark estimator.
+ */
+ private MovingFunction minReceivedTimestampMsSinceEpoch;
+
+ /**
+ * Stats only: Maximum of timestamp (ms since epoch) of all recently received messages.
+ * Used to estimate timestamp skew.
+ */
+ private MovingFunction maxReceivedTimestampMsSinceEpoch;
+
+ /**
+ * Stats only: Minimum of recent estimated watermarks (ms since epoch).
+ */
+ private MovingFunction minWatermarkMsSinceEpoch;
+
+ /**
+ * Stats ony: Maximum of recent estimated watermarks (ms since epoch).
+ */
+ private MovingFunction maxWatermarkMsSinceEpoch;
+
+ /**
+ * Stats only: Number of messages with timestamps strictly behind the estimated watermark
+ * at the time they are received. These may be considered 'late' by downstream computations.
+ */
+ private MovingFunction numLateMessages;
+
+ /**
+ * Stats only: Current number of checkpoints in flight.
+ * CAUTION: Accessed by both checkpointing and reader threads.
+ */
+ private AtomicInteger numInFlightCheckpoints;
+
+ /**
+ * Stats only: Maximum number of checkpoints in flight at any time.
+ */
+ private int maxInFlightCheckpoints;
+
+ private static MovingFunction newFun(Combine.BinaryCombineLongFn function) {
+ return new MovingFunction(SAMPLE_PERIOD.getMillis(),
+ SAMPLE_UPDATE.getMillis(),
+ MIN_WATERMARK_SPREAD,
+ MIN_WATERMARK_MESSAGES,
+ function);
+ }
+
+ /**
+ * Construct a reader.
+ */
+ public Reader(GcpOptions options, Source outer) throws IOException,
+ GeneralSecurityException {
+ this.outer = outer;
+ pubsubClient = PubsubGrpcClient.newClient(outer.outer.timestampLabel,
+ outer.outer.idLabel,
+ options);
+ safeToAckIds = new ArrayList<>();
+ notYetRead = new ArrayDeque<>();
+ inFlight = new LinkedHashMap<>();
+ ackedIds = new ConcurrentLinkedQueue<>();
+ notYetReadBytes = 0;
+ minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(),
+ MIN_WATERMARK_SPREAD,
+ MIN_WATERMARK_MESSAGES,
+ MIN);
+ minReadTimestampMsSinceEpoch = newFun(MIN);
+ lastReceivedMsSinceEpoch = -1;
+ lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ current = null;
+ lastLogTimestampMsSinceEpoch = -1;
+ numReceived = 0L;
+ numReceivedRecently = newFun(SUM);
+ numExtendedDeadlines = newFun(SUM);
+ numLateDeadlines = newFun(SUM);
+ numAcked = newFun(SUM);
+ numNacked = newFun(SUM);
+ numReadBytes = newFun(SUM);
+ minReceivedTimestampMsSinceEpoch = newFun(MIN);
+ maxReceivedTimestampMsSinceEpoch = newFun(MAX);
+ minWatermarkMsSinceEpoch = newFun(MIN);
+ maxWatermarkMsSinceEpoch = newFun(MAX);
+ numLateMessages = newFun(SUM);
+ numInFlightCheckpoints = new AtomicInteger();
+ maxInFlightCheckpoints = 0;
+ }
+
+ /**
+ * BLOCKING
+ * Ack receipt of messages from pub/sub with the given {@code ackIds}.
+ * CAUTION: May be invoked from a separate checkpointing thread.
+ * CAUTION: Retains {@code ackIds}.
+ */
+ public void ackBatch(List ackIds) throws IOException {
+ pubsubClient.acknowledge(outer.outer.subscription, ackIds);
+ ackedIds.add(ackIds);
+ }
+
+ /**
+ * BLOCKING
+ * 'Nack' (ie request deadline extension of 0) receipt of messages from pub/sub
+ * with the given {@code ockIds}. Does not retain {@code ackIds}.
+ */
+ public void nackBatch(long nowMsSinceEpoch, List ackIds) throws IOException {
+ pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 0);
+ numNacked.add(nowMsSinceEpoch, ackIds.size());
+ }
+
+ /**
+ * BLOCKING
+ * Extend the processing deadline for messages from pub/sub with the given {@code ackIds}.
+ * Does not retain {@code ackIds}.
+ */
+ private void extendBatch(long nowMsSinceEpoch, List ackIds) throws IOException {
+ pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds,
+ (int) ACK_EXTENSION.getStandardSeconds());
+ numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
+ }
+
+ /**
+ * Messages which have been acked (via the checkpoint finalize) are no longer in flight.
+ * This is only used for flow control and stats.
+ */
+ private void retire() {
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ while (true) {
+ List ackIds = ackedIds.poll();
+ if (ackIds == null) {
+ return;
+ }
+ numAcked.add(nowMsSinceEpoch, ackIds.size());
+ for (String ackId : ackIds) {
+ inFlight.remove(ackId);
+ }
+ }
+ }
+
+ /**
+ * BLOCKING
+ * Extend deadline for all messages which need it.
+ *
+ * CAUTION: If extensions can't keep up with wallclock then we'll never return.
+ */
+ private void extend() throws IOException {
+ while (true) {
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ List> toBeExtended = new ArrayList<>();
+ for (Map.Entry entry : inFlight.entrySet()) {
+ long deadlineMsSinceEpoch = entry.getValue().getMillis();
+ if (deadlineMsSinceEpoch < nowMsSinceEpoch + ACK_TOO_LATE.getMillis()) {
+ // This message may have already expired, in which case it will (eventually) be
+ // made available on a future pull request.
+ // If the message ends up being commited then a future resend will be ignored
+ // downsteam and acked as usual.
+ numLateDeadlines.add(nowMsSinceEpoch, 1);
+ }
+ if (deadlineMsSinceEpoch > nowMsSinceEpoch + ACK_SAFETY.getMillis()) {
+ // No later messages need extending.
+ break;
+ }
+ toBeExtended.add(entry);
+ if (toBeExtended.size() >= ACK_BATCH_SIZE) {
+ // Enough for one batch.
+ break;
+ }
+ }
+ if (toBeExtended.isEmpty()) {
+ // No messages need to be extended.
+ return;
+ }
+ List extensionAckIds = new ArrayList<>(toBeExtended.size());
+ long newDeadlineMsSinceEpoch = nowMsSinceEpoch + ACK_EXTENSION.getMillis();
+ for (Map.Entry entry : toBeExtended) {
+ inFlight.remove(entry.getKey());
+ inFlight.put(entry.getKey(), new Instant(newDeadlineMsSinceEpoch));
+ extensionAckIds.add(entry.getKey());
+ }
+ // BLOCKs until extended.
+ extendBatch(nowMsSinceEpoch, extensionAckIds);
+ }
+ }
+
+ /**
+ * BLOCKING
+ * Fetch another batch of messages from pub/sub.
+ */
+ private void pull() throws IOException {
+ if (inFlight.size() >= MAX_IN_FLIGHT) {
+ // Wait for checkpoint to be finalized before pulling anymore.
+ // There may be lag while checkpoints are persisted and the finalizeCheckpoint method
+ // is invoked. By limiting the in-flight messages we can ensure we don't end up consuming
+ // messages faster than we can checkpoint them.
+ return;
+ }
+
+ long requestTimeMsSinceEpoch = System.currentTimeMillis();
+ long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + ACK_TIMEOUT.getMillis();
+
+ // Pull the next batch.
+ // BLOCKs until received.
+ Collection receivedMessages =
+ pubsubClient.pull(requestTimeMsSinceEpoch,
+ outer.outer.subscription,
+ PULL_BATCH_SIZE);
+ if (receivedMessages.isEmpty()) {
+ // Nothing available yet. Try again later.
+ return;
+ }
+
+ lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch;
+
+ // Capture the received messages.
+ for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
+ notYetRead.add(incomingMessage);
+ notYetReadBytes += incomingMessage.elementBytes.length;
+ inFlight.put(incomingMessage.ackId, new Instant(deadlineMsSinceEpoch));
+ numReceived++;
+ numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
+ minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+ incomingMessage.timestampMsSinceEpoch);
+ maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+ incomingMessage.timestampMsSinceEpoch);
+ minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+ incomingMessage.timestampMsSinceEpoch);
+ }
+ }
+
+ /**
+ * Log stats if time to do so.
+ */
+ private void stats() {
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ if (lastLogTimestampMsSinceEpoch < 0) {
+ lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
+ return;
+ }
+ long deltaMs = nowMsSinceEpoch - lastLogTimestampMsSinceEpoch;
+ if (deltaMs < LOG_PERIOD.getMillis()) {
+ return;
+ }
+
+ String messageSkew = "unknown";
+ long minTimestamp = minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+ long maxTimestamp = maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+ if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) {
+ messageSkew = (maxTimestamp - minTimestamp) + "ms";
+ }
+
+ String watermarkSkew = "unknown";
+ long minWatermark = minWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
+ long maxWatermark = maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
+ if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) {
+ watermarkSkew = (maxWatermark - minWatermark) + "ms";
+ }
+
+ LOG.warn("Pubsub {} has "
+ + "{} received messages, "
+ + "{} current unread messages, "
+ + "{} current unread bytes, "
+ + "{} current in-flight msgs, "
+ + "{} current in-flight checkpoints, "
+ + "{} max in-flight checkpoints, "
+ + "{}B/s recent read, "
+ + "{} recent received, "
+ + "{} recent extended, "
+ + "{} recent late extended, "
+ + "{} recent acked, "
+ + "{} recent nacked, "
+ + "{} recent message timestamp skew, "
+ + "{} recent watermark skew, "
+ + "{} recent late messages, "
+ + "{} last reported watermark",
+ outer.outer.subscription,
+ numReceived,
+ notYetRead.size(),
+ notYetReadBytes,
+ inFlight.size(),
+ numInFlightCheckpoints.get(),
+ maxInFlightCheckpoints,
+ numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L),
+ numReceivedRecently.get(nowMsSinceEpoch),
+ numExtendedDeadlines.get(nowMsSinceEpoch),
+ numLateDeadlines.get(nowMsSinceEpoch),
+ numAcked.get(nowMsSinceEpoch),
+ numNacked.get(nowMsSinceEpoch),
+ messageSkew,
+ watermarkSkew,
+ numLateMessages.get(nowMsSinceEpoch),
+ new Instant(lastWatermarkMsSinceEpoch));
+
+ lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ /**
+ * BLOCKING
+ * Return {@literal true} if a pub/sub messaage is available, {@literal false} if
+ * none is available at this time or we are over-subscribed. May BLOCK while extending
+ * acks or fetching available messages. Will not block waiting for messages.
+ */
+ @Override
+ public boolean advance() throws IOException {
+ // Emit stats.
+ stats();
+
+ if (current != null) {
+ // Current is consumed. It can no longer contribute to holding back the watermark.
+ minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch);
+ current = null;
+ }
+
+ // Retire state associated with acked messages.
+ retire();
+
+ // Extend all pressing deadlines.
+ // Will BLOCK until done.
+ // If the system is pulling messages only to let them sit in a downsteam queue then
+ // this will have the effect of slowing down the pull rate.
+ // However, if the system is genuinely taking longer to process each message then
+ // the work to extend acks would be better done in the background.
+ extend();
+
+ if (notYetRead.isEmpty()) {
+ // Pull another batch.
+ // Will BLOCK until fetch returns, but will not block until a message is available.
+ pull();
+ }
+
+ // Take one message from queue.
+ current = notYetRead.poll();
+ if (current == null) {
+ // Try again later.
+ return false;
+ }
+ notYetReadBytes -= current.elementBytes.length;
+ Preconditions.checkState(notYetReadBytes >= 0);
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
+ minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch);
+ if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
+ numLateMessages.add(nowMsSinceEpoch, 1L);
+ }
+
+ // Current message will be persisted by the next checkpoint so it is now safe to ack.
+ safeToAckIds.add(current.ackId);
+ return true;
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ try {
+ return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
+ } catch (CoderException e) {
+ throw new RuntimeException("Unable to decode element from pub/sub message: ", e);
+ }
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return new Instant(current.timestampMsSinceEpoch);
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return current.recordId;
+ }
+
+ @Override
+ public void close() throws IOException {
+ pubsubClient.close();
+ pubsubClient = null;
+ }
+
+ @Override
+ public Source getCurrentSource() {
+ return outer;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ // NOTE: We'll allow the watermark to go backwards. The underlying runner is responsible
+ // for aggregating all reported watermarks and ensuring the aggregate is latched.
+ // If we attempt to latch locally then it is possible a temporary starvation of one reader
+ // could cause its estimated watermark to fast forward to current system time. Then when
+ // the reader resumes its watermark would be unable to resume tracking.
+ // By letting the underlying runner latch we avoid any problems due to localized starvation.
+ long nowMsSinceEpoch = System.currentTimeMillis();
+ long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+ long unreadMin = minUnreadTimestampMsSinceEpoch.get();
+ if (readMin == Long.MAX_VALUE &&
+ unreadMin == Long.MAX_VALUE &&
+ lastReceivedMsSinceEpoch >= 0 &&
+ nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
+ // We don't currently have any unread messages pending, we have not had any messages
+ // read for a while, and we have not received any new messages from pub/sub for a while.
+ // Advance watermark to current time.
+ // TODO: Estimate a timestamp lag.
+ lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
+ } else if (minReadTimestampMsSinceEpoch.isSignificant() ||
+ minUnreadTimestampMsSinceEpoch.isSignificant()) {
+ // Take minimum of the timestamps in all unread messages and recently read messages.
+ lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
+ }
+ // else: We're not confident enough to estimate a new watermark. Stick with the old one.
+ minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
+ maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
+ return new Instant(lastWatermarkMsSinceEpoch);
+ }
+
+ @Override
+ public UnboundedSource.CheckpointMark getCheckpointMark() {
+ int cur = numInFlightCheckpoints.incrementAndGet();
+ maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur);
+ // The checkpoint will either be finalized or we'll rollback to an earlier
+ // checkpoint. Thus we can hand off these ack ids to the checkpoint.
+ List snapshotSafeToAckIds = safeToAckIds;
+ safeToAckIds = new ArrayList<>();
+ List snapshotNotYetReadIds = new ArrayList<>(notYetRead.size());
+ for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
+ snapshotNotYetReadIds.add(incomingMessage.ackId);
+ }
+ return new Checkpoint<>(this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+ }
+
+ @Override
+ public long getSplitBacklogBytes() {
+ return notYetReadBytes;
+ }
+ }
+
+ // ================================================================================
+ // Source
+ // ================================================================================
+
+ private static class Source extends UnboundedSource> {
+ PubsubUnboundedSource outer;
+
+ public Source(PubsubUnboundedSource outer) {
+ this.outer = outer;
+ }
+
+ @Override
+ public List> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ List> result = new ArrayList<>(desiredNumSplits);
+ for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
+ // Since the source is immutable and pub/sub automatically shards we simply
+ // replicate ourselves the requested number of times
+ result.add(this);
+ }
+ return result;
+ }
+
+ @Override
+ public UnboundedReader createReader(
+ PipelineOptions options, @Nullable Checkpoint checkpoint) {
+ PubsubUnboundedSource.Reader reader;
+ try {
+ reader =
+ new PubsubUnboundedSource.Reader<>(options.as(GcpOptions.class), this);
+ } catch (GeneralSecurityException | IOException e) {
+ throw new RuntimeException("Unable to subscribe to " + outer.subscription + ": ", e);
+ }
+ if (checkpoint != null) {
+ // Nack all messages we may have lost.
+ try {
+ checkpoint.setReader(reader);
+ // Will BLOCK until nacked.
+ checkpoint.nackAll();
+ } catch (IOException e) {
+ LOG.error("Pubsub {} cannot have {} lost messages nacked, ignoring: {}",
+ outer.subscription, checkpoint.notYetReadIds.size(), e);
+ }
+ }
+ return reader;
+ }
+
+ @Nullable
+ @Override
+ public Coder> getCheckpointMarkCoder() {
+ @SuppressWarnings("unchecked") CheckpointCoder typedCoder =
+ (CheckpointCoder) CHECKPOINT_CODER;
+ return typedCoder;
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return outer.elementCoder;
+ }
+
+ @Override
+ public void validate() {
+ // Nothing to validate.
+ }
+
+ @Override
+ public boolean requiresDeduping() {
+ // We cannot prevent re-offering already read messages after a restore from checkpoint.
+ return true;
+ }
+ }
+
+ // ================================================================================
+ // StatsFn
+ // ================================================================================
+
+ private static class StatsFn extends DoFn {
+ private final Aggregator elementCounter =
+ createAggregator("elements", new Sum.SumLongFn());
+
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ elementCounter.addValue(1L);
+ c.output(c.element());
+ }
+ }
+
+ // ================================================================================
+ // PubsubUnboundedSource
+ // ================================================================================
+
+ /**
+ * Subscription to read from.
+ */
+ private final String subscription;
+
+ /**
+ * Coder for elements. Elements are effectively double-encoded: first to a byte array
+ * using this checkpointCoder, then to a base-64 string to conform to pub/sub's payload
+ * conventions.
+ */
+ private final Coder elementCoder;
+
+ /**
+ * Pub/sub metadata field holding timestamp of each element, or {@literal null} if should use
+ * pub/sub message publish timestamp instead.
+ */
+ @Nullable
+ private final String timestampLabel;
+
+ /**
+ * Pub/sub metadata field holding id for each element, or {@literal null} if need to generate
+ * a unique id ourselves.
+ */
+ @Nullable
+ private final String idLabel;
+
+ /**
+ * Construct an unbounded source to consume from the pub/sub {@code subscription}.
+ */
+ public PubsubUnboundedSource(
+ String subscription, Coder elementCoder, @Nullable String timestampLabel,
+ @Nullable String idLabel) {
+ this.subscription = Preconditions.checkNotNull(subscription);
+ this.elementCoder = Preconditions.checkNotNull(elementCoder);
+ this.timestampLabel = timestampLabel;
+ this.idLabel = idLabel;
+ }
+
+ @Override
+ public PCollection apply(PBegin input) {
+ String label = "PubsubSource(" + subscription.replaceFirst(".*/", "") + ")";
+ return input.getPipeline().begin()
+ .apply(Read.from(new Source(this)))
+ .apply(ParDo.named(label).of(new StatsFn()));
+ }
+}
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
index 67b7f1067adc..096888a5f110 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
@@ -110,8 +110,21 @@ public interface CheckpointMark {
*
For example, this could be sending acknowledgement requests to an external
* data source such as Pub/Sub.
*
- *
This may be called from any thread, potentially at the same time as calls to the
- * {@code UnboundedReader} that created it.
+ *
Note that:
+ *
+ *
This finalize method may be called from any thread, potentially at the same time as
+ * calls to the {@link UnboundedReader} it was created from.
+ *
Checkpoints will be finalized in the same order they are created from the
+ * {@link UnboundedReader}.
+ *
It is possible for multiple checkpoints created from the same {@link UnboundedReader}
+ * to be 'in-flight' (ie not yet finalized) simultaneously.
+ *
If this call throws an exception then the entire checkpoint will be abandoned and the
+ * reader restarted from an earlier, successfully-finalized checkpoint.
+ *
The system ensures that if a checkpoint fails for any reason then no later checkpoint
+ * is allowed to be finalized without the reader first being restarted.
+ *
It is not safe to assume the {@link UnboundedReader} from which this checkpoint was
+ * created still exists at the time this method is called.
+ *
*/
void finalizeCheckpoint() throws IOException;
}
@@ -205,10 +218,10 @@ public byte[] getCurrentRecordId() throws NoSuchElementException {
/**
* Returns a {@link CheckpointMark} representing the progress of this {@code UnboundedReader}.
*
- *
The elements read up until this is called will be processed together as a bundle. Once
- * the result of this processing has been durably committed,
- * {@link CheckpointMark#finalizeCheckpoint} will be called on the {@link CheckpointMark}
- * object.
+ *
All elements read up until this method is called will be processed together as a bundle.
+ * Once the result of processing those elements and the returned checkpoint have been durably
+ * committed, {@link CheckpointMark#finalizeCheckpoint} will be called on the
+ * returned {@link CheckpointMark} object.
*
*
The returned object should not be modified.
*
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BucketingFunction.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BucketingFunction.java
new file mode 100644
index 000000000000..56bec8db6fdb
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BucketingFunction.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Keep track of the minimum/maximum/sum of a set of timestamped long values.
+ * For efficiency, bucket values by their timestamp.
+ */
+public class BucketingFunction {
+ private static class Bucket {
+ private int numSamples;
+ private long combinedValue;
+
+ public Bucket(BucketingFunction outer) {
+ numSamples = 0;
+ combinedValue = outer.function.identity();
+ }
+
+ public void add(BucketingFunction outer, long value) {
+ combinedValue = outer.function.apply(combinedValue, value);
+ numSamples++;
+ }
+
+ public boolean remove() {
+ numSamples--;
+ Preconditions.checkState(numSamples >= 0);
+ return numSamples == 0;
+ }
+
+ public long get() {
+ return combinedValue;
+ }
+ }
+
+ /**
+ * How large a time interval to fit within each bucket.
+ */
+ private final long bucketWidthMs;
+
+ /**
+ * How many buckets are considered 'significant'?
+ */
+ private final int numSignificantBuckets;
+
+ /**
+ * How many samples are considered 'significant'?
+ */
+ private final int numSignificantSamples;
+
+ /**
+ * Function for combining sample values.
+ */
+ private final Combine.BinaryCombineLongFn function;
+
+ /**
+ * Active buckets.
+ */
+ private final Map buckets;
+
+ public BucketingFunction(
+ long bucketWidthMs,
+ int numSignificantBuckets,
+ int numSignificantSamples,
+ Combine.BinaryCombineLongFn function) {
+ this.bucketWidthMs = bucketWidthMs;
+ this.numSignificantBuckets = numSignificantBuckets;
+ this.numSignificantSamples = numSignificantSamples;
+ this.function = function;
+ this.buckets = new HashMap<>();
+ }
+
+ /**
+ * Which bucket key corresponds to {@code timeMsSinceEpoch}.
+ */
+ private long key(long timeMsSinceEpoch) {
+ return timeMsSinceEpoch - (timeMsSinceEpoch % bucketWidthMs);
+ }
+
+ /**
+ * Add one sample of {@code value} (to bucket) at {@code timeMsSinceEpoch}.
+ */
+ public void add(long timeMsSinceEpoch, long value) {
+ long key = key(timeMsSinceEpoch);
+ Bucket bucket = buckets.get(key);
+ if (bucket == null) {
+ bucket = new Bucket(this);
+ buckets.put(key, bucket);
+ }
+ bucket.add(this, value);
+ }
+
+ /**
+ * Remove one sample (from bucket) at {@code timeMsSinceEpoch}.
+ */
+ public void remove(long timeMsSinceEpoch) {
+ long key = key(timeMsSinceEpoch);
+ Bucket bucket = buckets.get(key);
+ if (bucket == null) {
+ return;
+ }
+ if (bucket.remove()) {
+ buckets.remove(key);
+ }
+ }
+
+ /**
+ * Return the (bucketized) combined value of all samples.
+ */
+ public long get() {
+ long result = function.identity();
+ for (Bucket bucket : buckets.values()) {
+ result = function.apply(result, bucket.get());
+ }
+ return result;
+ }
+
+ /**
+ * Is the current result 'significant'? Ie is it drawn from enough buckets
+ * or from enough samples?
+ */
+ public boolean isSignificant() {
+ if (buckets.size() >= numSignificantBuckets) {
+ return true;
+ }
+ int totalSamples = 0;
+ for (Bucket bucket : buckets.values()) {
+ totalSamples += bucket.numSamples;
+ }
+ return totalSamples >= numSignificantSamples;
+ }
+}
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MovingFunction.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MovingFunction.java
new file mode 100644
index 000000000000..22827e62512a
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MovingFunction.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+
+/**
+ * Keep track of the moving minimum/maximum/sum of sampled long values. The minimum/maximum/sum
+ * is over at most the last {@link #samplePeriodMs}, and is updated every
+ * {@link #sampleUpdateMs}.
+ */
+public class MovingFunction {
+ /**
+ * How far back to retain samples, in ms.
+ */
+ private final long samplePeriodMs;
+
+ /**
+ * How frequently to update the moving function, in ms.
+ */
+ private final long sampleUpdateMs;
+
+ /**
+ * How many buckets are considered 'significant'?
+ */
+ private final int numSignificantBuckets;
+
+ /**
+ * How many samples are considered 'significant'?
+ */
+ private final int numSignificantSamples;
+
+ /**
+ * Function for combining sample values.
+ */
+ private final Combine.BinaryCombineLongFn function;
+
+ /**
+ * Minimum/maximum/sum of all values per bucket.
+ */
+ private final long[] buckets;
+
+ /**
+ * How many samples have been added to each bucket.
+ */
+ private final int[] numSamples;
+
+ /**
+ * Time of start of current bucket.
+ */
+ private long currentMsSinceEpoch;
+
+ /**
+ * Index of bucket corresponding to above timestamp, or -1 if no entries.
+ */
+ private int currentIndex;
+
+ public MovingFunction(long samplePeriodMs, long sampleUpdateMs,
+ int numSignificantBuckets, int numSignificantSamples,
+ Combine.BinaryCombineLongFn function) {
+ this.samplePeriodMs = samplePeriodMs;
+ this.sampleUpdateMs = sampleUpdateMs;
+ this.numSignificantBuckets = numSignificantBuckets;
+ this.numSignificantSamples = numSignificantSamples;
+ this.function = function;
+ int n = (int) (samplePeriodMs / sampleUpdateMs);
+ buckets = new long[n];
+ Arrays.fill(buckets, function.identity());
+ numSamples = new int[n];
+ Arrays.fill(numSamples, 0);
+ currentMsSinceEpoch = -1;
+ currentIndex = -1;
+ }
+
+ /**
+ * Flush stale values.
+ */
+ private void flush(long nowMsSinceEpoch) {
+ Preconditions.checkArgument(nowMsSinceEpoch >= 0);
+ if (currentIndex < 0) {
+ currentMsSinceEpoch = nowMsSinceEpoch - (nowMsSinceEpoch % sampleUpdateMs);
+ currentIndex = 0;
+ }
+ Preconditions.checkArgument(nowMsSinceEpoch >= currentMsSinceEpoch);
+ int newBuckets =
+ Math.min((int) ((nowMsSinceEpoch - currentMsSinceEpoch) / sampleUpdateMs),
+ buckets.length);
+ while (newBuckets > 0) {
+ currentIndex = (currentIndex + 1) % buckets.length;
+ buckets[currentIndex] = function.identity();
+ numSamples[currentIndex] = 0;
+ newBuckets--;
+ currentMsSinceEpoch += sampleUpdateMs;
+ }
+ }
+
+ /**
+ * Add {@code value} at {@code nowMsSinceEpoch}.
+ */
+ public void add(long nowMsSinceEpoch, long value) {
+ flush(nowMsSinceEpoch);
+ buckets[currentIndex] = function.apply(buckets[currentIndex], value);
+ numSamples[currentIndex]++;
+ }
+
+ /**
+ * Return the minimum/maximum/sum of all retained values within {@link #samplePeriodMs}
+ * of {@code nowMsSinceEpoch}.
+ */
+ public long get(long nowMsSinceEpoch) {
+ flush(nowMsSinceEpoch);
+ long result = function.identity();
+ for (int i = 0; i < buckets.length; i++) {
+ result = function.apply(result, buckets[i]);
+ }
+ return result;
+ }
+
+ /**
+ * Is the current result 'significant'? Ie is it drawn from enough buckets
+ * or from enough samples?
+ */
+ public boolean isSignificant() {
+ int totalSamples = 0;
+ int activeBuckets = 0;
+ for (int i = 0; i < buckets.length; i++) {
+ totalSamples += numSamples[i];
+ if (numSamples[i] > 0) {
+ activeBuckets++;
+ }
+ }
+ return activeBuckets >= numSignificantBuckets || totalSamples >= numSignificantSamples;
+ }
+}
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BucketingFunctionTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BucketingFunctionTest.java
new file mode 100644
index 000000000000..f5e52e059ec1
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BucketingFunctionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link BucketingFunction}.
+ */
+@RunWith(JUnit4.class)
+public class BucketingFunctionTest {
+
+ private static final long BUCKET_WIDTH = 10;
+ private static final int SIGNIFICANT_BUCKETS = 2;
+ private static final int SIGNIFICANT_SAMPLES = 10;
+
+ private static final Combine.BinaryCombineLongFn SUM =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return left + right;
+ }
+
+ @Override
+ public long identity() {
+ return 0;
+ }
+ };
+
+ private BucketingFunction newFunc() {
+ return new
+ BucketingFunction(BUCKET_WIDTH, SIGNIFICANT_BUCKETS,
+ SIGNIFICANT_SAMPLES, SUM);
+ }
+
+ @Test
+ public void significantSamples() {
+ BucketingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) {
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ }
+ f.add(0, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void significantBuckets() {
+ BucketingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ f.add(BUCKET_WIDTH, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void sum() {
+ BucketingFunction f = newFunc();
+ for (int i = 0; i < 100; i++) {
+ f.add(i, i);
+ assertEquals(((i + 1) * i) / 2, f.get());
+ }
+ }
+
+ @Test
+ public void movingSum() {
+ BucketingFunction f = newFunc();
+ int lost = 0;
+ for (int i = 0; i < 200; i++) {
+ f.add(i, 1);
+ if (i >= 100) {
+ f.remove(i - 100);
+ if (i % BUCKET_WIDTH == BUCKET_WIDTH - 1) {
+ lost += BUCKET_WIDTH;
+ }
+ }
+ assertEquals(i + 1 - lost, f.get());
+ }
+ }
+}
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MovingFunctionTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MovingFunctionTest.java
new file mode 100644
index 000000000000..2ca8b7cc7c6d
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/MovingFunctionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link MovingFunction}.
+ */
+@RunWith(JUnit4.class)
+public class MovingFunctionTest {
+
+ private static final long SAMPLE_PERIOD = 100;
+ private static final long SAMPLE_UPDATE = 10;
+ private static final int SIGNIFICANT_BUCKETS = 2;
+ private static final int SIGNIFICANT_SAMPLES = 10;
+
+ private static final Combine.BinaryCombineLongFn SUM =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long apply(long left, long right) {
+ return left + right;
+ }
+
+ @Override
+ public long identity() {
+ return 0;
+ }
+ };
+
+ private MovingFunction newFunc() {
+ return new
+ MovingFunction(SAMPLE_PERIOD, SAMPLE_UPDATE, SIGNIFICANT_BUCKETS,
+ SIGNIFICANT_SAMPLES, SUM);
+
+ }
+
+ @Test
+ public void significantSamples() {
+ MovingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) {
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ }
+ f.add(0, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void significantBuckets() {
+ MovingFunction f = newFunc();
+ assertFalse(f.isSignificant());
+ f.add(0, 0);
+ assertFalse(f.isSignificant());
+ f.add(SAMPLE_UPDATE, 0);
+ assertTrue(f.isSignificant());
+ }
+
+ @Test
+ public void sum() {
+ MovingFunction f = newFunc();
+ for (int i = 0; i < SAMPLE_PERIOD; i++) {
+ f.add(i, i);
+ assertEquals(((i + 1) * i) / 2, f.get(i));
+ }
+ }
+
+ @Test
+ public void movingSum() {
+ MovingFunction f = newFunc();
+ int lost = 0;
+ for (int i = 0; i < SAMPLE_PERIOD * 2; i++) {
+ f.add(i , 1);
+ if (i >= SAMPLE_PERIOD) {
+ if (i % SAMPLE_UPDATE == 0) {
+ lost += SAMPLE_UPDATE;
+ }
+ }
+ assertEquals(i + 1 - lost, f.get(i));
+ }
+ }
+
+ @Test
+ public void jumpingSum() {
+ MovingFunction f = newFunc();
+ f.add(0, 1);
+ f.add(SAMPLE_PERIOD - 1, 1);
+ assertEquals(2, f.get(SAMPLE_PERIOD - 1));
+ assertEquals(1, f.get(SAMPLE_PERIOD + 3 * SAMPLE_UPDATE));
+ assertEquals(0, f.get(SAMPLE_PERIOD * 2));
+ }
+}