Skip to content

Commit 2616345

Browse files
committed
Avoid division by 0 when computing partition
Fixes #271
1 parent c668137 commit 2616345

File tree

4 files changed

+60
-14
lines changed

4 files changed

+60
-14
lines changed

src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -33,10 +33,14 @@ class HashRoutingStrategy implements RoutingStrategy {
3333

3434
@Override
3535
public List<String> route(Message message, Metadata metadata) {
36-
String routingKey = routingKeyExtractor.apply(message);
37-
int hashValue = hash.applyAsInt(routingKey);
3836
List<String> partitions = metadata.partitions();
39-
return Collections.singletonList(
40-
partitions.get(Integer.remainderUnsigned(hashValue, partitions.size())));
37+
if (partitions.isEmpty()) {
38+
return Collections.emptyList();
39+
} else {
40+
String routingKey = routingKeyExtractor.apply(message);
41+
int hashValue = hash.applyAsInt(routingKey);
42+
return Collections.singletonList(
43+
partitions.get(Integer.remainderUnsigned(hashValue, partitions.size())));
44+
}
4145
}
4246
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -167,6 +167,9 @@ private DefaultSuperStreamMetadata(String superStream, StreamEnvironment environ
167167
c -> c.partitions(superStream),
168168
"Partition lookup for super stream '%s'",
169169
superStream));
170+
if (ps.isEmpty()) {
171+
throw new IllegalArgumentException("Super stream '" + superStream + "' has no partition");
172+
}
170173
this.partitions = new CopyOnWriteArrayList<>(ps);
171174
}
172175

src/test/java/com/rabbitmq/stream/impl/HashRoutingStrategyTest.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.stream.RoutingStrategy;
2020
import com.rabbitmq.stream.RoutingStrategy.Metadata;
2121
import java.util.Arrays;
22+
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
@@ -62,23 +63,43 @@ public String apply(Message message) {
6263
};
6364

6465
Metadata metadata =
65-
new Metadata() {
66+
new MetadataAdapter() {
6667
List<String> partitions = Arrays.asList("invoices-01", "invoices-02", "invoices-03");
6768

6869
@Override
6970
public List<String> partitions() {
7071
return partitions;
7172
}
72-
73-
@Override
74-
public List<String> route(String routingKey) {
75-
// not used here
76-
return null;
77-
}
7873
};
7974
for (String key : keys) {
8075
List<String> partitions = routingStrategy.route(null, metadata);
8176
assertThat(partitions).hasSize(1).element(0).isEqualTo(expectedRoutes.get(key));
8277
}
8378
}
79+
80+
@Test
81+
void shouldReturnEmptyListIfNoPartition() {
82+
RoutingStrategy routingStrategy = new HashRoutingStrategy(m -> "rk", HashUtils.MURMUR3);
83+
Metadata metadata =
84+
new MetadataAdapter() {
85+
@Override
86+
public List<String> partitions() {
87+
return Collections.emptyList();
88+
}
89+
};
90+
assertThat(routingStrategy.route(null, metadata)).isEmpty();
91+
}
92+
93+
private static class MetadataAdapter implements Metadata {
94+
95+
@Override
96+
public List<String> partitions() {
97+
return null;
98+
}
99+
100+
@Override
101+
public List<String> route(String routingKey) {
102+
return null;
103+
}
104+
}
84105
}

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,4 +331,22 @@ void producerShouldNotPublishMessagesOnceClosed() throws Exception {
331331
assertThat(latchAssert(publishLatch)).completes(5);
332332
assertThat(confirmationCodes).hasSize(1).containsExactly(Constants.CODE_PRODUCER_CLOSED);
333333
}
334+
335+
@Test
336+
void producerCreationShouldFailIfNoPartition() throws Exception {
337+
declareSuperStreamTopology(connection, superStream, 0);
338+
String producerName = "super-stream-application";
339+
assertThatThrownBy(
340+
() -> {
341+
environment
342+
.producerBuilder()
343+
.name(producerName)
344+
.superStream(superStream)
345+
.routing(message -> message.getProperties().getMessageIdAsString())
346+
.producerBuilder()
347+
.build();
348+
})
349+
.isInstanceOf(IllegalArgumentException.class)
350+
.hasMessageContaining("no partition");
351+
}
334352
}

0 commit comments

Comments
 (0)