Skip to content

Commit 185c433

Browse files
committed
When splitting and merging tokens, update both tokens involved
Before the introduction of persistent masks, split and merge tasks could focus only on the newly created or deleted segment. However, now that masks are persistent, the other segment involved must be persisted to store its modified mask.
1 parent ee92993 commit 185c433

5 files changed

Lines changed: 142 additions & 27 deletions

File tree

messaging/src/main/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/MergeTask.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,22 +153,24 @@ private Boolean mergeSegments(Segment thisSegment, TrackingToken thisToken,
153153
Segment thatSegment, TrackingToken thatToken) {
154154
try {
155155
Segment mergedSegment = thisSegment.mergedWith(thatSegment);
156-
// We want to keep the token with the segmentId obtained by the merge operation, and to delete the other
157-
int tokenToDelete = mergedSegment.getSegmentId() == thisSegment.getSegmentId()
158-
? thatSegment.getSegmentId() : thisSegment.getSegmentId();
159156
TrackingToken mergedToken = thatSegment.getSegmentId() < thisSegment.getSegmentId()
160157
? MergedTrackingToken.merged(thatToken, thisToken)
161158
: MergedTrackingToken.merged(thisToken, thatToken);
162159

163160
joinAndUnwrap(unitOfWorkFactory.create().executeWithResult(
164-
context -> tokenStore.deleteToken(name, tokenToDelete, context)
165-
.thenCompose(result -> tokenStore.storeToken(mergedToken,
166-
name,
167-
mergedSegment.getSegmentId(),
168-
context))
169-
.thenCompose(result -> tokenStore.releaseClaim(name,
170-
mergedSegment.getSegmentId(),
171-
context))
161+
context -> tokenStore.deleteToken(name, thisSegment.getSegmentId(), context)
162+
.thenCompose(result -> tokenStore.deleteToken(name, thatSegment.getSegmentId(), context))
163+
.thenCompose(result -> tokenStore.initializeSegment(
164+
mergedToken,
165+
name,
166+
mergedSegment,
167+
context
168+
))
169+
.thenCompose(result -> tokenStore.releaseClaim(
170+
name,
171+
mergedSegment.getSegmentId(),
172+
context
173+
))
172174
));
173175

174176
logger.info("Processor [{}] successfully merged {} with {} into {}.",

messaging/src/main/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/SplitTask.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,17 @@ private CompletableFuture<Void> splitAndRelease(@Nonnull TrackerStatus[] splitSt
158158
splitStatuses[0].getSegment().getSegmentId(),
159159
context
160160
))
161+
.thenCompose(result -> tokenStore.deleteToken(
162+
name,
163+
splitStatuses[0].getSegment().getSegmentId(),
164+
context
165+
))
166+
.thenCompose(result -> tokenStore.initializeSegment(
167+
splitStatuses[0].getTrackingToken(),
168+
name,
169+
splitStatuses[0].getSegment(),
170+
context
171+
))
161172
.thenRun(() -> logger.info(
162173
"Processor [{}] successfully split {} into {} and {}.",
163174
name, segmentToSplit, splitStatuses[0].getSegment(), splitStatuses[1].getSegment()

messaging/src/test/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/MergeTaskTest.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ void runMergeSegmentsFromWorkPackages() throws ExecutionException, InterruptedEx
117117
when(tokenStore.releaseClaim(anyString(), anyInt(), any())).thenReturn(FutureUtils.emptyCompletedFuture());
118118
when(tokenStore.fetchToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any()))
119119
.thenReturn(completedFuture(testTokenToBeMerged));
120+
when(tokenStore.initializeSegment(any(), eq(PROCESSOR_NAME), eq(new Segment(0, 0)), any()))
121+
.thenReturn(FutureUtils.emptyCompletedFuture());
122+
120123
workPackages.put(SEGMENT_TO_BE_MERGED, workPackageTwo);
121124

122125
ArgumentCaptor<TrackingToken> mergedTokenCaptor = ArgumentCaptor.forClass(TrackingToken.class);
@@ -126,10 +129,11 @@ void runMergeSegmentsFromWorkPackages() throws ExecutionException, InterruptedEx
126129
verify(tokenStore).fetchSegment(eq(PROCESSOR_NAME), eq(SEGMENT_ZERO.getSegmentId()), any());
127130
verify(tokenStore).fetchSegment(eq(PROCESSOR_NAME), eq(SEGMENT_ONE.getSegmentId()), any());
128131
verify(tokenStore).deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any());
129-
verify(tokenStore).storeToken(mergedTokenCaptor.capture(),
130-
eq(PROCESSOR_NAME),
131-
eq(SEGMENT_TO_MERGE),
132-
any());
132+
verify(tokenStore).deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_MERGE), any());
133+
verify(tokenStore).initializeSegment(mergedTokenCaptor.capture(),
134+
eq(PROCESSOR_NAME),
135+
eq(new Segment(0, 0)),
136+
any());
133137
TrackingToken resultToken = mergedTokenCaptor.getValue();
134138
assertTrue(resultToken.getClass().isAssignableFrom(MergedTrackingToken.class));
135139
assertEquals(testTokenToMerge, ((MergedTrackingToken) resultToken).lowerSegmentToken());
@@ -151,6 +155,8 @@ void runMergeSegmentsAfterClaimingBoth() throws ExecutionException, InterruptedE
151155
.thenReturn(completedFuture(testTokenToMerge));
152156
when(tokenStore.fetchToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any()))
153157
.thenReturn(completedFuture(testTokenToBeMerged));
158+
when(tokenStore.initializeSegment(any(), eq(PROCESSOR_NAME), eq(new Segment(0, 0)), any()))
159+
.thenReturn(FutureUtils.emptyCompletedFuture());
154160

