From 8e9b9bddec55847d247181faf3bc47e679e0398d Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 1 May 2019 10:01:30 -0400 Subject: [PATCH 1/3] Pub/Sub: more refactoring - Using `Preconditions` - Adding `onSuccess` and `onFailure` to `OutstandingPublish` --- .../com/google/cloud/pubsub/v1/Publisher.java | 53 +++++++++---------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 9b0390ecde06..bdfb36b5651e 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -192,9 +192,7 @@ public String getTopicNameString() { * @return the message ID wrapped in a future. */ public ApiFuture publish(PubsubMessage message) { - if (shutdown.get()) { - throw new IllegalStateException("Cannot publish on a shut-down publisher."); - } + Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); @@ -288,23 +286,14 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { public void onSuccess(PublishResponse result) { try { if (result.getMessageIdsCount() != outstandingBatch.size()) { - Throwable t = - new IllegalStateException( - String.format( - "The publish result count %s does not match " - + "the expected %s results. Please contact Cloud Pub/Sub support " - + "if this frequently occurs", - result.getMessageIdsCount(), outstandingBatch.size())); - for (OutstandingPublish oustandingMessage : outstandingBatch.outstandingPublishes) { - oustandingMessage.publishResult.setException(t); - } - return; - } - - Iterator messagesResultsIt = - outstandingBatch.outstandingPublishes.iterator(); - for (String messageId : result.getMessageIdsList()) { - messagesResultsIt.next().publishResult.set(messageId); + outstandingBatch.onFailure(new IllegalStateException( + String.format( + "The publish result count %s does not match " + + "the expected %s results. Please contact Cloud Pub/Sub support " + + "if this frequently occurs", + result.getMessageIdsCount(), outstandingBatch.size()))); + } else { + outstandingBatch.onSuccess(result.getMessageIds()); } } finally { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); @@ -314,9 +303,7 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { try { - for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { - outstandingPublish.publishResult.setException(t); - } + outstandingBatch.onFailure(t); } finally { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } @@ -350,6 +337,19 @@ private List getMessages() { } return results; } + + private void onFailure(Throwable t) { + for (OutstandingPublish outstandingPublish : outstandingPublishes) { + outstandingPublish.publishResult.setException(t); + } + } + + private void onSuccess(Iterable results){ + Iterator messagesResultsIt = outstandingPublishes.iterator(); + for (String messageId : results) { + messagesResultsIt.next().publishResult.set(messageId); + } + } } private static final class OutstandingPublish { @@ -376,10 +376,9 @@ public BatchingSettings getBatchingSettings() { * should be invoked prior to deleting the {@link Publisher} object in order to ensure that no * pending messages are lost. */ - public void shutdown() throws Exception { - if (shutdown.getAndSet(true)) { - throw new IllegalStateException("Cannot shut down a publisher already shut-down."); - } + public void shutdown() { + Preconditions.checkState( + !shutdown.getAndSet(true), "Cannot shut down a publisher already shut-down."); if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) { currentAlarmFuture.cancel(false); } From b6596531194bb23da04731159da673c263b798bb Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 1 May 2019 10:19:35 -0400 Subject: [PATCH 2/3] Fixing a typo --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index bdfb36b5651e..d7741bb28332 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -293,7 +293,7 @@ public void onSuccess(PublishResponse result) { + "if this frequently occurs", result.getMessageIdsCount(), outstandingBatch.size()))); } else { - outstandingBatch.onSuccess(result.getMessageIds()); + outstandingBatch.onSuccess(result.getMessageIdsList()); } } finally { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); From ceb2b47be69cd2f5414be861116f1fce86052df8 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 1 May 2019 12:29:57 -0400 Subject: [PATCH 3/3] Fixing formating --- .../com/google/cloud/pubsub/v1/Publisher.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index d7741bb28332..31a57fcc9daa 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -286,12 +286,13 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { public void onSuccess(PublishResponse result) { try { if (result.getMessageIdsCount() != outstandingBatch.size()) { - outstandingBatch.onFailure(new IllegalStateException( - String.format( - "The publish result count %s does not match " - + "the expected %s results. Please contact Cloud Pub/Sub support " - + "if this frequently occurs", - result.getMessageIdsCount(), outstandingBatch.size()))); + outstandingBatch.onFailure( + new IllegalStateException( + String.format( + "The publish result count %s does not match " + + "the expected %s results. Please contact Cloud Pub/Sub support " + + "if this frequently occurs", + result.getMessageIdsCount(), outstandingBatch.size()))); } else { outstandingBatch.onSuccess(result.getMessageIdsList()); } @@ -344,7 +345,7 @@ private void onFailure(Throwable t) { } } - private void onSuccess(Iterable results){ + private void onSuccess(Iterable results) { Iterator messagesResultsIt = outstandingPublishes.iterator(); for (String messageId : results) { messagesResultsIt.next().publishResult.set(messageId);