From 279ba093cf6cfae1a47334975cfd0ca97d402253 Mon Sep 17 00:00:00 2001 From: pm47 Date: Wed, 11 May 2022 18:43:28 +0200 Subject: [PATCH 1/2] router: index private channels on channelId --- .../eclair/router/RouteCalculation.scala | 28 +- .../scala/fr/acinq/eclair/router/Router.scala | 30 +- .../fr/acinq/eclair/router/Validation.scala | 303 +++++++++--------- .../acinq/eclair/router/BaseRouterSpec.scala | 8 +- .../fr/acinq/eclair/router/RouterSpec.scala | 4 +- 5 files changed, 202 insertions(+), 171 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala index cf51a2cc97..58d75a6c4c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/RouteCalculation.scala @@ -66,18 +66,22 @@ object RouteCalculation { case PredefinedChannelRoute(targetNodeId, shortChannelIds) => val (end, hops) = shortChannelIds.foldLeft((localNodeId, Seq.empty[ChannelHop])) { case ((currentNode, previousHops), shortChannelId) => - val channelDesc_opt = d.channels.get(shortChannelId).flatMap(c => currentNode match { - case c.ann.nodeId1 => Some(ChannelDesc(shortChannelId, c.ann.nodeId1, c.ann.nodeId2)) - case c.ann.nodeId2 => Some(ChannelDesc(shortChannelId, c.ann.nodeId2, c.ann.nodeId1)) - case _ => None - }).orElse(d.privateChannels.get(shortChannelId).flatMap(c => currentNode match { - case c.nodeId1 => Some(ChannelDesc(shortChannelId, c.nodeId1, c.nodeId2)) - case c.nodeId2 => Some(ChannelDesc(shortChannelId, c.nodeId2, c.nodeId1)) - case _ => None - })).orElse(assistedChannels.get(shortChannelId).flatMap(c => currentNode match { - case c.nodeId => Some(ChannelDesc(shortChannelId, c.nodeId, c.nextNodeId)) - case _ => None - })) + val channelDesc_opt = d.resolve(shortChannelId) match { + case Some(c: PublicChannel) => currentNode match { + case c.nodeId1 => Some(ChannelDesc(shortChannelId, c.nodeId1, c.nodeId2)) + case c.nodeId2 => Some(ChannelDesc(shortChannelId, c.nodeId2, c.nodeId1)) + case _ => None + } + case Some(c: PrivateChannel) => currentNode match { + case c.nodeId1 => Some(ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2)) + case c.nodeId2 => Some(ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1)) + case _ => None + } + case None => assistedChannels.get(shortChannelId).flatMap(c => currentNode match { + case c.nodeId => Some(ChannelDesc(shortChannelId, c.nodeId, c.nextNodeId)) + case _ => None + }) + } channelDesc_opt.flatMap(c => g.getEdge(c)) match { case Some(edge) => (edge.desc.b, previousHops :+ ChannelHop(edge.desc.shortChannelId, edge.desc.a, edge.desc.b, edge.params)) case None => (currentNode, previousHops) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 9cca8e5dac..3424867682 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -24,6 +24,7 @@ import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi} import fr.acinq.eclair.Logs.LogCategory +import fr.acinq.eclair.ShortChannelId.outputIndex import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered} @@ -102,7 +103,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm log.info(s"initialization completed, ready to process messages") Try(initialized.map(_.success(Done))) - startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty)) + startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, resolveScid = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty)) } when(NORMAL) { @@ -320,6 +321,8 @@ object Router { case class ChannelMeta(balance1: MilliSatoshi, balance2: MilliSatoshi) sealed trait KnownChannel { val capacity: Satoshi + val nodeId1: PublicKey + val nodeId2: PublicKey def getNodeIdSameSideAs(u: ChannelUpdate): PublicKey def getChannelUpdateSameSideAs(u: ChannelUpdate): Option[ChannelUpdate] def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi] @@ -331,6 +334,10 @@ object Router { update_1_opt.foreach(u => assert(u.channelFlags.isNode1)) update_2_opt.foreach(u => assert(!u.channelFlags.isNode1)) + val nodeId1: PublicKey = ann.nodeId1 + val nodeId2: PublicKey = ann.nodeId2 + def shortChannelId: ShortChannelId = ann.shortChannelId + def channelId: ByteVector32 = toLongId(fundingTxid.reverse, outputIndex(ann.shortChannelId)) def getNodeIdSameSideAs(u: ChannelUpdate): PublicKey = if (u.channelFlags.isNode1) ann.nodeId1 else ann.nodeId2 def getChannelUpdateSameSideAs(u: ChannelUpdate): Option[ChannelUpdate] = if (u.channelFlags.isNode1) update_1_opt else update_2_opt def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi] = if (u.channelFlags.isNode1) meta_opt.map(_.balance1) else meta_opt.map(_.balance2) @@ -345,7 +352,7 @@ object Router { case Right(rcu) => updateChannelUpdateSameSideAs(rcu.channelUpdate) } } - case class PrivateChannel(localNodeId: PublicKey, remoteNodeId: PublicKey, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta: ChannelMeta) extends KnownChannel { + case class PrivateChannel(shortChannelId: ShortChannelId, channelId: ByteVector32, localNodeId: PublicKey, remoteNodeId: PublicKey, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta: ChannelMeta) extends KnownChannel { val (nodeId1, nodeId2) = if (Announcements.isNode1(localNodeId, remoteNodeId)) (localNodeId, remoteNodeId) else (remoteNodeId, localNodeId) val capacity: Satoshi = (meta.balance1 + meta.balance2).truncateToSatoshi @@ -605,11 +612,26 @@ object Router { stash: Stash, rebroadcast: Rebroadcast, awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done - privateChannels: Map[ShortChannelId, PrivateChannel], + privateChannels: Map[ByteVector32, PrivateChannel], // indexed by channel id + resolveScid: Map[ShortChannelId, ByteVector32], // scid to channel_id excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graph: DirectedGraph, sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message - ) + ) { + + def resolve(scid: ShortChannelId): Option[KnownChannel] = { + // let's assume this is a real scid + channels.get(scid) match { + case Some(publicChannel) => Some(publicChannel) + case None => + // maybe it's an alias or a real scid + resolveScid.get(scid).flatMap(privateChannels.get) match { + case Some(privateChannel) => Some(privateChannel) + case None => None + } + } + } + } // @formatter:off sealed trait State diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 2607a04651..dfc8bc1591 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -32,7 +32,7 @@ import fr.acinq.eclair.router.Monitoring.Metrics import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire.protocol._ -import fr.acinq.eclair.{Logs, MilliSatoshiLong, NodeParams, ShortChannelId, TxCoordinates} +import fr.acinq.eclair.{Logs, MilliSatoshiLong, NodeParams, ShortChannelId, TxCoordinates, toLongId} object Validation { @@ -123,7 +123,8 @@ object Validation { ctx.self ! nodeAnn } // maybe this previously was a local unannounced channel - val privateChannel_opt = d0.privateChannels.get(c.shortChannelId) + val channelId = toLongId(tx.txid.reverse, outputIndex) + val privateChannel_opt = d0.privateChannels.get(channelId) Some(PublicChannel(c, tx.txid, capacity, @@ -160,7 +161,7 @@ object Validation { .toMap val d1 = d0.copy( channels = d0.channels + (c.shortChannelId -> pc), - privateChannels = d0.privateChannels - c.shortChannelId, // we remove the corresponding unannounced channel that we may have until now + privateChannels = d0.privateChannels - pc.channelId, // we remove the corresponding unannounced channel that we may have until now rebroadcast = d0.rebroadcast.copy( channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers updates = d0.rebroadcast.updates ++ updates1 @@ -264,158 +265,157 @@ object Validation { def handleChannelUpdate(d: Data, db: NetworkDb, routerConf: RouterConf, update: Either[LocalChannelUpdate, RemoteChannelUpdate], wasStashed: Boolean = false)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - val (u: ChannelUpdate, origins: Set[GossipOrigin]) = update match { - case Left(lcu) => (lcu.channelUpdate, Set(LocalGossip)) + val (pc_opt: Option[KnownChannel], u: ChannelUpdate, origins: Set[GossipOrigin]) = update match { + case Left(lcu) => (d.resolve(lcu.shortChannelId), lcu.channelUpdate, Set(LocalGossip)) case Right(rcu) => rcu.origins.collect { case RemoteGossip(peerConnection, _) if !wasStashed => // stashed changes have already been acknowledged log.debug("received channel update for shortChannelId={}", rcu.channelUpdate.shortChannelId) peerConnection ! TransportHandler.ReadAck(rcu.channelUpdate) } - (rcu.channelUpdate, rcu.origins) + (d.resolve(rcu.channelUpdate.shortChannelId), rcu.channelUpdate, rcu.origins) } - if (d.channels.contains(u.shortChannelId)) { - // related channel is already known (note: this means no related channel_update is in the stash) - val publicChannel = true - val pc = d.channels(u.shortChannelId) - if (d.rebroadcast.updates.contains(u)) { - log.debug("ignoring {} (pending rebroadcast)", u) - sendDecision(origins, GossipDecision.Accepted(u)) - val origins1 = d.rebroadcast.updates(u) ++ origins - // NB: we update the channels because the balances may have changed even if the channel_update is the same. - val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) - d.copy(rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins1)), channels = d.channels + (u.shortChannelId -> pc1), graph = graph1) - } else if (StaleChannels.isStale(u)) { - log.debug("ignoring {} (stale)", u) - sendDecision(origins, GossipDecision.Stale(u)) - d - } else if (pc.getChannelUpdateSameSideAs(u).exists(_.timestamp >= u.timestamp)) { - log.debug("ignoring {} (duplicate)", u) - sendDecision(origins, GossipDecision.Duplicate(u)) - update match { - case Left(_) => - // NB: we update the graph because the balances may have changed even if the channel_update is the same. - val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) - d.copy(channels = d.channels + (u.shortChannelId -> pc1), graph = graph1) - case Right(_) => d - } - } else if (!Announcements.checkSig(u, pc.getNodeIdSameSideAs(u))) { - log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u) - sendDecision(origins, GossipDecision.InvalidSignature(u)) - d - } else if (pc.getChannelUpdateSameSideAs(u).isDefined) { - log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u) - Metrics.channelUpdateRefreshed(u, pc.getChannelUpdateSameSideAs(u).get, publicChannel) - sendDecision(origins, GossipDecision.Accepted(u)) - ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) - db.updateChannel(u) - // update the graph - val pc1 = pc.applyChannelUpdate(update) - val graph1 = if (u.channelFlags.isEnabled) { + pc_opt match { + case Some(pc: PublicChannel) => + // related channel is already known (note: this means no related channel_update is in the stash) + val publicChannel = true + if (d.rebroadcast.updates.contains(u)) { + log.debug("ignoring {} (pending rebroadcast)", u) + sendDecision(origins, GossipDecision.Accepted(u)) + val origins1 = d.rebroadcast.updates(u) ++ origins + // NB: we update the channels because the balances may have changed even if the channel_update is the same. + val pc1 = pc.applyChannelUpdate(update) + val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) + d.copy(rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins1)), channels = d.channels + (pc.shortChannelId -> pc1), graph = graph1) + } else if (StaleChannels.isStale(u)) { + log.debug("ignoring {} (stale)", u) + sendDecision(origins, GossipDecision.Stale(u)) + d + } else if (pc.getChannelUpdateSameSideAs(u).exists(_.timestamp >= u.timestamp)) { + log.debug("ignoring {} (duplicate)", u) + sendDecision(origins, GossipDecision.Duplicate(u)) + update match { + case Left(_) => + // NB: we update the graph because the balances may have changed even if the channel_update is the same. + val pc1 = pc.applyChannelUpdate(update) + val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) + d.copy(channels = d.channels + (pc.shortChannelId -> pc1), graph = graph1) + case Right(_) => d + } + } else if (!Announcements.checkSig(u, pc.getNodeIdSameSideAs(u))) { + log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u) + sendDecision(origins, GossipDecision.InvalidSignature(u)) + d + } else if (pc.getChannelUpdateSameSideAs(u).isDefined) { + log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u) + Metrics.channelUpdateRefreshed(u, pc.getChannelUpdateSameSideAs(u).get, publicChannel) + sendDecision(origins, GossipDecision.Accepted(u)) + ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) + db.updateChannel(u) + // update the graph + val pc1 = pc.applyChannelUpdate(update) + val graph1 = if (u.channelFlags.isEnabled) { + update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel)) + d.graph.addEdge(GraphEdge(u, pc1)) + } else { + update.left.foreach(_ => log.info("removed local shortChannelId={} public={} from the network graph", u.shortChannelId, publicChannel)) + d.graph.removeEdge(ChannelDesc(u, pc1.ann)) + } + d.copy(channels = d.channels + (pc.shortChannelId -> pc1), rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graph = graph1) + } else { + log.debug("added channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u) + sendDecision(origins, GossipDecision.Accepted(u)) + ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) + db.updateChannel(u) + // we also need to update the graph + val pc1 = pc.applyChannelUpdate(update) + val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel)) - d.graph.addEdge(GraphEdge(u, pc1)) + d.copy(channels = d.channels + (pc.shortChannelId -> pc1), privateChannels = d.privateChannels - pc1.channelId, rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graph = graph1) + } + case Some(pc: PrivateChannel) => + val publicChannel = false + if (StaleChannels.isStale(u)) { + log.debug("ignoring {} (stale)", u) + sendDecision(origins, GossipDecision.Stale(u)) + d + } else if (pc.getChannelUpdateSameSideAs(u).exists(_.timestamp >= u.timestamp)) { + log.debug("ignoring {} (already know same or newer)", u) + sendDecision(origins, GossipDecision.Duplicate(u)) + d + } else if (!Announcements.checkSig(u, pc.getNodeIdSameSideAs(u))) { + log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u) + sendDecision(origins, GossipDecision.InvalidSignature(u)) + d + } else if (pc.getChannelUpdateSameSideAs(u).isDefined) { + log.debug("updated channel_update for channelId={} public={} flags={} {}", pc.channelId, publicChannel, u.channelFlags, u) + Metrics.channelUpdateRefreshed(u, pc.getChannelUpdateSameSideAs(u).get, publicChannel) + sendDecision(origins, GossipDecision.Accepted(u)) + ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) + // we also need to update the graph + val pc1 = pc.applyChannelUpdate(update) + val graph1 = if (u.channelFlags.isEnabled) { + update.left.foreach(_ => log.info("added local channelId={} public={} to the network graph", pc.channelId, publicChannel)) + d.graph.addEdge(GraphEdge(u, pc1)) + } else { + update.left.foreach(_ => log.info("removed local channelId={} public={} from the network graph", pc.channelId, publicChannel)) + d.graph.removeEdge(ChannelDesc(u, pc1)) + } + d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graph = graph1) } else { - update.left.foreach(_ => log.info("removed local shortChannelId={} public={} from the network graph", u.shortChannelId, publicChannel)) - d.graph.removeEdge(ChannelDesc(u, pc.ann)) + log.debug("added channel_update for channelId={} public={} flags={} {}", pc.channelId, publicChannel, u.channelFlags, u) + sendDecision(origins, GossipDecision.Accepted(u)) + ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) + // we also need to update the graph + val pc1 = pc.applyChannelUpdate(update) + val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) + update.left.foreach(_ => log.info("added local channelId={} public={} to the network graph", pc.channelId, publicChannel)) + d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graph = graph1) } - d.copy(channels = d.channels + (u.shortChannelId -> pc1), rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graph = graph1) - } else { - log.debug("added channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u) - sendDecision(origins, GossipDecision.Accepted(u)) - ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) - db.updateChannel(u) - // we also need to update the graph - val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) - update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel)) - d.copy(channels = d.channels + (u.shortChannelId -> pc1), privateChannels = d.privateChannels - u.shortChannelId, rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graph = graph1) - } - } else if (d.awaiting.keys.exists(c => c.shortChannelId == u.shortChannelId)) { - // channel is currently being validated - if (d.stash.updates.contains(u)) { - log.debug("ignoring {} (already stashed)", u) - val origins1 = d.stash.updates(u) ++ origins - d.copy(stash = d.stash.copy(updates = d.stash.updates + (u -> origins1))) - } else { - log.debug("stashing {}", u) - d.copy(stash = d.stash.copy(updates = d.stash.updates + (u -> origins))) - } - } else if (d.privateChannels.contains(u.shortChannelId)) { - val publicChannel = false - val pc = d.privateChannels(u.shortChannelId) - if (StaleChannels.isStale(u)) { - log.debug("ignoring {} (stale)", u) - sendDecision(origins, GossipDecision.Stale(u)) - d - } else if (pc.getChannelUpdateSameSideAs(u).exists(_.timestamp >= u.timestamp)) { - log.debug("ignoring {} (already know same or newer)", u) - sendDecision(origins, GossipDecision.Duplicate(u)) - d - } else if (!Announcements.checkSig(u, pc.getNodeIdSameSideAs(u))) { - log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u) - sendDecision(origins, GossipDecision.InvalidSignature(u)) - d - } else if (pc.getChannelUpdateSameSideAs(u).isDefined) { - log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u) - Metrics.channelUpdateRefreshed(u, pc.getChannelUpdateSameSideAs(u).get, publicChannel) - sendDecision(origins, GossipDecision.Accepted(u)) - ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) - // we also need to update the graph - val pc1 = pc.applyChannelUpdate(update) - val graph1 = if (u.channelFlags.isEnabled) { - update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel)) - d.graph.addEdge(GraphEdge(u, pc1)) + case None if d.awaiting.keys.exists(c => c.shortChannelId == u.shortChannelId) => + // channel is currently being validated + if (d.stash.updates.contains(u)) { + log.debug("ignoring {} (already stashed)", u) + val origins1 = d.stash.updates(u) ++ origins + d.copy(stash = d.stash.copy(updates = d.stash.updates + (u -> origins1))) } else { - update.left.foreach(_ => log.info("removed local shortChannelId={} public={} from the network graph", u.shortChannelId, publicChannel)) - d.graph.removeEdge(ChannelDesc(u, pc1)) + log.debug("stashing {}", u) + d.copy(stash = d.stash.copy(updates = d.stash.updates + (u -> origins))) } - d.copy(privateChannels = d.privateChannels + (u.shortChannelId -> pc1), graph = graph1) - } else { - log.debug("added channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u) - sendDecision(origins, GossipDecision.Accepted(u)) - ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil)) - // we also need to update the graph - val pc1 = pc.applyChannelUpdate(update) - val graph1 = d.graph.addEdge(GraphEdge(u, pc1)) - update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel)) - d.copy(privateChannels = d.privateChannels + (u.shortChannelId -> pc1), graph = graph1) - } - } else if (db.isPruned(u.shortChannelId) && !StaleChannels.isStale(u)) { - // the channel was recently pruned, but if we are here, it means that the update is not stale so this is the case - // of a zombie channel coming back from the dead. they probably sent us a channel_announcement right before this update, - // but we ignored it because the channel was in the 'pruned' list. Now that we know that the channel is alive again, - // let's remove the channel from the zombie list and ask the sender to re-send announcements (channel_announcement + updates) - // about that channel. We can ignore this update since we will receive it again - log.info(s"channel shortChannelId=${u.shortChannelId} is back from the dead! requesting announcements about this channel") - sendDecision(origins, GossipDecision.RelatedChannelPruned(u)) - db.removeFromPruned(u.shortChannelId) - // peerConnection_opt will contain a valid peerConnection only when we're handling an update that we received from a peer, not - // when we're sending updates to ourselves - origins head match { - case RemoteGossip(peerConnection, remoteNodeId) => - val query = QueryShortChannelIds(u.chainHash, EncodedShortChannelIds(routerConf.encodingType, List(u.shortChannelId)), TlvStream.empty) - d.sync.get(remoteNodeId) match { - case Some(sync) if sync.started => - // we already have a pending request to that node, let's add this channel to the list and we'll get it later - // TODO: we only request channels with old style channel_query - d.copy(sync = d.sync + (remoteNodeId -> sync.copy(remainingQueries = sync.remainingQueries :+ query, totalQueries = sync.totalQueries + 1))) - case _ => - // otherwise we send the query right away - peerConnection ! query - d.copy(sync = d.sync + (remoteNodeId -> Syncing(remainingQueries = Nil, totalQueries = 1))) - } - case _ => - // we don't know which node this update came from (maybe it was stashed and the channel got pruned in the meantime or some other corner case). - // or we don't have a peerConnection to send our query to. - // anyway, that's not really a big deal because we have removed the channel from the pruned db so next time it shows up we will revalidate it - d - } - } else { - log.debug("ignoring announcement {} (unknown channel)", u) - sendDecision(origins, GossipDecision.NoRelatedChannel(u)) - d + case None if db.isPruned(u.shortChannelId) && !StaleChannels.isStale(u) => + // the channel was recently pruned, but if we are here, it means that the update is not stale so this is the case + // of a zombie channel coming back from the dead. they probably sent us a channel_announcement right before this update, + // but we ignored it because the channel was in the 'pruned' list. Now that we know that the channel is alive again, + // let's remove the channel from the zombie list and ask the sender to re-send announcements (channel_announcement + updates) + // about that channel. We can ignore this update since we will receive it again + log.info(s"channel shortChannelId=${u.shortChannelId} is back from the dead! requesting announcements about this channel") + sendDecision(origins, GossipDecision.RelatedChannelPruned(u)) + db.removeFromPruned(u.shortChannelId) + // peerConnection_opt will contain a valid peerConnection only when we're handling an update that we received from a peer, not + // when we're sending updates to ourselves + origins head match { + case RemoteGossip(peerConnection, remoteNodeId) => + val query = QueryShortChannelIds(u.chainHash, EncodedShortChannelIds(routerConf.encodingType, List(u.shortChannelId)), TlvStream.empty) + d.sync.get(remoteNodeId) match { + case Some(sync) if sync.started => + // we already have a pending request to that node, let's add this channel to the list and we'll get it later + // TODO: we only request channels with old style channel_query + d.copy(sync = d.sync + (remoteNodeId -> sync.copy(remainingQueries = sync.remainingQueries :+ query, totalQueries = sync.totalQueries + 1))) + case _ => + // otherwise we send the query right away + peerConnection ! query + d.copy(sync = d.sync + (remoteNodeId -> Syncing(remainingQueries = Nil, totalQueries = 1))) + } + case _ => + // we don't know which node this update came from (maybe it was stashed and the channel got pruned in the meantime or some other corner case). + // or we don't have a peerConnection to send our query to. + // anyway, that's not really a big deal because we have removed the channel from the pruned db so next time it shows up we will revalidate it + d + } + case None => + log.debug("ignoring announcement {} (unknown channel)", u) + sendDecision(origins, GossipDecision.NoRelatedChannel(u)) + d } } @@ -437,15 +437,18 @@ object Validation { // maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks) db.removeFromPruned(c.shortChannelId) handleChannelUpdate(d1, db, routerConf, Left(lcu)) - case None if d.privateChannels.contains(lcu.shortChannelId) => + case None if d.privateChannels.contains(lcu.channelId) => // channel isn't announced but we already know about it, we can process the channel_update handleChannelUpdate(d, db, routerConf, Left(lcu)) case None => // channel isn't announced and we never heard of it (maybe it is a private channel or maybe it is a public channel that doesn't yet have 6 confirmations) // let's create a corresponding private channel and process the channel_update log.debug("adding unannounced local channel to remote={} shortChannelId={}", lcu.remoteNodeId, lcu.shortChannelId) - val pc = PrivateChannel(localNodeId, lcu.remoteNodeId, None, None, ChannelMeta(0 msat, 0 msat)).updateBalances(lcu.commitments) - val d1 = d.copy(privateChannels = d.privateChannels + (lcu.shortChannelId -> pc)) + val pc = PrivateChannel(lcu.shortChannelId, lcu.channelId, localNodeId, lcu.remoteNodeId, None, None, ChannelMeta(0 msat, 0 msat)).updateBalances(lcu.commitments) + val d1 = d.copy( + privateChannels = d.privateChannels + (lcu.channelId -> pc), + resolveScid = d.resolveScid + (lcu.shortChannelId -> lcu.channelId) + ) handleChannelUpdate(d1, db, routerConf, Left(lcu)) } } @@ -458,7 +461,7 @@ object Validation { // the channel was public, we will receive (or have already received) a WatchEventSpentBasic event, that will trigger a clean up of the channel // so let's not do anything here d - } else if (d.privateChannels.contains(shortChannelId)) { + } else if (d.privateChannels.contains(channelId)) { // the channel was private or public-but-not-yet-announced, let's do the clean up log.info("removing private local channel and channel_update for channelId={} shortChannelId={}", channelId, shortChannelId) val desc1 = ChannelDesc(shortChannelId, localNodeId, remoteNodeId) @@ -468,7 +471,7 @@ object Validation { .removeEdge(desc1) .removeEdge(desc2) // and we remove the channel and channel_update from our state - d.copy(privateChannels = d.privateChannels - shortChannelId, graph = graph1) + d.copy(privateChannels = d.privateChannels - channelId, graph = graph1) } else { d } @@ -485,13 +488,13 @@ object Validation { case None => (d.channels, d.graph) } - val (privateChannels1, graph2) = d.privateChannels.get(e.shortChannelId) match { + val (privateChannels1, graph2) = d.privateChannels.get(e.channelId) match { case Some(pc) => val pc1 = pc.updateBalances(e.commitments) log.debug("private channel balance updated: {}", pc1) val update_opt = if (e.commitments.localNodeId == pc1.nodeId1) pc1.update_1_opt else pc1.update_2_opt val graph2 = update_opt.map(u => graph1.addEdge(GraphEdge(u, pc1))).getOrElse(graph1) - (d.privateChannels + (e.shortChannelId -> pc1), graph2) + (d.privateChannels + (e.channelId -> pc1), graph2) case None => (d.privateChannels, graph1) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index 12e4b3cf27..155df8c7dd 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -80,6 +80,8 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi val scid_ag_private = ShortChannelId(BlockHeight(420000), 5, 0) val scid_gh = ShortChannelId(BlockHeight(420000), 6, 0) + val channelId_ag_private = randomBytes32() + val chan_ab = channelAnnouncement(scid_ab, priv_a, priv_b, priv_funding_a, priv_funding_b) val chan_bc = channelAnnouncement(scid_bc, priv_b, priv_c, priv_funding_b, priv_funding_c) val chan_cd = channelAnnouncement(scid_cd, priv_c, priv_d, priv_funding_c, priv_funding_d) @@ -110,8 +112,8 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi assert(ChannelDesc(update_bc, chan_bc) === ChannelDesc(chan_bc.shortChannelId, b, c)) assert(ChannelDesc(update_cd, chan_cd) === ChannelDesc(chan_cd.shortChannelId, c, d)) assert(ChannelDesc(update_ef, chan_ef) === ChannelDesc(chan_ef.shortChannelId, e, f)) - assert(ChannelDesc(update_ag_private, PrivateChannel(a, g, None, None, ChannelMeta(1000 msat, 2000 msat))) === ChannelDesc(scid_ag_private, a, g)) - assert(ChannelDesc(update_ag_private, PrivateChannel(g, a, None, None, ChannelMeta(2000 msat, 1000 msat))) === ChannelDesc(scid_ag_private, a, g)) + assert(ChannelDesc(update_ag_private, PrivateChannel(scid_ag_private, channelId_ag_private, a, g, None, None, ChannelMeta(1000 msat, 2000 msat))) === ChannelDesc(scid_ag_private, a, g)) + assert(ChannelDesc(update_ag_private, PrivateChannel(scid_ag_private, channelId_ag_private, g, a, None, None, ChannelMeta(2000 msat, 1000 msat))) === ChannelDesc(scid_ag_private, a, g)) assert(ChannelDesc(update_gh, chan_gh) === ChannelDesc(chan_gh.shortChannelId, g, h)) // let's set up the router @@ -149,7 +151,7 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_gh)) peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_hg)) // then private channels - sender.send(router, LocalChannelUpdate(sender.ref, randomBytes32(), scid_ag_private, g, None, update_ag_private, CommitmentsSpec.makeCommitments(30000000 msat, 8000000 msat, a, g, announceChannel = false))) + sender.send(router, LocalChannelUpdate(sender.ref, channelId_ag_private, scid_ag_private, g, None, update_ag_private, CommitmentsSpec.makeCommitments(30000000 msat, 8000000 msat, a, g, announceChannel = false))) // watcher receives the get tx requests assert(watcher.expectMsgType[ValidateRequest].ann === chan_ab) assert(watcher.expectMsgType[ValidateRequest].ann === chan_bc) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index 6afaebf073..f433335c63 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -670,10 +670,10 @@ class RouterSpec extends BaseRouterSpec { // Private channels should also update the graph when HTLCs are relayed through them. val balances = Set(33000000 msat, 5000000 msat) val commitments = CommitmentsSpec.makeCommitments(33000000 msat, 5000000 msat, a, g, announceChannel = false) - sender.send(router, AvailableBalanceChanged(sender.ref, null, scid_ag_private, commitments)) + sender.send(router, AvailableBalanceChanged(sender.ref, channelId_ag_private, scid_ag_private, commitments)) sender.send(router, Router.GetRouterData) val data = sender.expectMsgType[Data] - val channel_ag = data.privateChannels(scid_ag_private) + val channel_ag = data.privateChannels(channelId_ag_private) assert(Set(channel_ag.meta.balance1, channel_ag.meta.balance2) === balances) // And the graph should be updated too. val edge_ag = data.graph.getEdge(ChannelDesc(scid_ag_private, a, g)).get From 1c0f4535589f0839f41080ad2fd708bfdac6aa98 Mon Sep 17 00:00:00 2001 From: pm47 Date: Thu, 19 May 2022 11:23:00 +0200 Subject: [PATCH 2/2] rename resolveScid->scid2PrivateChannels --- .../src/main/scala/fr/acinq/eclair/router/Router.scala | 6 +++--- .../src/main/scala/fr/acinq/eclair/router/Validation.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 3424867682..e1adb6a0a4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -103,7 +103,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm log.info(s"initialization completed, ready to process messages") Try(initialized.map(_.success(Done))) - startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, resolveScid = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty)) + startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, scid2PrivateChannels = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty)) } when(NORMAL) { @@ -613,7 +613,7 @@ object Router { rebroadcast: Rebroadcast, awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done privateChannels: Map[ByteVector32, PrivateChannel], // indexed by channel id - resolveScid: Map[ShortChannelId, ByteVector32], // scid to channel_id + scid2PrivateChannels: Map[ShortChannelId, ByteVector32], // scid to channel_id, only to be used for private channels excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graph: DirectedGraph, sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message @@ -625,7 +625,7 @@ object Router { case Some(publicChannel) => Some(publicChannel) case None => // maybe it's an alias or a real scid - resolveScid.get(scid).flatMap(privateChannels.get) match { + scid2PrivateChannels.get(scid).flatMap(privateChannels.get) match { case Some(privateChannel) => Some(privateChannel) case None => None } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index dfc8bc1591..a1ca814820 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -447,7 +447,7 @@ object Validation { val pc = PrivateChannel(lcu.shortChannelId, lcu.channelId, localNodeId, lcu.remoteNodeId, None, None, ChannelMeta(0 msat, 0 msat)).updateBalances(lcu.commitments) val d1 = d.copy( privateChannels = d.privateChannels + (lcu.channelId -> pc), - resolveScid = d.resolveScid + (lcu.shortChannelId -> lcu.channelId) + scid2PrivateChannels = d.scid2PrivateChannels + (lcu.shortChannelId -> lcu.channelId) ) handleChannelUpdate(d1, db, routerConf, Left(lcu)) }