155161
ArgumentCaptor<TrackingToken> mergedTokenCaptor = ArgumentCaptor.forClass(TrackingToken.class);
156162

@@ -159,10 +165,11 @@ void runMergeSegmentsAfterClaimingBoth() throws ExecutionException, InterruptedE
159165
verify(tokenStore).fetchSegment(eq(PROCESSOR_NAME), eq(SEGMENT_ZERO.getSegmentId()), any());
160166
verify(tokenStore).fetchSegment(eq(PROCESSOR_NAME), eq(SEGMENT_ONE.getSegmentId()), any());
161167
verify(tokenStore).deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any());
162-
verify(tokenStore).storeToken(mergedTokenCaptor.capture(),
163-
eq(PROCESSOR_NAME),
164-
eq(SEGMENT_TO_MERGE),
165-
any());
168+
verify(tokenStore).deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_MERGE), any());
169+
verify(tokenStore).initializeSegment(mergedTokenCaptor.capture(),
170+
eq(PROCESSOR_NAME),
171+
eq(new Segment(0, 0)),
172+
any());
166173
TrackingToken resultToken = mergedTokenCaptor.getValue();
167174
assertTrue(resultToken.getClass().isAssignableFrom(MergedTrackingToken.class));
168175
assertEquals(testTokenToMerge, ((MergedTrackingToken) resultToken).lowerSegmentToken());
@@ -188,6 +195,8 @@ void runMergeSegmentsFromWorkPackageAndClaimedSegment() throws ExecutionExceptio
188195
when(tokenStore.releaseClaim(anyString(), anyInt(), any())).thenReturn(FutureUtils.emptyCompletedFuture());
189196
when(tokenStore.fetchToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any()))
190197
.thenReturn(completedFuture(testTokenToBeMerged));
198+
when(tokenStore.initializeSegment(any(), eq(PROCESSOR_NAME), eq(new Segment(0, 0)), any()))
199+
.thenReturn(FutureUtils.emptyCompletedFuture());
191200

192201
ArgumentCaptor<TrackingToken> mergedTokenCaptor = ArgumentCaptor.forClass(TrackingToken.class);
193202

