Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ object Router {
channels: SortedMap[ShortChannelId, PublicChannel],
stash: Stash,
rebroadcast: Rebroadcast,
awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
privateChannels: Map[ShortChannelId, PrivateChannel],
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedGraph,
Expand Down
50 changes: 32 additions & 18 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fr.acinq.eclair.router
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.{ActorContext, ActorRef, typed}
import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
Expand Down Expand Up @@ -81,17 +82,20 @@ object Validation {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
import nodeParams.db.{network => db}
import r.c
d0.awaiting.get(c) match {
case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
// now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
// (the other ones have already been acknowledged as duplicates)
d0.awaiting.getOrElse(c, Seq.empty).headOption match {
case Some(origin: RemoteGossip) => origin.peerConnection ! TransportHandler.ReadAck(c)
case Some(LocalGossip) => () // there is nothing to ack if it was a local gossip
case _ => ()
}
val remoteOrigins_opt = d0.awaiting.get(c)
Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
val remoteOrigins = d0.awaiting.getOrElse(c, Set.empty).collect { case rg: RemoteGossip => rg }
Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins.headOption.map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
log.debug("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size)
val publicChannel_opt = r match {
case ValidateResult(c, Left(t)) =>
log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c)))
None
case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) =>
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId)
Expand All @@ -103,12 +107,12 @@ object Validation {
}
if (fundingOutputIsInvalid) {
log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c)))
None
} else {
watcher ! WatchExternalChannelSpent(ctx.self, tx.txid, outputIndex, c.shortChannelId)
log.debug("added channel channelId={}", c.shortChannelId)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
val capacity = tx.txOut(outputIndex).amount
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil))
db.addChannel(c, tx.txid, capacity)
Expand All @@ -118,24 +122,29 @@ object Validation {
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures())
ctx.self ! nodeAnn
}
// public channels that haven't yet been announced are considered as private channels
val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta)
Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt))
// maybe this previously was a local unannounced channel
val privateChannel_opt = d0.privateChannels.get(c.shortChannelId)
Some(PublicChannel(c,
tx.txid,
capacity,
update_1_opt = privateChannel_opt.flatMap(_.update_1_opt),
update_2_opt = privateChannel_opt.flatMap(_.update_2_opt),
meta_opt = privateChannel_opt.map(_.meta)))
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid)
// the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c)))
} else {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c)))
}
// there may be a record if we have just restarted
db.removeChannel(c.shortChannelId)
None
}
// we also reprocess node and channel_update announcements related to channels that were just analyzed
// we also reprocess node and channel_update announcements related to the channel that was just analyzed
val reprocessUpdates = d0.stash.updates.view.filterKeys(u => u.shortChannelId == c.shortChannelId)
val reprocessNodes = d0.stash.nodes.view.filterKeys(n => isRelatedTo(c, n.nodeId))
// and we remove the reprocessed messages from the stash
Expand All @@ -145,12 +154,17 @@ object Validation {

publicChannel_opt match {
case Some(pc) =>
// note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update
// right after the channel_announcement, channel_updates will be moved from private to public at that time
// those updates are only defined if this was a previously an unannounced local channel, we broadcast them if they use the real scid
val updates1 = (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet)
.map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin]))
.toMap
val d1 = d0.copy(
channels = d0.channels + (c.shortChannelId -> pc),
privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before
rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue
privateChannels = d0.privateChannels - c.shortChannelId, // we remove the corresponding unannounced channel that we may have until now
rebroadcast = d0.rebroadcast.copy(
channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers
updates = d0.rebroadcast.updates ++ updates1
), // we also add the newly validated channels to the rebroadcast queue
stash = stash1,
awaiting = awaiting1)
// we only reprocess updates and nodes if validation succeeded
Expand Down Expand Up @@ -419,7 +433,7 @@ object Validation {
case Some(c) =>
// channel wasn't announced but here is the announcement, we will process it *before* the channel_update
watcher ! ValidateRequest(ctx.self, c)
val d1 = d.copy(awaiting = d.awaiting + (c -> Nil)) // no origin
val d1 = d.copy(awaiting = d.awaiting + (c -> Seq(LocalGossip))) // no origin
// maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks)
db.removeFromPruned(c.shortChannelId)
handleChannelUpdate(d1, db, routerConf, Left(lcu))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ object TestConstants {
routerConf = RouterConf(
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.COMPRESSED_ZLIB,
channelRangeChunkSize = 20,
Expand Down Expand Up @@ -299,7 +299,7 @@ object TestConstants {
routerConf = RouterConf(
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = 20,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package fr.acinq.eclair.channel.states

import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.{ActorContext, ActorRef}
import akka.testkit.{TestFSMRef, TestKitBase, TestProbe}
import akka.actor.{ActorContext, ActorRef, ActorSystem}
import akka.testkit.{TestFSMRef, TestKit, TestKitBase, TestProbe}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
Expand Down Expand Up @@ -106,6 +106,12 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
def currentBlockHeight: BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight
}

val systemA: ActorSystem = ActorSystem("system-alice")
val systemB: ActorSystem = ActorSystem("system-bob")

system.registerOnTermination(TestKit.shutdownActorSystem(systemA))
system.registerOnTermination(TestKit.shutdownActorSystem(systemB))

def init(nodeParamsA: NodeParams = TestConstants.Alice.nodeParams, nodeParamsB: NodeParams = TestConstants.Bob.nodeParams, wallet: OnChainWallet = new DummyOnChainWallet(), tags: Set[String] = Set.empty): SetupFixture = {
val aliceOrigin = TestProbe()
val alice2bob = TestProbe()
Expand All @@ -119,8 +125,10 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
val alice2relayer = TestProbe()
val bob2relayer = TestProbe()
val channelUpdateListener = TestProbe()
system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
Comment thread
t-bast marked this conversation as resolved.
val router = TestProbe()
val finalNodeParamsA = nodeParamsA
.modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(5000 sat)
Expand All @@ -132,8 +140,14 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
.modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(5000 sat)
.modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(10000 sat)
.modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(10000 sat)
val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref)
val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)
val alice: TestFSMRef[ChannelState, ChannelData, Channel] = {
implicit val system: ActorSystem = systemA
TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref)
}
val bob: TestFSMRef[ChannelState, ChannelData, Channel] = {
implicit val system: ActorSystem = systemB
TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)
}
SetupFixture(alice, bob, aliceOrigin, alice2bob, bob2alice, alice2blockchain, bob2blockchain, router, alice2relayer, bob2relayer, channelUpdateListener, wallet, alicePeer, bobPeer)
}

