From 722374b18de4fcc7126a9721adfa5b000f2deb99 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 13 May 2022 12:59:17 +0200 Subject: [PATCH 1/9] disable router rebroadcast in tests --- .../scala/fr/acinq/eclair/TestConstants.scala | 4 +- .../acinq/eclair/router/BaseRouterSpec.scala | 1 - .../acinq/eclair/router/FrontRouterSpec.scala | 53 ++++++++++++------- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index b77d9fa3af..ad38f3e92c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -161,7 +161,7 @@ object TestConstants { routerConf = RouterConf( watchSpentWindow = 1 second, channelExcludeDuration = 60 seconds, - routerBroadcastInterval = 5 seconds, + routerBroadcastInterval = 1 day, // "disables" rebroadcast requestNodeAnnouncements = true, encodingType = EncodingType.COMPRESSED_ZLIB, channelRangeChunkSize = 20, @@ -299,7 +299,7 @@ object TestConstants { routerConf = RouterConf( watchSpentWindow = 1 second, channelExcludeDuration = 60 seconds, - routerBroadcastInterval = 5 seconds, + routerBroadcastInterval = 1 day, // "disables" rebroadcast requestNodeAnnouncements = true, encodingType = EncodingType.UNCOMPRESSED, channelRangeChunkSize = 20, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index 3c2c2cff05..12e4b3cf27 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -122,7 +122,6 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi import com.softwaremill.quicklens._ val nodeParams = Alice.nodeParams .modify(_.nodeKeyManager).setTo(testNodeKeyManager) - .modify(_.routerConf.routerBroadcastInterval).setTo(1 day) // "disable" auto rebroadcast val router = system.actorOf(Router.props(nodeParams, watcher.ref)) // we announce channels peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab)) diff --git a/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala index d05ea1f218..404d3a2c7f 100644 --- a/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala +++ b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.router import akka.actor.ActorSystem import akka.actor.typed.scaladsl.adapter.actorRefAdapter -import akka.testkit.{TestKit, TestProbe} +import akka.testkit.{TestFSMRef, TestKit, TestProbe} import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} import fr.acinq.bitcoin.scalacompat.{Block, SatoshiLong, Transaction, TxOut} @@ -32,9 +32,6 @@ import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire.protocol.Color import org.scalatest.funsuite.AnyFunSuiteLike -import scodec.bits._ - -import scala.concurrent.duration._ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike { @@ -131,12 +128,14 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab)) peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab)) - // we have to wait 2 times the broadcast interval because there is an additional per-peer delay - val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second - peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) - peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) - peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty)) - peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty)) + // manual rebroadcast + front1 ! Router.TickBroadcast + peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) + peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) + front2 ! Router.TickBroadcast + peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty)) + front3 ! Router.TickBroadcast + peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty)) } test("aggregate gossip") { @@ -149,9 +148,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike val system2 = ActorSystem("front-system-2") val system3 = ActorSystem("front-system-3") - val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router)) - val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, router)) - val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, router)) + val front1 = { + implicit val system: ActorSystem = system1 + TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router)) + } + val front2 = { + implicit val system: ActorSystem = system2 + TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router)) + } + val front3 = { + implicit val system: ActorSystem = system3 + TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router)) + } val peerConnection1a = TestProbe("peerconn-1a") val peerConnection1b = TestProbe("peerconn-1b") @@ -182,7 +190,6 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_bc)) peerConnection3a.expectMsg(GossipDecision.NoRelatedChannel(channelUpdate_bc)) - watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab)) @@ -207,12 +214,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_b)) peerConnection3a.expectMsg(GossipDecision.Accepted(ann_b)) - // we have to wait 2 times the broadcast interval because there is an additional per-peer delay - val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second - peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) - peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) - peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) - peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a)))) + awaitCond(front1.stateData.nodes.size == 2) + awaitCond(front2.stateData.nodes.size == 2) + awaitCond(front3.stateData.nodes.size == 2) + + // manual rebroadcast + front1 ! Router.TickBroadcast + peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + front2 ! Router.TickBroadcast + peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + front3 ! Router.TickBroadcast + peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a)))) } test("do not forward duplicate gossip") { From f6eb44880b13bb9bd89c30234b575101f09d0ff1 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 13 May 2022 16:48:25 +0200 Subject: [PATCH 2/9] use separate ActorSystem for alice and bob in tests --- .../ChannelStateTestsHelperMethods.scala | 26 ++++++++++++++----- .../b/WaitForFundingSignedStateSpec.scala | 2 +- .../c/WaitForFundingConfirmedStateSpec.scala | 10 +++---- .../channel/states/e/NormalStateSpec.scala | 12 ++++----- .../channel/states/e/OfflineStateSpec.scala | 2 +- .../channel/states/h/ClosingStateSpec.scala | 22 +++++++++------- 6 files changed, 46 insertions(+), 28 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala index cbffd9cc3a..b144a2c462 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala @@ -17,8 +17,8 @@ package fr.acinq.eclair.channel.states import akka.actor.typed.scaladsl.adapter.actorRefAdapter -import akka.actor.{ActorContext, ActorRef} -import akka.testkit.{TestFSMRef, TestKitBase, TestProbe} +import akka.actor.{ActorContext, ActorRef, ActorSystem} +import akka.testkit.{TestFSMRef, TestKit, TestKitBase, TestProbe} import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.ScriptFlags import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey @@ -106,6 +106,12 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { def currentBlockHeight: BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight } + val systemA: ActorSystem = ActorSystem("system-alice") + val systemB: ActorSystem = ActorSystem("system-bob") + + system.registerOnTermination(TestKit.shutdownActorSystem(systemA)) + system.registerOnTermination(TestKit.shutdownActorSystem(systemB)) + def init(nodeParamsA: NodeParams = TestConstants.Alice.nodeParams, nodeParamsB: NodeParams = TestConstants.Bob.nodeParams, wallet: OnChainWallet = new DummyOnChainWallet(), tags: Set[String] = Set.empty): SetupFixture = { val aliceOrigin = TestProbe() val alice2bob = TestProbe() @@ -119,8 +125,10 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { val alice2relayer = TestProbe() val bob2relayer = TestProbe() val channelUpdateListener = TestProbe() - system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate]) - system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown]) + systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate]) + systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown]) + systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate]) + systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown]) val router = TestProbe() val finalNodeParamsA = nodeParamsA .modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(5000 sat) @@ -132,8 +140,14 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { .modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(5000 sat) .modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(10000 sat) .modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(10000 sat) - val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref) - val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref) + val alice: TestFSMRef[ChannelState, ChannelData, Channel] = { + implicit val system: ActorSystem = systemA + TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref) + } + val bob: TestFSMRef[ChannelState, ChannelData, Channel] = { + implicit val system: ActorSystem = systemB + TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref) + } SetupFixture(alice, bob, aliceOrigin, alice2bob, bob2alice, alice2blockchain, bob2blockchain, router, alice2relayer, bob2relayer, channelUpdateListener, wallet, alicePeer, bobPeer) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala index b03b8d00e3..2a8a475072 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala @@ -82,7 +82,7 @@ class WaitForFundingSignedStateSpec extends TestKitBaseClass with FixtureAnyFunS test("recv FundingSigned with valid signature") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) bob2alice.expectMsgType[FundingSigned] bob2alice.forward(alice) awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala index 8b7a8c7480..3bb70833cb 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala @@ -54,7 +54,7 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF within(30 seconds) { val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) alice ! INPUT_INIT_FUNDER(ByteVector32.Zeroes, TestConstants.fundingSatoshis, pushMsat, TestConstants.feeratePerKw, TestConstants.feeratePerKw, aliceParams, alice2bob.ref, bobInit, ChannelFlags.Private, channelConfig, channelType) alice2blockchain.expectMsgType[TxPublisher.SetChannelId] bob ! INPUT_INIT_FUNDEE(ByteVector32.Zeroes, bobParams, bob2alice.ref, aliceInit, channelConfig, channelType) @@ -83,8 +83,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF import f._ // we create a new listener that registers after alice has published the funding tx val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) + bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) // make bob send a FundingLocked msg val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get bob ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx) @@ -102,8 +102,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF import f._ // we create a new listener that registers after alice has published the funding tx val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get alice ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx) assert(listener.expectMsgType[TransactionConfirmed].tx === fundingTx) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala index 59981ba15b..75d30245ac 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala @@ -74,7 +74,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with val initialState = alice.stateData.asInstanceOf[DATA_NORMAL] val sender = TestProbe() val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) val h = randomBytes32() val add = CMD_ADD_HTLC(sender.ref, 50000000 msat, h, CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), TestConstants.emptyOnionPacket, localOrigin(sender.ref)) alice ! add @@ -881,7 +881,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with bob2alice.expectMsgType[UpdateFulfillHtlc] // we listen to channel_update events val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate]) + bob.underlying.system.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate]) // actual test starts here // when signing the fulfill, bob will have its main output go above reserve in alice's commitment tx @@ -896,7 +896,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv CMD_SIGN (after CMD_UPDATE_FEE)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) alice ! CMD_UPDATE_FEE(FeeratePerKw(654564 sat)) alice2bob.expectMsgType[UpdateFee] alice ! CMD_SIGN() @@ -2734,7 +2734,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill @@ -2767,7 +2767,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill but doesn't sign @@ -2800,7 +2800,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala index c467c5d6ca..de429c16fe 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala @@ -527,7 +527,7 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] val initialCommitTx = initialState.commitments.localCommit.commitTxAndRemoteSig.commitTx.tx diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala index 6fcaca1652..1d1440ebac 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala @@ -89,15 +89,19 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with bob2blockchain.expectMsgType[WatchFundingConfirmed] awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED) awaitCond(bob.stateName == WAIT_FOR_FUNDING_CONFIRMED) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) withFixture(test.toNoArgTest(FixtureParam(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, alice2relayer, bob2relayer, channelUpdateListener, eventListener, Nil))) } } else { within(30 seconds) { reachNormal(setup, test.tags) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) val bobCommitTxs: List[CommitTxAndRemoteSig] = (for (amt <- List(100000000 msat, 200000000 msat, 300000000 msat)) yield { val (r, htlc) = addHtlc(amt, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) @@ -368,8 +372,8 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.channelFeatures === channelFeatures) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed]) - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) // alice sends an htlc to bob val (_, htlca1) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice) @@ -473,7 +477,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv WatchTxConfirmedTriggered (local commit with htlcs only signed by local)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) val aliceCommitTx = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx // alice sends an htlc val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice) @@ -522,7 +526,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv WatchTxConfirmedTriggered (local commit with fail not acked by remote)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) val (_, htlc) = addHtlc(25000000 msat, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) failHtlc(htlc.id, bob, alice, bob2alice, alice2bob) @@ -603,7 +607,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv WatchTxConfirmedTriggered (remote commit with htlcs only signed by local in next remote commit)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx // alice sends an htlc val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice) From f36bac7e243f8a95ea1d0c1e7210556fc2a54a16 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 13 May 2022 11:45:44 +0200 Subject: [PATCH 3/9] add debug command to router --- .../src/main/scala/fr/acinq/eclair/router/Router.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 2e2ecdcde1..c4dda53138 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -187,6 +187,14 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm sender() ! updates stay() + case Event(PrintChannelUpdates, d) => + println("public:") + d.channels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") } + println("private:") + d.privateChannels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") } + println("---------------------------------------------------") + stay() + case Event(GetRouterData, d) => sender() ! d stay() @@ -558,6 +566,7 @@ object Router { case object GetChannels case object GetChannelsMap case object GetChannelUpdates + case object PrintChannelUpdates // @formatter:on // @formatter:off From ddd3f85a49f486ad9a0313bdf50c069fe94eb916 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 13 May 2022 10:03:13 +0200 Subject: [PATCH 4/9] add a simple channel-router integration test The test currently fails, which looks like a bug to me. When a local channel graduates from private to public, we do not copy existing known `channel_update`s. Current implementation guarantees that we will process our local `channel_update` immediately, but what about our peer? --- .../scala/fr/acinq/eclair/router/Router.scala | 13 ++- .../ChannelStateTestsHelperMethods.scala | 3 +- .../router/ChannelRouterIntegrationSpec.scala | 107 ++++++++++++++++++ 3 files changed, 117 insertions(+), 6 deletions(-) create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index c4dda53138..5ff2b42b80 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.router import akka.Done import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated, typed} -import akka.event.DiagnosticLoggingAdapter +import akka.event.{DiagnosticLoggingAdapter, EventStream} import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi} @@ -50,7 +50,7 @@ import scala.util.{Random, Try} /** * Created by PM on 24/05/2016. */ -class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] { +class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None, eventStream_opt: Option[EventStream] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] { import Router._ @@ -59,9 +59,12 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm // we pass these to helpers classes so that they have the logging context implicit def implicitLog: DiagnosticLoggingAdapter = diagLog - context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate]) - context.system.eventStream.subscribe(self, classOf[LocalChannelDown]) - context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged]) + // this allows overriding the default eventstream in tests + val eventStream = eventStream_opt.getOrElse(context.system.eventStream) + + eventStream.subscribe(self, classOf[LocalChannelUpdate]) + eventStream.subscribe(self, classOf[LocalChannelDown]) + eventStream.subscribe(self, classOf[AvailableBalanceChanged]) startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval) startTimerWithFixedDelay(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala index b144a2c462..663d639939 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala @@ -193,7 +193,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { (aliceParams, bobParams, channelType) } - def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Unit = { + def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Transaction = { import setup._ @@ -245,6 +245,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { // x2 because alice and bob share the same relayer channelUpdateListener.expectMsgType[LocalChannelUpdate] channelUpdateListener.expectMsgType[LocalChannelUpdate] + fundingTx } def localOrigin(replyTo: ActorRef): Origin.LocalHot = Origin.LocalHot(replyTo, UUID.randomUUID()) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala new file mode 100644 index 0000000000..c83be953ad --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala @@ -0,0 +1,107 @@ +package fr.acinq.eclair.router + +import akka.actor.ActorSystem +import akka.testkit.{TestFSMRef, TestProbe} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingDeeplyBuriedTriggered +import fr.acinq.eclair.blockchain.bitcoind.{ZmqWatcher, ZmqWatcherSpec} +import fr.acinq.eclair.channel.DATA_NORMAL +import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} +import fr.acinq.eclair.io.Peer.PeerRoutingMessage +import fr.acinq.eclair.wire.protocol.AnnouncementSignatures +import fr.acinq.eclair.{BlockHeight, TestKitBaseClass} +import org.scalatest.funsuite.FixtureAnyFunSuiteLike +import org.scalatest.{Outcome, Tag} + +/** + * This test checks the integration between Channel and Router (events, etc.) + */ +class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase { + + case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], channels: SetupFixture, testTags: Set[String]) + + implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging + + override def withFixture(test: OneArgTest): Outcome = { + val channels = init(tags = test.tags) + val router: TestFSMRef[Router.State, Router.Data, Router] = { + // we use alice's actor system so we share the same event stream + implicit val system: ActorSystem = channels.alice.underlying.system + TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None)) + } + withFixture(test.toNoArgTest(FixtureParam(router, channels, test.tags))) + } + + test("private local channel") { f => + import f._ + + reachNormal(channels, testTags) + + awaitCond(router.stateData.privateChannels.size == 1) + + { + // only the local channel_update is known + val pc = router.stateData.privateChannels.head._2 + assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined) + } + + val peerConnection = TestProbe() + val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate + router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate) + + awaitCond { + // only the local channel_update is known + val pc = router.stateData.privateChannels.head._2 + pc.update_1_opt.isDefined && pc.update_2_opt.isDefined + } + + } + + test("public local channel", Tag(ChannelStateTestsTags.ChannelsPublic)) { f => + import f._ + + val fundingTx = reachNormal(channels, testTags) + + awaitCond(router.stateData.privateChannels.size == 1) + + { + val pc = router.stateData.privateChannels.head._2 + // only the local channel_update is known + assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined) + } + + val peerConnection = TestProbe() + val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate + router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate) + + awaitCond { + val pc = router.stateData.privateChannels.head._2 + // both channel_updates are known + pc.update_1_opt.isDefined && pc.update_2_opt.isDefined + } + + // funding tx reaches 6 blocks, announcements are exchanged + channels.alice ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null) + channels.alice2bob.expectMsgType[AnnouncementSignatures] + channels.alice2bob.forward(channels.bob) + + channels.bob ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null) + channels.bob2alice.expectMsgType[AnnouncementSignatures] + channels.bob2alice.forward(channels.alice) + + // router gets notified and attempts to validate the local channel + val vr = channels.alice2blockchain.expectMsgType[ZmqWatcher.ValidateRequest] + vr.replyTo ! ZmqWatcher.ValidateResult(vr.ann, Right((fundingTx, ZmqWatcher.UtxoStatus.Unspent))) + + awaitCond { + router.stateData.privateChannels.isEmpty && router.stateData.channels.size == 1 + } + + awaitCond { + val pc = router.stateData.channels.head._2 + // both channel updates are preserved + pc.update_1_opt.isDefined && pc.update_2_opt.isDefined + } + + } + +} From 6528dda44ea53f99d9036adce67ca5776eb7366f Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 13 May 2022 10:06:20 +0200 Subject: [PATCH 5/9] fix bug found in new channel-router test --- .../fr/acinq/eclair/router/Validation.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index ed0cce7918..f61cdd86db 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -118,9 +118,14 @@ object Validation { val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures()) ctx.self ! nodeAnn } - // public channels that haven't yet been announced are considered as private channels - val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta) - Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt)) + // maybe this previously was a local unannounced channel + val privateChannel_opt = d0.privateChannels.get(c.shortChannelId) + Some(PublicChannel(c, + tx.txid, + capacity, + update_1_opt = privateChannel_opt.flatMap(_.update_1_opt), + update_2_opt = privateChannel_opt.flatMap(_.update_2_opt), + meta_opt = privateChannel_opt.map(_.meta))) } case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => if (fundingTxStatus.spendingTxConfirmed) { @@ -135,7 +140,7 @@ object Validation { db.removeChannel(c.shortChannelId) None } - // we also reprocess node and channel_update announcements related to channels that were just analyzed + // we also reprocess node and channel_update announcements related to the channel that was just analyzed val reprocessUpdates = d0.stash.updates.view.filterKeys(u => u.shortChannelId == c.shortChannelId) val reprocessNodes = d0.stash.nodes.view.filterKeys(n => isRelatedTo(c, n.nodeId)) // and we remove the reprocessed messages from the stash @@ -145,12 +150,13 @@ object Validation { publicChannel_opt match { case Some(pc) => - // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update - // right after the channel_announcement, channel_updates will be moved from private to public at that time val d1 = d0.copy( channels = d0.channels + (c.shortChannelId -> pc), privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before - rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue + rebroadcast = d0.rebroadcast.copy( + channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers + updates = d0.rebroadcast.updates ++ (pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).map(_ -> Set.empty[GossipOrigin]).toMap // those updates are only defined if this was a previously an unannounced local channel, we broadcast them + ), // we also add the newly validated channels to the rebroadcast queue stash = stash1, awaiting = awaiting1) // we only reprocess updates and nodes if validation succeeded From 31de18816abb173985cc72013633d1d118bdfdd4 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 13 May 2022 13:47:59 +0200 Subject: [PATCH 6/9] fix rebroadcast for local announcements We fix a second bug where gossip origin wasn't properly set for local announcements --- .../scala/fr/acinq/eclair/router/Router.scala | 15 ++++----- .../fr/acinq/eclair/router/Validation.scala | 25 +++++++------- .../router/ChannelRouterIntegrationSpec.scala | 33 ++++++++++++++----- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 5ff2b42b80..85b4419420 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.router import akka.Done import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated, typed} -import akka.event.{DiagnosticLoggingAdapter, EventStream} +import akka.event.DiagnosticLoggingAdapter import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi} @@ -50,7 +50,7 @@ import scala.util.{Random, Try} /** * Created by PM on 24/05/2016. */ -class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None, eventStream_opt: Option[EventStream] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] { +class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] { import Router._ @@ -59,12 +59,9 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm // we pass these to helpers classes so that they have the logging context implicit def implicitLog: DiagnosticLoggingAdapter = diagLog - // this allows overriding the default eventstream in tests - val eventStream = eventStream_opt.getOrElse(context.system.eventStream) - - eventStream.subscribe(self, classOf[LocalChannelUpdate]) - eventStream.subscribe(self, classOf[LocalChannelDown]) - eventStream.subscribe(self, classOf[AvailableBalanceChanged]) + context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate]) + context.system.eventStream.subscribe(self, classOf[LocalChannelDown]) + context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged]) startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval) startTimerWithFixedDelay(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour) @@ -616,7 +613,7 @@ object Router { channels: SortedMap[ShortChannelId, PublicChannel], stash: Stash, rebroadcast: Rebroadcast, - awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done + awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done privateChannels: Map[ShortChannelId, PrivateChannel], excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graph: DirectedGraph, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index f61cdd86db..1e74f5d1d8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -81,17 +81,20 @@ object Validation { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors import nodeParams.db.{network => db} import r.c - d0.awaiting.get(c) match { - case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement + // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement + // (the other ones have already been acknowledged as duplicates) + d0.awaiting.getOrElse(c, Seq.empty).headOption match { + case Some(origin: RemoteGossip) => origin.peerConnection ! TransportHandler.ReadAck(c) + case Some(LocalGossip) => () // there is nothing to ack if it was a local gossip case _ => () } - val remoteOrigins_opt = d0.awaiting.get(c) - Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first + val remoteOrigins = d0.awaiting.getOrElse(c, Set.empty).collect { case rg: RemoteGossip => rg } + Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins.headOption.map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first log.debug("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size) val publicChannel_opt = r match { case ValidateResult(c, Left(t)) => log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c))) None case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) => val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId) @@ -103,12 +106,12 @@ object Validation { } if (fundingOutputIsInvalid) { log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c))) None } else { watcher ! WatchExternalChannelSpent(ctx.self, tx.txid, outputIndex, c.shortChannelId) log.debug("added channel channelId={}", c.shortChannelId) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))) val capacity = tx.txOut(outputIndex).amount ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil)) db.addChannel(c, tx.txid, capacity) @@ -131,10 +134,10 @@ object Validation { if (fundingTxStatus.spendingTxConfirmed) { log.debug("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid) // the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c))) } else { log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c))) } // there may be a record if we have just restarted db.removeChannel(c.shortChannelId) @@ -155,7 +158,7 @@ object Validation { privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before rebroadcast = d0.rebroadcast.copy( channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers - updates = d0.rebroadcast.updates ++ (pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).map(_ -> Set.empty[GossipOrigin]).toMap // those updates are only defined if this was a previously an unannounced local channel, we broadcast them + updates = d0.rebroadcast.updates ++ (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet).map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin])).toMap // those updates are only defined if this was a previously an unannounced local channel, we broadcast them ), // we also add the newly validated channels to the rebroadcast queue stash = stash1, awaiting = awaiting1) @@ -425,7 +428,7 @@ object Validation { case Some(c) => // channel wasn't announced but here is the announcement, we will process it *before* the channel_update watcher ! ValidateRequest(ctx.self, c) - val d1 = d.copy(awaiting = d.awaiting + (c -> Nil)) // no origin + val d1 = d.copy(awaiting = d.awaiting + (c -> Seq(LocalGossip))) // no origin // maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks) db.removeFromPruned(c.shortChannelId) handleChannelUpdate(d1, db, routerConf, Left(lcu)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala index c83be953ad..eb2c61bd6a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala @@ -2,11 +2,12 @@ package fr.acinq.eclair.router import akka.actor.ActorSystem import akka.testkit.{TestFSMRef, TestProbe} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingDeeplyBuriedTriggered -import fr.acinq.eclair.blockchain.bitcoind.{ZmqWatcher, ZmqWatcherSpec} import fr.acinq.eclair.channel.DATA_NORMAL import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} import fr.acinq.eclair.io.Peer.PeerRoutingMessage +import fr.acinq.eclair.router.Router.{GossipOrigin, LocalGossip} import fr.acinq.eclair.wire.protocol.AnnouncementSignatures import fr.acinq.eclair.{BlockHeight, TestKitBaseClass} import org.scalatest.funsuite.FixtureAnyFunSuiteLike @@ -17,18 +18,20 @@ import org.scalatest.{Outcome, Tag} */ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase { - case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], channels: SetupFixture, testTags: Set[String]) + case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], rebroadcastListener: TestProbe, channels: SetupFixture, testTags: Set[String]) implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging override def withFixture(test: OneArgTest): Outcome = { val channels = init(tags = test.tags) + val rebroadcastListener = TestProbe() val router: TestFSMRef[Router.State, Router.Data, Router] = { // we use alice's actor system so we share the same event stream implicit val system: ActorSystem = channels.alice.underlying.system + system.eventStream.subscribe(rebroadcastListener.ref, classOf[Router.Rebroadcast]) TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None)) } - withFixture(test.toNoArgTest(FixtureParam(router, channels, test.tags))) + withFixture(test.toNoArgTest(FixtureParam(router, rebroadcastListener, channels, test.tags))) } test("private local channel") { f => @@ -40,7 +43,7 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu { // only the local channel_update is known - val pc = router.stateData.privateChannels.head._2 + val pc = router.stateData.privateChannels.values.head assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined) } @@ -50,10 +53,14 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu awaitCond { // only the local channel_update is known - val pc = router.stateData.privateChannels.head._2 + val pc = router.stateData.privateChannels.values.head pc.update_1_opt.isDefined && pc.update_2_opt.isDefined } + // manual rebroadcast + router ! Router.TickBroadcast + rebroadcastListener.expectNoMessage() + } test("public local channel", Tag(ChannelStateTestsTags.ChannelsPublic)) { f => @@ -64,20 +71,22 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu awaitCond(router.stateData.privateChannels.size == 1) { - val pc = router.stateData.privateChannels.head._2 + val pc = router.stateData.privateChannels.values.head // only the local channel_update is known assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined) } val peerConnection = TestProbe() + val aliceChannelUpdate = channels.alice.stateData.asInstanceOf[DATA_NORMAL].channelUpdate val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate) awaitCond { - val pc = router.stateData.privateChannels.head._2 + val pc = router.stateData.privateChannels.values.head // both channel_updates are known pc.update_1_opt.isDefined && pc.update_2_opt.isDefined } + val privateChannel = router.stateData.privateChannels.values.head // funding tx reaches 6 blocks, announcements are exchanged channels.alice ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null) @@ -97,11 +106,19 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu } awaitCond { - val pc = router.stateData.channels.head._2 + val pc = router.stateData.channels.values.head // both channel updates are preserved pc.update_1_opt.isDefined && pc.update_2_opt.isDefined } + // manual rebroadcast + router ! Router.TickBroadcast + rebroadcastListener.expectMsg(Router.Rebroadcast( + channels = Map(vr.ann -> Set[GossipOrigin](LocalGossip)), + updates = Map(aliceChannelUpdate -> Set[GossipOrigin](LocalGossip), bobChannelUpdate -> Set.empty[GossipOrigin]), // broadcast the channel_updates (they were previously unannounced) + nodes = Map(router.underlyingActor.stateData.nodes.values.head -> Set[GossipOrigin](LocalGossip)), // new node_announcement + )) + } } From 30412766fdd701b03b4d5fc8922f42e9a7f131d6 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 13 May 2022 19:28:27 +0200 Subject: [PATCH 7/9] increase ram for tests to 2G --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 52490a781a..f6272e3e76 100644 --- a/pom.xml +++ b/pom.xml @@ -235,7 +235,7 @@ ${project.build.directory} - -Xmx1024m -Dfile.encoding=UTF-8 + -Xmx2048m -Dfile.encoding=UTF-8 From a7a35c3bb836df507cdc6a39548111f4ca61bbf3 Mon Sep 17 00:00:00 2001 From: pm47 Date: Wed, 18 May 2022 17:21:08 +0200 Subject: [PATCH 8/9] address review comments --- .../scala/fr/acinq/eclair/router/Router.scala | 9 -------- .../fr/acinq/eclair/router/Validation.scala | 9 ++++++-- .../router/ChannelRouterIntegrationSpec.scala | 21 ++++++++++--------- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 85b4419420..9cca8e5dac 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -187,14 +187,6 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm sender() ! updates stay() - case Event(PrintChannelUpdates, d) => - println("public:") - d.channels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") } - println("private:") - d.privateChannels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") } - println("---------------------------------------------------") - stay() - case Event(GetRouterData, d) => sender() ! d stay() @@ -566,7 +558,6 @@ object Router { case object GetChannels case object GetChannelsMap case object GetChannelUpdates - case object PrintChannelUpdates // @formatter:on // @formatter:off diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 1e74f5d1d8..2607a04651 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -19,6 +19,7 @@ package fr.acinq.eclair.router import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{ActorContext, ActorRef, typed} import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter} +import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher @@ -153,12 +154,16 @@ object Validation { publicChannel_opt match { case Some(pc) => + // those updates are only defined if this was a previously an unannounced local channel, we broadcast them if they use the real scid + val updates1 = (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet) + .map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin])) + .toMap val d1 = d0.copy( channels = d0.channels + (c.shortChannelId -> pc), - privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before + privateChannels = d0.privateChannels - c.shortChannelId, // we remove the corresponding unannounced channel that we may have until now rebroadcast = d0.rebroadcast.copy( channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers - updates = d0.rebroadcast.updates ++ (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet).map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin])).toMap // those updates are only defined if this was a previously an unannounced local channel, we broadcast them + updates = d0.rebroadcast.updates ++ updates1 ), // we also add the newly validated channels to the rebroadcast queue stash = stash1, awaiting = awaiting1) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala index eb2c61bd6a..ad98627e6f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala @@ -8,7 +8,7 @@ import fr.acinq.eclair.channel.DATA_NORMAL import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} import fr.acinq.eclair.io.Peer.PeerRoutingMessage import fr.acinq.eclair.router.Router.{GossipOrigin, LocalGossip} -import fr.acinq.eclair.wire.protocol.AnnouncementSignatures +import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelUpdate} import fr.acinq.eclair.{BlockHeight, TestKitBaseClass} import org.scalatest.funsuite.FixtureAnyFunSuiteLike import org.scalatest.{Outcome, Tag} @@ -39,21 +39,22 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu reachNormal(channels, testTags) - awaitCond(router.stateData.privateChannels.size == 1) + awaitAssert(router.stateData.privateChannels.size === 1) { - // only the local channel_update is known + // only the local channel_update is known (bob won't send his before the channel is deeply buried) val pc = router.stateData.privateChannels.values.head assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined) } val peerConnection = TestProbe() + // bob hasn't yet sent his channel_update but we can get it by looking at its internal data val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate) - awaitCond { - // only the local channel_update is known + awaitAssert { val pc = router.stateData.privateChannels.values.head + // both channel_updates are known pc.update_1_opt.isDefined && pc.update_2_opt.isDefined } @@ -68,7 +69,7 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu val fundingTx = reachNormal(channels, testTags) - awaitCond(router.stateData.privateChannels.size == 1) + awaitAssert(router.stateData.privateChannels.size === 1) { val pc = router.stateData.privateChannels.values.head @@ -77,16 +78,16 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu } val peerConnection = TestProbe() + // alice and bob haven't yet sent their channel_updates but we can get them by looking at their internal data val aliceChannelUpdate = channels.alice.stateData.asInstanceOf[DATA_NORMAL].channelUpdate val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate) - awaitCond { + awaitAssert { val pc = router.stateData.privateChannels.values.head // both channel_updates are known pc.update_1_opt.isDefined && pc.update_2_opt.isDefined } - val privateChannel = router.stateData.privateChannels.values.head // funding tx reaches 6 blocks, announcements are exchanged channels.alice ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null) @@ -101,11 +102,11 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu val vr = channels.alice2blockchain.expectMsgType[ZmqWatcher.ValidateRequest] vr.replyTo ! ZmqWatcher.ValidateResult(vr.ann, Right((fundingTx, ZmqWatcher.UtxoStatus.Unspent))) - awaitCond { + awaitAssert { router.stateData.privateChannels.isEmpty && router.stateData.channels.size == 1 } - awaitCond { + awaitAssert { val pc = router.stateData.channels.values.head // both channel updates are preserved pc.update_1_opt.isDefined && pc.update_2_opt.isDefined From ae123907586d50e48a58023b45508e978ead8b6d Mon Sep 17 00:00:00 2001 From: pm47 Date: Wed, 18 May 2022 13:52:51 +0200 Subject: [PATCH 9/9] improve debuggability of integration tests --- .../integration/PaymentIntegrationSpec.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala index 1e85d07c88..ff3b80b4b0 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala @@ -117,19 +117,21 @@ class PaymentIntegrationSpec extends IntegrationSpec { def awaitAnnouncements(subset: Map[String, Kit], nodes: Int, channels: Int, updates: Int): Unit = { val sender = TestProbe() subset.foreach { - case (_, setup) => - awaitCond({ - sender.send(setup.router, Router.GetNodes) - sender.expectMsgType[Iterable[NodeAnnouncement]].size == nodes - }, max = 60 seconds, interval = 1 second) - awaitCond({ - sender.send(setup.router, Router.GetChannels) - sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels - }, max = 60 seconds, interval = 1 second) - awaitCond({ - sender.send(setup.router, Router.GetChannelUpdates) - sender.expectMsgType[Iterable[ChannelUpdate]].size == updates - }, max = 60 seconds, interval = 1 second) + case (node, setup) => + withClue(node) { + awaitAssert({ + sender.send(setup.router, Router.GetNodes) + assert(sender.expectMsgType[Iterable[NodeAnnouncement]].size == nodes) + }, max = 10 seconds, interval = 1 second) + awaitAssert({ + sender.send(setup.router, Router.GetChannels) + sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels + }, max = 10 seconds, interval = 1 second) + awaitAssert({ + sender.send(setup.router, Router.GetChannelUpdates) + sender.expectMsgType[Iterable[ChannelUpdate]].size == updates + }, max = 10 seconds, interval = 1 second) + } } }