diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 9b0390ecde06..31a57fcc9daa 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -192,9 +192,7 @@ public String getTopicNameString() { * @return the message ID wrapped in a future. */ public ApiFuture publish(PubsubMessage message) { - if (shutdown.get()) { - throw new IllegalStateException("Cannot publish on a shut-down publisher."); - } + Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); @@ -288,23 +286,15 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { public void onSuccess(PublishResponse result) { try { if (result.getMessageIdsCount() != outstandingBatch.size()) { - Throwable t = + outstandingBatch.onFailure( new IllegalStateException( String.format( "The publish result count %s does not match " + "the expected %s results. Please contact Cloud Pub/Sub support " + "if this frequently occurs", - result.getMessageIdsCount(), outstandingBatch.size())); - for (OutstandingPublish oustandingMessage : outstandingBatch.outstandingPublishes) { - oustandingMessage.publishResult.setException(t); - } - return; - } - - Iterator messagesResultsIt = - outstandingBatch.outstandingPublishes.iterator(); - for (String messageId : result.getMessageIdsList()) { - messagesResultsIt.next().publishResult.set(messageId); + result.getMessageIdsCount(), outstandingBatch.size()))); + } else { + outstandingBatch.onSuccess(result.getMessageIdsList()); } } finally { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); @@ -314,9 +304,7 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { try { - for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { - outstandingPublish.publishResult.setException(t); - } + outstandingBatch.onFailure(t); } finally { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } @@ -350,6 +338,19 @@ private List getMessages() { } return results; } + + private void onFailure(Throwable t) { + for (OutstandingPublish outstandingPublish : outstandingPublishes) { + outstandingPublish.publishResult.setException(t); + } + } + + private void onSuccess(Iterable results) { + Iterator messagesResultsIt = outstandingPublishes.iterator(); + for (String messageId : results) { + messagesResultsIt.next().publishResult.set(messageId); + } + } } private static final class OutstandingPublish { @@ -376,10 +377,9 @@ public BatchingSettings getBatchingSettings() { * should be invoked prior to deleting the {@link Publisher} object in order to ensure that no * pending messages are lost. */ - public void shutdown() throws Exception { - if (shutdown.getAndSet(true)) { - throw new IllegalStateException("Cannot shut down a publisher already shut-down."); - } + public void shutdown() { + Preconditions.checkState( + !shutdown.getAndSet(true), "Cannot shut down a publisher already shut-down."); if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) { currentAlarmFuture.cancel(false); }