@@ -196,10 +205,11 @@ void runMergeSegmentsFromWorkPackageAndClaimedSegment() throws ExecutionExceptio
196205
verify(tokenStore).fetchSegment(eq(PROCESSOR_NAME), eq(SEGMENT_ZERO.getSegmentId()), any());
197206
verify(tokenStore).fetchSegment(eq(PROCESSOR_NAME), eq(SEGMENT_ONE.getSegmentId()), any());
198207
verify(tokenStore).deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any());
199-
verify(tokenStore).storeToken(mergedTokenCaptor.capture(),
200-
eq(PROCESSOR_NAME),
201-
eq(SEGMENT_TO_MERGE),
202-
any());
208+
verify(tokenStore).deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_MERGE), any());
209+
verify(tokenStore).initializeSegment(mergedTokenCaptor.capture(),
210+
eq(PROCESSOR_NAME),
211+
eq(new Segment(0, 0)),
212+
any());
203213
TrackingToken resultToken = mergedTokenCaptor.getValue();
204214
assertTrue(resultToken.getClass().isAssignableFrom(MergedTrackingToken.class));
205215
assertEquals(testTokenToMerge, ((MergedTrackingToken) resultToken).lowerSegmentToken());
@@ -236,6 +246,9 @@ void runCompletesExceptionallyThroughUnableToClaimTokenExceptionOnDelete() {
236246
when(workPackageTwo.abort(null)).thenReturn(FutureUtils.emptyCompletedFuture());
237247
when(tokenStore.fetchToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any()))
238248
.thenReturn(completedFuture(new GlobalSequenceTrackingToken(1)));
249+
when(tokenStore.deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_MERGE), any()))
250+
.thenReturn(FutureUtils.emptyCompletedFuture());
251+
239252
workPackages.put(SEGMENT_TO_BE_MERGED, workPackageTwo);
240253

241254
doThrow(new UnableToClaimTokenException("some exception"))
@@ -263,6 +276,8 @@ void runCompletesExceptionallyThroughOtherException() {
263276
when(workPackageTwo.abort(null)).thenReturn(FutureUtils.emptyCompletedFuture());
264277
when(tokenStore.fetchToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_BE_MERGED), any()))
265278
.thenReturn(completedFuture(new GlobalSequenceTrackingToken(1)));
279+
when(tokenStore.deleteToken(eq(PROCESSOR_NAME), eq(SEGMENT_TO_MERGE), any()))
280+
.thenReturn(FutureUtils.emptyCompletedFuture());
266281
workPackages.put(SEGMENT_TO_BE_MERGED, workPackageTwo);
267282

268283
doThrow(new IllegalStateException("some exception"))

messaging/src/test/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/PooledStreamingEventProcessorTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,70 @@ void splitSegment() {
11141114
() -> assertNotNull(testSubject.processingStatus().get(1))
11151115
);
11161116
}
1117+
1118+
@Test
1119+
void splitAndMergeSegmentOfGroupOf4() {
1120+
// given
1121+
int testSegmentId = 2;
1122+
int splitSegmentId = 6; // splitting segment 2 when there are 4 segments results in a new segment 6/7
1123+
int testTokenClaimInterval = 500;
1124+
1125+
withTestSubject(
1126+
List.of(),
1127+
c -> c.initialSegmentCount(4).tokenClaimInterval(testTokenClaimInterval)
1128+
);
1129+
1130+
// when
1131+
startEventProcessor();
1132+
1133+
// wait until the segment we want to split is in use, and verify all segments are correct in the token store:
1134+
await().untilAsserted(() -> {
1135+
assertNotNull(testSubject.processingStatus().get(testSegmentId));
1136+
assertThat(tokenStore.fetchSegments(PROCESSOR_NAME, null).join())
1137+
.containsExactlyInAnyOrder(
1138+
new Segment(0, 3),
1139+
new Segment(1, 3),
1140+
new Segment(2, 3),
1141+
new Segment(3, 3)
1142+
);
1143+
});
1144+
1145+
// split segment:
1146+
boolean success = testSubject.splitSegment(testSegmentId).join();
1147+
1148+
assertThat(success).isTrue();
1149+
1150+
// wait until the two split segments are in use, and verify all segments are correct in the token store:
1151+
await().untilAsserted(() -> {
1152+
assertNotNull(testSubject.processingStatus().get(testSegmentId));
1153+
assertNotNull(testSubject.processingStatus().get(splitSegmentId));
1154+
assertThat(tokenStore.fetchSegments(PROCESSOR_NAME, null).join())
1155+
.containsExactlyInAnyOrder(
1156+
new Segment(0, 3),
1157+
new Segment(1, 3),
1158+
new Segment(3, 3),
1159+
new Segment(2, 7),
1160+
new Segment(6, 7)
1161+
);
1162+
});
1163+
1164+
// merge segment:
1165+
success = testSubject.mergeSegment(1).join();
1166+
1167+
assertThat(success).isTrue();
1168+
1169+
// wait until the merged segments is in use, and verify all segments are correct in the token store:
1170+
await().untilAsserted(() -> {
1171+
assertNotNull(testSubject.processingStatus().get(1));
1172+
assertThat(tokenStore.fetchSegments(PROCESSOR_NAME, null).join())
1173+
.containsExactlyInAnyOrder(
1174+
new Segment(0, 3),
1175+
new Segment(1, 1),
1176+
new Segment(2, 7),
1177+
new Segment(6, 7)
1178+
);
1179+
});
1180+
}
11171181
}
11181182

