diff --git a/src/main/java/model/exceptions/LightChainNetworkingException.java b/src/main/java/model/exceptions/LightChainNetworkingException.java index cff8c42a..bef1f521 100644 --- a/src/main/java/model/exceptions/LightChainNetworkingException.java +++ b/src/main/java/model/exceptions/LightChainNetworkingException.java @@ -3,4 +3,8 @@ /** * Represents a runtime exception happens on the Networking layer of LightChain. */ -public class LightChainNetworkingException extends Exception{ } +public class LightChainNetworkingException extends Exception { + public LightChainNetworkingException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/network/NetworkAdapter.java b/src/main/java/network/NetworkAdapter.java new file mode 100644 index 00000000..e5151104 --- /dev/null +++ b/src/main/java/network/NetworkAdapter.java @@ -0,0 +1,41 @@ +package network; + +import model.Entity; +import model.exceptions.LightChainDistributedStorageException; +import model.exceptions.LightChainNetworkingException; +import model.lightchain.Identifier; + +/** + * NetworkAdapter models the interface that is exposed to the conduits from the networking layer. + */ +public interface NetworkAdapter { + /** + * Sends the Entity through the Network to the remote target. + * + * @param e the Entity to be sent over the network. + * @param target Identifier of the receiver. + * @param channel channel on which this entity is sent. + * @throws LightChainNetworkingException any unhappy path taken on sending the Entity. + */ + void unicast(Entity e, Identifier target, String channel) throws LightChainNetworkingException; + + /** + * Stores given Entity on the underlying Distributed Hash Table (DHT) of nodes. + * + * @param e the Entity to be stored over the network. + * @param namespace namespace on which this entity is stored. + * @throws LightChainDistributedStorageException any unhappy path taken on storing the Entity. + */ + void put(Entity e, String namespace) throws LightChainDistributedStorageException; + + /** + * Retrieves the entity corresponding to the given identifier form the underlying Distributed Hash Table + * (DHT) of nodes. + * + * @param identifier identifier of the entity to be retrieved. + * @param namespace the namespace on which this query is resolved. + * @return the retrieved entity or null if it does not exist. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entity. + */ + Entity get(Identifier identifier, String namespace) throws LightChainDistributedStorageException; +} diff --git a/src/test/java/networking/Hub.java b/src/test/java/networking/Hub.java index 01861dbd..77a83555 100644 --- a/src/test/java/networking/Hub.java +++ b/src/test/java/networking/Hub.java @@ -10,6 +10,51 @@ * Models the core communication part of the networking layer that allows stub network instances to talk to each other. */ public class Hub { - private ConcurrentHashMap networks; - private ConcurrentHashMap entities; -} + private final ConcurrentHashMap networks; + private final ConcurrentHashMap entities; + + /** + * Create a hub. + */ + public Hub() { + this.networks = new ConcurrentHashMap<>(); + this.entities = new ConcurrentHashMap<>(); + } + + /** + * Registeration of a network to the Hub. + * + * @param identifier identifier of network. + * @param network to be registered. + */ + public void registerNetwork(Identifier identifier, Network network) { + networks.put(identifier, network); + } + + /** + * Transfer entity from to another network on the same channel. + * + * @param entity entity to be transferred. + * @param target identifier of target. + * @param channel channel on which the entity is delivered to target. + */ + public void transferEntity(Entity entity, Identifier target, String channel) throws IllegalStateException { + StubNetwork net = this.getNetwork(target); + try { + net.receiveUnicast(entity, channel); + } catch (IllegalArgumentException ex) { + throw new IllegalStateException("target network failed on receiving unicast: " + ex.getMessage()); + } + + } + + /** + * Get the network with identifier. + * + * @param identifier identity of network. + * @return network corresponding to identifier. + */ + private StubNetwork getNetwork(Identifier identifier) { + return (StubNetwork) networks.get(identifier); + } +} \ No newline at end of file diff --git a/src/test/java/networking/MockConduit.java b/src/test/java/networking/MockConduit.java new file mode 100644 index 00000000..7ff40438 --- /dev/null +++ b/src/test/java/networking/MockConduit.java @@ -0,0 +1,59 @@ +package networking; + +import model.Entity; +import model.exceptions.LightChainDistributedStorageException; +import model.exceptions.LightChainNetworkingException; +import model.lightchain.Identifier; +import network.Conduit; +import network.NetworkAdapter; + +/** + * MockConduit represents the Networking interface that is exposed to an Engine. + */ +public class MockConduit implements Conduit { + + private final String channel; + private final NetworkAdapter networkAdapter; + + public MockConduit(String channel, NetworkAdapter adapter) { + this.channel = channel; + this.networkAdapter = adapter; + } + + /** + * Sends the Entity through the Network to the remote target. + * + * @param e the Entity to be sent over the network. + * @param target Identifier of the receiver. + * @throws LightChainNetworkingException any unhappy path taken on sending the Entity. + */ + @Override + public void unicast(Entity e, Identifier target) throws LightChainNetworkingException { + this.networkAdapter.unicast(e, target, channel); + } + + /** + * Stores given Entity on the underlying Distributed Hash Table (DHT) of nodes. + * + * @param e the Entity to be stored over the network. + * @throws LightChainDistributedStorageException any unhappy path taken on storing the Entity. + */ + @Override + public void put(Entity e) throws LightChainDistributedStorageException { + + } + + /** + * Retrieves the entity corresponding to the given identifier form the underlying Distributed Hash Table + * (DHT) of nodes. + * + * @param identifier identifier of the entity to be retrieved. + * @return the retrieved entity or null if it does not exist. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entity. + */ + @Override + public Entity get(Identifier identifier) throws LightChainDistributedStorageException { + return null; + } + +} \ No newline at end of file diff --git a/src/test/java/networking/MockEngine.java b/src/test/java/networking/MockEngine.java index 70d82049..6b613b23 100644 --- a/src/test/java/networking/MockEngine.java +++ b/src/test/java/networking/MockEngine.java @@ -1,6 +1,8 @@ package networking; +import java.util.HashSet; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import model.Entity; import model.lightchain.Identifier; @@ -10,10 +12,56 @@ * Represents a mock implementation of Engine interface for testing. */ public class MockEngine implements Engine { - private Set receivedEntityIds; + private final ReentrantReadWriteLock lock; + private final Set receivedEntityIds; + + public MockEngine() { + this.receivedEntityIds = new HashSet<>(); + this.lock = new ReentrantReadWriteLock(); + } + + /** + * Called by Network whenever an Entity is arrived for this engine. + * + * @param e the arrived Entity from the network. + * @throws IllegalArgumentException any unhappy path taken on processing the Entity. + */ @Override public void process(Entity e) throws IllegalArgumentException { - // TODO: put e.Id() in the set. + lock.writeLock().lock(); + + receivedEntityIds.add(e.id()); + + lock.writeLock().unlock(); + } + + /** + * Check whether an entity is received. + * + * @param e the entity. + * @return true if the entity received, otherwise false. + */ + public boolean hasReceived(Entity e) { + lock.readLock().lock(); + + boolean ok = this.receivedEntityIds.contains(e.id()); + + lock.readLock().unlock(); + return ok; + } + + /** + * Total distinct entities this engine received. + * + * @return total messages it received. + */ + public int totalReceived() { + lock.readLock().lock(); + + int size = receivedEntityIds.size(); + + lock.readLock().unlock(); + return size; } -} +} \ No newline at end of file diff --git a/src/test/java/networking/StubNetwork.java b/src/test/java/networking/StubNetwork.java index 3da34d7c..11b13e4e 100644 --- a/src/test/java/networking/StubNetwork.java +++ b/src/test/java/networking/StubNetwork.java @@ -2,24 +2,127 @@ import java.util.concurrent.ConcurrentHashMap; +import model.Entity; +import model.exceptions.LightChainDistributedStorageException; +import model.exceptions.LightChainNetworkingException; +import model.lightchain.Identifier; import network.Conduit; import network.Network; +import network.NetworkAdapter; import protocol.Engine; +import unittest.fixtures.IdentifierFixture; /** * A mock implementation of networking layer as a test util. */ -public class StubNetwork implements Network { +public class StubNetwork implements Network, NetworkAdapter { private final ConcurrentHashMap engines; private final Hub hub; + private final Identifier identifier; + /** + * Create stubNetwork. + * + * @param hub the hub which stubnetwork registered is. + */ public StubNetwork(Hub hub) { this.engines = new ConcurrentHashMap<>(); this.hub = hub; + this.identifier = IdentifierFixture.newIdentifier(); + this.hub.registerNetwork(identifier, this); } + /** + * Get the identifier of the stubnet. + * + * @return identifier. + */ + public Identifier id() { + return this.identifier; + } + + /** + * Forward the incoming entity to the engine whose channel is given. + * + * @param entity received entity + * @param channel the channel through which the received entity is sent + */ + public void receiveUnicast(Entity entity, String channel) throws IllegalArgumentException { + Engine engine = getEngine(channel); + try { + engine.process(entity); + } catch (IllegalArgumentException e) { + throw new IllegalStateException("could not process the entity", e); + } + } + + /** + * Registers an Engine to the Network by providing it with a Conduit. + * + * @param en the Engine to be registered. + * @param channel the unique channel corresponding to the Engine. + * @return unique Conduit object created to connect the Network to the Engine. + * @throws IllegalStateException if the channel is already taken by another Engine. + */ + @Override + public Conduit register(Engine en, String channel) throws IllegalStateException { + Conduit conduit = new MockConduit(channel, this); + try { + if (engines.containsKey(channel)) { + throw new IllegalStateException(); + } + engines.put(channel, en); + } catch (IllegalArgumentException ex) { + throw new IllegalStateException("could not register the engine"); + } + return conduit; + } + + public Engine getEngine(String ch) { + return engines.get(ch); + } + + /** + * Sends the Entity through the Network to the remote target. + * + * @param e the Entity to be sent over the network. + * @param target Identifier of the receiver. + * @param channel channel on which this entity is sent. + * @throws LightChainNetworkingException any unhappy path taken on sending the Entity. + */ + @Override + public void unicast(Entity e, Identifier target, String channel) throws LightChainNetworkingException { + try { + this.hub.transferEntity(e, target, channel); + } catch (IllegalStateException ex) { + throw new LightChainNetworkingException("stub network could not transfer entity", ex); + } + + } + + /** + * Stores given Entity on the underlying Distributed Hash Table (DHT) of nodes. + * + * @param e the Entity to be stored over the network. + * @param namespace namespace on which this entity is stored. + * @throws LightChainDistributedStorageException any unhappy path taken on storing the Entity. + */ + @Override + public void put(Entity e, String namespace) throws LightChainDistributedStorageException { + + } + + /** + * Retrieves the entity corresponding to the given identifier form the underlying Distributed Hash Table + * (DHT) of nodes. + * + * @param identifier identifier of the entity to be retrieved. + * @param namespace the namespace on which this query is resolved. + * @return the retrieved entity or null if it does not exist. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entity. + */ @Override - public Conduit register(Engine e, String channel) throws IllegalStateException { + public Entity get(Identifier identifier, String namespace) throws LightChainDistributedStorageException { return null; } -} +} \ No newline at end of file diff --git a/src/test/java/networking/StubNetworkEpidemicTest.java b/src/test/java/networking/StubNetworkEpidemicTest.java new file mode 100644 index 00000000..9bd0766a --- /dev/null +++ b/src/test/java/networking/StubNetworkEpidemicTest.java @@ -0,0 +1,236 @@ +package networking; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import model.Entity; +import model.exceptions.LightChainNetworkingException; +import network.Conduit; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import unittest.fixtures.EntityFixture; + +/** + * Encapsulates one-to-all test cases of stub network. + */ +public class StubNetworkEpidemicTest { + private final String channel1 = "test-network-channel-1"; + private final String channel2 = "test-network-channel-2"; + private ArrayList networks; + private Hub hub; + + + /** + * Creates a hub with 10 connected networks, each network has two mock engines on different channels. + */ + @BeforeEach + void setup() { + this.networks = new ArrayList<>(); + this.hub = new Hub(); + for (int i = 0; i < 9; i++) { + StubNetwork stubNetwork = new StubNetwork(hub); + stubNetwork.register(new MockEngine(), channel1); + stubNetwork.register(new MockEngine(), channel2); + networks.add(stubNetwork); + } + } + + /** + * Test for Unicast one engine to all other stub networks. + */ + @Test + void testUnicastOneToAllSequentially() { + StubNetwork network1 = new StubNetwork(this.hub); + MockEngine a1 = new MockEngine(); + Conduit c1 = network1.register(a1, channel1); + Entity entity = new EntityFixture(); + + for (int i = 0; i < networks.size(); i++) { + try { + c1.unicast(entity, networks.get(i).id()); + MockEngine e1 = (MockEngine) networks.get(i).getEngine(channel1); + MockEngine e2 = (MockEngine) networks.get(i).getEngine(channel2); + + // only engine on channel-1 should receive the entity. + Assertions.assertTrue(e1.hasReceived(entity)); + Assertions.assertFalse(e2.hasReceived(entity)); + } catch (LightChainNetworkingException e) { + Assertions.fail(); + } + } + } + + /** + * Test one engine unicasts to all others concurrently. + */ + @Test + void testUnicastOneToAllConcurrently() { + int concurrencyDegree = 9; + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch sendDone = new CountDownLatch(concurrencyDegree); + + StubNetwork network1 = new StubNetwork(hub); + MockEngine a1 = new MockEngine(); + Conduit c1 = network1.register(a1, channel1); + + Entity entity = new EntityFixture(); + Thread[] unicastThreads = new Thread[concurrencyDegree]; + + for (int i = 0; i < networks.size(); i++) { + int finalI = i; + unicastThreads[i] = new Thread(() -> { + try { + c1.unicast(entity, (this.networks.get(finalI).id())); + MockEngine e1 = (MockEngine) this.networks.get(finalI).getEngine(channel1); + MockEngine e2 = (MockEngine) this.networks.get(finalI).getEngine(channel2); + if (!e1.hasReceived(entity)) { + threadError.getAndIncrement(); + } + if (e2.hasReceived(entity)) { + threadError.getAndIncrement(); + } + sendDone.countDown(); + } catch (LightChainNetworkingException e) { + threadError.getAndIncrement(); + } + }); + } + + for (Thread t : unicastThreads) { + t.start(); + } + try { + boolean doneOneTime = sendDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + + Assertions.assertEquals(0, threadError.get()); + } + + /** + * Test one engine sends unicast to some sequentially. + */ + @Test + void testUnicastOneToSomeSequentially() { + StubNetwork network1 = new StubNetwork(hub); + MockEngine a1 = new MockEngine(); + Conduit c1 = network1.register(a1, channel1); + + Entity entity = new EntityFixture(); + + // unicast only to the first half + for (int i = 0; i < networks.size() / 2; i++) { + try { + c1.unicast(entity, this.networks.get(i).id()); + } catch (LightChainNetworkingException e) { + Assertions.fail(); + } + } + + // checks only first half of network should receive it. + for (int i = 0; i < networks.size(); i++) { + // first half of networks should receive unicast + MockEngine e1 = (MockEngine) this.networks.get(i).getEngine(channel1); + MockEngine e2 = (MockEngine) this.networks.get(i).getEngine(channel2); + if (i < networks.size() / 2) { + + Assertions.assertTrue(e1.hasReceived(entity) // only engine on channel-1 should receive it. + && !e2.hasReceived(entity)); + } else { + Assertions.assertFalse(e1.hasReceived(entity) || e2.hasReceived(entity)); + } + } + + } + + /** + * Test one engine send unicast to some concurrently. + */ + @Test + void testUnicastOneToSomeConcurrently() { + int concurrencyDegree = networks.size() / 2; + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch sentDone = new CountDownLatch(concurrencyDegree); + StubNetwork network1 = new StubNetwork(hub); + + MockEngine a1 = new MockEngine(); + Conduit c1 = network1.register(a1, channel1); + Entity entity = new EntityFixture(); + Thread[] unicastThreads = new Thread[concurrencyDegree]; + + // concurrently unicasts to the first half of network + for (int i = 0; i < concurrencyDegree; i++) { + int finalI = i; + unicastThreads[i] = new Thread(() -> { + try { + c1.unicast(entity, this.networks.get(finalI).id()); + sentDone.countDown(); + } catch (LightChainNetworkingException e) { + threadError.getAndIncrement(); + } + }); + } + + for (Thread t : unicastThreads) { + t.start(); + } + try { + boolean doneOneTime = sentDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + + Assertions.assertEquals(0, threadError.get()); + + // checks only first half of network should receive it. + for (int i = 0; i < networks.size(); i++) { + // first half of networks should receive unicast + MockEngine e1 = (MockEngine) this.networks.get(i).getEngine(channel1); + MockEngine e2 = (MockEngine) this.networks.get(i).getEngine(channel2); + if (i < networks.size() / 2) { + + Assertions.assertTrue(e1.hasReceived(entity) // only engine on channel-1 should receive it. + && !e2.hasReceived(entity)); + } else { + Assertions.assertFalse(e1.hasReceived(entity) || e2.hasReceived(entity)); + } + } + + } + + /** + * Test two engines sends different distinct entities over distinct channels other engines sequentially. + */ + @Test + void testUnicastOneToAll_SequentiallyTwoEngines() { + StubNetwork network1 = new StubNetwork(hub); + MockEngine a1 = new MockEngine(); + MockEngine a2 = new MockEngine(); + + Conduit c1 = network1.register(a1, channel1); + Conduit c2 = network1.register(a2, channel2); + + Entity entity1 = new EntityFixture(); + Entity entity2 = new EntityFixture(); + + for (StubNetwork network : networks) { + try { + c1.unicast(entity1, network.id()); + c2.unicast(entity2, network.id()); + MockEngine e1 = (MockEngine) network.getEngine(channel1); + MockEngine e2 = (MockEngine) network.getEngine(channel2); + Assertions.assertTrue(e1.hasReceived(entity1) && e2.hasReceived(entity2)); + Assertions.assertFalse(e2.hasReceived(entity1) || e1.hasReceived(entity2)); + + } catch (LightChainNetworkingException e) { + Assertions.fail(); + } + } + } +} diff --git a/src/test/java/networking/StubNetworkTest.java b/src/test/java/networking/StubNetworkTest.java index 2cefc2d3..95ce2e86 100644 --- a/src/test/java/networking/StubNetworkTest.java +++ b/src/test/java/networking/StubNetworkTest.java @@ -1,20 +1,236 @@ package networking; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import model.Entity; +import model.exceptions.LightChainNetworkingException; +import network.Conduit; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import unittest.fixtures.EntityFixture; + /** - * Encapsulates tests for stub network implementation. + * Encapsulates tests for the stubnetwork. */ public class StubNetworkTest { - // TODO: add a test for each of the following scenarios: - // Use mock engines. - // 1. Engine A (on one stub network) can send message to Engine B (on another stub network) through its StubNetwork, - // and the message is received by Engine B. - // 2. Engine A can CONCURRENTLY send 100 messages to Engine B through its StubNetwork, - // and ALL messages received by Engine B. - // 3. Extend case 2 with Engine B also sending a reply message to Engine A for each received messages and all replies - // are received by Engine A. - // 4. Engines A and B on one StubNetwork can CONCURRENTLY send 100 messages to Engines C and D on another StubNetwork - // (A -> C) and (B -> D), and each Engine only - // receives messages destinated for it (C receives all messages from A) and (D receives all messages from B). - // Note that A and C must be on the same channel, and B and B must be on another same channel. - // 5. Stub network throws an exception if an engine is registering itself on an already taken channel. -} + + private final String channel1 = "test-network-channel-1"; + private final String channel2 = "test-network-channel-2"; + + /** + * Engine A (on one stub network) can send message to Engine B (on another stub network) through its StubNetwork, + * and the message is received by Engine B. + */ + @Test + void testTwoStubNetworksTwoEngines() { + Hub hub = new Hub(); + StubNetwork networkA = new StubNetwork(hub); + MockEngine engineA = new MockEngine(); + Conduit conduitA = networkA.register(engineA, channel1); + + StubNetwork networkB = new StubNetwork(hub); + MockEngine engineB = new MockEngine(); + networkB.register(engineB, channel1); + + Entity entity = new EntityFixture(); + try { + conduitA.unicast(entity, networkB.id()); + } catch (LightChainNetworkingException e) { + Assertions.fail(); + } + Assertions.assertTrue(engineB.hasReceived(entity)); + } + + /** + * Engine A can CONCURRENTLY send 100 messages to Engine B through its StubNetwork, + * and ALL messages received by Engine B. + */ + @Test + void testTwoStubNetworksTwoEnginesConcurrentMessages() { + Hub hub = new Hub(); + + int concurrencyDegree = 100; + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch countDownLatch = new CountDownLatch(concurrencyDegree); + Thread[] unicastThreads = new Thread[concurrencyDegree]; + + StubNetwork networkA = new StubNetwork(hub); + MockEngine engineA = new MockEngine(); + Conduit conduitA = networkA.register(engineA, channel1); + + StubNetwork networkB = new StubNetwork(hub); + MockEngine engineB = new MockEngine(); + networkB.register(engineB, channel1); + + for (int i = 0; i < concurrencyDegree; i++) { + unicastThreads[i] = new Thread(() -> { + Entity entity = new EntityFixture(); + try { + conduitA.unicast(entity, networkB.id()); + if (!engineB.hasReceived(entity)) { + threadError.getAndIncrement(); + } + countDownLatch.countDown(); + } catch (LightChainNetworkingException e) { + threadError.getAndIncrement(); + } + }); + } + + for (Thread t : unicastThreads) { + t.start(); + } + + try { + boolean doneOneTime = countDownLatch.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + Assertions.assertEquals(concurrencyDegree, engineB.totalReceived()); + } + + /** + * Engine A can CONCURRENTLY send 100 messages to Engine B through its StubNetwork, + * and ALL messages received by Engine B. + * Engine B also sending a reply message to Engine A for each received messages and all replies + * are received by Engine A. + */ + @Test + void testTwoStubNetworksTwoEnginesReplyConcurrentMessages() { + Hub hub = new Hub(); + + int concurrencyDegree = 100; + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch sendDone = new CountDownLatch(concurrencyDegree); + Thread[] unicastThreads = new Thread[concurrencyDegree]; + + StubNetwork networkA = new StubNetwork(hub); + MockEngine engineA = new MockEngine(); + Conduit conduitA = networkA.register(engineA, channel1); + + StubNetwork networkB = new StubNetwork(hub); + MockEngine engineB = new MockEngine(); + Conduit conduitB = networkB.register(engineB, channel1); + + for (int i = 0; i < concurrencyDegree; i++) { + unicastThreads[i] = new Thread(() -> { + Entity message = new EntityFixture(); + Entity reply = new EntityFixture(); + try { + // A -> B + conduitA.unicast(message, networkB.id()); + if (!engineB.hasReceived(message)) { + threadError.getAndIncrement(); + } + + // B -> A + conduitB.unicast(reply, networkA.id()); + if (!engineA.hasReceived(reply)) { + threadError.getAndIncrement(); + } + sendDone.countDown(); + } catch (LightChainNetworkingException e) { + threadError.getAndIncrement(); + } + }); + } + + for (Thread t : unicastThreads) { + t.start(); + } + + try { + boolean doneOneTime = sendDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + } + + /** + * Engines A1 and A2 on one StubNetwork can CONCURRENTLY send 100 messages to Engines B1 and B2 on another StubNetwork + * (A1 -> B1) and (A2 -> B2), and each Engine only + * receives messages destinated for it (B1 receives all messages from A1) and (B2 receives all messages from A2). + */ + @Test + void testTwoStubNetworksFourEnginesConcurrentMessages() { + Hub hub = new Hub(); + + int concurrencyDegree = 100; + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch sendDone = new CountDownLatch(concurrencyDegree); + Thread[] unicastThreads = new Thread[concurrencyDegree]; + + // network A + StubNetwork networkA = new StubNetwork(hub); + MockEngine engineA1 = new MockEngine(); + Conduit conduitA1 = networkA.register(engineA1, channel1); + + MockEngine engineA2 = new MockEngine(); + Conduit conduitA2 = networkA.register(engineA2, channel2); + + // network B + StubNetwork networkB = new StubNetwork(hub); + MockEngine engineB1 = new MockEngine(); + MockEngine engineB2 = new MockEngine(); + networkB.register(engineB1, channel1); + networkB.register(engineB2, channel2); + + for (int i = 0; i < concurrencyDegree; i++) { + unicastThreads[i] = new Thread(() -> { + Entity messageA1toB1 = new EntityFixture(); + Entity messageA2toB2 = new EntityFixture(); + try { + // A1 -> B1 + // A2 -> B2 + conduitA1.unicast(messageA1toB1, networkB.id()); + conduitA2.unicast(messageA2toB2, networkB.id()); + + if (!engineB1.hasReceived(messageA1toB1) + || engineB1.hasReceived(messageA2toB2) + || !engineB2.hasReceived(messageA2toB2) + || engineB2.hasReceived(messageA1toB1)) { + threadError.getAndIncrement(); + } + sendDone.countDown(); + } catch (LightChainNetworkingException e) { + threadError.getAndIncrement(); + } + }); + } + for (Thread t : unicastThreads) { + t.start(); + } + try { + boolean doneOneTime = sendDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + } + + /** + * Stub network throws an exception if an engine is registering itself on an already taken channel. + */ + @Test + void testRegisterToOccupiedChannel() { + Hub hub = new Hub(); + + StubNetwork network1 = new StubNetwork(hub); + MockEngine a1 = new MockEngine(); + network1.register(a1, channel1); + MockEngine b1 = new MockEngine(); + try { + network1.register(b1, channel1); + Assertions.fail("fail! method was expected to throw an exception"); + } catch (IllegalStateException ignored) { + // ignored + } + } +} \ No newline at end of file