diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
index 2e2ecdcde1..9cca8e5dac 100644
--- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
@@ -604,7 +604,7 @@ object Router {
channels: SortedMap[ShortChannelId, PublicChannel],
stash: Stash,
rebroadcast: Rebroadcast,
- awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
+ awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
privateChannels: Map[ShortChannelId, PrivateChannel],
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedGraph,
diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
index ed0cce7918..2607a04651 100644
--- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
+++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
@@ -19,6 +19,7 @@ package fr.acinq.eclair.router
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.{ActorContext, ActorRef, typed}
import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter}
+import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
@@ -81,17 +82,20 @@ object Validation {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
import nodeParams.db.{network => db}
import r.c
- d0.awaiting.get(c) match {
- case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
+ // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
+ // (the other ones have already been acknowledged as duplicates)
+ d0.awaiting.getOrElse(c, Seq.empty).headOption match {
+ case Some(origin: RemoteGossip) => origin.peerConnection ! TransportHandler.ReadAck(c)
+ case Some(LocalGossip) => () // there is nothing to ack if it was a local gossip
case _ => ()
}
- val remoteOrigins_opt = d0.awaiting.get(c)
- Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
+ val remoteOrigins = d0.awaiting.getOrElse(c, Set.empty).collect { case rg: RemoteGossip => rg }
+ Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins.headOption.map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
log.debug("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size)
val publicChannel_opt = r match {
case ValidateResult(c, Left(t)) =>
log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage)
- remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c))))
+ remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c)))
None
case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) =>
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId)
@@ -103,12 +107,12 @@ object Validation {
}
if (fundingOutputIsInvalid) {
log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c)
- remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c))))
+ remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c)))
None
} else {
watcher ! WatchExternalChannelSpent(ctx.self, tx.txid, outputIndex, c.shortChannelId)
log.debug("added channel channelId={}", c.shortChannelId)
- remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))))
+ remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
val capacity = tx.txOut(outputIndex).amount
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil))
db.addChannel(c, tx.txid, capacity)
@@ -118,24 +122,29 @@ object Validation {
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures())
ctx.self ! nodeAnn
}
- // public channels that haven't yet been announced are considered as private channels
- val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta)
- Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt))
+ // maybe this previously was a local unannounced channel
+ val privateChannel_opt = d0.privateChannels.get(c.shortChannelId)
+ Some(PublicChannel(c,
+ tx.txid,
+ capacity,
+ update_1_opt = privateChannel_opt.flatMap(_.update_1_opt),
+ update_2_opt = privateChannel_opt.flatMap(_.update_2_opt),
+ meta_opt = privateChannel_opt.map(_.meta)))
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid)
// the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those
- remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c))))
+ remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c)))
} else {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid)
- remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c))))
+ remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c)))
}
// there may be a record if we have just restarted
db.removeChannel(c.shortChannelId)
None
}
- // we also reprocess node and channel_update announcements related to channels that were just analyzed
+ // we also reprocess node and channel_update announcements related to the channel that was just analyzed
val reprocessUpdates = d0.stash.updates.view.filterKeys(u => u.shortChannelId == c.shortChannelId)
val reprocessNodes = d0.stash.nodes.view.filterKeys(n => isRelatedTo(c, n.nodeId))
// and we remove the reprocessed messages from the stash
@@ -145,12 +154,17 @@ object Validation {
publicChannel_opt match {
case Some(pc) =>
- // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update
- // right after the channel_announcement, channel_updates will be moved from private to public at that time
+ // those updates are only defined if this was a previously an unannounced local channel, we broadcast them if they use the real scid
+ val updates1 = (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet)
+ .map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin]))
+ .toMap
val d1 = d0.copy(
channels = d0.channels + (c.shortChannelId -> pc),
- privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before
- rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue
+ privateChannels = d0.privateChannels - c.shortChannelId, // we remove the corresponding unannounced channel that we may have until now
+ rebroadcast = d0.rebroadcast.copy(
+ channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers
+ updates = d0.rebroadcast.updates ++ updates1
+ ), // we also add the newly validated channels to the rebroadcast queue
stash = stash1,
awaiting = awaiting1)
// we only reprocess updates and nodes if validation succeeded
@@ -419,7 +433,7 @@ object Validation {
case Some(c) =>
// channel wasn't announced but here is the announcement, we will process it *before* the channel_update
watcher ! ValidateRequest(ctx.self, c)
- val d1 = d.copy(awaiting = d.awaiting + (c -> Nil)) // no origin
+ val d1 = d.copy(awaiting = d.awaiting + (c -> Seq(LocalGossip))) // no origin
// maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks)
db.removeFromPruned(c.shortChannelId)
handleChannelUpdate(d1, db, routerConf, Left(lcu))
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala
index b77d9fa3af..ad38f3e92c 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala
@@ -161,7 +161,7 @@ object TestConstants {
routerConf = RouterConf(
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
- routerBroadcastInterval = 5 seconds,
+ routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.COMPRESSED_ZLIB,
channelRangeChunkSize = 20,
@@ -299,7 +299,7 @@ object TestConstants {
routerConf = RouterConf(
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
- routerBroadcastInterval = 5 seconds,
+ routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = 20,
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala
index cbffd9cc3a..663d639939 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala
@@ -17,8 +17,8 @@
package fr.acinq.eclair.channel.states
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
-import akka.actor.{ActorContext, ActorRef}
-import akka.testkit.{TestFSMRef, TestKitBase, TestProbe}
+import akka.actor.{ActorContext, ActorRef, ActorSystem}
+import akka.testkit.{TestFSMRef, TestKit, TestKitBase, TestProbe}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
@@ -106,6 +106,12 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
def currentBlockHeight: BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight
}
+ val systemA: ActorSystem = ActorSystem("system-alice")
+ val systemB: ActorSystem = ActorSystem("system-bob")
+
+ system.registerOnTermination(TestKit.shutdownActorSystem(systemA))
+ system.registerOnTermination(TestKit.shutdownActorSystem(systemB))
+
def init(nodeParamsA: NodeParams = TestConstants.Alice.nodeParams, nodeParamsB: NodeParams = TestConstants.Bob.nodeParams, wallet: OnChainWallet = new DummyOnChainWallet(), tags: Set[String] = Set.empty): SetupFixture = {
val aliceOrigin = TestProbe()
val alice2bob = TestProbe()
@@ -119,8 +125,10 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
val alice2relayer = TestProbe()
val bob2relayer = TestProbe()
val channelUpdateListener = TestProbe()
- system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
- system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
+ systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
+ systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
+ systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
+ systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
val router = TestProbe()
val finalNodeParamsA = nodeParamsA
.modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(5000 sat)
@@ -132,8 +140,14 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
.modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(5000 sat)
.modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(10000 sat)
.modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(10000 sat)
- val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref)
- val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)
+ val alice: TestFSMRef[ChannelState, ChannelData, Channel] = {
+ implicit val system: ActorSystem = systemA
+ TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref)
+ }
+ val bob: TestFSMRef[ChannelState, ChannelData, Channel] = {
+ implicit val system: ActorSystem = systemB
+ TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)
+ }
SetupFixture(alice, bob, aliceOrigin, alice2bob, bob2alice, alice2blockchain, bob2blockchain, router, alice2relayer, bob2relayer, channelUpdateListener, wallet, alicePeer, bobPeer)
}
@@ -179,7 +193,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
(aliceParams, bobParams, channelType)
}
- def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Unit = {
+ def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Transaction = {
import setup._
@@ -231,6 +245,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
// x2 because alice and bob share the same relayer
channelUpdateListener.expectMsgType[LocalChannelUpdate]
channelUpdateListener.expectMsgType[LocalChannelUpdate]
+ fundingTx
}
def localOrigin(replyTo: ActorRef): Origin.LocalHot = Origin.LocalHot(replyTo, UUID.randomUUID())
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala
index b03b8d00e3..2a8a475072 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala
@@ -82,7 +82,7 @@ class WaitForFundingSignedStateSpec extends TestKitBaseClass with FixtureAnyFunS
test("recv FundingSigned with valid signature") { f =>
import f._
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
bob2alice.expectMsgType[FundingSigned]
bob2alice.forward(alice)
awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED)
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala
index 8b7a8c7480..3bb70833cb 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala
@@ -54,7 +54,7 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
within(30 seconds) {
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice ! INPUT_INIT_FUNDER(ByteVector32.Zeroes, TestConstants.fundingSatoshis, pushMsat, TestConstants.feeratePerKw, TestConstants.feeratePerKw, aliceParams, alice2bob.ref, bobInit, ChannelFlags.Private, channelConfig, channelType)
alice2blockchain.expectMsgType[TxPublisher.SetChannelId]
bob ! INPUT_INIT_FUNDEE(ByteVector32.Zeroes, bobParams, bob2alice.ref, aliceInit, channelConfig, channelType)
@@ -83,8 +83,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
import f._
// we create a new listener that registers after alice has published the funding tx
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
- system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
+ bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
+ bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
// make bob send a FundingLocked msg
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get
bob ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx)
@@ -102,8 +102,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
import f._
// we create a new listener that registers after alice has published the funding tx
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
- system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get
alice ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx)
assert(listener.expectMsgType[TransactionConfirmed].tx === fundingTx)
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala
index 59981ba15b..75d30245ac 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala
@@ -74,7 +74,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
val initialState = alice.stateData.asInstanceOf[DATA_NORMAL]
val sender = TestProbe()
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
val h = randomBytes32()
val add = CMD_ADD_HTLC(sender.ref, 50000000 msat, h, CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), TestConstants.emptyOnionPacket, localOrigin(sender.ref))
alice ! add
@@ -881,7 +881,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
bob2alice.expectMsgType[UpdateFulfillHtlc]
// we listen to channel_update events
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate])
+ bob.underlying.system.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate])
// actual test starts here
// when signing the fulfill, bob will have its main output go above reserve in alice's commitment tx
@@ -896,7 +896,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv CMD_SIGN (after CMD_UPDATE_FEE)") { f =>
import f._
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
alice ! CMD_UPDATE_FEE(FeeratePerKw(654564 sat))
alice2bob.expectMsgType[UpdateFee]
alice ! CMD_SIGN()
@@ -2734,7 +2734,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
+ bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
// actual test begins:
// * Bob receives the HTLC pre-image and wants to fulfill
@@ -2767,7 +2767,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
+ bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
// actual test begins:
// * Bob receives the HTLC pre-image and wants to fulfill but doesn't sign
@@ -2800,7 +2800,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
+ bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
// actual test begins:
// * Bob receives the HTLC pre-image and wants to fulfill
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala
index c467c5d6ca..de429c16fe 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala
@@ -527,7 +527,7 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
+ bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
val initialCommitTx = initialState.commitments.localCommit.commitTxAndRemoteSig.commitTx.tx
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala
index 6fcaca1652..1d1440ebac 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala
@@ -89,15 +89,19 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
bob2blockchain.expectMsgType[WatchFundingConfirmed]
awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED)
awaitCond(bob.stateName == WAIT_FOR_FUNDING_CONFIRMED)
- system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
- system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
+ alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
+ alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
+ bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
+ bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
withFixture(test.toNoArgTest(FixtureParam(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, alice2relayer, bob2relayer, channelUpdateListener, eventListener, Nil)))
}
} else {
within(30 seconds) {
reachNormal(setup, test.tags)
- system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
- system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
+ alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
+ alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
+ bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
+ bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
val bobCommitTxs: List[CommitTxAndRemoteSig] = (for (amt <- List(100000000 msat, 200000000 msat, 300000000 msat)) yield {
val (r, htlc) = addHtlc(amt, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
@@ -368,8 +372,8 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.channelFeatures === channelFeatures)
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed])
- system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
// alice sends an htlc to bob
val (_, htlca1) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
@@ -473,7 +477,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv WatchTxConfirmedTriggered (local commit with htlcs only signed by local)") { f =>
import f._
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
val aliceCommitTx = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx
// alice sends an htlc
val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice)
@@ -522,7 +526,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv WatchTxConfirmedTriggered (local commit with fail not acked by remote)") { f =>
import f._
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
val (_, htlc) = addHtlc(25000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
failHtlc(htlc.id, bob, alice, bob2alice, alice2bob)
@@ -603,7 +607,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv WatchTxConfirmedTriggered (remote commit with htlcs only signed by local in next remote commit)") { f =>
import f._
val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
+ alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx
// alice sends an htlc
val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice)
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala
index 1e85d07c88..ff3b80b4b0 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala
@@ -117,19 +117,21 @@ class PaymentIntegrationSpec extends IntegrationSpec {
def awaitAnnouncements(subset: Map[String, Kit], nodes: Int, channels: Int, updates: Int): Unit = {
val sender = TestProbe()
subset.foreach {
- case (_, setup) =>
- awaitCond({
- sender.send(setup.router, Router.GetNodes)
- sender.expectMsgType[Iterable[NodeAnnouncement]].size == nodes
- }, max = 60 seconds, interval = 1 second)
- awaitCond({
- sender.send(setup.router, Router.GetChannels)
- sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels
- }, max = 60 seconds, interval = 1 second)
- awaitCond({
- sender.send(setup.router, Router.GetChannelUpdates)
- sender.expectMsgType[Iterable[ChannelUpdate]].size == updates
- }, max = 60 seconds, interval = 1 second)
+ case (node, setup) =>
+ withClue(node) {
+ awaitAssert({
+ sender.send(setup.router, Router.GetNodes)
+ assert(sender.expectMsgType[Iterable[NodeAnnouncement]].size == nodes)
+ }, max = 10 seconds, interval = 1 second)
+ awaitAssert({
+ sender.send(setup.router, Router.GetChannels)
+ sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels
+ }, max = 10 seconds, interval = 1 second)
+ awaitAssert({
+ sender.send(setup.router, Router.GetChannelUpdates)
+ sender.expectMsgType[Iterable[ChannelUpdate]].size == updates
+ }, max = 10 seconds, interval = 1 second)
+ }
}
}
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala
index 3c2c2cff05..12e4b3cf27 100644
--- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala
@@ -122,7 +122,6 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi
import com.softwaremill.quicklens._
val nodeParams = Alice.nodeParams
.modify(_.nodeKeyManager).setTo(testNodeKeyManager)
- .modify(_.routerConf.routerBroadcastInterval).setTo(1 day) // "disable" auto rebroadcast
val router = system.actorOf(Router.props(nodeParams, watcher.ref))
// we announce channels
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab))
diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala
new file mode 100644
index 0000000000..ad98627e6f
--- /dev/null
+++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala
@@ -0,0 +1,125 @@
+package fr.acinq.eclair.router
+
+import akka.actor.ActorSystem
+import akka.testkit.{TestFSMRef, TestProbe}
+import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
+import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingDeeplyBuriedTriggered
+import fr.acinq.eclair.channel.DATA_NORMAL
+import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags}
+import fr.acinq.eclair.io.Peer.PeerRoutingMessage
+import fr.acinq.eclair.router.Router.{GossipOrigin, LocalGossip}
+import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelUpdate}
+import fr.acinq.eclair.{BlockHeight, TestKitBaseClass}
+import org.scalatest.funsuite.FixtureAnyFunSuiteLike
+import org.scalatest.{Outcome, Tag}
+
+/**
+ * This test checks the integration between Channel and Router (events, etc.)
+ */
+class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {
+
+ case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], rebroadcastListener: TestProbe, channels: SetupFixture, testTags: Set[String])
+
+ implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging
+
+ override def withFixture(test: OneArgTest): Outcome = {
+ val channels = init(tags = test.tags)
+ val rebroadcastListener = TestProbe()
+ val router: TestFSMRef[Router.State, Router.Data, Router] = {
+ // we use alice's actor system so we share the same event stream
+ implicit val system: ActorSystem = channels.alice.underlying.system
+ system.eventStream.subscribe(rebroadcastListener.ref, classOf[Router.Rebroadcast])
+ TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None))
+ }
+ withFixture(test.toNoArgTest(FixtureParam(router, rebroadcastListener, channels, test.tags)))
+ }
+
+ test("private local channel") { f =>
+ import f._
+
+ reachNormal(channels, testTags)
+
+ awaitAssert(router.stateData.privateChannels.size === 1)
+
+ {
+ // only the local channel_update is known (bob won't send his before the channel is deeply buried)
+ val pc = router.stateData.privateChannels.values.head
+ assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined)
+ }
+
+ val peerConnection = TestProbe()
+ // bob hasn't yet sent his channel_update but we can get it by looking at its internal data
+ val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate
+ router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate)
+
+ awaitAssert {
+ val pc = router.stateData.privateChannels.values.head
+ // both channel_updates are known
+ pc.update_1_opt.isDefined && pc.update_2_opt.isDefined
+ }
+
+ // manual rebroadcast
+ router ! Router.TickBroadcast
+ rebroadcastListener.expectNoMessage()
+
+ }
+
+ test("public local channel", Tag(ChannelStateTestsTags.ChannelsPublic)) { f =>
+ import f._
+
+ val fundingTx = reachNormal(channels, testTags)
+
+ awaitAssert(router.stateData.privateChannels.size === 1)
+
+ {
+ val pc = router.stateData.privateChannels.values.head
+ // only the local channel_update is known
+ assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined)
+ }
+
+ val peerConnection = TestProbe()
+ // alice and bob haven't yet sent their channel_updates but we can get them by looking at their internal data
+ val aliceChannelUpdate = channels.alice.stateData.asInstanceOf[DATA_NORMAL].channelUpdate
+ val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate
+ router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate)
+
+ awaitAssert {
+ val pc = router.stateData.privateChannels.values.head
+ // both channel_updates are known
+ pc.update_1_opt.isDefined && pc.update_2_opt.isDefined
+ }
+
+ // funding tx reaches 6 blocks, announcements are exchanged
+ channels.alice ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null)
+ channels.alice2bob.expectMsgType[AnnouncementSignatures]
+ channels.alice2bob.forward(channels.bob)
+
+ channels.bob ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null)
+ channels.bob2alice.expectMsgType[AnnouncementSignatures]
+ channels.bob2alice.forward(channels.alice)
+
+ // router gets notified and attempts to validate the local channel
+ val vr = channels.alice2blockchain.expectMsgType[ZmqWatcher.ValidateRequest]
+ vr.replyTo ! ZmqWatcher.ValidateResult(vr.ann, Right((fundingTx, ZmqWatcher.UtxoStatus.Unspent)))
+
+ awaitAssert {
+ router.stateData.privateChannels.isEmpty && router.stateData.channels.size == 1
+ }
+
+ awaitAssert {
+ val pc = router.stateData.channels.values.head
+ // both channel updates are preserved
+ pc.update_1_opt.isDefined && pc.update_2_opt.isDefined
+ }
+
+ // manual rebroadcast
+ router ! Router.TickBroadcast
+ rebroadcastListener.expectMsg(Router.Rebroadcast(
+ channels = Map(vr.ann -> Set[GossipOrigin](LocalGossip)),
+ updates = Map(aliceChannelUpdate -> Set[GossipOrigin](LocalGossip), bobChannelUpdate -> Set.empty[GossipOrigin]), // broadcast the channel_updates (they were previously unannounced)
+ nodes = Map(router.underlyingActor.stateData.nodes.values.head -> Set[GossipOrigin](LocalGossip)), // new node_announcement
+ ))
+
+ }
+
+}
diff --git a/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala
index d05ea1f218..404d3a2c7f 100644
--- a/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala
+++ b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala
@@ -18,7 +18,7 @@ package fr.acinq.eclair.router
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
-import akka.testkit.{TestKit, TestProbe}
+import akka.testkit.{TestFSMRef, TestKit, TestProbe}
import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey
import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
import fr.acinq.bitcoin.scalacompat.{Block, SatoshiLong, Transaction, TxOut}
@@ -32,9 +32,6 @@ import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire.protocol.Color
import org.scalatest.funsuite.AnyFunSuiteLike
-import scodec.bits._
-
-import scala.concurrent.duration._
class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike {
@@ -131,12 +128,14 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab))
peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab))
- // we have to wait 2 times the broadcast interval because there is an additional per-peer delay
- val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second
- peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
- peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
- peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty))
- peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty))
+ // manual rebroadcast
+ front1 ! Router.TickBroadcast
+ peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
+ peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
+ front2 ! Router.TickBroadcast
+ peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty))
+ front3 ! Router.TickBroadcast
+ peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty))
}
test("aggregate gossip") {
@@ -149,9 +148,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
val system2 = ActorSystem("front-system-2")
val system3 = ActorSystem("front-system-3")
- val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router))
- val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, router))
- val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, router))
+ val front1 = {
+ implicit val system: ActorSystem = system1
+ TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router))
+ }
+ val front2 = {
+ implicit val system: ActorSystem = system2
+ TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router))
+ }
+ val front3 = {
+ implicit val system: ActorSystem = system3
+ TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router))
+ }
val peerConnection1a = TestProbe("peerconn-1a")
val peerConnection1b = TestProbe("peerconn-1b")
@@ -182,7 +190,6 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_bc))
peerConnection3a.expectMsg(GossipDecision.NoRelatedChannel(channelUpdate_bc))
-
watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab))
@@ -207,12 +214,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_b))
peerConnection3a.expectMsg(GossipDecision.Accepted(ann_b))
- // we have to wait 2 times the broadcast interval because there is an additional per-peer delay
- val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second
- peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
- peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
- peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
- peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a))))
+ awaitCond(front1.stateData.nodes.size == 2)
+ awaitCond(front2.stateData.nodes.size == 2)
+ awaitCond(front3.stateData.nodes.size == 2)
+
+ // manual rebroadcast
+ front1 ! Router.TickBroadcast
+ peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
+ peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
+ front2 ! Router.TickBroadcast
+ peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
+ front3 ! Router.TickBroadcast
+ peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a))))
}
test("do not forward duplicate gossip") {
diff --git a/pom.xml b/pom.xml
index 52490a781a..f6272e3e76 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,7 +235,7 @@
${project.build.directory}
- -Xmx1024m -Dfile.encoding=UTF-8
+ -Xmx2048m -Dfile.encoding=UTF-8