From bfa56299fb95582da9af998c4918f6fe0a2b4d2f Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Thu, 7 Apr 2016 11:06:51 -0700 Subject: [PATCH 1/6] Pub/sub client with gRPC impl --- pom.xml | 1 + sdks/java/core/pom.xml | 34 ++ .../cloud/dataflow/sdk/io/PubsubClient.java | 176 ++++++++ .../dataflow/sdk/io/PubsubGrpcClient.java | 391 ++++++++++++++++++ 4 files changed, 602 insertions(+) create mode 100644 sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java create mode 100644 sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java diff --git a/pom.xml b/pom.xml index 591b67a9a747..243cff3cf5c0 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ 1.7.7 v2-rev248-1.21.0 0.2.3 + 0.0.2 v2-rev6-1.21.0 v1b3-rev22-1.21.0 0.5.160222 diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 6595a214e4f8..97621362cbf8 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -389,6 +389,40 @@ 0.12.0 + + com.google.auth + google-auth-library-oauth2-http + 0.3.1 + + + + com.google.guava + guava-jdk5 + + + + + + io.netty + netty-handler + 4.1.0.Beta8 + + + + com.google.api.grpc + grpc-pubsub-v1 + ${pubsubgrpc.version} + + + + com.google.guava + guava-jdk5 + + + + com.google.cloud.bigtable bigtable-protos diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java new file mode 100644 index 000000000000..3f9cf74a5ea7 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.dataflow.sdk.io; + +import java.io.IOException; +import java.util.Collection; + +/** + * A helper interface for talking to Pubsub via an underlying transport. + */ +public interface PubsubClient extends AutoCloseable { + /** + * Gracefully close the underlying transport. + */ + @Override + void close(); + + /** + * A message to be sent to Pubsub. + */ + class OutgoingMessage { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + /** + * Timestamp for element (ms since epoch). + */ + public final long timestampMsSinceEpoch; + + public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) { + this.elementBytes = elementBytes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + } + } + + /** + * A message received from Pubsub. + */ + class IncomingMessage { + /** + * Underlying (encoded) element. + */ + public final byte[] elementBytes; + + /** + * Timestamp for element (ms since epoch). Either Pubsub's processing time, + * or the custom timestamp associated with the message. + */ + public final long timestampMsSinceEpoch; + + /** + * Timestamp (in system time) at which we requested the message (ms since epoch). + */ + public final long requestTimeMsSinceEpoch; + + /** + * Id to pass back to Pubsub to acknowledge receipt of this message. + */ + public final String ackId; + + /** + * Id to pass to the runner to distinguish this message from all others. + */ + public final byte[] recordId; + + public IncomingMessage( + byte[] elementBytes, + long timestampMsSinceEpoch, + long requestTimeMsSinceEpoch, + String ackId, + byte[] recordId) { + this.elementBytes = elementBytes; + this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; + this.ackId = ackId; + this.recordId = recordId; + } + } + + /** + * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages + * published. + * + * @throws IOException + */ + int publish(String topic, Iterable outgoingMessages) throws IOException; + + /** + * Request the next batch of up to {@code batchSize} messages from {@code subscription}. + * Return the received messages, or empty collection if none were available. Does not + * wait for messages to arrive. Returned messages will record heir request time + * as {@code requestTimeMsSinceEpoch}. + * + * @throws IOException + */ + Collection pull( + long requestTimeMsSinceEpoch, String subscription, int + batchSize) throws IOException; + + /** + * Acknowldege messages from {@code subscription} with {@code ackIds}. + * + * @throws IOException + */ + void acknowledge(String subscription, Iterable ackIds) throws IOException; + + /** + * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to + * be {@code deadlineSeconds} from now. + * + * @throws IOException + */ + void modifyAckDeadline(String subscription, Iterable ackIds, int deadlineSeconds) + throws IOException; + + /** + * Create {@code topic}. + * + * @throws IOException + */ + void createTopic(String topic) throws IOException; + + /* + * Delete {@code topic}. + * + * @throws IOException + */ + void deleteTopic(String topic) throws IOException; + + /** + * Return a list of topics for {@code project}. + * + * @throws IOException + */ + Collection listTopics(String project) throws IOException; + + /** + * Create {@code subscription} to {@code topic}. + * + * @throws IOException + */ + void createSubscription(String topic, String subscription, int ackDeadlineSeconds) throws + IOException; + + /** + * Delete {@code subscription}. + * + * @throws IOException + */ + void deleteSubscription(String subscription) throws IOException; + + /** + * Return a list of subscriptions for {@code topic} in {@code project}. + * + * @throws IOException + */ + Collection listSubscriptions(String project, String topic) throws IOException; +} diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java new file mode 100644 index 000000000000..2d75679f7846 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.dataflow.sdk.io; + +import com.google.api.client.util.DateTime; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.dataflow.sdk.options.GcpOptions; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.DeleteSubscriptionRequest; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SubscriberGrpc; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import io.grpc.Channel; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.auth.ClientAuthInterceptor; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +/** + * A helper class for talking to Pubsub via grpc. + */ +public class PubsubGrpcClient implements PubsubClient { + private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; + private static final int PUBSUB_PORT = 443; + private static final List PUBSUB_SCOPES = + Collections.singletonList("https://www.googleapis.com/auth/pubsub"); + private static final int LIST_BATCH_SIZE = 1000; + + /** + * Timeout for grpc calls (in s). + */ + private static final int TIMEOUT_S = 15; + + /** + * Underlying netty channel, or {@literal null} if closed. + */ + @Nullable + private ManagedChannel publisherChannel; + + /** + * Credentials determined from options and environment. + */ + private final GoogleCredentials credentials; + + /** + * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time + * instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. + */ + @Nullable + private final String idLabel; + + /** + * Cached stubs, or null if not cached. + */ + @Nullable + private PublisherGrpc.PublisherBlockingStub cachedPublisherStub; + private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub; + + private PubsubGrpcClient( + @Nullable String timestampLabel, @Nullable String idLabel, + ManagedChannel publisherChannel, GoogleCredentials credentials) { + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.publisherChannel = publisherChannel; + this.credentials = credentials; + } + + /** + * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order + * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources + * construct since this class is {@link AutoCloseable}. If non-{@literal null}, use + * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within + * message metadata. + */ + public static PubsubGrpcClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, + GcpOptions options) throws IOException { + ManagedChannel channel = NettyChannelBuilder + .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .build(); + // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the + // various command line options. It currently only supports the older + // com.google.api.client.auth.oauth2.Credentials. + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + return new PubsubGrpcClient(timestampLabel, idLabel, channel, credentials); + } + + /** + * Gracefully close the underlying netty channel. + */ + @Override + public void close() { + Preconditions.checkState(publisherChannel != null, "Client has already been closed"); + publisherChannel.shutdown(); + try { + publisherChannel.awaitTermination(TIMEOUT_S, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore. + Thread.currentThread().interrupt(); + } + publisherChannel = null; + cachedPublisherStub = null; + cachedSubscriberStub = null; + } + + /** + * Return channel with interceptor for returning credentials. + */ + private Channel newChannel() throws IOException { + Preconditions.checkState(publisherChannel != null, "PubsubGrpcClient has been closed"); + ClientAuthInterceptor interceptor = + new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor()); + return ClientInterceptors.intercept(publisherChannel, interceptor); + } + + /** + * Return a stub for making a publish request with a timeout. + */ + private PublisherGrpc.PublisherBlockingStub publisherStub() throws IOException { + if (cachedPublisherStub == null) { + cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel()); + } + return cachedPublisherStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS); + } + + /** + * Return a stub for making a subscribe request with a timeout. + */ + private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOException { + if (cachedSubscriberStub == null) { + cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel()); + } + return cachedSubscriberStub.withDeadlineAfter(TIMEOUT_S, TimeUnit.SECONDS); + } + + @Override + public int publish(String topic, Iterable outgoingMessages) throws IOException { + PublishRequest.Builder request = PublishRequest.newBuilder() + .setTopic(topic); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + PubsubMessage.Builder message = + PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(outgoingMessage.elementBytes)); + + if (timestampLabel != null) { + message.getMutableAttributes() + .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); + } + + if (idLabel != null) { + message.getMutableAttributes() + .put(idLabel, + Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); + } + + request.addMessages(message); + } + + PublishResponse response = publisherStub().publish(request.build()); + return response.getMessageIdsCount(); + } + + @Override + public Collection pull( + long requestTimeMsSinceEpoch, + String subscription, + int batchSize) throws IOException { + PullRequest request = PullRequest.newBuilder() + .setSubscription(subscription) + .setReturnImmediately(true) + .setMaxMessages(batchSize) + .build(); + PullResponse response = subscriberStub().pull(request); + if (response.getReceivedMessagesCount() == 0) { + return ImmutableList.of(); + } + List incomingMessages = new ArrayList<>(response.getReceivedMessagesCount()); + for (ReceivedMessage message : response.getReceivedMessagesList()) { + PubsubMessage pubsubMessage = message.getMessage(); + Map attributes = pubsubMessage.getAttributes(); + + // Payload. + byte[] elementBytes = pubsubMessage.getData().toByteArray(); + + // Timestamp. + // Start with Pubsub processing time. + Timestamp timestampProto = pubsubMessage.getPublishTime(); + long timestampMsSinceEpoch = timestampProto.getSeconds() + timestampProto.getNanos() / 1000L; + if (timestampLabel != null && attributes != null) { + String timestampString = attributes.get(timestampLabel); + if (timestampString != null && !timestampString.isEmpty()) { + try { + // Try parsing as milliseconds since epoch. Note there is no way to parse a + // string in RFC 3339 format here. + // Expected IllegalArgumentException if parsing fails; we use that to fall back + // to RFC 3339. + timestampMsSinceEpoch = Long.parseLong(timestampString); + } catch (IllegalArgumentException e1) { + try { + // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an + // IllegalArgumentException if parsing fails, and the caller should handle. + timestampMsSinceEpoch = DateTime.parseRfc3339(timestampString).getValue(); + } catch (IllegalArgumentException e2) { + // Fallback to Pubsub processing time. + } + } + } + // else: fallback to Pubsub processing time. + } + // else: fallback to Pubsub processing time. + + // Ack id. + String ackId = message.getAckId(); + Preconditions.checkState(ackId != null && !ackId.isEmpty()); + + // Record id, if any. + @Nullable byte[] recordId = null; + if (idLabel != null && attributes != null) { + String recordIdString = attributes.get(idLabel); + if (recordIdString != null && !recordIdString.isEmpty()) { + recordId = recordIdString.getBytes(); + } + } + if (recordId == null) { + recordId = pubsubMessage.getMessageId().getBytes(); + } + + incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId)); + } + return incomingMessages; + } + + @Override + public void acknowledge(String subscription, Iterable ackIds) throws IOException { + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(ackIds) + .build(); + subscriberStub().acknowledge(request); // ignore Empty result. + } + + @Override + public void modifyAckDeadline(String subscription, Iterable ackIds, int deadlineSeconds) + throws IOException { + ModifyAckDeadlineRequest request = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(ackIds) + .setAckDeadlineSeconds(deadlineSeconds) + .build(); + subscriberStub().modifyAckDeadline(request); // ignore Empty result. + } + + @Override + public void createTopic(String topic) throws IOException { + Topic request = Topic.newBuilder() + .setName(topic) + .build(); + publisherStub().createTopic(request); // ignore Topic result. + } + + @Override + public void deleteTopic(String topic) throws IOException { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(topic).build(); + publisherStub().deleteTopic(request); // ignore Empty result. + } + + @Override + public Collection listTopics(String project) throws IOException { + ListTopicsRequest.Builder request = + ListTopicsRequest.newBuilder() + .setProject(project) + .setPageSize(LIST_BATCH_SIZE); + ListTopicsResponse response = publisherStub().listTopics(request.build()); + if (response.getTopicsCount() == 0) { + return ImmutableList.of(); + } + List topics = new ArrayList<>(response.getTopicsCount()); + while (true) { + for (Topic topic : response.getTopicsList()) { + topics.add(topic.getName()); + } + if (response.getNextPageToken().isEmpty()) { + break; + } + request.setPageToken(response.getNextPageToken()); + response = publisherStub().listTopics(request.build()); + } + return topics; + } + + @Override + public void createSubscription(String topic, String subscription, int ackDeadlineSeconds) + throws IOException { + Subscription request = Subscription.newBuilder() + .setTopic(topic) + .setName(subscription) + .setAckDeadlineSeconds(ackDeadlineSeconds) + .build(); + subscriberStub().createSubscription(request); // ignore Subscription result. + } + + @Override + public void deleteSubscription(String subscription) throws IOException { + DeleteSubscriptionRequest request = + DeleteSubscriptionRequest.newBuilder().setSubscription(subscription).build(); + subscriberStub().deleteSubscription(request); // ignore Empty result. + } + + @Override + public Collection listSubscriptions(String project, String topic) throws IOException { + ListSubscriptionsRequest.Builder request = + ListSubscriptionsRequest.newBuilder() + .setProject(project) + .setPageSize(LIST_BATCH_SIZE); + ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build()); + if (response.getSubscriptionsCount() == 0) { + return ImmutableList.of(); + } + List subscriptions = new ArrayList<>(response.getSubscriptionsCount()); + while (true) { + for (Subscription subscription : response.getSubscriptionsList()) { + if (subscription.getTopic().equals(topic)) { + subscriptions.add(subscription.getName()); + } + } + if (response.getNextPageToken().isEmpty()) { + break; + } + request.setPageToken(response.getNextPageToken()); + response = subscriberStub().listSubscriptions(request.build()); + } + return subscriptions; + } +} From b78c947540fb4278d40f29ef9bd48149927c5a3e Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Mon, 11 Apr 2016 15:08:24 -0700 Subject: [PATCH 2/6] Make subscription and topic 'path' objects instead of strings --- .../cloud/dataflow/sdk/io/PubsubClient.java | 102 +++++++++++++++--- .../dataflow/sdk/io/PubsubGrpcClient.java | 62 ++++++----- 2 files changed, 121 insertions(+), 43 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java index 3f9cf74a5ea7..c8bcfdd574c3 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java @@ -26,10 +26,68 @@ */ public interface PubsubClient extends AutoCloseable { /** - * Gracefully close the underlying transport. + * Path representing a Pubsub subscription. */ - @Override - void close(); + class SubscriptionPath { + private final String path; + + public SubscriptionPath(String projectId, String subscriptionName) { + path = String.format("projects/%s/subscriptions/%s", projectId, subscriptionName); + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubscriptionPath that = (SubscriptionPath) o; + return path.equals(that.path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } + } + + /** + * Path representing a Pubsub topic. + */ + class TopicPath { + private final String path; + + public TopicPath(String projectId, String topicName) { + path = String.format("projects/%s/topics/%s", projectId, topicName); + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicPath topicPath = (TopicPath) o; + return path.equals(topicPath.path); + } + + @Override + public int hashCode() { + return path.hashCode(); + } + } /** * A message to be sent to Pubsub. @@ -95,13 +153,20 @@ public IncomingMessage( } } + /** + * Gracefully close the underlying transport. + */ + @Override + void close(); + + /** * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages * published. * * @throws IOException */ - int publish(String topic, Iterable outgoingMessages) throws IOException; + int publish(TopicPath topic, Iterable outgoingMessages) throws IOException; /** * Request the next batch of up to {@code batchSize} messages from {@code subscription}. @@ -112,15 +177,15 @@ public IncomingMessage( * @throws IOException */ Collection pull( - long requestTimeMsSinceEpoch, String subscription, int - batchSize) throws IOException; + long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize) + throws IOException; /** * Acknowldege messages from {@code subscription} with {@code ackIds}. * * @throws IOException */ - void acknowledge(String subscription, Iterable ackIds) throws IOException; + void acknowledge(SubscriptionPath subscription, Iterable ackIds) throws IOException; /** * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to @@ -128,7 +193,9 @@ Collection pull( * * @throws IOException */ - void modifyAckDeadline(String subscription, Iterable ackIds, int deadlineSeconds) + void modifyAckDeadline( + SubscriptionPath subscription, Iterable ackIds, + int deadlineSeconds) throws IOException; /** @@ -136,41 +203,42 @@ void modifyAckDeadline(String subscription, Iterable ackIds, int deadlin * * @throws IOException */ - void createTopic(String topic) throws IOException; + void createTopic(TopicPath topic) throws IOException; /* * Delete {@code topic}. * * @throws IOException */ - void deleteTopic(String topic) throws IOException; + void deleteTopic(TopicPath topic) throws IOException; /** - * Return a list of topics for {@code project}. + * Return a list of topics for {@code projectId}. * * @throws IOException */ - Collection listTopics(String project) throws IOException; + Collection listTopics(String projectId) throws IOException; /** * Create {@code subscription} to {@code topic}. * * @throws IOException */ - void createSubscription(String topic, String subscription, int ackDeadlineSeconds) throws - IOException; + void createSubscription( + TopicPath topic, SubscriptionPath subscription, + int ackDeadlineSeconds) throws IOException; /** * Delete {@code subscription}. * * @throws IOException */ - void deleteSubscription(String subscription) throws IOException; + void deleteSubscription(SubscriptionPath subscription) throws IOException; /** - * Return a list of subscriptions for {@code topic} in {@code project}. + * Return a list of subscriptions for {@code topic} in {@code projectId}. * * @throws IOException */ - Collection listSubscriptions(String project, String topic) throws IOException; + Collection listSubscriptions(String projectId, TopicPath topic) throws IOException; } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java index 2d75679f7846..fe8d956183e6 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java @@ -121,7 +121,7 @@ private PubsubGrpcClient( /** * Construct a new Pubsub grpc client. It should be closed via {@link #close} in order * to ensure tidy cleanup of underlying netty resources. (Or use the try-with-resources - * construct since this class is {@link AutoCloseable}. If non-{@literal null}, use + * construct since this class is {@link AutoCloseable}). If non-{@literal null}, use * {@code timestampLabel} and {@code idLabel} to store custom timestamps/ids within * message metadata. */ @@ -189,9 +189,10 @@ private SubscriberGrpc.SubscriberBlockingStub subscriberStub() throws IOExceptio } @Override - public int publish(String topic, Iterable outgoingMessages) throws IOException { + public int publish(TopicPath topic, Iterable outgoingMessages) + throws IOException { PublishRequest.Builder request = PublishRequest.newBuilder() - .setTopic(topic); + .setTopic(topic.getPath()); for (OutgoingMessage outgoingMessage : outgoingMessages) { PubsubMessage.Builder message = PubsubMessage.newBuilder() @@ -205,7 +206,7 @@ public int publish(String topic, Iterable outgoingMessages) thr if (idLabel != null) { message.getMutableAttributes() .put(idLabel, - Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); + Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); } request.addMessages(message); @@ -218,10 +219,10 @@ public int publish(String topic, Iterable outgoingMessages) thr @Override public Collection pull( long requestTimeMsSinceEpoch, - String subscription, + SubscriptionPath subscription, int batchSize) throws IOException { PullRequest request = PullRequest.newBuilder() - .setSubscription(subscription) + .setSubscription(subscription.getPath()) .setReturnImmediately(true) .setMaxMessages(batchSize) .build(); @@ -281,26 +282,29 @@ public Collection pull( } incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, ackId, recordId)); + requestTimeMsSinceEpoch, ackId, recordId)); } return incomingMessages; } @Override - public void acknowledge(String subscription, Iterable ackIds) throws IOException { + public void acknowledge(SubscriptionPath subscription, Iterable ackIds) + throws IOException { AcknowledgeRequest request = AcknowledgeRequest.newBuilder() - .setSubscription(subscription) + .setSubscription(subscription.getPath()) .addAllAckIds(ackIds) .build(); subscriberStub().acknowledge(request); // ignore Empty result. } @Override - public void modifyAckDeadline(String subscription, Iterable ackIds, int deadlineSeconds) + public void modifyAckDeadline( + SubscriptionPath subscription, Iterable ackIds, int + deadlineSeconds) throws IOException { ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() - .setSubscription(subscription) + .setSubscription(subscription.getPath()) .addAllAckIds(ackIds) .setAckDeadlineSeconds(deadlineSeconds) .build(); @@ -308,24 +312,26 @@ public void modifyAckDeadline(String subscription, Iterable ackIds, int } @Override - public void createTopic(String topic) throws IOException { + public void createTopic(TopicPath topic) throws IOException { Topic request = Topic.newBuilder() - .setName(topic) + .setName(topic.getPath()) .build(); publisherStub().createTopic(request); // ignore Topic result. } @Override - public void deleteTopic(String topic) throws IOException { - DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(topic).build(); + public void deleteTopic(TopicPath topic) throws IOException { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder() + .setTopic(topic.getPath()) + .build(); publisherStub().deleteTopic(request); // ignore Empty result. } @Override - public Collection listTopics(String project) throws IOException { + public Collection listTopics(String projectId) throws IOException { ListTopicsRequest.Builder request = ListTopicsRequest.newBuilder() - .setProject(project) + .setProject(projectId) .setPageSize(LIST_BATCH_SIZE); ListTopicsResponse response = publisherStub().listTopics(request.build()); if (response.getTopicsCount() == 0) { @@ -346,28 +352,32 @@ public Collection listTopics(String project) throws IOException { } @Override - public void createSubscription(String topic, String subscription, int ackDeadlineSeconds) - throws IOException { + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, + int ackDeadlineSeconds) throws IOException { Subscription request = Subscription.newBuilder() - .setTopic(topic) - .setName(subscription) + .setTopic(topic.getPath()) + .setName(subscription.getPath()) .setAckDeadlineSeconds(ackDeadlineSeconds) .build(); subscriberStub().createSubscription(request); // ignore Subscription result. } @Override - public void deleteSubscription(String subscription) throws IOException { + public void deleteSubscription(SubscriptionPath subscription) throws IOException { DeleteSubscriptionRequest request = - DeleteSubscriptionRequest.newBuilder().setSubscription(subscription).build(); + DeleteSubscriptionRequest.newBuilder() + .setSubscription(subscription.getPath()) + .build(); subscriberStub().deleteSubscription(request); // ignore Empty result. } @Override - public Collection listSubscriptions(String project, String topic) throws IOException { + public Collection listSubscriptions(String projectId, TopicPath topic) + throws IOException { ListSubscriptionsRequest.Builder request = ListSubscriptionsRequest.newBuilder() - .setProject(project) + .setProject(projectId) .setPageSize(LIST_BATCH_SIZE); ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build()); if (response.getSubscriptionsCount() == 0) { @@ -376,7 +386,7 @@ public Collection listSubscriptions(String project, String topic) throws List subscriptions = new ArrayList<>(response.getSubscriptionsCount()); while (true) { for (Subscription subscription : response.getSubscriptionsList()) { - if (subscription.getTopic().equals(topic)) { + if (subscription.getTopic().equals(topic.getPath())) { subscriptions.add(subscription.getName()); } } From ecf057f6aa8757a59dded595655f50ff77ff2980 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Mon, 11 Apr 2016 15:39:28 -0700 Subject: [PATCH 3/6] Add ProjectPath. Return paths instead of strings. --- .../cloud/dataflow/sdk/io/PubsubClient.java | 65 ++++++++++++++++--- .../dataflow/sdk/io/PubsubGrpcClient.java | 16 ++--- 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java index c8bcfdd574c3..6f118c8ebbbc 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java @@ -25,14 +25,53 @@ * A helper interface for talking to Pubsub via an underlying transport. */ public interface PubsubClient extends AutoCloseable { + /** + * Path representing a cloud project id. + */ + class ProjectPath { + private final String path; + + public ProjectPath(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ProjectPath that = (ProjectPath) o; + + return path.equals(that.path); + + } + + @Override + public int hashCode() { + return path.hashCode(); + } + + public static ProjectPath fromId(String projectId) { + return new ProjectPath(String.format("projects/%s", projectId)); + } + } + /** * Path representing a Pubsub subscription. */ class SubscriptionPath { private final String path; - public SubscriptionPath(String projectId, String subscriptionName) { - path = String.format("projects/%s/subscriptions/%s", projectId, subscriptionName); + public SubscriptionPath(String path) { + this.path = path; } public String getPath() { @@ -55,6 +94,11 @@ public boolean equals(Object o) { public int hashCode() { return path.hashCode(); } + + public static SubscriptionPath fromName(String projectId, String subscriptionName) { + return new SubscriptionPath(String.format("projects/%s/subscriptions/%s", + projectId, subscriptionName)); + } } /** @@ -63,8 +107,8 @@ public int hashCode() { class TopicPath { private final String path; - public TopicPath(String projectId, String topicName) { - path = String.format("projects/%s/topics/%s", projectId, topicName); + public TopicPath(String path) { + this.path = path; } public String getPath() { @@ -87,6 +131,10 @@ public boolean equals(Object o) { public int hashCode() { return path.hashCode(); } + + public static TopicPath fromName(String projectId, String topicName) { + return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); + } } /** @@ -213,11 +261,11 @@ void modifyAckDeadline( void deleteTopic(TopicPath topic) throws IOException; /** - * Return a list of topics for {@code projectId}. + * Return a list of topics for {@code project}. * * @throws IOException */ - Collection listTopics(String projectId) throws IOException; + Collection listTopics(ProjectPath project) throws IOException; /** * Create {@code subscription} to {@code topic}. @@ -236,9 +284,10 @@ void createSubscription( void deleteSubscription(SubscriptionPath subscription) throws IOException; /** - * Return a list of subscriptions for {@code topic} in {@code projectId}. + * Return a list of subscriptions for {@code topic} in {@code project}. * * @throws IOException */ - Collection listSubscriptions(String projectId, TopicPath topic) throws IOException; + Collection listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException; } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java index fe8d956183e6..6e347056eb8a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubGrpcClient.java @@ -328,19 +328,19 @@ public void deleteTopic(TopicPath topic) throws IOException { } @Override - public Collection listTopics(String projectId) throws IOException { + public Collection listTopics(ProjectPath project) throws IOException { ListTopicsRequest.Builder request = ListTopicsRequest.newBuilder() - .setProject(projectId) + .setProject(project.getPath()) .setPageSize(LIST_BATCH_SIZE); ListTopicsResponse response = publisherStub().listTopics(request.build()); if (response.getTopicsCount() == 0) { return ImmutableList.of(); } - List topics = new ArrayList<>(response.getTopicsCount()); + List topics = new ArrayList<>(response.getTopicsCount()); while (true) { for (Topic topic : response.getTopicsList()) { - topics.add(topic.getName()); + topics.add(new TopicPath(topic.getName())); } if (response.getNextPageToken().isEmpty()) { break; @@ -373,21 +373,21 @@ public void deleteSubscription(SubscriptionPath subscription) throws IOException } @Override - public Collection listSubscriptions(String projectId, TopicPath topic) + public Collection listSubscriptions(ProjectPath project, TopicPath topic) throws IOException { ListSubscriptionsRequest.Builder request = ListSubscriptionsRequest.newBuilder() - .setProject(projectId) + .setProject(project.getPath()) .setPageSize(LIST_BATCH_SIZE); ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build()); if (response.getSubscriptionsCount() == 0) { return ImmutableList.of(); } - List subscriptions = new ArrayList<>(response.getSubscriptionsCount()); + List subscriptions = new ArrayList<>(response.getSubscriptionsCount()); while (true) { for (Subscription subscription : response.getSubscriptionsList()) { if (subscription.getTopic().equals(topic.getPath())) { - subscriptions.add(subscription.getName()); + subscriptions.add(new SubscriptionPath(subscription.getName())); } } if (response.getNextPageToken().isEmpty()) { From d040526c31a30856320bb9ffb4fe4a2ee0b9e2c9 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Mon, 11 Apr 2016 16:23:54 -0700 Subject: [PATCH 4/6] toString on path objects --- .../cloud/dataflow/sdk/io/PubsubClient.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java index 6f118c8ebbbc..6cafc7c8643d 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java @@ -59,6 +59,11 @@ public int hashCode() { return path.hashCode(); } + @Override + public String toString() { + return path; + } + public static ProjectPath fromId(String projectId) { return new ProjectPath(String.format("projects/%s", projectId)); } @@ -95,6 +100,11 @@ public int hashCode() { return path.hashCode(); } + @Override + public String toString() { + return path; + } + public static SubscriptionPath fromName(String projectId, String subscriptionName) { return new SubscriptionPath(String.format("projects/%s/subscriptions/%s", projectId, subscriptionName)); @@ -132,6 +142,11 @@ public int hashCode() { return path.hashCode(); } + @Override + public String toString() { + return path; + } + public static TopicPath fromName(String projectId, String topicName) { return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); } From 3d69f09eea80ca979a9df86e9deca2f09affc2a9 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Mon, 11 Apr 2016 17:46:56 -0700 Subject: [PATCH 5/6] Support v1Beta1 paths since Windmill still needs them --- .../google/cloud/dataflow/sdk/io/PubsubClient.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java index 6cafc7c8643d..f8e132381e29 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java @@ -18,6 +18,7 @@ package com.google.cloud.dataflow.sdk.io; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; import java.io.IOException; import java.util.Collection; @@ -83,6 +84,12 @@ public String getPath() { return path; } + public String getV1Beta1Path() { + String[] splits = path.split("/"); + Preconditions.checkState(splits.length == 4); + return String.format("/subscriptions/%s/%s", splits[1], splits[3]); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -125,6 +132,12 @@ public String getPath() { return path; } + public String getV1Beta1Path() { + String[] splits = path.split("/"); + Preconditions.checkState(splits.length == 4); + return String.format("/topics/%s/%s", splits[1], splits[3]); + } + @Override public boolean equals(Object o) { if (this == o) { From 06c37a0677c4ed72e7c4b3fefc317b7ed7b6ced4 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Tue, 12 Apr 2016 09:44:33 -0700 Subject: [PATCH 6/6] Paths must be serializable --- .../com/google/cloud/dataflow/sdk/io/PubsubClient.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java index f8e132381e29..e5b8a39d6498 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubClient.java @@ -20,6 +20,7 @@ import com.google.api.client.repackaged.com.google.common.base.Preconditions; import java.io.IOException; +import java.io.Serializable; import java.util.Collection; /** @@ -29,7 +30,7 @@ public interface PubsubClient extends AutoCloseable { /** * Path representing a cloud project id. */ - class ProjectPath { + class ProjectPath implements Serializable { private final String path; public ProjectPath(String path) { @@ -73,7 +74,7 @@ public static ProjectPath fromId(String projectId) { /** * Path representing a Pubsub subscription. */ - class SubscriptionPath { + class SubscriptionPath implements Serializable { private final String path; public SubscriptionPath(String path) { @@ -121,7 +122,7 @@ public static SubscriptionPath fromName(String projectId, String subscriptionNam /** * Path representing a Pubsub topic. */ - class TopicPath { + class TopicPath implements Serializable { private final String path; public TopicPath(String path) {