diff --git a/google-cloud-examples/README.md b/google-cloud-examples/README.md index c4c9b04aa491..2366416fbca4 100644 --- a/google-cloud-examples/README.md +++ b/google-cloud-examples/README.md @@ -128,7 +128,6 @@ To run examples from your command line: target/appassembler/bin/PubSubExample create topic test-topic target/appassembler/bin/PubSubExample create subscription test-topic test-subscription target/appassembler/bin/PubSubExample publish test-topic message1 message2 - target/appassembler/bin/PubSubExample pull sync test-subscription 2 ``` * Here's an example run of `ResourceManagerExample`. diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java index d79e4e9f8afd..ad79647d787e 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java @@ -504,102 +504,6 @@ public String params() { } } - /** - * This class demonstrates how to acknowledge Pub/Sub messages for a subscription. - * - * @see Receiving - * pull messages - */ - private static class AckMessagesAction extends MessagesAction { - @Override - public void run(Tuple> params) throws Exception { - SubscriptionName subscriptionName = params.x(); - List ackIds = params.y(); - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { - subscriptionAdminClient.acknowledge(subscriptionName, ackIds); - } - System.out.printf("Acked %d messages for subscription %s%n", ackIds.size(), subscriptionName); - } - } - - /** - * This class demonstrates how to "nack" Pub/Sub messages for a subscription. This action - * corresponds to setting the acknowledge deadline to 0. - * - * @see Message - * acknowledgement deadline - */ - private static class NackMessagesAction extends MessagesAction { - @Override - public void run(Tuple> params) throws Exception { - SubscriptionName subscriptionName = params.x(); - List ackIds = params.y(); - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { - subscriptionAdminClient.modifyAckDeadline(subscriptionName, ackIds, 0); - } - System.out.printf("Nacked %d messages for subscription %s%n", ackIds.size(), subscriptionName); - } - } - - /** - * This class demonstrates how modify the acknowledge deadline for messages in a Pub/Sub - * subscription. - * - * @see Message - * acknowledgement deadline - */ - private static class ModifyAckDeadlineAction - extends PubSubAction>> { - - static class SubscriptionAndDeadline { - - private final SubscriptionName subscription; - private final int deadlineMillis; - - private SubscriptionAndDeadline(String subscription, int deadlineMillis) { - this.subscription = SubscriptionName.create(projectId, subscription); - this.deadlineMillis = deadlineMillis; - } - - SubscriptionName subscriptionName() { - return subscription; - } - - int deadlineMillis() { - return deadlineMillis; - } - } - - @Override - public void run(Tuple> params) - throws Exception { - SubscriptionName subscriptionName = params.x().subscriptionName(); - int deadline = params.x().deadlineMillis(); - List ackIds = params.y(); - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { - subscriptionAdminClient.modifyAckDeadline(subscriptionName, ackIds, deadline); - } - System.out.printf("Ack deadline set to %d seconds for %d messages in subscription %s%n", deadline, - ackIds.size(), subscriptionName); - } - - @Override - Tuple> parse(String... args) throws Exception { - if (args.length < 3) { - throw new IllegalArgumentException("Missing required subscription, deadline and ack IDs"); - } - String subscription = args[0]; - int deadline = Integer.parseInt(args[1]); - return Tuple.of(new SubscriptionAndDeadline(subscription, deadline), - Arrays.asList(Arrays.copyOfRange(args, 2, args.length))); - } - - @Override - public String params() { - return " +"; - } - } - /** * This class demonstrates how to asynchronously pull messages from a Pub/Sub pull subscription. * Messages are pulled until a timeout is reached. @@ -667,53 +571,6 @@ public String params() { } } - /** - * This class demonstrates how to synchronously pull messages from a Pub/Sub pull subscription. - * No more than the requested number of messages are pulled. Possibly less messages are pulled. - * - * @see Receiving - * pull messages - */ - private static class PullSyncAction extends PubSubAction> { - @Override - public void run(Tuple params) throws Exception { - SubscriptionName subscriptionName = params.x(); - Integer maxMessages = params.y(); - AtomicInteger messageCount = new AtomicInteger(); - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { - PullResponse response = subscriptionAdminClient.pull(subscriptionName, true, maxMessages); - for (ReceivedMessage message : response.getReceivedMessagesList()) { - // do something with message, then ack or nack - System.out.printf("Received message \"%s\"%n", message); - subscriptionAdminClient.acknowledge( - subscriptionName, Collections.singletonList(message.getAckId())); - messageCount.incrementAndGet(); - } - } - System.out.printf("Pulled %d messages from subscription %s%n", messageCount, subscriptionName); - } - - @Override - Tuple parse(String... args) throws Exception { - String message; - if (args.length == 2) { - SubscriptionName subscription = SubscriptionName.create(projectId, args[0]); - int maxMessages = Integer.parseInt(args[1]); - return Tuple.of(subscription, maxMessages); - } else if (args.length > 2) { - message = "Too many arguments."; - } else { - message = "Missing required subscription name"; - } - throw new IllegalArgumentException(message); - } - - @Override - public String params() { - return " "; - } - } - private abstract static class GetPolicyAction extends PubSubAction { @Override String parse(String... args) throws Exception { @@ -927,7 +784,6 @@ public void run(Tuple> param) throws Exception { DELETE_ACTIONS.put("topic", new DeleteTopicAction()); DELETE_ACTIONS.put("subscription", new DeleteSubscriptionAction()); PULL_ACTIONS.put("async", new PullAsyncAction()); - PULL_ACTIONS.put("sync", new PullSyncAction()); GET_IAM_ACTIONS.put("topic", new GetTopicPolicyAction()); GET_IAM_ACTIONS.put("subscription", new GetSubscriptionPolicyAction()); REPLACE_IAM_ACTIONS.put("topic", new AddIdentityTopicAction()); @@ -944,9 +800,6 @@ public void run(Tuple> param) throws Exception { ACTIONS.put("test-permissions", new ParentAction(TEST_IAM_ACTIONS)); ACTIONS.put("publish", new PublishMessagesAction()); ACTIONS.put("replace-push-config", new ReplacePushConfigAction()); - ACTIONS.put("ack", new AckMessagesAction()); - ACTIONS.put("nack", new NackMessagesAction()); - ACTIONS.put("modify-ack-deadline", new ModifyAckDeadlineAction()); } private static void printUsage() { diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionAdminClientSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionAdminClientSnippets.java index 9af97c4e4d2f..83e0b52afb2a 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionAdminClientSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriptionAdminClientSnippets.java @@ -67,22 +67,6 @@ public Subscription createSubscription(String topic, String subscriptionId) thro } } - /** Example of pulling a maximum number of messages from a subscription. */ - public PullResponse pull(String subscriptionId) throws Exception { - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { - // [START pull] - SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId); - PullResponse response = subscriptionAdminClient.pull(subscriptionName, true, 100); - for (ReceivedMessage message : response.getReceivedMessagesList()) { - // do something with message, then ack or nack - subscriptionAdminClient.acknowledge( - subscriptionName, Collections.singletonList(message.getAckId())); - } - // [END pull] - return response; - } - } - /** Example of replacing the push configuration of a subscription, setting the push endpoint. */ public void replacePushConfig(String subscriptionId, String endpoint) throws Exception { try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { diff --git a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java new file mode 100644 index 000000000000..eeded46f5e04 --- /dev/null +++ b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java @@ -0,0 +1,127 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.examples.pubsub.snippets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.api.gax.core.ApiFutureCallback; +import com.google.api.gax.core.ApiFutures; +import com.google.api.gax.core.SettableApiFuture; +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.spi.v1.Publisher; +import com.google.cloud.pubsub.spi.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.spi.v1.TopicAdminClient; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +public class ITPubSubSnippets { + + private static final String NAME_SUFFIX = UUID.randomUUID().toString(); + + @Rule public Timeout globalTimeout = Timeout.seconds(300); + + private static String formatForTest(String resourceName) { + return resourceName + "-" + NAME_SUFFIX; + } + + @Test + public void testPublisherSubscriber() throws Exception { + TopicName topicName = + TopicName.create(ServiceOptions.getDefaultProjectId(), formatForTest("test-topic")); + SubscriptionName subscriptionName = + SubscriptionName.create( + ServiceOptions.getDefaultProjectId(), formatForTest("test-subscription")); + try (TopicAdminClient publisherClient = TopicAdminClient.create(); + SubscriptionAdminClient subscriberClient = SubscriptionAdminClient.create()) { + publisherClient.createTopic(topicName); + subscriberClient.createSubscription( + subscriptionName, topicName, PushConfig.getDefaultInstance(), 0); + + testPublisherSubscriberHelper(topicName, subscriptionName); + + subscriberClient.deleteSubscription(subscriptionName); + publisherClient.deleteTopic(topicName); + } + } + + private void testPublisherSubscriberHelper(TopicName topicName, SubscriptionName subscriptionName) + throws Exception { + String messageToPublish = "my-message"; + + Publisher publisher = null; + try { + publisher = Publisher.newBuilder(topicName).build(); + PublisherSnippets snippets = new PublisherSnippets(publisher); + final SettableApiFuture done = SettableApiFuture.create(); + ApiFutures.addCallback( + snippets.publish(messageToPublish), + new ApiFutureCallback() { + public void onSuccess(String messageId) { + done.set(null); + } + + public void onFailure(Throwable t) { + done.setException(t); + } + }); + done.get(); + } finally { + if (publisher != null) { + publisher.shutdown(); + } + } + + final BlockingQueue queue = new ArrayBlockingQueue<>(1); + final SettableApiFuture done = SettableApiFuture.create(); + final SettableApiFuture received = SettableApiFuture.create(); + SubscriberSnippets snippets = + new SubscriberSnippets( + subscriptionName, + new MessageReceiverSnippets(queue).messageReceiver(), + done, + MoreExecutors.directExecutor()); + new Thread( + new Runnable() { + @Override + public void run() { + try { + received.set(queue.poll(10, TimeUnit.MINUTES)); + } catch (InterruptedException e) { + received.set(null); + } + done.set(null); // signal the subscriber to clean up + } + }) + .start(); + snippets.startAndWait(); // blocks until done is set + + PubsubMessage message = received.get(); + assertNotNull(message); + assertEquals(message.getData().toStringUtf8(), messageToPublish); + } +} diff --git a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITSubscriptionAdminClientSnippets.java b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITSubscriptionAdminClientSnippets.java index 82de90561295..e128b007d5dd 100644 --- a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITSubscriptionAdminClientSnippets.java +++ b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITSubscriptionAdminClientSnippets.java @@ -94,26 +94,6 @@ private Subscription createSubscription(String topicName, String subscriptionNam return subscription; } - @Test - public void publishAndPullMessagesIsSuccessful() throws Exception { - String topicName = topics[0]; - String subscriptionName = subscriptions[0]; - createSubscription(topicName, subscriptionName); - Set messages = publishMessages(topicName, 5); - //pulls max 100 messages - PullResponse response = subscriptionAdminClientSnippets.pull(subscriptionName); - assertNotNull(response); - //remove messages that match sent - for (ReceivedMessage receivedMessage : response.getReceivedMessagesList()) { - String message = receivedMessage.getMessage().getData().toStringUtf8(); - if (messages.contains(message)) { - messages.remove(message); - } - } - //all messages published were received - assertTrue(messages.isEmpty()); - } - @Test public void replacePushConfigIsSuccessful() throws Exception { String topicName = topics[0]; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriptionAdminClient.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriptionAdminClient.java index a9dd992088cb..531dfc99fa88 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriptionAdminClient.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriptionAdminClient.java @@ -463,7 +463,7 @@ public final UnaryCallable getSubscription * @param request The request object containing all of the parameters for the API call. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - public final Subscription updateSubscription(UpdateSubscriptionRequest request) { + /* package-private */ final Subscription updateSubscription(UpdateSubscriptionRequest request) { return updateSubscriptionCallable().call(request); } @@ -488,7 +488,8 @@ public final Subscription updateSubscription(UpdateSubscriptionRequest request) * } * */ - public final UnaryCallable updateSubscriptionCallable() { + /* package-private */ final UnaryCallable + updateSubscriptionCallable() { return updateSubscriptionCallable; } @@ -707,8 +708,7 @@ public final UnaryCallable deleteSubscriptionC * seconds. The maximum deadline you can specify is 600 seconds (10 minutes). * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final void modifyAckDeadline( + /* package-private */ final void modifyAckDeadline( SubscriptionName subscription, List ackIds, int ackDeadlineSeconds) { ModifyAckDeadlineRequest request = @@ -746,8 +746,7 @@ public final void modifyAckDeadline( * @param request The request object containing all of the parameters for the API call. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final void modifyAckDeadline(ModifyAckDeadlineRequest request) { + /* package-private */ final void modifyAckDeadline(ModifyAckDeadlineRequest request) { modifyAckDeadlineCallable().call(request); } @@ -776,8 +775,8 @@ public final void modifyAckDeadline(ModifyAckDeadlineRequest request) { * } * */ - @Deprecated - public final UnaryCallable modifyAckDeadlineCallable() { + /* package-private */ final UnaryCallable + modifyAckDeadlineCallable() { return modifyAckDeadlineCallable; } @@ -805,8 +804,7 @@ public final UnaryCallable modifyAckDeadlineCal * the Pub/Sub system in the `Pull` response. Must not be empty. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final void acknowledge(SubscriptionName subscription, List ackIds) { + /* package-private */ final void acknowledge(SubscriptionName subscription, List ackIds) { AcknowledgeRequest request = AcknowledgeRequest.newBuilder() @@ -841,8 +839,7 @@ public final void acknowledge(SubscriptionName subscription, List ackIds * @param request The request object containing all of the parameters for the API call. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final void acknowledge(AcknowledgeRequest request) { + /* package-private */ final void acknowledge(AcknowledgeRequest request) { acknowledgeCallable().call(request); } @@ -870,8 +867,7 @@ public final void acknowledge(AcknowledgeRequest request) { * } * */ - @Deprecated - public final UnaryCallable acknowledgeCallable() { + /* package-private */ final UnaryCallable acknowledgeCallable() { return acknowledgeCallable; } @@ -903,8 +899,7 @@ public final UnaryCallable acknowledgeCallable() { * may return fewer than the number specified. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final PullResponse pull( + /* package-private */ final PullResponse pull( SubscriptionName subscription, boolean returnImmediately, int maxMessages) { PullRequest request = @@ -939,8 +934,7 @@ public final PullResponse pull( * @param request The request object containing all of the parameters for the API call. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final PullResponse pull(PullRequest request) { + /* package-private */ final PullResponse pull(PullRequest request) { return pullCallable().call(request); } @@ -966,8 +960,7 @@ public final PullResponse pull(PullRequest request) { * } * */ - @Deprecated - public final UnaryCallable pullCallable() { + /* package-private */ final UnaryCallable pullCallable() { return pullCallable; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/TopicAdminClient.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/TopicAdminClient.java index d17c03ad1ab7..5212f2844096 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/TopicAdminClient.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/TopicAdminClient.java @@ -297,8 +297,8 @@ public final UnaryCallable createTopicCallable() { * @param messages The messages to publish. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final PublishResponse publish(TopicName topic, List messages) { + /* package-private */ final PublishResponse publish( + TopicName topic, List messages) { PublishRequest request = PublishRequest.newBuilder().setTopicWithTopicName(topic).addAllMessages(messages).build(); @@ -332,8 +332,7 @@ public final PublishResponse publish(TopicName topic, List messag * @param request The request object containing all of the parameters for the API call. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - @Deprecated - public final PublishResponse publish(PublishRequest request) { + /* package-private */ final PublishResponse publish(PublishRequest request) { return publishCallable().call(request); } @@ -363,8 +362,7 @@ public final PublishResponse publish(PublishRequest request) { * } * */ - @Deprecated - public final UnaryCallable publishCallable() { + /* package-private */ final UnaryCallable publishCallable() { return publishCallable; }