diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java deleted file mode 100644 index f92b480de1d8..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubClient.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io; - -import static com.google.common.base.Preconditions.checkState; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; - -/** - * A helper interface for talking to Pubsub via an underlying transport. - */ -public interface PubsubClient extends AutoCloseable { - /** - * Path representing a cloud project id. - */ - class ProjectPath implements Serializable { - private final String path; - - public ProjectPath(String path) { - this.path = path; - } - - public String getPath() { - return path; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ProjectPath that = (ProjectPath) o; - - return path.equals(that.path); - - } - - @Override - public int hashCode() { - return path.hashCode(); - } - - @Override - public String toString() { - return path; - } - - public static ProjectPath fromId(String projectId) { - return new ProjectPath(String.format("projects/%s", projectId)); - } - } - - /** - * Path representing a Pubsub subscription. - */ - class SubscriptionPath implements Serializable { - private final String path; - - public SubscriptionPath(String path) { - this.path = path; - } - - public String getPath() { - return path; - } - - public String getV1Beta1Path() { - String[] splits = path.split("/"); - checkState(splits.length == 4); - return String.format("/subscriptions/%s/%s", splits[1], splits[3]); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SubscriptionPath that = (SubscriptionPath) o; - return path.equals(that.path); - } - - @Override - public int hashCode() { - return path.hashCode(); - } - - @Override - public String toString() { - return path; - } - - public static SubscriptionPath fromName(String projectId, String subscriptionName) { - return new SubscriptionPath(String.format("projects/%s/subscriptions/%s", - projectId, subscriptionName)); - } - } - - /** - * Path representing a Pubsub topic. - */ - class TopicPath implements Serializable { - private final String path; - - public TopicPath(String path) { - this.path = path; - } - - public String getPath() { - return path; - } - - public String getV1Beta1Path() { - String[] splits = path.split("/"); - checkState(splits.length == 4); - return String.format("/topics/%s/%s", splits[1], splits[3]); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TopicPath topicPath = (TopicPath) o; - return path.equals(topicPath.path); - } - - @Override - public int hashCode() { - return path.hashCode(); - } - - @Override - public String toString() { - return path; - } - - public static TopicPath fromName(String projectId, String topicName) { - return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); - } - } - - /** - * A message to be sent to Pubsub. - */ - class OutgoingMessage { - /** - * Underlying (encoded) element. - */ - public final byte[] elementBytes; - - /** - * Timestamp for element (ms since epoch). - */ - public final long timestampMsSinceEpoch; - - public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) { - this.elementBytes = elementBytes; - this.timestampMsSinceEpoch = timestampMsSinceEpoch; - } - } - - /** - * A message received from Pubsub. - */ - class IncomingMessage { - /** - * Underlying (encoded) element. - */ - public final byte[] elementBytes; - - /** - * Timestamp for element (ms since epoch). Either Pubsub's processing time, - * or the custom timestamp associated with the message. - */ - public final long timestampMsSinceEpoch; - - /** - * Timestamp (in system time) at which we requested the message (ms since epoch). - */ - public final long requestTimeMsSinceEpoch; - - /** - * Id to pass back to Pubsub to acknowledge receipt of this message. - */ - public final String ackId; - - /** - * Id to pass to the runner to distinguish this message from all others. - */ - public final byte[] recordId; - - public IncomingMessage( - byte[] elementBytes, - long timestampMsSinceEpoch, - long requestTimeMsSinceEpoch, - String ackId, - byte[] recordId) { - this.elementBytes = elementBytes; - this.timestampMsSinceEpoch = timestampMsSinceEpoch; - this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; - this.ackId = ackId; - this.recordId = recordId; - } - } - - /** - * Gracefully close the underlying transport. - */ - @Override - void close(); - - - /** - * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages - * published. - * - * @throws IOException - */ - int publish(TopicPath topic, Iterable outgoingMessages) throws IOException; - - /** - * Request the next batch of up to {@code batchSize} messages from {@code subscription}. - * Return the received messages, or empty collection if none were available. Does not - * wait for messages to arrive. Returned messages will record heir request time - * as {@code requestTimeMsSinceEpoch}. - * - * @throws IOException - */ - Collection pull( - long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize) - throws IOException; - - /** - * Acknowldege messages from {@code subscription} with {@code ackIds}. - * - * @throws IOException - */ - void acknowledge(SubscriptionPath subscription, Iterable ackIds) throws IOException; - - /** - * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to - * be {@code deadlineSeconds} from now. - * - * @throws IOException - */ - void modifyAckDeadline( - SubscriptionPath subscription, Iterable ackIds, - int deadlineSeconds) - throws IOException; - - /** - * Create {@code topic}. - * - * @throws IOException - */ - void createTopic(TopicPath topic) throws IOException; - - /* - * Delete {@code topic}. - * - * @throws IOException - */ - void deleteTopic(TopicPath topic) throws IOException; - - /** - * Return a list of topics for {@code project}. - * - * @throws IOException - */ - Collection listTopics(ProjectPath project) throws IOException; - - /** - * Create {@code subscription} to {@code topic}. - * - * @throws IOException - */ - void createSubscription( - TopicPath topic, SubscriptionPath subscription, - int ackDeadlineSeconds) throws IOException; - - /** - * Delete {@code subscription}. - * - * @throws IOException - */ - void deleteSubscription(SubscriptionPath subscription) throws IOException; - - /** - * Return a list of subscriptions for {@code topic} in {@code project}. - * - * @throws IOException - */ - Collection listSubscriptions(ProjectPath project, TopicPath topic) - throws IOException; -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 46464614221a..fa867c24bd22 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -17,8 +17,7 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.MoreObjects.firstNonNull; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -33,25 +32,18 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.util.PubsubApiaryClient; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; -import com.google.api.client.util.Clock; -import com.google.api.client.util.DateTime; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.AcknowledgeRequest; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.api.services.pubsub.model.PullRequest; -import com.google.api.services.pubsub.model.PullResponse; -import com.google.api.services.pubsub.model.ReceivedMessage; -import com.google.api.services.pubsub.model.Subscription; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Strings; import org.joda.time.Duration; import org.joda.time.Instant; @@ -61,13 +53,10 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; - import javax.annotation.Nullable; /** @@ -82,6 +71,9 @@ public class PubsubIO { private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class); + /** Factory for creating pubsub client to manage transport. */ + private static final PubsubClient.PubsubClientFactory FACTORY = PubsubApiaryClient.FACTORY; + /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */ public static final Coder DEFAULT_PUBSUB_CODER = StringUtf8Coder.of(); @@ -142,48 +134,6 @@ private static void validatePubsubName(String name) { } } - /** - * Returns the {@link Instant} that corresponds to the timestamp in the supplied - * {@link PubsubMessage} under the specified {@code ink label}. See - * {@link PubsubIO.Read#timestampLabel(String)} for details about how these messages are - * parsed. - * - *

The {@link Clock} parameter is used to virtualize time for testing. - * - * @throws IllegalArgumentException if the timestamp label is provided, but there is no - * corresponding attribute in the message or the value provided is not a valid timestamp - * string. - * @see PubsubIO.Read#timestampLabel(String) - */ - @VisibleForTesting - protected static Instant assignMessageTimestamp( - PubsubMessage message, @Nullable String label, Clock clock) { - if (label == null) { - return new Instant(clock.currentTimeMillis()); - } - - // Extract message attributes, defaulting to empty map if null. - Map attributes = firstNonNull( - message.getAttributes(), ImmutableMap.of()); - - String timestampStr = attributes.get(label); - checkArgument(timestampStr != null && !timestampStr.isEmpty(), - "PubSub message is missing a timestamp in label: %s", label); - - long millisSinceEpoch; - try { - // Try parsing as milliseconds since epoch. Note there is no way to parse a string in - // RFC 3339 format here. - // Expected IllegalArgumentException if parsing fails; we use that to fall back to RFC 3339. - millisSinceEpoch = Long.parseLong(timestampStr); - } catch (IllegalArgumentException e) { - // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an IllegalArgumentException - // if parsing fails, and the caller should handle. - millisSinceEpoch = DateTime.parseRfc3339(timestampStr).getValue(); - } - return new Instant(millisSinceEpoch); - } - /** * Class representing a Cloud Pub/Sub Subscription. */ @@ -679,8 +629,8 @@ public PCollection apply(PInput input) { if (boundedOutput) { return input.getPipeline().begin() - .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) - .apply(ParDo.of(new PubsubReader())).setCoder(coder); + .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) + .apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder); } else { return PCollection.createPrimitiveOutputInternal( input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) @@ -740,86 +690,94 @@ public Duration getMaxReadTime() { return maxReadTime; } - private class PubsubReader extends DoFn { + /** + * Default reader when Pubsub subscription has some form of upper bound. + *

TODO: Consider replacing with BoundedReadFromUnboundedSource on top of upcoming + * PubsubUnboundedSource. + *

NOTE: This is not the implementation used when running on the Google Dataflow hosted + * service. + */ + private class PubsubBoundedReader extends DoFn { private static final int DEFAULT_PULL_SIZE = 100; + private static final int ACK_TIMEOUT_SEC = 60; @Override public void processElement(ProcessContext c) throws IOException { - Pubsub pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class)) - .build(); - - String subscription; - if (getSubscription() == null) { - String topic = getTopic().asPath(); - String[] split = topic.split("/"); - subscription = - "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_" - + new Random().nextLong(); - Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic); - try { - pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); - } catch (Exception e) { - throw new RuntimeException("Failed to create subscription: ", e); + try (PubsubClient pubsubClient = + FACTORY.newClient(timestampLabel, idLabel, + c.getPipelineOptions().as(PubsubOptions.class))) { + + PubsubClient.SubscriptionPath subscriptionPath; + if (getSubscription() == null) { + // Create a randomized subscription derived from the topic name. + String subscription = getTopic().topic + "_dataflow_" + new Random().nextLong(); + // The subscription will be registered under this pipeline's project if we know it. + // Otherwise we'll fall back to the topic's project. + // Note that they don't need to be the same. + String project = c.getPipelineOptions().as(PubsubOptions.class).getProject(); + if (Strings.isNullOrEmpty(project)) { + project = getTopic().project; + } + subscriptionPath = PubsubClient.subscriptionPathFromName(project, subscription); + TopicPath topicPath = + PubsubClient.topicPathFromName(getTopic().project, getTopic().topic); + try { + pubsubClient.createSubscription(topicPath, subscriptionPath, ACK_TIMEOUT_SEC); + } catch (Exception e) { + throw new RuntimeException("Failed to create subscription: ", e); + } + } else { + subscriptionPath = + PubsubClient.subscriptionPathFromName(getSubscription().project, + getSubscription().subscription); } - } else { - subscription = getSubscription().asPath(); - } - Instant endTime = (getMaxReadTime() == null) - ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime()); + Instant endTime = (getMaxReadTime() == null) + ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime()); - List messages = new ArrayList<>(); + List messages = new ArrayList<>(); - Throwable finallyBlockException = null; - try { - while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords()) - && Instant.now().isBefore(endTime)) { - PullRequest pullRequest = new PullRequest().setReturnImmediately(false); - if (getMaxNumRecords() > 0) { - pullRequest.setMaxMessages(getMaxNumRecords() - messages.size()); - } else { - pullRequest.setMaxMessages(DEFAULT_PULL_SIZE); - } + Throwable finallyBlockException = null; + try { + while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords()) + && Instant.now().isBefore(endTime)) { + int batchSize = DEFAULT_PULL_SIZE; + if (getMaxNumRecords() > 0) { + batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size()); + } - PullResponse pullResponse = - pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute(); - List ackIds = new ArrayList<>(); - if (pullResponse.getReceivedMessages() != null) { - for (ReceivedMessage received : pullResponse.getReceivedMessages()) { - messages.add(received.getMessage()); - ackIds.add(received.getAckId()); + List batchMessages = + pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize, + false); + List ackIds = new ArrayList<>(); + for (IncomingMessage message : batchMessages) { + messages.add(message); + ackIds.add(message.ackId); + } + if (ackIds.size() != 0) { + pubsubClient.acknowledge(subscriptionPath, ackIds); } } - - if (ackIds.size() != 0) { - AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds); - pubsubClient.projects() - .subscriptions() - .acknowledge(subscription, ackRequest) - .execute(); + } catch (IOException e) { + throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e); + } finally { + if (getSubscription() == null) { + try { + pubsubClient.deleteSubscription(subscriptionPath); + } catch (Exception e) { + finallyBlockException = e; + } } } - } catch (IOException e) { - throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e); - } finally { - if (getTopic() != null) { - try { - pubsubClient.projects().subscriptions().delete(subscription).execute(); - } catch (IOException e) { - finallyBlockException = new RuntimeException("Failed to delete subscription: ", e); - LOG.error("Failed to delete subscription: ", e); - } + if (finallyBlockException != null) { + throw new RuntimeException("Failed to delete subscription: ", finallyBlockException); } - } - if (finallyBlockException != null) { - throw new RuntimeException(finallyBlockException); - } - for (PubsubMessage message : messages) { - c.outputWithTimestamp( - CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()), - assignMessageTimestamp(message, getTimestampLabel(), Clock.SYSTEM)); + for (IncomingMessage message : messages) { + c.outputWithTimestamp( + CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes), + new Instant(message.timestampMsSinceEpoch)); + } } } } @@ -1026,31 +984,28 @@ public Coder getCoder() { return coder; } + /** + * Writer to Pubsub which batches messages. + *

NOTE: This is not the implementation used when running on the Google Dataflow hosted + * service. + */ private class PubsubWriter extends DoFn { private static final int MAX_PUBLISH_BATCH_SIZE = 100; - private transient List output; - private transient Pubsub pubsubClient; + private transient List output; + private transient PubsubClient pubsubClient; @Override - public void startBundle(Context c) { + public void startBundle(Context c) throws IOException { this.output = new ArrayList<>(); - this.pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class)) - .build(); + this.pubsubClient = FACTORY.newClient(timestampLabel, idLabel, + c.getPipelineOptions().as(PubsubOptions.class)); } @Override public void processElement(ProcessContext c) throws IOException { - PubsubMessage message = - new PubsubMessage().encodeData(CoderUtils.encodeToByteArray(getCoder(), c.element())); - if (getTimestampLabel() != null) { - Map attributes = message.getAttributes(); - if (attributes == null) { - attributes = new HashMap<>(); - message.setAttributes(attributes); - } - attributes.put(getTimestampLabel(), String.valueOf(c.timestamp().getMillis())); - } + OutgoingMessage message = + new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()), + c.timestamp().getMillis()); output.add(message); if (output.size() >= MAX_PUBLISH_BATCH_SIZE) { @@ -1063,13 +1018,16 @@ public void finishBundle(Context c) throws IOException { if (!output.isEmpty()) { publish(); } + output = null; + pubsubClient.close(); + pubsubClient = null; } private void publish() throws IOException { - PublishRequest publishRequest = new PublishRequest().setMessages(output); - pubsubClient.projects().topics() - .publish(getTopic().asPath(), publishRequest) - .execute(); + int n = pubsubClient.publish( + PubsubClient.topicPathFromName(getTopic().project, getTopic().topic), + output); + checkState(n == output.size()); output.clear(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java new file mode 100644 index 000000000000..f0a90968f510 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.options.PubsubOptions; + +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.Pubsub.Builder; +import com.google.api.services.pubsub.model.AcknowledgeRequest; +import com.google.api.services.pubsub.model.ListSubscriptionsResponse; +import com.google.api.services.pubsub.model.ListTopicsResponse; +import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.api.services.pubsub.model.Subscription; +import com.google.api.services.pubsub.model.Topic; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.annotation.Nullable; + +/** + * A Pubsub client using Apiary. + */ +public class PubsubApiaryClient extends PubsubClient { + + public static final PubsubClientFactory FACTORY = new PubsubClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + Pubsub pubsub = new Builder( + Transport.getTransport(), + Transport.getJsonFactory(), + new ChainingHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setRootUrl(options.getPubsubRootUrl()) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) + .build(); + return new PubsubApiaryClient(timestampLabel, idLabel, pubsub); + } + }; + + /** + * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time + * instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. + */ + @Nullable + private final String idLabel; + + /** + * Underlying Apiary client. + */ + private Pubsub pubsub; + + @VisibleForTesting + PubsubApiaryClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + Pubsub pubsub) { + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.pubsub = pubsub; + } + + @Override + public void close() { + // Nothing to close. + } + + @Override + public int publish(TopicPath topic, List outgoingMessages) + throws IOException { + List pubsubMessages = new ArrayList<>(outgoingMessages.size()); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); + + Map attributes = pubsubMessage.getAttributes(); + if ((timestampLabel != null || idLabel != null) && attributes == null) { + attributes = new TreeMap<>(); + pubsubMessage.setAttributes(attributes); + } + + if (timestampLabel != null) { + attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); + } + + if (idLabel != null) { + attributes.put(idLabel, + Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); + } + + pubsubMessages.add(pubsubMessage); + } + PublishRequest request = new PublishRequest().setMessages(pubsubMessages); + PublishResponse response = pubsub.projects() + .topics() + .publish(topic.getPath(), request) + .execute(); + return response.getMessageIds().size(); + } + + @Override + public List pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) throws IOException { + PullRequest request = new PullRequest() + .setReturnImmediately(returnImmediately) + .setMaxMessages(batchSize); + PullResponse response = pubsub.projects() + .subscriptions() + .pull(subscription.getPath(), request) + .execute(); + if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) { + return ImmutableList.of(); + } + List incomingMessages = new ArrayList<>(response.getReceivedMessages().size()); + for (ReceivedMessage message : response.getReceivedMessages()) { + PubsubMessage pubsubMessage = message.getMessage(); + @Nullable Map attributes = pubsubMessage.getAttributes(); + + // Payload. + byte[] elementBytes = pubsubMessage.decodeData(); + + // Timestamp. + long timestampMsSinceEpoch = + extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes); + + // Ack id. + String ackId = message.getAckId(); + checkState(!Strings.isNullOrEmpty(ackId)); + + // Record id, if any. + @Nullable byte[] recordId = null; + if (idLabel != null && attributes != null) { + String recordIdString = attributes.get(idLabel); + if (!Strings.isNullOrEmpty(recordIdString)) { + recordId = recordIdString.getBytes(); + } + } + if (recordId == null) { + recordId = pubsubMessage.getMessageId().getBytes(); + } + + incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId)); + } + + return incomingMessages; + } + + @Override + public void acknowledge(SubscriptionPath subscription, List ackIds) throws IOException { + AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds); + pubsub.projects() + .subscriptions() + .acknowledge(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List ackIds, int deadlineSeconds) + throws IOException { + ModifyAckDeadlineRequest request = + new ModifyAckDeadlineRequest().setAckIds(ackIds) + .setAckDeadlineSeconds(deadlineSeconds); + pubsub.projects() + .subscriptions() + .modifyAckDeadline(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .create(topic.getPath(), new Topic()) + .execute(); // ignore Topic result. + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .delete(topic.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List listTopics(ProjectPath project) throws IOException { + ListTopicsResponse response = pubsub.projects() + .topics() + .list(project.getPath()) + .execute(); + if (response.getTopics() == null || response.getTopics().isEmpty()) { + return ImmutableList.of(); + } + List topics = new ArrayList<>(response.getTopics().size()); + for (Topic topic : response.getTopics()) { + topics.add(topicPathFromPath(topic.getName())); + } + return topics; + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, + int ackDeadlineSeconds) throws IOException { + Subscription request = new Subscription() + .setTopic(topic.getPath()) + .setAckDeadlineSeconds(ackDeadlineSeconds); + pubsub.projects() + .subscriptions() + .create(subscription.getPath(), request) + .execute(); // ignore Subscription result. + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + pubsub.projects() + .subscriptions() + .delete(subscription.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException { + ListSubscriptionsResponse response = pubsub.projects() + .subscriptions() + .list(project.getPath()) + .execute(); + if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) { + return ImmutableList.of(); + } + List subscriptions = new ArrayList<>(response.getSubscriptions().size()); + for (Subscription subscription : response.getSubscriptions()) { + if (subscription.getTopic().equals(topic.getPath())) { + subscriptions.add(subscriptionPathFromPath(subscription.getName())); + } + } + return subscriptions; + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); + return response.getAckDeadlineSeconds(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java new file mode 100644 index 000000000000..a44329d9f1c6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.options.PubsubOptions; + +import com.google.api.client.util.DateTime; +import com.google.common.base.Objects; +import com.google.common.base.Strings; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * An (abstract) helper class for talking to Pubsub via an underlying transport. + */ +public abstract class PubsubClient implements Closeable { + /** + * Factory for creating clients. + */ + public interface PubsubClientFactory extends Serializable { + /** + * Construct a new Pubsub client. It should be closed via {@link #close} in order + * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources + * construct). Uses {@code options} to derive pubsub endpoints and application credentials. + * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom + * timestamps/ids within message metadata. + */ + PubsubClient newClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + PubsubOptions options) throws IOException; + } + + /** + * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}. + * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException} + * if timestamp cannot be recognized. + */ + @Nullable + private static Long asMsSinceEpoch(@Nullable String timestamp) { + if (Strings.isNullOrEmpty(timestamp)) { + return null; + } + try { + // Try parsing as milliseconds since epoch. Note there is no way to parse a + // string in RFC 3339 format here. + // Expected IllegalArgumentException if parsing fails; we use that to fall back + // to RFC 3339. + return Long.parseLong(timestamp); + } catch (IllegalArgumentException e1) { + // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an + // IllegalArgumentException if parsing fails, and the caller should handle. + return DateTime.parseRfc3339(timestamp).getValue(); + } + } + + /** + * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code + * attributes} and {@code pubsubTimestamp}. + *

If {@code timestampLabel} is non-{@literal null} then the message attributes must contain + * that label, and the value of that label will be taken as the timestamp. + * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code + * pubsubTimestamp}. Throw {@link IllegalArgumentException} if the timestamp cannot be + * recognized as a ms-since-unix-epoch or RFC3339 time. + * + * @throws IllegalArgumentException + */ + protected static long extractTimestamp( + @Nullable String timestampLabel, + @Nullable String pubsubTimestamp, + @Nullable Map attributes) { + Long timestampMsSinceEpoch; + if (Strings.isNullOrEmpty(timestampLabel)) { + timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp); + checkArgument(timestampMsSinceEpoch != null, + "Cannot interpret PubSub publish timestamp: %s", + pubsubTimestamp); + } else { + String value = attributes == null ? null : attributes.get(timestampLabel); + checkArgument(value != null, + "PubSub message is missing a value for timestamp label %s", + timestampLabel); + timestampMsSinceEpoch = asMsSinceEpoch(value); + checkArgument(timestampMsSinceEpoch != null, + "Cannot interpret value of label %s as timestamp: %s", + timestampLabel, value); + } + return timestampMsSinceEpoch; + } + + /** + * Path representing a cloud project id. + */ + public static class ProjectPath implements Serializable { + private final String path; + + ProjectPath(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ProjectPath that = (ProjectPath) o; + + return path.equals(that.path); + + } + + @Override + public int hashCode() { + return path.hashCode(); + } + + @Override + public String toString() { + return path; + } + } + + public static ProjectPath projectPathFromPath(String path) { + return new ProjectPath(path); + } + + public static ProjectPath projectPathFromId(String projectId) { + return new ProjectPath(String.format("projects/%s", projectId)); + } + + /** + * Path representing a Pubsub subscription. + */ + public static class SubscriptionPath implements Serializable { + private final String path; + + SubscriptionPath(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + public String getV1Beta1Path() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed subscription path %s", path); + return String.format("/subscriptions/%s/%s", splits[1], splits[3]); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubscriptionPath that = (SubscriptionPath) o; + return path.equals(that.path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } + + @Override + public String toString() { + return path; + } + } + + public static SubscriptionPath subscriptionPathFromPath(String path) { + return new SubscriptionPath(path); + } + + public static SubscriptionPath subscriptionPathFromName( + String projectId, String subscriptionName) { + return new SubscriptionPath(String.format("projects/%s/subscriptions/%s", + projectId, subscriptionName)); + } + + /** + * Path representing a Pubsub topic. + */ + public static class TopicPath implements Serializable { + private final String path; + + TopicPath(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + public String getV1Beta1Path() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed topic path %s", path); + return String.format("/topics/%s/%s", splits[1], splits[3]); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicPath topicPath = (TopicPath) o; + return path.equals(topicPath.path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } + + @Override + public String toString() { + return path; + } + } + + public static TopicPath topicPathFromPath(String path) { + return new TopicPath(path); + } + + public static TopicPath topicPathFromName(String projectId, String topicName) { + return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); + } + + /** + * A message to be sent to Pubsub. + *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. + * Java serialization is never used for non-test clients. + */ + public static class OutgoingMessage implements Serializable { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + /** + * Timestamp for element (ms since epoch). + */ + public final long timestampMsSinceEpoch; + + public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) { + this.elementBytes = elementBytes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OutgoingMessage that = (OutgoingMessage) o; + + if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) { + return false; + } + return Arrays.equals(elementBytes, that.elementBytes); + + } + + @Override + public int hashCode() { + return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch); + } + } + + /** + * A message received from Pubsub. + *

NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. + * Java serialization is never used for non-test clients. + */ + public static class IncomingMessage implements Serializable { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + /** + * Timestamp for element (ms since epoch). Either Pubsub's processing time, + * or the custom timestamp associated with the message. + */ + public final long timestampMsSinceEpoch; + + /** + * Timestamp (in system time) at which we requested the message (ms since epoch). + */ + public final long requestTimeMsSinceEpoch; + + /** + * Id to pass back to Pubsub to acknowledge receipt of this message. + */ + public final String ackId; + + /** + * Id to pass to the runner to distinguish this message from all others. + */ + public final byte[] recordId; + + public IncomingMessage( + byte[] elementBytes, + long timestampMsSinceEpoch, + long requestTimeMsSinceEpoch, + String ackId, + byte[] recordId) { + this.elementBytes = elementBytes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; + this.ackId = ackId; + this.recordId = recordId; + } + + public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { + return new IncomingMessage(elementBytes, timestampMsSinceEpoch, requestTimeMsSinceEpoch, + ackId, recordId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IncomingMessage that = (IncomingMessage) o; + + if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) { + return false; + } + if (requestTimeMsSinceEpoch != that.requestTimeMsSinceEpoch) { + return false; + } + if (!Arrays.equals(elementBytes, that.elementBytes)) { + return false; + } + if (!ackId.equals(that.ackId)) { + return false; + } + return Arrays.equals(recordId, that.recordId); + } + + @Override + public int hashCode() { + return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch, + requestTimeMsSinceEpoch, + ackId, Arrays.hashCode(recordId)); + } + } + + /** + * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages + * published. + * + * @throws IOException + */ + public abstract int publish(TopicPath topic, List outgoingMessages) + throws IOException; + + /** + * Request the next batch of up to {@code batchSize} messages from {@code subscription}. + * Return the received messages, or empty collection if none were available. Does not + * wait for messages to arrive if {@code returnImmediately} is {@literal true}. + * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}. + * + * @throws IOException + */ + public abstract List pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) + throws IOException; + + /** + * Acknowldege messages from {@code subscription} with {@code ackIds}. + * + * @throws IOException + */ + public abstract void acknowledge(SubscriptionPath subscription, List ackIds) + throws IOException; + + /** + * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to + * be {@code deadlineSeconds} from now. + * + * @throws IOException + */ + public abstract void modifyAckDeadline( + SubscriptionPath subscription, List ackIds, + int deadlineSeconds) throws IOException; + + /** + * Create {@code topic}. + * + * @throws IOException + */ + public abstract void createTopic(TopicPath topic) throws IOException; + + /* + * Delete {@code topic}. + * + * @throws IOException + */ + public abstract void deleteTopic(TopicPath topic) throws IOException; + + /** + * Return a list of topics for {@code project}. + * + * @throws IOException + */ + public abstract List listTopics(ProjectPath project) throws IOException; + + /** + * Create {@code subscription} to {@code topic}. + * + * @throws IOException + */ + public abstract void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; + + /** + * Delete {@code subscription}. + * + * @throws IOException + */ + public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException; + + /** + * Return a list of subscriptions for {@code topic} in {@code project}. + * + * @throws IOException + */ + public abstract List listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException; + + /** + * Return the ack deadline, in seconds, for {@code subscription}. + * + * @throws IOException + */ + public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException; +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java similarity index 68% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index 66fb61fb6335..b3c1b8f525bc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -16,13 +16,15 @@ * limitations under the License. */ -package org.apache.beam.sdk.io; +package org.apache.beam.sdk.util; -import org.apache.beam.sdk.options.GcpOptions; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.options.PubsubOptions; -import com.google.api.client.util.DateTime; import com.google.auth.oauth2.GoogleCredentials; -import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; @@ -30,6 +32,7 @@ import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; import com.google.pubsub.v1.ListSubscriptionsRequest; import com.google.pubsub.v1.ListSubscriptionsResponse; import com.google.pubsub.v1.ListTopicsRequest; @@ -38,11 +41,13 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.SubscriberGrpc; +import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; @@ -56,7 +61,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -68,17 +72,44 @@ /** * A helper class for talking to Pubsub via grpc. */ -public class PubsubGrpcClient implements PubsubClient { +public class PubsubGrpcClient extends PubsubClient { private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; private static final int PUBSUB_PORT = 443; private static final List PUBSUB_SCOPES = Collections.singletonList("https://www.googleapis.com/auth/pubsub"); private static final int LIST_BATCH_SIZE = 1000; + private static final int DEFAULT_TIMEOUT_S = 15; + + public static final PubsubClientFactory FACTORY = + new PubsubClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + ManagedChannel channel = NettyChannelBuilder + .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .build(); + // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the + // various command line options. It currently only supports the older + // com.google.api.client.auth.oauth2.Credentials. + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + return new PubsubGrpcClient(timestampLabel, + idLabel, + DEFAULT_TIMEOUT_S, + channel, + credentials, + null /* publisher stub */, + null /* subscriber stub */); + } + }; + /** * Timeout for grpc calls (in s). */ - private static final int TIMEOUT_S = 15; + private final int timeoutSec; /** * Underlying netty channel, or {@literal null} if closed. @@ -104,6 +135,7 @@ public class PubsubGrpcClient implements PubsubClient { @Nullable private final String idLabel; + /** * Cached stubs, or null if not cached. */ @@ -111,35 +143,22 @@ public class PubsubGrpcClient implements PubsubClient { private PublisherGrpc.PublisherBlockingStub cachedPublisherStub; private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub; - private PubsubGrpcClient( - @Nullable String timestampLabel, @Nullable String idLabel, - ManagedChannel publisherChannel, GoogleCredentials credentials) { + @VisibleForTesting + PubsubGrpcClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + int timeoutSec, + ManagedChannel publisherChannel, + GoogleCredentials credentials, + PublisherGrpc.PublisherBlockingStub cachedPublisherStub, + SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub) { this.timestampLabel = timestampLabel; this.idLabel = idLabel; + this.timeoutSec = timeoutSec; this.publisherChannel = publisherChannel; this.credentials = credentials; - } - - /** - * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order - * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources - * construct since this class is {@link AutoCloseable}). If non-{@literal null}, use - * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within - * message metadata. - */ - public static PubsubGrpcClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, - GcpOptions options) throws IOException { - ManagedChannel channel = NettyChannelBuilder - .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .build(); - // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the - // various command line options. It currently only supports the older - // com.google.api.client.auth.oauth2.Credentials. - GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); - return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials); + this.cachedPublisherStub = cachedPublisherStub; + this.cachedSubscriberStub = cachedSubscriberStub; } /** @@ -147,24 +166,34 @@ public static PubsubGrpcClient newClient( */ @Override public void close() { - Preconditions.checkState(publisherChannel != null, "Client has already been closed"); - publisherChannel.shutdown(); - try { - publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // Ignore. - Thread.currentThread().interrupt(); + if (publisherChannel == null) { + // Already closed. + return; } - publisherChannel = null; + // Can gc the underlying stubs. cachedPublisherStub = null; cachedSubscriberStub = null; + // Mark the client as having been closed before going further + // in case we have an exception from the channel. + ManagedChannel publisherChannel = this.publisherChannel; + this.publisherChannel = null; + // Gracefully shutdown the channel. + publisherChannel.shutdown(); + if (timeoutSec > 0) { + try { + publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore. + Thread.currentThread().interrupt(); + } + } } /** * Return channel with interceptor for returning credentials. */ private Channel newChannel() throws IOException { - Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed"); + checkState(publisherChannel != null, "PubsubGrpcClient has been closed"); ClientAuthInterceptor interceptor = new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor()); return ClientInterceptors.intercept(publisherChannel, interceptor); @@ -173,25 +202,33 @@ private Channel newChannel() throws IOException { /** * Return a stub for making a publish request with a timeout. */ - private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException { + private PublisherBlockingStub publisherStub() throws IOException { if (cachedPublisherStub == null) { cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel()); } - return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS); + if (timeoutSec > 0) { + return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); + } else { + return cachedPublisherStub; + } } /** * Return a stub for making a subscribe request with a timeout. */ - private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException { + private SubscriberBlockingStub subscriberStub() throws IOException { if (cachedSubscriberStub == null) { cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel()); } - return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS); + if (timeoutSec > 0) { + return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); + } else { + return cachedSubscriberStub; + } } @Override - public int publish(TopicPath topic, Iterable outgoingMessages) + public int publish(TopicPath topic, List outgoingMessages) throws IOException { PublishRequest.Builder request = PublishRequest.newBuilder() .setTopic(topic.getPath()); @@ -208,7 +245,7 @@ public int publish(TopicPath topic, Iterable outgoingMessages) if (idLabel != null) { message.getMutableAttributes() .put(idLabel, - Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); + Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); } request.addMessages(message); @@ -219,13 +256,14 @@ public int publish(TopicPath topic, Iterable outgoingMessages) } @Override - public Collection pull( + public List pull( long requestTimeMsSinceEpoch, SubscriptionPath subscription, - int batchSize) throws IOException { + int batchSize, + boolean returnImmediately) throws IOException { PullRequest request = PullRequest.newBuilder() .setSubscription(subscription.getPath()) - .setReturnImmediately(true) + .setReturnImmediately(returnImmediately) .setMaxMessages(batchSize) .build(); PullResponse response = subscriberStub().pull(request); @@ -235,41 +273,24 @@ public Collection pull( List incomingMessages = new ArrayList<>(response.getReceivedMessagesCount()); for (ReceivedMessage message : response.getReceivedMessagesList()) { PubsubMessage pubsubMessage = message.getMessage(); - Map attributes = pubsubMessage.getAttributes(); + @Nullable Map attributes = pubsubMessage.getAttributes(); // Payload. byte[] elementBytes = pubsubMessage.getData().toByteArray(); // Timestamp. - // Start with Pubsub processing time. + String pubsubTimestampString = null; Timestamp timestampProto = pubsubMessage.getPublishTime(); - long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L; - if (timestampLabel != null && attributes != null) { - String timestampString = attributes.get(timestampLabel); - if (timestampString != null && !timestampString.isEmpty()) { - try { - // Try parsing as milliseconds since epoch. Note there is no way to parse a - // string in RFC 3339 format here. - // Expected IllegalArgumentException if parsing fails; we use that to fall back - // to RFC 3339. - timestampMsSinceEpoch = Long.parseLong(timestampString); - } catch (IllegalArgumentException e1) { - try { - // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an - // IllegalArgumentException if parsing fails, and the caller should handle. - timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue(); - } catch (IllegalArgumentException e2) { - // Fallback to Pubsub processing time. - } - } - } - // else: fallback to Pubsub processing time. + if (timestampProto != null) { + pubsubTimestampString = String.valueOf(timestampProto.getSeconds() + + timestampProto.getNanos() / 1000L); } - // else: fallback to Pubsub processing time. + long timestampMsSinceEpoch = + extractTimestamp(timestampLabel, pubsubTimestampString, attributes); // Ack id. String ackId = message.getAckId(); - Preconditions.checkState(ackId != null && !ackId.isEmpty()); + checkState(!Strings.isNullOrEmpty(ackId)); // Record id, if any. @Nullable byte[] recordId = null; @@ -284,13 +305,13 @@ public Collection pull( } incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, ackId, recordId)); + requestTimeMsSinceEpoch, ackId, recordId)); } return incomingMessages; } @Override - public void acknowledge(SubscriptionPath subscription, Iterable ackIds) + public void acknowledge(SubscriptionPath subscription, List ackIds) throws IOException { AcknowledgeRequest request = AcknowledgeRequest.newBuilder() .setSubscription(subscription.getPath()) @@ -301,8 +322,7 @@ public void acknowledge(SubscriptionPath subscription, Iterable ackIds) @Override public void modifyAckDeadline( - SubscriptionPath subscription, Iterable ackIds, int - deadlineSeconds) + SubscriptionPath subscription, List ackIds, int deadlineSeconds) throws IOException { ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() @@ -330,7 +350,7 @@ public void deleteTopic(TopicPath topic) throws IOException { } @Override - public Collection listTopics(ProjectPath project) throws IOException { + public List listTopics(ProjectPath project) throws IOException { ListTopicsRequest.Builder request = ListTopicsRequest.newBuilder() .setProject(project.getPath()) @@ -342,7 +362,7 @@ public Collection listTopics(ProjectPath project) throws IOException List topics = new ArrayList<>(response.getTopicsCount()); while (true) { for (Topic topic : response.getTopicsList()) { - topics.add(new TopicPath(topic.getName())); + topics.add(topicPathFromPath(topic.getName())); } if (response.getNextPageToken().isEmpty()) { break; @@ -375,7 +395,7 @@ public void deleteSubscription(SubscriptionPath subscription) throws IOException } @Override - public Collection listSubscriptions(ProjectPath project, TopicPath topic) + public List listSubscriptions(ProjectPath project, TopicPath topic) throws IOException { ListSubscriptionsRequest.Builder request = ListSubscriptionsRequest.newBuilder() @@ -389,7 +409,7 @@ public Collection listSubscriptions(ProjectPath project, Topic while (true) { for (Subscription subscription : response.getSubscriptionsList()) { if (subscription.getTopic().equals(topic.getPath())) { - subscriptions.add(new SubscriptionPath(subscription.getName())); + subscriptions.add(subscriptionPathFromPath(subscription.getName())); } } if (response.getNextPageToken().isEmpty()) { @@ -400,4 +420,14 @@ public Collection listSubscriptions(ProjectPath project, Topic } return subscriptions; } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder() + .setSubscription(subscription.getPath()) + .build(); + Subscription response = subscriberStub().getSubscription(request); + return response.getAckDeadlineSeconds(); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java new file mode 100644 index 000000000000..4a47c3070776 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.options.PubsubOptions; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + +/** + * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for + * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} + * methods. + */ +public class PubsubTestClient extends PubsubClient { + public static PubsubClientFactory createFactoryForPublish( + final TopicPath expectedTopic, + final Set expectedOutgoingMessages) { + return new PubsubClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null); + } + }; + } + + public static PubsubClientFactory createFactoryForPull( + @Nullable final SubscriptionPath expectedSubscription, + final int ackTimeoutSec, + @Nullable final List expectedIncomingMessages) { + return new PubsubClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec, + null, expectedIncomingMessages); + } + }; + } + + /** + * Only publish calls for this topic are allowed. + */ + @Nullable + private TopicPath expectedTopic; + /** + * Only pull calls for this subscription are allowed. + */ + @Nullable + private SubscriptionPath expectedSubscription; + + /** + * Timeout to simulate. + */ + private int ackTimeoutSec; + + /** + * Messages yet to seen in a {@link #publish} call. + */ + @Nullable + private Set remainingExpectedOutgoingMessages; + + /** + * Messages waiting to be received by a {@link #pull} call. + */ + @Nullable + private List remainingPendingIncomingMessages; + + /** + * Messages which have been returned from a {@link #pull} call and + * not yet ACKed by an {@link #acknowledge} call. + */ + private Map pendingAckIncommingMessages; + + /** + * When above messages are due to have their ACK deadlines expire. + */ + private Map ackDeadline; + + /** + * Current time. + */ + private long nowMsSinceEpoch; + + @VisibleForTesting + PubsubTestClient( + @Nullable TopicPath expectedTopic, + @Nullable SubscriptionPath expectedSubscription, + int ackTimeoutSec, + @Nullable Set expectedOutgoingMessages, + @Nullable List expectedIncomingMessages) { + this.expectedTopic = expectedTopic; + this.expectedSubscription = expectedSubscription; + this.ackTimeoutSec = ackTimeoutSec; + + this.remainingExpectedOutgoingMessages = expectedOutgoingMessages; + this.remainingPendingIncomingMessages = expectedIncomingMessages; + + this.pendingAckIncommingMessages = new HashMap<>(); + this.ackDeadline = new HashMap<>(); + this.nowMsSinceEpoch = Long.MIN_VALUE; + } + + /** + * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate Pubsub expiring + * outstanding ACKs. + */ + public void advanceTo(long newNowMsSinceEpoch) { + checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch, + "Cannot advance time backwards from %d to %d", nowMsSinceEpoch, + newNowMsSinceEpoch); + nowMsSinceEpoch = newNowMsSinceEpoch; + // Any messages who's ACKs timed out are available for re-pulling. + Iterator> deadlineItr = ackDeadline.entrySet().iterator(); + while (deadlineItr.hasNext()) { + Map.Entry entry = deadlineItr.next(); + if (entry.getValue() <= nowMsSinceEpoch) { + remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey())); + deadlineItr.remove(); + } + } + } + + @Override + public void close() { + if (remainingExpectedOutgoingMessages != null) { + checkState(this.remainingExpectedOutgoingMessages.isEmpty(), + "Failed to pull %d messages", this.remainingExpectedOutgoingMessages.size()); + remainingExpectedOutgoingMessages = null; + } + if (remainingPendingIncomingMessages != null) { + checkState(remainingPendingIncomingMessages.isEmpty(), + "Failed to publish %d messages", remainingPendingIncomingMessages.size()); + checkState(pendingAckIncommingMessages.isEmpty(), + "Failed to ACK %d messages", pendingAckIncommingMessages.size()); + checkState(ackDeadline.isEmpty(), + "Failed to ACK %d messages", ackDeadline.size()); + remainingPendingIncomingMessages = null; + pendingAckIncommingMessages = null; + ackDeadline = null; + } + } + + @Override + public int publish( + TopicPath topic, List outgoingMessages) throws IOException { + checkNotNull(expectedTopic, "Missing expected topic"); + checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing messages"); + checkState(topic.equals(expectedTopic), "Topic %s does not match expected %s", topic, + expectedTopic); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage), + "Unexpeced outgoing message %s", outgoingMessage); + } + return outgoingMessages.size(); + } + + @Override + public List pull( + long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize, + boolean returnImmediately) throws IOException { + checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch, + "Simulated time %d does not match requset time %d", nowMsSinceEpoch, + requestTimeMsSinceEpoch); + checkNotNull(expectedSubscription, "Missing expected subscription"); + checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); + checkState(subscription.equals(expectedSubscription), + "Subscription %s does not match expected %s", subscription, expectedSubscription); + checkState(returnImmediately, "PubsubTestClient only supports returning immediately"); + + List incomingMessages = new ArrayList<>(); + Iterator pendItr = remainingPendingIncomingMessages.iterator(); + while (pendItr.hasNext()) { + IncomingMessage incomingMessage = pendItr.next(); + pendItr.remove(); + IncomingMessage incomingMessageWithRequestTime = + incomingMessage.withRequestTime(requestTimeMsSinceEpoch); + incomingMessages.add(incomingMessageWithRequestTime); + pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId, + incomingMessageWithRequestTime); + ackDeadline.put(incomingMessageWithRequestTime.ackId, + requestTimeMsSinceEpoch + ackTimeoutSec * 1000); + if (incomingMessages.size() >= batchSize) { + break; + } + } + return incomingMessages; + } + + @Override + public void acknowledge( + SubscriptionPath subscription, + List ackIds) throws IOException { + checkNotNull(expectedSubscription, "Missing expected subscription"); + checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); + checkState(subscription.equals(expectedSubscription), + "Subscription %s does not match expected %s", subscription, expectedSubscription); + + for (String ackId : ackIds) { + checkState(ackDeadline.remove(ackId) != null, + "No message with ACK id %s is outstanding", ackId); + checkState(pendingAckIncommingMessages.remove(ackId) != null, + "No message with ACK id %s is outstanding", ackId); + } + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List ackIds, int deadlineSeconds) throws IOException { + checkNotNull(expectedSubscription, "Missing expected subscription"); + checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); + checkState(subscription.equals(expectedSubscription), + "Subscription %s does not match expected %s", subscription, expectedSubscription); + + for (String ackId : ackIds) { + checkState(ackDeadline.remove(ackId) != null, + "No message with ACK id %s is outstanding", ackId); + checkState(pendingAckIncommingMessages.containsKey(ackId), + "No message with ACK id %s is outstanding", ackId); + ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000); + } + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List listTopics(ProjectPath project) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List listSubscriptions( + ProjectPath project, TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + return ackTimeoutSec; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java index 9082ce375b7f..6daecdb6ce60 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -105,7 +105,11 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { /** * Returns a Pubsub client builder using the specified {@link PubsubOptions}. + * + * @deprecated Use an appropriate + * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory} */ + @Deprecated public static Pubsub.Builder newPubsubClient(PubsubOptions options) { return new Pubsub.Builder(getTransport(), getJsonFactory(), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index 1e5bf5157b05..eaf452d8bafa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -24,22 +24,13 @@ import org.apache.beam.sdk.transforms.display.DisplayData; -import com.google.api.client.testing.http.FixedClock; -import com.google.api.client.util.Clock; -import com.google.api.services.pubsub.model.PubsubMessage; - import org.joda.time.Duration; -import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.HashMap; - -import javax.annotation.Nullable; - /** * Tests for PubsubIO Read and Write transforms. */ @@ -90,154 +81,6 @@ public void testTopicValidationTooLong() throws Exception { .toString()); } - /** - * Helper function that creates a {@link PubsubMessage} with the given timestamp registered as - * an attribute with the specified label. - * - *

If {@code label} is {@code null}, then the attributes are {@code null}. - * - *

Else, if {@code timestamp} is {@code null}, then attributes are present but have no key for - * the label. - */ - private static PubsubMessage messageWithTimestamp( - @Nullable String label, @Nullable String timestamp) { - PubsubMessage message = new PubsubMessage(); - if (label == null) { - message.setAttributes(null); - return message; - } - - message.setAttributes(new HashMap()); - - if (timestamp == null) { - return message; - } - - message.getAttributes().put(label, timestamp); - return message; - } - - /** - * Helper function that parses the given string to a timestamp through the PubSubIO plumbing. - */ - private static Instant parseTimestamp(@Nullable String timestamp) { - PubsubMessage message = messageWithTimestamp("mylabel", timestamp); - return PubsubIO.assignMessageTimestamp(message, "mylabel", Clock.SYSTEM); - } - - @Test - public void noTimestampLabelReturnsNow() { - final long time = 987654321L; - Instant timestamp = PubsubIO.assignMessageTimestamp( - messageWithTimestamp(null, null), null, new FixedClock(time)); - - assertEquals(new Instant(time), timestamp); - } - - @Test - public void timestampLabelWithNullAttributesThrowsError() { - PubsubMessage message = messageWithTimestamp(null, null); - thrown.expect(RuntimeException.class); - thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel"); - - PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM); - } - - @Test - public void timestampLabelSetWithMissingAttributeThrowsError() { - PubsubMessage message = messageWithTimestamp("notMyLabel", "ignored"); - thrown.expect(RuntimeException.class); - thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel"); - - PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM); - } - - @Test - public void timestampLabelParsesMillisecondsSinceEpoch() { - Long millis = 1446162101123L; - assertEquals(new Instant(millis), parseTimestamp(millis.toString())); - } - - @Test - public void timestampLabelParsesRfc3339Seconds() { - String rfc3339 = "2015-10-29T23:41:41Z"; - assertEquals(Instant.parse(rfc3339), parseTimestamp(rfc3339)); - } - - @Test - public void timestampLabelParsesRfc3339Tenths() { - String rfc3339tenths = "2015-10-29T23:41:41.1Z"; - assertEquals(Instant.parse(rfc3339tenths), parseTimestamp(rfc3339tenths)); - } - - @Test - public void timestampLabelParsesRfc3339Hundredths() { - String rfc3339hundredths = "2015-10-29T23:41:41.12Z"; - assertEquals(Instant.parse(rfc3339hundredths), parseTimestamp(rfc3339hundredths)); - } - - @Test - public void timestampLabelParsesRfc3339Millis() { - String rfc3339millis = "2015-10-29T23:41:41.123Z"; - assertEquals(Instant.parse(rfc3339millis), parseTimestamp(rfc3339millis)); - } - - @Test - public void timestampLabelParsesRfc3339Micros() { - String rfc3339micros = "2015-10-29T23:41:41.123456Z"; - assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros)); - // Note: micros part 456/1000 is dropped. - assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros)); - } - - @Test - public void timestampLabelParsesRfc3339MicrosRounding() { - String rfc3339micros = "2015-10-29T23:41:41.123999Z"; - assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros)); - // Note: micros part 999/1000 is dropped, not rounded up. - assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros)); - } - - @Test - public void timestampLabelWithInvalidFormatThrowsError() { - thrown.expect(NumberFormatException.class); - parseTimestamp("not-a-timestamp"); - } - - @Test - public void timestampLabelWithInvalidFormat2ThrowsError() { - thrown.expect(NumberFormatException.class); - parseTimestamp("null"); - } - - @Test - public void timestampLabelWithInvalidFormat3ThrowsError() { - thrown.expect(NumberFormatException.class); - parseTimestamp("2015-10"); - } - - @Test - public void timestampLabelParsesRfc3339WithSmallYear() { - // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted - // This is therefore a "small year" until this difference is reconciled. - String rfc3339SmallYear = "1582-10-15T01:23:45.123Z"; - assertEquals(Instant.parse(rfc3339SmallYear), parseTimestamp(rfc3339SmallYear)); - } - - @Test - public void timestampLabelParsesRfc3339WithLargeYear() { - // Year 9999 in range. - String rfc3339LargeYear = "9999-10-29T23:41:41.123999Z"; - assertEquals(Instant.parse(rfc3339LargeYear), parseTimestamp(rfc3339LargeYear)); - } - - @Test - public void timestampLabelRfc3339WithTooLargeYearThrowsError() { - thrown.expect(NumberFormatException.class); - // Year 10000 out of range. - parseTimestamp("10000-10-29T23:41:41.123999Z"); - } - @Test public void testReadDisplayData() { String topic = "projects/project/topics/topic"; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java new file mode 100644 index 000000000000..40c31fb5ac03 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +/** + * Tests for PubsubApiaryClient. + */ +public class PubsubApiaryClientTest { + private Pubsub mockPubsub; + private PubsubClient client; + + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long PUB_TIME = 3456L; + private static final long MESSAGE_TIME = 6789L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String CUSTOM_ID = + Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString(); + private static final String ACK_ID = "testAckId"; + + @Before + public void setup() throws IOException { + mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS); + client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); + } + + @After + public void teardown() throws IOException { + client.close(); + client = null; + mockPubsub = null; + } + + @Test + public void pullOneMessage() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + PullRequest expectedRequest = + new PullRequest().setReturnImmediately(true).setMaxMessages(10); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .setMessageId(MESSAGE_ID) + .encodeData(DATA.getBytes()) + .setPublishTime(String.valueOf(PUB_TIME)) + .setAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)); + ReceivedMessage expectedReceivedMessage = + new ReceivedMessage().setMessage(expectedPubsubMessage) + .setAckId(ACK_ID); + PullResponse expectedResponse = + new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); + Mockito.when(mockPubsub.projects() + .subscriptions() + .pull(expectedSubscription, expectedRequest) + .execute()) + .thenReturn(expectedResponse); + List acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(CUSTOM_ID, new String(actualMessage.recordId)); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + } + + @Test + public void publishOneMessage() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .encodeData(DATA.getBytes()) + .setAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)); + PublishRequest expectedRequest = new PublishRequest() + .setMessages(ImmutableList.of(expectedPubsubMessage)); + PublishResponse expectedResponse = new PublishResponse() + .setMessageIds(ImmutableList.of(MESSAGE_ID)); + Mockito.when(mockPubsub.projects() + .topics() + .publish(expectedTopic, expectedRequest) + .execute()) + .thenReturn(expectedResponse); + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java new file mode 100644 index 000000000000..22508572e5a0 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.ProjectPath; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.common.collect.ImmutableMap; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Map; + +/** + * Tests for helper classes and methods in PubsubClient. + */ +public class PubsubClientTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + // + // Timestamp handling + // + + private long parse(String timestamp) { + Map map = ImmutableMap.of("myLabel", timestamp); + return PubsubClient.extractTimestamp("myLabel", null, map); + } + + private void roundTripRfc339(String timestamp) { + assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp)); + } + + private void truncatedRfc339(String timestamp, String truncatedTimestmap) { + assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp)); + } + + @Test + public void noTimestampLabelReturnsPubsubPublish() { + final long time = 987654321L; + long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null); + assertEquals(time, timestamp); + } + + @Test + public void noTimestampLabelAndInvalidPubsubPublishThrowsError() { + thrown.expect(NumberFormatException.class); + PubsubClient.extractTimestamp(null, "not-a-date", null); + } + + @Test + public void timestampLabelWithNullAttributesThrowsError() { + thrown.expect(RuntimeException.class); + thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); + PubsubClient.extractTimestamp("myLabel", null, null); + } + + @Test + public void timestampLabelSetWithMissingAttributeThrowsError() { + thrown.expect(RuntimeException.class); + thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); + Map map = ImmutableMap.of("otherLabel", "whatever"); + PubsubClient.extractTimestamp("myLabel", null, map); + } + + @Test + public void timestampLabelParsesMillisecondsSinceEpoch() { + long time = 1446162101123L; + Map map = ImmutableMap.of("myLabel", String.valueOf(time)); + long timestamp = PubsubClient.extractTimestamp("myLabel", null, map); + assertEquals(time, timestamp); + } + + @Test + public void timestampLabelParsesRfc3339Seconds() { + roundTripRfc339("2015-10-29T23:41:41Z"); + } + + @Test + public void timestampLabelParsesRfc3339Tenths() { + roundTripRfc339("2015-10-29T23:41:41.1Z"); + } + + @Test + public void timestampLabelParsesRfc3339Hundredths() { + roundTripRfc339("2015-10-29T23:41:41.12Z"); + } + + @Test + public void timestampLabelParsesRfc3339Millis() { + roundTripRfc339("2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339Micros() { + // Note: micros part 456/1000 is dropped. + truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339MicrosRounding() { + // Note: micros part 999/1000 is dropped, not rounded up. + truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelWithInvalidFormatThrowsError() { + thrown.expect(NumberFormatException.class); + parse("not-a-timestamp"); + } + + @Test + public void timestampLabelWithInvalidFormat2ThrowsError() { + thrown.expect(NumberFormatException.class); + parse("null"); + } + + @Test + public void timestampLabelWithInvalidFormat3ThrowsError() { + thrown.expect(NumberFormatException.class); + parse("2015-10"); + } + + @Test + public void timestampLabelParsesRfc3339WithSmallYear() { + // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted + // This is therefore a "small year" until this difference is reconciled. + roundTripRfc339("1582-10-15T01:23:45.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339WithLargeYear() { + // Year 9999 in range. + roundTripRfc339("9999-10-29T23:41:41.123999Z"); + } + + @Test + public void timestampLabelRfc3339WithTooLargeYearThrowsError() { + thrown.expect(NumberFormatException.class); + // Year 10000 out of range. + parse("10000-10-29T23:41:41.123999Z"); + } + + // + // Paths + // + + @Test + public void projectPathFromIdWellFormed() { + ProjectPath path = PubsubClient.projectPathFromId("test"); + assertEquals("projects/test", path.getPath()); + } + + @Test + public void subscriptionPathFromNameWellFormed() { + SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something"); + assertEquals("projects/test/subscriptions/something", path.getPath()); + assertEquals("/subscriptions/test/something", path.getV1Beta1Path()); + } + + @Test + public void topicPathFromNameWellFormed() { + TopicPath path = PubsubClient.topicPathFromName("test", "something"); + assertEquals("projects/test/topics/something", path.getPath()); + assertEquals("/topics/test/something", path.getV1Beta1Path()); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java new file mode 100644 index 000000000000..189049c07ea4 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SubscriberGrpc; + +import io.grpc.ManagedChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +/** + * Tests for PubsubGrpcClient. + */ +public class PubsubGrpcClientTest { + private ManagedChannel mockChannel; + private GoogleCredentials mockCredentials; + private PublisherGrpc.PublisherBlockingStub mockPublisherStub; + private SubscriberGrpc.SubscriberBlockingStub mockSubscriberStub; + + private PubsubClient client; + + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long PUB_TIME = 3456L; + private static final long MESSAGE_TIME = 6789L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String CUSTOM_ID = + Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString(); + private static final String ACK_ID = "testAckId"; + + @Before + public void setup() throws IOException { + mockChannel = Mockito.mock(ManagedChannel.class); + mockCredentials = Mockito.mock(GoogleCredentials.class); + mockPublisherStub = + Mockito.mock(PublisherGrpc.PublisherBlockingStub.class, Mockito.RETURNS_DEEP_STUBS); + mockSubscriberStub = + Mockito.mock(SubscriberGrpc.SubscriberBlockingStub.class, Mockito.RETURNS_DEEP_STUBS); + client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 0, mockChannel, + mockCredentials, mockPublisherStub, mockSubscriberStub); + } + + @After + public void teardown() throws IOException { + client.close(); + client = null; + mockChannel = null; + mockCredentials = null; + mockPublisherStub = null; + mockSubscriberStub = null; + } + + @Test + public void pullOneMessage() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + PullRequest expectedRequest = + PullRequest.newBuilder() + .setSubscription(expectedSubscription) + .setReturnImmediately(true) + .setMaxMessages(10) + .build(); + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(PUB_TIME / 1000) + .setNanos((int) (PUB_TIME % 1000) * 1000) + .build(); + PubsubMessage expectedPubsubMessage = + PubsubMessage.newBuilder() + .setMessageId(MESSAGE_ID) + .setData( + ByteString.copyFrom(DATA.getBytes())) + .setPublishTime(timestamp) + .putAllAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, + String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)) + .build(); + ReceivedMessage expectedReceivedMessage = + ReceivedMessage.newBuilder() + .setMessage(expectedPubsubMessage) + .setAckId(ACK_ID) + .build(); + PullResponse expectedResponse = + PullResponse.newBuilder() + .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage)) + .build(); + Mockito.when(mockSubscriberStub.pull(expectedRequest)) + .thenReturn(expectedResponse); + List acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(CUSTOM_ID, new String(actualMessage.recordId)); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + } + + @Test + public void publishOneMessage() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = + PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(DATA.getBytes())) + .putAllAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)) + .build(); + PublishRequest expectedRequest = + PublishRequest.newBuilder() + .setTopic(expectedTopic) + .addAllMessages( + ImmutableList.of(expectedPubsubMessage)) + .build(); + PublishResponse expectedResponse = + PublishResponse.newBuilder() + .addAllMessageIds(ImmutableList.of(MESSAGE_ID)) + .build(); + Mockito.when(mockPublisherStub.publish(expectedRequest)) + .thenReturn(expectedResponse); + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java new file mode 100644 index 000000000000..7d8513b725b1 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +/** + * Tests for PubsubTestClient. + */ +public class PubsubTestClientTest { + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long MESSAGE_TIME = 6789L; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String ACK_ID = "testAckId"; + private static final int ACK_TIMEOUT_S = 60; + + @Test + public void pullOneMessage() throws IOException { + IncomingMessage expectedIncomingMessage = + new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes()); + try (PubsubTestClient client = + new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null, + Lists.newArrayList(expectedIncomingMessage))) { + long now = REQ_TIME; + client.advanceTo(now); + List incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage, incomingMessages.get(0)); + // Timeout on ACK. + now += (ACK_TIMEOUT_S + 10) * 1000; + client.advanceTo(now); + incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0)); + now += 10 * 1000; + client.advanceTo(now); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Timeout on extended ACK + now += 30 * 1000; + client.advanceTo(now); + incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0)); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Ack + now += 15 * 1000; + client.advanceTo(now); + client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID)); + } + } + + @Test + public void publishOneMessage() throws IOException { + OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + try (PubsubTestClient client = + new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S, + Sets.newHashSet(expectedOutgoingMessage), null)) { + client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage)); + } + } +}