Expand Down Expand Up @@ -179,7 +193,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
(aliceParams, bobParams, channelType)
}

def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Unit = {
def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Transaction = {

import setup._

Expand Down Expand Up @@ -231,6 +245,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
// x2 because alice and bob share the same relayer
channelUpdateListener.expectMsgType[LocalChannelUpdate]
channelUpdateListener.expectMsgType[LocalChannelUpdate]
fundingTx
}

def localOrigin(replyTo: ActorRef): Origin.LocalHot = Origin.LocalHot(replyTo, UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class WaitForFundingSignedStateSpec extends TestKitBaseClass with FixtureAnyFunS
test("recv FundingSigned with valid signature") { f =>
import f._
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
bob2alice.expectMsgType[FundingSigned]
bob2alice.forward(alice)
awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF

within(30 seconds) {
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice ! INPUT_INIT_FUNDER(ByteVector32.Zeroes, TestConstants.fundingSatoshis, pushMsat, TestConstants.feeratePerKw, TestConstants.feeratePerKw, aliceParams, alice2bob.ref, bobInit, ChannelFlags.Private, channelConfig, channelType)
alice2blockchain.expectMsgType[TxPublisher.SetChannelId]
bob ! INPUT_INIT_FUNDEE(ByteVector32.Zeroes, bobParams, bob2alice.ref, aliceInit, channelConfig, channelType)
Expand Down Expand Up @@ -83,8 +83,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
import f._
// we create a new listener that registers after alice has published the funding tx
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
// make bob send a FundingLocked msg
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get
bob ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx)
Expand All @@ -102,8 +102,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
import f._
// we create a new listener that registers after alice has published the funding tx
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get
alice ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx)
assert(listener.expectMsgType[TransactionConfirmed].tx === fundingTx)
Expand Down
Loading