diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 3ed22ab5c263..fe9d28f421fb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -231,11 +231,12 @@ public ApiFuture publish(PubsubMessage message) { messagesBatches.remove(orderingKey); } setupAlarm(); - if (!batchesToSend.isEmpty()) { - // TODO: if this is not an ordering keys scenario, will this do anything? + // For messages with an ordering key, then we need to publish with messagesBatchLock held in + // order to ensure another publish doesn't slip in and send a batch before these batches we + // already want to send. + if (!batchesToSend.isEmpty() && !orderingKey.isEmpty()) { publishAllWithoutInflight(); - // TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock? for (final OutstandingBatch batch : batchesToSend) { logger.log(Level.FINER, "Scheduling a batch for immediate sending."); publishOutstandingBatch(batch); @@ -246,6 +247,16 @@ public ApiFuture publish(PubsubMessage message) { } messagesWaiter.incrementPendingMessages(1); + + // For messages without ordering keys, it is okay to send batches without holding + // messagesBatchLock. + if (!batchesToSend.isEmpty() && orderingKey.isEmpty()) { + for (final OutstandingBatch batch : batchesToSend) { + logger.log(Level.FINER, "Scheduling a batch for immediate sending."); + publishOutstandingBatch(batch); + } + } + return outstandingPublish.publishResult; } @@ -318,6 +329,7 @@ public void publishAllOutstanding() { * for messages to send, call {@code get} on the futures returned from {@code publish}. */ private void publishAllWithoutInflight() { + OutstandingBatch unorderedOutstandingBatch = null; messagesBatchLock.lock(); try { Iterator> it = messagesBatches.entrySet().iterator(); @@ -327,8 +339,11 @@ private void publishAllWithoutInflight() { String key = entry.getKey(); if (batch.isEmpty()) { it.remove(); - } else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) { - // TODO: Will this cause a performance problem for non-ordering keys scenarios? + } else if (key.isEmpty()) { + // We will publish the batch with no ordering key outside messagesBatchLock. + unorderedOutstandingBatch = batch.popOutstandingBatch(); + it.remove(); + } else if (!sequentialExecutor.hasTasksInflight(key)) { publishOutstandingBatch(batch.popOutstandingBatch()); it.remove(); } @@ -336,6 +351,9 @@ private void publishAllWithoutInflight() { } finally { messagesBatchLock.unlock(); } + if (unorderedOutstandingBatch != null) { + publishOutstandingBatch(unorderedOutstandingBatch); + } } private ApiFuture publishCall(OutstandingBatch outstandingBatch) {