diff --git a/datastore/src/main/java/io/spine/server/storage/datastore/DsSessionStorage.java b/datastore/src/main/java/io/spine/server/storage/datastore/DsSessionStorage.java
index d187a9bb..2d8132ff 100644
--- a/datastore/src/main/java/io/spine/server/storage/datastore/DsSessionStorage.java
+++ b/datastore/src/main/java/io/spine/server/storage/datastore/DsSessionStorage.java
@@ -99,17 +99,21 @@ public void write(ShardSessionRecord message) {
*
*
Returns the updated record if the update succeeded.
*
- *
Returns {@code Optional.empty()} if the update could not be executed, either because
- * the rules of the passed {@code RecordUpdate} prevented it, or due to a concurrent changes
- * which have happened to the corresponding Datastore entity.
+ *
Returns {@code Optional.empty()} if the update could not be executed, because
+ * the rules of the passed {@code RecordUpdate} prevented it.
*
* @param index
* index of a record to execute an update for
* @param update
* an update to perform
* @return a modified record, or {@code Optional.empty()} if the update could not be executed
+ * @throws DatastoreException
+ * if there is a problem communicating with Datastore, or if the entity could not
+ * be updated due to a concurrent changes which have happened to the corresponding
+ * Datastore entity.
*/
- Optional updateTransactionally(ShardIndex index, RecordUpdate update) {
+ Optional updateTransactionally(ShardIndex index, RecordUpdate update)
+ throws DatastoreException {
try (TransactionWrapper tx = newTransaction()) {
Key key = key(index);
Optional result = tx.read(key);
@@ -125,7 +129,7 @@ Optional updateTransactionally(ShardIndex index, RecordUpdat
}
return updated;
} catch (DatastoreException e) {
- return Optional.empty();
+ throw e;
} catch (RuntimeException e) {
throw newIllegalStateException(
e, "Cannot update the `ShardSessionRecord` with index `%s` in a transaction.",
@@ -168,7 +172,8 @@ private enum Column implements MessageColumn {
worker((m) -> {
WorkerId worker = m.getWorker();
- String value = worker.getNodeId().getValue() + '-' + worker.getValue();
+ String value = worker.getNodeId()
+ .getValue() + '-' + worker.getValue();
return StringValue.of(value);
}),
diff --git a/datastore/src/main/java/io/spine/server/storage/datastore/DsShardProcessingSession.java b/datastore/src/main/java/io/spine/server/storage/datastore/DsShardProcessingSession.java
deleted file mode 100644
index 5f30a78a..00000000
--- a/datastore/src/main/java/io/spine/server/storage/datastore/DsShardProcessingSession.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2022, TeamDev. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Redistribution and use in source and/or binary forms, with or without
- * modification, must retain the above copyright notice and the following
- * disclaimer.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.spine.server.storage.datastore;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.spine.server.delivery.ShardProcessingSession;
-import io.spine.server.delivery.ShardSessionRecord;
-
-/**
- * An implementation of a {@link ShardProcessingSession} based on Datastore.
- */
-final class DsShardProcessingSession extends ShardProcessingSession {
-
- private final Runnable completionCallback;
-
- DsShardProcessingSession(ShardSessionRecord record,
- Runnable completionCallback) {
- super(record);
- this.completionCallback = completionCallback;
- }
-
- @Override
- @VisibleForTesting // Otherwise should stay `protected`.
- public void complete() {
- completionCallback.run();
- }
-}
diff --git a/datastore/src/main/java/io/spine/server/storage/datastore/DsShardedWorkRegistry.java b/datastore/src/main/java/io/spine/server/storage/datastore/DsShardedWorkRegistry.java
index 1f10e18d..6d76a570 100644
--- a/datastore/src/main/java/io/spine/server/storage/datastore/DsShardedWorkRegistry.java
+++ b/datastore/src/main/java/io/spine/server/storage/datastore/DsShardedWorkRegistry.java
@@ -26,12 +26,13 @@
package io.spine.server.storage.datastore;
+import com.google.cloud.datastore.DatastoreException;
import com.google.protobuf.Duration;
import io.spine.logging.Logging;
import io.spine.server.NodeId;
import io.spine.server.delivery.AbstractWorkRegistry;
+import io.spine.server.delivery.PickUpOutcome;
import io.spine.server.delivery.ShardIndex;
-import io.spine.server.delivery.ShardProcessingSession;
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.WorkerId;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -41,6 +42,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static io.spine.base.Time.currentTime;
+import static io.spine.server.delivery.PickUpOutcomeMixin.alreadyPicked;
+import static io.spine.server.delivery.PickUpOutcomeMixin.pickedUp;
/**
* A {@link io.spine.server.delivery.ShardedWorkRegistry} based on the Google Datastore storage.
@@ -75,16 +78,33 @@ public DsShardedWorkRegistry(DatastoreStorageFactory factory) {
* The potential concurrent access to the same record is handled by using the Datastore
* transaction mechanism. In case of any parallel executions of {@code pickUp} operation,
* the one started earlier wins.
+ *
+ * @throws DatastoreException
+ * if there is a problem updating an entity in Datastore. This exception may signal
+ * about a technical issue communicating with Datastore, or about a concurrent
+ * change of a corresponding entity.
*/
@Override
- public synchronized Optional pickUp(ShardIndex index, NodeId nodeId) {
+ public synchronized PickUpOutcome pickUp(ShardIndex index, NodeId nodeId)
+ throws DatastoreException {
checkNotNull(index);
checkNotNull(nodeId);
WorkerId worker = currentWorkerFor(nodeId);
- Optional result =
- storage.updateTransactionally(index, new UpdateWorkerIfAbsent(index, worker));
- return result.map(this::asSession);
+ UpdateWorkerIfAbsent updateAction = new UpdateWorkerIfAbsent(index, worker);
+ Optional result = storage.updateTransactionally(index, updateAction);
+ if (result.isPresent()) {
+ return pickedUp(result.get());
+ } else {
+ ShardSessionRecord notUpdated = updateAction.previous()
+ .get();
+ return alreadyPicked(notUpdated.getWorker(), notUpdated.getWhenLastPicked());
+ }
+ }
+
+ @Override
+ public void release(ShardSessionRecord session) {
+ clearNode(session);
}
/**
@@ -93,11 +113,12 @@ public synchronized Optional pickUp(ShardIndex index, No
*/
@Override
protected WorkerId currentWorkerFor(NodeId id) {
- long threadId = Thread.currentThread().getId();
+ long threadId = Thread.currentThread()
+ .getId();
return WorkerId.newBuilder()
- .setNodeId(id)
- .setValue(Long.toString(threadId))
- .vBuild();
+ .setNodeId(id)
+ .setValue(Long.toString(threadId))
+ .vBuild();
}
@Override
@@ -126,11 +147,6 @@ protected Optional find(ShardIndex index) {
return read;
}
- @Override
- protected ShardProcessingSession asSession(ShardSessionRecord record) {
- return new DsShardProcessingSession(record, () -> clearNode(record));
- }
-
/**
* Obtains the session storage which persists the session records.
*/
@@ -143,11 +159,14 @@ protected DsSessionStorage storage() {
* {@link ShardIndex} if the record has not been picked by anyone.
*
* If there is no such a record, creates a new record.
+ *
+ *
Preserves the record state before updating if the supplied record is not {@code null}.
*/
private static class UpdateWorkerIfAbsent implements DsSessionStorage.RecordUpdate {
private final ShardIndex index;
private final WorkerId workerToSet;
+ private ShardSessionRecord previous;
private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {
this.index = index;
@@ -156,8 +175,11 @@ private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {
@Override
public Optional createOrUpdate(@Nullable ShardSessionRecord previous) {
- if (previous != null && previous.hasWorker()) {
- return Optional.empty();
+ if (previous != null) {
+ this.previous = previous;
+ if (previous.hasWorker()) {
+ return Optional.empty();
+ }
}
ShardSessionRecord.Builder builder =
previous == null
@@ -171,5 +193,14 @@ public Optional createOrUpdate(@Nullable ShardSessionRecord
.vBuild();
return Optional.of(updated);
}
+
+ /**
+ * Returns the {@code ShardSessionRecord} state before the update is executed, or empty
+ * {@code Optional} if the is no previous record or
+ * the {@linkplain #createOrUpdate(ShardSessionRecord) createOrUpdate()} is not called yet.
+ */
+ private Optional previous() {
+ return Optional.ofNullable(previous);
+ }
}
}
diff --git a/datastore/src/test/java/io/spine/server/storage/datastore/DsShardedWorkRegistryTest.java b/datastore/src/test/java/io/spine/server/storage/datastore/DsShardedWorkRegistryTest.java
index fabd89ee..9c86efee 100644
--- a/datastore/src/test/java/io/spine/server/storage/datastore/DsShardedWorkRegistryTest.java
+++ b/datastore/src/test/java/io/spine/server/storage/datastore/DsShardedWorkRegistryTest.java
@@ -31,8 +31,9 @@
import com.google.protobuf.util.Timestamps;
import io.spine.base.Identifier;
import io.spine.server.NodeId;
+import io.spine.server.delivery.PickUpOutcome;
+import io.spine.server.delivery.ShardAlreadyPickedUp;
import io.spine.server.delivery.ShardIndex;
-import io.spine.server.delivery.ShardProcessingSession;
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.ShardedWorkRegistry;
import io.spine.server.delivery.ShardedWorkRegistryTest;
@@ -80,11 +81,11 @@ protected ShardedWorkRegistry registry() {
@Test
@DisplayName("pick up the shard and write a corresponding record to the storage")
void pickUp() {
- Optional session = registry.pickUp(index, nodeId);
+ PickUpOutcome outcome = registry.pickUp(index, nodeId);
WorkerId expectedWorker = registry.currentWorkerFor(nodeId);
- assertThat(session).isPresent();
- assertThat(session.get()
- .shardIndex()).isEqualTo(index);
+ assertThat(outcome.hasSession()).isTrue();
+ assertThat(outcome.getSession()
+ .getIndex()).isEqualTo(index);
ShardSessionRecord record = readSingleRecord(index);
assertThat(record.getIndex()).isEqualTo(index);
@@ -95,42 +96,54 @@ void pickUp() {
@DisplayName("not be able to pick up the shard if it's already picked up")
void cannotPickUpIfTaken() {
- Optional session = registry.pickUp(index, nodeId);
- assertThat(session).isPresent();
+ PickUpOutcome outcome = registry.pickUp(index, nodeId);
+ assertThat(outcome.hasSession()).isTrue();
+ ShardSessionRecord session = outcome.getSession();
- Optional sameIdxSameNode = registry.pickUp(index, nodeId);
- assertThat(sameIdxSameNode).isEmpty();
+ PickUpOutcome sameIdxSameNode = registry.pickUp(index, nodeId);
+ assertThat(sameIdxSameNode.hasSession()).isFalse();
+ assertThat(sameIdxSameNode.hasAlreadyPicked()).isTrue();
- Optional sameIdxAnotherNode = registry.pickUp(index, newNode());
- assertThat(sameIdxAnotherNode).isEmpty();
+ ShardAlreadyPickedUp alreadyPicked = sameIdxSameNode.getAlreadyPicked();
+ assertThat(alreadyPicked.getWorker()).isEqualTo(session.getWorker());
+
+ PickUpOutcome sameIdxAnotherNode = registry.pickUp(index, newNode());
+ assertThat(sameIdxAnotherNode.hasSession()).isFalse();
+ assertThat(sameIdxAnotherNode.hasAlreadyPicked()).isTrue();
+
+ ShardAlreadyPickedUp anotherAlreadyPicked = sameIdxAnotherNode.getAlreadyPicked();
+ assertThat(anotherAlreadyPicked.getWorker()).isEqualTo(session.getWorker());
ShardIndex anotherIdx = newIndex(24, 100);
- Optional anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
- assertThat(anotherIdxSameNode).isPresent();
+ PickUpOutcome anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
+ assertThat(anotherIdxSameNode.hasSession()).isTrue();
+ ShardSessionRecord anotherSession = anotherIdxSameNode.getSession();
+
+ PickUpOutcome anotherIdxAnotherNode = registry.pickUp(anotherIdx, newNode());
+ assertThat(anotherIdxAnotherNode.hasSession()).isFalse();
+ assertThat(anotherIdxAnotherNode.hasAlreadyPicked()).isTrue();
- Optional anotherIdxAnotherNode =
- registry.pickUp(anotherIdx, newNode());
- assertThat(anotherIdxAnotherNode).isEmpty();
+ ShardAlreadyPickedUp oneMoreAnotherPicked = anotherIdxAnotherNode.getAlreadyPicked();
+ assertThat(oneMoreAnotherPicked.getWorker()).isEqualTo(anotherSession.getWorker());
}
@Test
- @DisplayName("complete the shard session (once picked up) and make it available for picking up")
- void completeSessionAndMakeItAvailable() {
- Optional optional = registry.pickUp(index, nodeId);
- assertThat(optional).isPresent();
+ @DisplayName("release the shard session (once picked up) and make it available for picking up")
+ void releaseSessionAndMakeItAvailable() {
+ PickUpOutcome outcome = registry.pickUp(index, nodeId);
+ assertThat(outcome.hasSession()).isTrue();
Timestamp whenPickedFirst = readSingleRecord(index).getWhenLastPicked();
- DsShardProcessingSession session = (DsShardProcessingSession) optional.get();
- session.complete();
+ registry.release(outcome.getSession());
ShardSessionRecord completedRecord = readSingleRecord(index);
assertThat(completedRecord.hasWorker()).isFalse();
NodeId anotherNode = newNode();
WorkerId anotherWorker = registry.currentWorkerFor(anotherNode);
- Optional anotherOptional = registry.pickUp(index, anotherNode);
- assertThat(anotherOptional).isPresent();
+ PickUpOutcome anotherOutcome = registry.pickUp(index, anotherNode);
+ assertThat(anotherOutcome.hasSession()).isTrue();
ShardSessionRecord secondSessionRecord = readSingleRecord(index);
assertThat(secondSessionRecord.getWorker()).isEqualTo(anotherWorker);
@@ -152,7 +165,8 @@ void notAcceptNulls() {
}
private ShardSessionRecord readSingleRecord(ShardIndex index) {
- Optional record = registry.storage().read(index);
+ Optional record = registry.storage()
+ .read(index);
assertThat(record).isPresent();
return record.get();
}
diff --git a/version.gradle.kts b/version.gradle.kts
index be01d0c4..75908b8e 100644
--- a/version.gradle.kts
+++ b/version.gradle.kts
@@ -36,5 +36,5 @@ val cloudPubsubV1Version: String by extra("1.105.8")
val cloudTraceVersion: String by extra("2.14.0")
val spineBaseVersion: String by extra("1.9.0-SNAPSHOT.5")
-val spineCoreVersion: String by extra("1.9.0-SNAPSHOT.10")
-val versionToPublish: String by extra("1.9.0-SNAPSHOT.3")
+val spineCoreVersion: String by extra("1.9.0-SNAPSHOT.11")
+val versionToPublish: String by extra("1.9.0-SNAPSHOT.4")