11191183
@Nested

messaging/src/test/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/SplitTaskTest.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,24 @@ void runSplitsSegmentFromWorkPackage() throws ExecutionException, InterruptedExc
8282
when(workPackage.abort(null)).thenReturn(emptyCompletedFuture());
8383
when(tokenStore.fetchToken(eq(PROCESSOR_NAME), eq(SEGMENT_ID), any()))
8484
.thenReturn(completedFuture(testTokenToSplit));
85-
when(tokenStore.initializeSegment(any(), anyString(), any(Segment.class), any())).thenReturn(emptyCompletedFuture());
86-
when(tokenStore.releaseClaim(any(), anyInt(), any())).thenReturn(emptyCompletedFuture());
85+
when(tokenStore.initializeSegment(any(), eq(PROCESSOR_NAME), eq(new Segment(0, 1)), any()))
86+
.thenReturn(emptyCompletedFuture());
87+
when(tokenStore.initializeSegment(any(), eq(PROCESSOR_NAME), eq(new Segment(1, 1)), any()))
88+
.thenReturn(emptyCompletedFuture());
89+
when(tokenStore.releaseClaim(eq(PROCESSOR_NAME), anyInt(), any()))
90+
.thenReturn(emptyCompletedFuture());
91+
when(tokenStore.deleteToken(eq(PROCESSOR_NAME), eq(Segment.ROOT_SEGMENT.getSegmentId()), any()))
92+
.thenReturn(emptyCompletedFuture());
93+
8794
workPackages.put(SEGMENT_ID, workPackage);
8895

8996
testSubject.run();
9097

98+
// original token must be reinitialized (due to mask change):
99+
verify(tokenStore).initializeSegment(eq(expectedOriginal.getTrackingToken()),
100+
eq(PROCESSOR_NAME),
101+
eq(expectedOriginal.getSegment()),
102+
any());
91103
verify(tokenStore).initializeSegment(eq(expectedSplit.getTrackingToken()),
92104
eq(PROCESSOR_NAME),
93105
eq(expectedSplit.getSegment()),
@@ -112,11 +124,22 @@ void runSplitsSegmentAfterClaiming() throws ExecutionException, InterruptedExcep
112124
.thenReturn(completedFuture(Segment.ROOT_SEGMENT));
113125
when(tokenStore.fetchToken(eq(PROCESSOR_NAME), eq(SEGMENT_ID), any()))
114126
.thenReturn(completedFuture(testTokenToSplit));
115-
when(tokenStore.initializeSegment(any(), anyString(), any(Segment.class), any())).thenReturn(emptyCompletedFuture());
116-
when(tokenStore.releaseClaim(any(), anyInt(), any())).thenReturn(emptyCompletedFuture());
127+
when(tokenStore.initializeSegment(any(), eq(PROCESSOR_NAME), eq(new Segment(0, 1)), any()))
128+
.thenReturn(emptyCompletedFuture());
129+
when(tokenStore.initializeSegment(any(), eq(PROCESSOR_NAME), eq(new Segment(1, 1)), any()))
130+
.thenReturn(emptyCompletedFuture());
131+
when(tokenStore.releaseClaim(eq(PROCESSOR_NAME), eq(Segment.ROOT_SEGMENT.getSegmentId()), any()))
132+
.thenReturn(emptyCompletedFuture());
133+
when(tokenStore.deleteToken(eq(PROCESSOR_NAME), eq(Segment.ROOT_SEGMENT.getSegmentId()), any()))
134+
.thenReturn(emptyCompletedFuture());
117135

118136
testSubject.run();
119137

138+
// original token must be reinitialized (due to mask change):
139+
verify(tokenStore).initializeSegment(eq(expectedOriginal.getTrackingToken()),
140+
eq(PROCESSOR_NAME),
141+
eq(expectedOriginal.getSegment()),
142+
any());
120143
verify(tokenStore).initializeSegment(eq(expectedSplit.getTrackingToken()),
121144
eq(PROCESSOR_NAME),
122145
eq(expectedSplit.getSegment()),

0 commit comments

Comments
 (0)