From 223e6356c833125245221cb1ea895a462b4c6f3e Mon Sep 17 00:00:00 2001 From: sstone Date: Fri, 23 Aug 2019 16:29:18 +0200 Subject: [PATCH 1/5] Channel Range Queries: send back node announcements if requested This PR adds support for sending back node announcements when replying to channel range queries: - when explicitly requested (bit is set in the optional query flag) - when query flags are not used and a channel announcement is sent (as per the BOLTs) A new configuration option `request-node-announcements` has been added in the `router` section. If set to true, we will request node announcements when we receive a channel id (through channel range queries) that we don't know of. This is a setting that we will probably turn off on mobile devices. --- eclair-core/src/main/resources/reference.conf | 1 + .../scala/fr/acinq/eclair/NodeParams.scala | 1 + .../scala/fr/acinq/eclair/router/Router.scala | 83 +++++++++++++------ .../eclair/wire/QueryShortChannelIdsTlv.scala | 12 ++- .../scala/fr/acinq/eclair/TestConstants.scala | 2 + .../router/ChannelRangeQueriesSpec.scala | 33 ++++---- .../acinq/eclair/router/RoutingSyncSpec.scala | 71 +++++++++++----- 7 files changed, 138 insertions(+), 65 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index cadbbec31b..8a6717d75e 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -116,6 +116,7 @@ eclair { channel-exclude-duration = 60 seconds // when a temporary channel failure is returned, we exclude the channel from our payment routes for this duration broadcast-interval = 60 seconds // see BOLT #7 init-timeout = 5 minutes + request-node-announcements = true // if true we will ask for node annnouncements when we receive channel ids that we don't know // the values below will be used to perform route searching path-finding { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 093d3b8a64..c66ee80d76 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -235,6 +235,7 @@ object NodeParams { channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS), routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS), randomizeRouteSelection = config.getBoolean("router.randomize-route-selection"), + requestNodeAnnouncements = config.getBoolean("router.request-node-announcements"), searchMaxRouteLength = config.getInt("router.path-finding.max-route-length"), searchMaxCltv = config.getInt("router.path-finding.max-cltv"), searchMaxFeeBase = Satoshi(config.getLong("router.path-finding.fee-threshold-sat")), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 3d80cba7e0..2f86ba3d84 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -50,6 +50,7 @@ import scala.util.{Random, Try} case class RouterConf(randomizeRouteSelection: Boolean, channelExcludeDuration: FiniteDuration, routerBroadcastInterval: FiniteDuration, + requestNodeAnnouncements: Boolean, searchMaxFeeBase: Satoshi, searchMaxFeePct: Double, searchMaxRouteLength: Int, @@ -535,7 +536,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ ids match { case Nil => acc.reverse case head :: tail => - val flag = computeFlag(d.channels, d.updates)(head, timestamps.headOption, checksums.headOption) + val flag = computeFlag(d.channels, d.updates)(head, timestamps.headOption, checksums.headOption, nodeParams.routerConf.requestNodeAnnouncements) // 0 means nothing to query, just don't include it val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc loop(tail, timestamps.drop(1), checksums.drop(1), acc1) @@ -549,7 +550,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) { case ((c, u), ShortChannelIdAndFlag(_, flag)) => - val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeAnnouncement(flag)) 1 else 0) + val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0) val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0) (c1, u1) } @@ -573,26 +574,58 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ case Event(PeerRoutingMessage(transport, _, routingMessage@QueryShortChannelIds(chainHash, shortChannelIds, queryFlags_opt)), d) => sender ! TransportHandler.ReadAck(routingMessage) - val (channelCount, updatesCount) = shortChannelIds.array - .zipWithIndex - .foldLeft((0, 0)) { - case ((c, u), (shortChannelId, idx)) => - var c1 = c - var u1 = u - val flag = routingMessage.queryFlags_opt.map(_.array(idx)).getOrElse(QueryShortChannelIdsTlv.QueryFlagType.INCLUDE_ALL) - d.channels.get(shortChannelId) match { - case None => log.warning("received query for shortChannelId={} that we don't have", shortChannelId) - case Some(ca) => - if (QueryShortChannelIdsTlv.QueryFlagType.includeAnnouncement(flag)) { - transport ! ca - c1 = c1 + 1 - } - if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => transport ! u; u1 = u1 + 1 } - if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => transport ! u; u1 = u1 + 1 } + val flags = routingMessage.queryFlags_opt.map(_.array).getOrElse(List.empty[Long]) + + // we loop over channel ids and query flag and send what the sender requested + // we keep track of how many announcements and updates we've sent, and we also track node Ids for node announcement + // we've already sent to avoid sending them multiple times, as requested by the BOLTs + @tailrec + def loop(ids: List[ShortChannelId], flags: List[Long], numca: Int = 0, numcu: Int = 0, sent: Set[PublicKey] = Set.empty[PublicKey]): (Int, Int, Int) = ids match { + case Nil => (numca, numcu, sent.size) + case head :: tail if !d.channels.contains(head) => + log.warning("received query for shortChannelId={} that we don't have", head) + loop(tail, flags.drop(1), numca, numcu, sent) + case head :: tail => + var numca1 = numca + var numcu1 = numcu + var sent1 = sent + val ca = d.channels(head) + val flag_opt = flags.headOption + // no flag means send everything + + if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement).getOrElse(true)) { + transport ! ca + numca1 = numca1 + 1 + } + if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1).getOrElse(true)) { + d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => + transport ! u + numcu1 = numcu1 + 1 } - (c1, u1) - } - log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates", shortChannelIds.array.size, channelCount, updatesCount) + } + if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2).getOrElse(true)) { + d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => + transport ! u + numcu1 = numcu1 + 1 + } + } + if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeNodeAnnouncement1).getOrElse(true) && !sent1.contains(ca.nodeId1)) { + d.nodes.get(ca.nodeId1).foreach { a => + transport ! a + sent1 = sent1 + ca.nodeId1 + } + } + if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeNodeAnnouncement2).getOrElse(true) && !sent1.contains(ca.nodeId2)) { + d.nodes.get(ca.nodeId2).foreach { a => + transport ! a + sent1 = sent1 + ca.nodeId2 + } + } + loop(tail, flags.drop(1), numca1, numcu1, sent1) + } + + val (channelCount, updateCount, nodeCount) = loop(shortChannelIds.array, flags) + log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} node announcements", shortChannelIds.array.size, channelCount, updateCount, nodeCount) transport ! ReplyShortChannelIdsEnd(chainHash, 1) stay @@ -856,7 +889,8 @@ object Router { def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])( shortChannelId: ShortChannelId, timestamps_opt: Option[ReplyChannelRangeTlv.Timestamps], - checksums_opt: Option[ReplyChannelRangeTlv.Checksums]): Long = { + checksums_opt: Option[ReplyChannelRangeTlv.Checksums], + includeNodeAnnouncements: Boolean): Long = { import QueryShortChannelIdsTlv.QueryFlagType var flag = 0L (timestamps_opt, checksums_opt) match { @@ -884,8 +918,9 @@ object Router { // we know this channel: we only request their channel updates flag = QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 case _ => - // we don't know this channel: we request everything - flag = QueryFlagType.INCLUDE_CHANNEL_ANNOUNCEMENT | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 + // we don't know this channel: we request its channel announcement and updates + flag = QueryFlagType.INCLUDE_CHANNEL_ANNOUNCEMENT | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 + if (includeNodeAnnouncements) flag = flag | QueryFlagType.INCLUDE_NODE_ANNOUNCEMENT_1 | QueryFlagType.INCLUDE_NODE_ANNOUNCEMENT_2 } flag } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala index 3c878b2d7c..59ce2acad4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala @@ -1,7 +1,7 @@ package fr.acinq.eclair.wire import fr.acinq.eclair.UInt64 -import fr.acinq.eclair.wire.CommonCodecs.{shortchannelid, varint, varintoverflow} +import fr.acinq.eclair.wire.CommonCodecs.{varint, varintoverflow} import scodec.Codec import scodec.codecs.{byte, discriminated, list, provide, variableSizeBytesLong, zlib} @@ -20,13 +20,19 @@ object QueryShortChannelIdsTlv { val INCLUDE_CHANNEL_ANNOUNCEMENT: Long = 1 val INCLUDE_CHANNEL_UPDATE_1: Long = 2 val INCLUDE_CHANNEL_UPDATE_2: Long = 4 - val INCLUDE_ALL: Long = (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2) + val INCLUDE_NODE_ANNOUNCEMENT_1: Long = 8 + val INCLUDE_NODE_ANNOUNCEMENT_2: Long = 16 + val INCLUDE_ALL: Long = (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2 | INCLUDE_NODE_ANNOUNCEMENT_1 | INCLUDE_NODE_ANNOUNCEMENT_2) - def includeAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0 + def includeChannelAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0 def includeUpdate1(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_1) != 0 def includeUpdate2(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_2) != 0 + + def includeNodeAnnouncement1(flag: Long) = (flag & INCLUDE_NODE_ANNOUNCEMENT_1) != 0 + + def includeNodeAnnouncement2(flag: Long) = (flag & INCLUDE_NODE_ANNOUNCEMENT_2) != 0 } val encodedQueryFlagsCodec: Codec[EncodedQueryFlags] = diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 9c340e5a8e..f909aa70e0 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -106,6 +106,7 @@ object TestConstants { randomizeRouteSelection = false, channelExcludeDuration = 60 seconds, routerBroadcastInterval = 5 seconds, + requestNodeAnnouncements = true, searchMaxFeeBase = Satoshi(21), searchMaxFeePct = 0.03, searchMaxCltv = 2016, @@ -176,6 +177,7 @@ object TestConstants { randomizeRouteSelection = false, channelExcludeDuration = 60 seconds, routerBroadcastInterval = 5 seconds, + requestNodeAnnouncements = true, searchMaxFeeBase = Satoshi(21), searchMaxFeePct = 0.03, searchMaxCltv = 2016, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala index 065ee73ac3..f57aebeb81 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala @@ -17,7 +17,6 @@ package fr.acinq.eclair.router import fr.acinq.eclair.wire.ReplyChannelRangeTlv._ -import fr.acinq.eclair.wire._ import fr.acinq.eclair.{MilliSatoshi, randomKey} import org.scalatest.FunSuite @@ -27,21 +26,21 @@ import scala.compat.Platform class ChannelRangeQueriesSpec extends FunSuite { - test("compute flag tests") { + test("compute flag tests") { val now = Platform.currentTime / 1000 val a = randomKey.publicKey val b = randomKey.publicKey val ab = RouteCalculationSpec.makeChannel(123466L, a, b) - val (ab1, uab1) = RouteCalculationSpec.makeUpdateShort(ab.shortChannelId, ab.nodeId1, ab.nodeId2, MilliSatoshi(0), 0, timestamp = now) - val (ab2, uab2) = RouteCalculationSpec.makeUpdateShort(ab.shortChannelId, ab.nodeId2, ab.nodeId1, MilliSatoshi(0), 0, timestamp = now) + val (ab1, uab1) = RouteCalculationSpec.makeUpdateShort(ab.shortChannelId, ab.nodeId1, ab.nodeId2, MilliSatoshi(0), 0, timestamp = now) + val (ab2, uab2) = RouteCalculationSpec.makeUpdateShort(ab.shortChannelId, ab.nodeId2, ab.nodeId1, MilliSatoshi(0), 0, timestamp = now) val c = randomKey.publicKey val d = randomKey.publicKey val cd = RouteCalculationSpec.makeChannel(451312L, c, d) - val (cd1, ucd1) = RouteCalculationSpec.makeUpdateShort(cd.shortChannelId, cd.nodeId1, cd.nodeId2, MilliSatoshi(0), 0, timestamp = now) - val (_, ucd2) = RouteCalculationSpec.makeUpdateShort(cd.shortChannelId, cd.nodeId2, cd.nodeId1, MilliSatoshi(0), 0, timestamp = now) + val (cd1, ucd1) = RouteCalculationSpec.makeUpdateShort(cd.shortChannelId, cd.nodeId1, cd.nodeId2, MilliSatoshi(0), 0, timestamp = now) + val (_, ucd2) = RouteCalculationSpec.makeUpdateShort(cd.shortChannelId, cd.nodeId2, cd.nodeId1, MilliSatoshi(0), 0, timestamp = now) val e = randomKey.publicKey val f = randomKey.publicKey @@ -63,23 +62,23 @@ class ChannelRangeQueriesSpec extends FunSuite { assert(Router.getChannelDigestInfo(channels, updates)(ab.shortChannelId) == (Timestamps(now, now), Checksums(3297511804L, 3297511804L))) // no extended info but we know the channel: we ask for the updates - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, None, None) === (INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2).toByte) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, None, None, true) === (INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2).toByte) // same checksums, newer timestamps: we don't ask anything - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now + 1, now + 1)), Some(Checksums(3297511804L, 3297511804L))) === 0.toByte) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now + 1, now + 1)), Some(Checksums(3297511804L, 3297511804L)), true) === 0.toByte) // different checksums, newer timestamps: we ask for the updates - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now + 1, now)), Some(Checksums(154654604, 3297511804L))) === INCLUDE_CHANNEL_UPDATE_1) - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now, now + 1)), Some(Checksums(3297511804L, 45664546))) === INCLUDE_CHANNEL_UPDATE_2) - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now + 1, now + 1)), Some(Checksums(154654604, 45664546+6))) === (INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2).toByte) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now + 1, now)), Some(Checksums(154654604, 3297511804L)), true) === INCLUDE_CHANNEL_UPDATE_1) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now, now + 1)), Some(Checksums(3297511804L, 45664546)), true) === INCLUDE_CHANNEL_UPDATE_2) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now + 1, now + 1)), Some(Checksums(154654604, 45664546 + 6)), true) === (INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2).toByte) // different checksums, older timestamps: we don't ask anything - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now - 1, now)), Some(Checksums(154654604, 3297511804L))) === 0.toByte) - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now, now - 1)), Some(Checksums(3297511804L, 45664546))) === 0.toByte) - assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now - 1, now - 1)), Some(Checksums(154654604, 45664546))) === 0.toByte) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now - 1, now)), Some(Checksums(154654604, 3297511804L)), true) === 0.toByte) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now, now - 1)), Some(Checksums(3297511804L, 45664546)), true) === 0.toByte) + assert(Router.computeFlag(channels, updates)(ab.shortChannelId, Some(Timestamps(now - 1, now - 1)), Some(Checksums(154654604, 45664546)), true) === 0.toByte) // missing channel update: we ask for it - assert(Router.computeFlag(channels, updates)(cd.shortChannelId, Some(Timestamps(now, now)), Some(Checksums(3297511804L, 3297511804L))) === INCLUDE_CHANNEL_UPDATE_2) + assert(Router.computeFlag(channels, updates)(cd.shortChannelId, Some(Timestamps(now, now)), Some(Checksums(3297511804L, 3297511804L)), true) === INCLUDE_CHANNEL_UPDATE_2) // unknown channel: we ask everything - assert(Router.computeFlag(channels, updates)(ef.shortChannelId, None, None) === INCLUDE_ALL) - + assert(Router.computeFlag(channels, updates)(ef.shortChannelId, None, None, false) === (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2)) + assert(Router.computeFlag(channels, updates)(ef.shortChannelId, None, None, true) === (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2 | INCLUDE_NODE_ANNOUNCEMENT_1 | INCLUDE_NODE_ANNOUNCEMENT_2)) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala index 95f451ee45..0d0e13999a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala @@ -61,10 +61,10 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { } } - case class BasicSyncResult(ranges: Int, queries: Int, channels: Int, updates: Int) + case class BasicSyncResult(ranges: Int, queries: Int, channels: Int, updates: Int, nodes: Int) - case class SyncResult(ranges: Seq[ReplyChannelRange], queries: Seq[QueryShortChannelIds], channels: Seq[ChannelAnnouncement], updates: Seq[ChannelUpdate]) { - def counts = BasicSyncResult(ranges.size, queries.size, channels.size, updates.size) + case class SyncResult(ranges: Seq[ReplyChannelRange], queries: Seq[QueryShortChannelIds], channels: Seq[ChannelAnnouncement], updates: Seq[ChannelUpdate], nodes: Seq[NodeAnnouncement]) { + def counts = BasicSyncResult(ranges.size, queries.size, channels.size, updates.size, nodes.size) } def sync(src: TestFSMRef[State, Data, Router], tgt: TestFSMRef[State, Data, Router], extendedQueryFlags_opt: Option[QueryChannelRangeTlv]): SyncResult = { @@ -92,6 +92,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { var queries = Vector.empty[QueryShortChannelIds] var channels = Vector.empty[ChannelAnnouncement] var updates = Vector.empty[ChannelUpdate] + var nodes = Vector.empty[NodeAnnouncement] while (src.stateData.sync.nonEmpty) { // for each chunk, src sends a query_short_channel_id val query = pipe.expectMsgType[QueryShortChannelIds] @@ -104,6 +105,9 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { case u: ChannelUpdate => updates = updates :+ u u + case n: NodeAnnouncement => + nodes = nodes :+ n + n } // tgt replies with announcements announcements.foreach(ann => pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, ann))) @@ -111,7 +115,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { val rscie = pipe.expectMsgType[ReplyShortChannelIdsEnd] pipe.send(src, PeerRoutingMessage(pipe.ref, tgtId, rscie)) } - SyncResult(rcrs, queries, channels, updates) + SyncResult(rcrs, queries, channels, updates, nodes) } test("handle channel range extended") { @@ -123,34 +127,46 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { val extendedQueryFlags_opt = None // tell alice to sync with bob - assert(BasicSyncResult(ranges = 1, queries = 0, channels = 0, updates = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 1, queries = 0, channels = 0, updates = 0, nodes = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) + awaitCond(alice.stateData.nodes === bob.stateData.nodes) // add some channels and updates to bob and resync - fakeRoutingInfo.take(40).map(_._2._1).foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) - fakeRoutingInfo.take(40).map(_._2._2).foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) + fakeRoutingInfo.take(40).values.foreach { + case (ca, cu1, cu2, na1, na2) => + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, ca)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, cu1)) + // we don't send channel_update #2 + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) + } awaitCond(bob.stateData.channels.size === 40 && bob.stateData.updates.size === 40) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) // add some updates to bob and resync - fakeRoutingInfo.take(40).map(_._2._3).foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) + fakeRoutingInfo.take(40).values.foreach { + case (ca, cu1, cu2, na1, na2) => + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, cu2)) + } awaitCond(bob.stateData.channels.size === 40 && bob.stateData.updates.size === 80) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 80, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) // add everything (duplicates will be ignored) fakeRoutingInfo.values.foreach { - case (c, u1, u2, _, _) => + case (c, u1, u2, na1, na2) => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, u1)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, u2)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && bob.stateData.updates.size === 2 * fakeRoutingInfo.size) - assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) } @@ -164,34 +180,46 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { val extendedQueryFlags_opt = Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) // tell alice to sync with bob - assert(BasicSyncResult(ranges = 1, queries = 0, channels = 0, updates = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 1, queries = 0, channels = 0, updates = 0, nodes = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) // add some channels and updates to bob and resync - fakeRoutingInfo.take(40).map(_._2._1).foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) - fakeRoutingInfo.take(40).map(_._2._2).foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) + fakeRoutingInfo.take(40).values.foreach { + case (ca, cu1, cu2, na1, na2) => + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, ca)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, cu1)) + //sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, cu2)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) + } awaitCond(bob.stateData.channels.size === 40 && bob.stateData.updates.size === 40) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) + awaitCond(alice.stateData.nodes === bob.stateData.nodes) // add some updates to bob and resync - fakeRoutingInfo.take(40).map(_._2._3).foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) + fakeRoutingInfo.take(40).values.foreach { + case (ca, cu1, cu2, na1, na2) => + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, cu2)) + } awaitCond(bob.stateData.channels.size === 40 && bob.stateData.updates.size === 80) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 0, updates = 40) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 1, queries = 1, channels = 0, updates = 40, nodes = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) // add everything (duplicates will be ignored) fakeRoutingInfo.values.foreach { - case (c, u1, u2, _, _) => + case (c, u1, u2, na1, na2) => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, u1)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, u2)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) + sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && bob.stateData.updates.size === 2 * fakeRoutingInfo.size) - assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size - 40, updates = 2 * (fakeRoutingInfo.size - 40)) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size - 40, updates = 2 * (fakeRoutingInfo.size - 40), nodes = 2 * (fakeRoutingInfo.size - 40)) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) @@ -203,9 +231,10 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { val bumpedUpdates = (List(0, 42, 147, 153, 654, 834, 2301).map(touchUpdate(_, true)) ++ List(1, 42, 150, 200).map(touchUpdate(_, false))).toSet bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) - assert(BasicSyncResult(ranges = 2, queries = 2, channels = 0, updates = bumpedUpdates.size) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 2, queries = 2, channels = 0, updates = bumpedUpdates.size, nodes = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) awaitCond(alice.stateData.updates === bob.stateData.updates) + awaitCond(alice.stateData.nodes === bob.stateData.nodes) } test("reset sync state on reconnection") { From d90743cd4205365493e641930a554e78c0d84843 Mon Sep 17 00:00:00 2001 From: sstone Date: Fri, 23 Aug 2019 17:25:42 +0200 Subject: [PATCH 2/5] Increase tests timeouts There is now more work to do. --- .../fr/acinq/eclair/router/RoutingSyncSpec.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala index 0d0e13999a..b313d61c2e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala @@ -34,6 +34,7 @@ import org.scalatest.FunSuiteLike import scala.collection.immutable.TreeMap import scala.collection.{SortedSet, immutable, mutable} import scala.compat.Platform +import scala.concurrent.duration._ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { @@ -165,9 +166,9 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } - awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && bob.stateData.updates.size === 2 * fakeRoutingInfo.size) + awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && bob.stateData.updates.size === 2 * fakeRoutingInfo.size, max = 20 seconds) assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts) - awaitCond(alice.stateData.channels === bob.stateData.channels) + awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) } @@ -195,7 +196,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { } awaitCond(bob.stateData.channels.size === 40 && bob.stateData.updates.size === 40) assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) - awaitCond(alice.stateData.channels === bob.stateData.channels) + awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) awaitCond(alice.stateData.nodes === bob.stateData.nodes) @@ -206,7 +207,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { } awaitCond(bob.stateData.channels.size === 40 && bob.stateData.updates.size === 80) assert(BasicSyncResult(ranges = 1, queries = 1, channels = 0, updates = 40, nodes = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) - awaitCond(alice.stateData.channels === bob.stateData.channels) + awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) // add everything (duplicates will be ignored) @@ -218,9 +219,9 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } - awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && bob.stateData.updates.size === 2 * fakeRoutingInfo.size) + awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && bob.stateData.updates.size === 2 * fakeRoutingInfo.size, max = 20 seconds) assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size - 40, updates = 2 * (fakeRoutingInfo.size - 40), nodes = 2 * (fakeRoutingInfo.size - 40)) === sync(alice, bob, extendedQueryFlags_opt).counts) - awaitCond(alice.stateData.channels === bob.stateData.channels) + awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) // bump random channel_updates @@ -232,7 +233,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { val bumpedUpdates = (List(0, 42, 147, 153, 654, 834, 2301).map(touchUpdate(_, true)) ++ List(1, 42, 150, 200).map(touchUpdate(_, false))).toSet bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) assert(BasicSyncResult(ranges = 2, queries = 2, channels = 0, updates = bumpedUpdates.size, nodes = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) - awaitCond(alice.stateData.channels === bob.stateData.channels) + awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) awaitCond(alice.stateData.nodes === bob.stateData.nodes) } From be1d85a65f07c1a1779ec70fc70a3d11f9db725a Mon Sep 17 00:00:00 2001 From: sstone Date: Fri, 23 Aug 2019 18:50:22 +0200 Subject: [PATCH 3/5] Test query sync with and without node announcements --- eclair-core/src/main/resources/reference.conf | 2 +- .../scala/fr/acinq/eclair/router/Router.scala | 8 +++---- .../eclair/wire/QueryShortChannelIdsTlv.scala | 1 - .../acinq/eclair/router/RoutingSyncSpec.scala | 24 ++++++++++++------- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 8a6717d75e..49898092e1 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -116,7 +116,7 @@ eclair { channel-exclude-duration = 60 seconds // when a temporary channel failure is returned, we exclude the channel from our payment routes for this duration broadcast-interval = 60 seconds // see BOLT #7 init-timeout = 5 minutes - request-node-announcements = true // if true we will ask for node annnouncements when we receive channel ids that we don't know + request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know // the values below will be used to perform route searching path-finding { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 2f86ba3d84..03c3b26227 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -610,14 +610,14 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ } } if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeNodeAnnouncement1).getOrElse(true) && !sent1.contains(ca.nodeId1)) { - d.nodes.get(ca.nodeId1).foreach { a => - transport ! a + d.nodes.get(ca.nodeId1).foreach { n => + transport ! n sent1 = sent1 + ca.nodeId1 } } if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeNodeAnnouncement2).getOrElse(true) && !sent1.contains(ca.nodeId2)) { - d.nodes.get(ca.nodeId2).foreach { a => - transport ! a + d.nodes.get(ca.nodeId2).foreach { n => + transport ! n sent1 = sent1 + ca.nodeId2 } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala index 59ce2acad4..12f5ad96fa 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/QueryShortChannelIdsTlv.scala @@ -22,7 +22,6 @@ object QueryShortChannelIdsTlv { val INCLUDE_CHANNEL_UPDATE_2: Long = 4 val INCLUDE_NODE_ANNOUNCEMENT_1: Long = 8 val INCLUDE_NODE_ANNOUNCEMENT_2: Long = 16 - val INCLUDE_ALL: Long = (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2 | INCLUDE_NODE_ANNOUNCEMENT_1 | INCLUDE_NODE_ANNOUNCEMENT_2) def includeChannelAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0 diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala index b313d61c2e..2b2a0a0804 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala @@ -119,7 +119,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { SyncResult(rcrs, queries, channels, updates, nodes) } - test("handle channel range extended") { + test("sync with standard channel queries") { val watcher = system.actorOf(Props(new YesWatcher())) val alice = TestFSMRef(new Router(Alice.nodeParams, watcher)) val bob = TestFSMRef(new Router(Bob.nodeParams, watcher)) @@ -172,9 +172,9 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { awaitCond(alice.stateData.updates === bob.stateData.updates) } - test("handle channel range extended (extended)") { + def syncWithExtendedQueries(requestChannelAnnouncements: Boolean) = { val watcher = system.actorOf(Props(new YesWatcher())) - val alice = TestFSMRef(new Router(Alice.nodeParams, watcher)) + val alice = TestFSMRef(new Router(Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(requestNodeAnnouncements = requestChannelAnnouncements)), watcher)) val bob = TestFSMRef(new Router(Bob.nodeParams, watcher)) val charlieId = randomKey.publicKey val sender = TestProbe() @@ -190,15 +190,15 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { case (ca, cu1, cu2, na1, na2) => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, ca)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, cu1)) - //sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, cu2)) + // we don't send channel_update #2 sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } awaitCond(bob.stateData.channels.size === 40 && bob.stateData.updates.size === 40) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = if (requestChannelAnnouncements) 80 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) - awaitCond(alice.stateData.nodes === bob.stateData.nodes) + if (requestChannelAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes) // add some updates to bob and resync fakeRoutingInfo.take(40).values.foreach { @@ -220,7 +220,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && bob.stateData.updates.size === 2 * fakeRoutingInfo.size, max = 20 seconds) - assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size - 40, updates = 2 * (fakeRoutingInfo.size - 40), nodes = 2 * (fakeRoutingInfo.size - 40)) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 2, queries = 24, channels = fakeRoutingInfo.size - 40, updates = 2 * (fakeRoutingInfo.size - 40), nodes = if (requestChannelAnnouncements) 2 * (fakeRoutingInfo.size - 40) else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) @@ -235,7 +235,15 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { assert(BasicSyncResult(ranges = 2, queries = 2, channels = 0, updates = bumpedUpdates.size, nodes = 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 20 seconds) awaitCond(alice.stateData.updates === bob.stateData.updates) - awaitCond(alice.stateData.nodes === bob.stateData.nodes) + if (requestChannelAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes) + } + + test("sync with extended channel queries (don't request node announcements)") { + syncWithExtendedQueries(false) + } + + test("sync with extended channel queries (request node announcements)") { + syncWithExtendedQueries(true) } test("reset sync state on reconnection") { From 1d6b3094bf504d5cfa671cdb04e7b7167d916fa8 Mon Sep 17 00:00:00 2001 From: sstone Date: Mon, 26 Aug 2019 10:26:42 +0200 Subject: [PATCH 4/5] Router: minor fix --- eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 03c3b26227..d3da7d4895 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -625,7 +625,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ } val (channelCount, updateCount, nodeCount) = loop(shortChannelIds.array, flags) - log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} node announcements", shortChannelIds.array.size, channelCount, updateCount, nodeCount) + log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", shortChannelIds.array.size, channelCount, updateCount, nodeCount) transport ! ReplyShortChannelIdsEnd(chainHash, 1) stay From 2be5c81c9cbceeed3d60f19c288919e266effd49 Mon Sep 17 00:00:00 2001 From: sstone Date: Mon, 26 Aug 2019 14:28:19 +0200 Subject: [PATCH 5/5] Router: rework query handling --- .../scala/fr/acinq/eclair/router/Router.scala | 219 +++++++++++------- .../router/ChannelRangeQueriesSpec.scala | 42 ++++ 2 files changed, 182 insertions(+), 79 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index d3da7d4895..4c4ceda9ef 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -576,55 +576,26 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ sender ! TransportHandler.ReadAck(routingMessage) val flags = routingMessage.queryFlags_opt.map(_.array).getOrElse(List.empty[Long]) - // we loop over channel ids and query flag and send what the sender requested - // we keep track of how many announcements and updates we've sent, and we also track node Ids for node announcement - // we've already sent to avoid sending them multiple times, as requested by the BOLTs - @tailrec - def loop(ids: List[ShortChannelId], flags: List[Long], numca: Int = 0, numcu: Int = 0, sent: Set[PublicKey] = Set.empty[PublicKey]): (Int, Int, Int) = ids match { - case Nil => (numca, numcu, sent.size) - case head :: tail if !d.channels.contains(head) => - log.warning("received query for shortChannelId={} that we don't have", head) - loop(tail, flags.drop(1), numca, numcu, sent) - case head :: tail => - var numca1 = numca - var numcu1 = numcu - var sent1 = sent - val ca = d.channels(head) - val flag_opt = flags.headOption - // no flag means send everything - - if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement).getOrElse(true)) { - transport ! ca - numca1 = numca1 + 1 - } - if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1).getOrElse(true)) { - d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => - transport ! u - numcu1 = numcu1 + 1 - } - } - if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2).getOrElse(true)) { - d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => - transport ! u - numcu1 = numcu1 + 1 - } - } - if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeNodeAnnouncement1).getOrElse(true) && !sent1.contains(ca.nodeId1)) { - d.nodes.get(ca.nodeId1).foreach { n => - transport ! n - sent1 = sent1 + ca.nodeId1 - } - } - if (flag_opt.map(QueryShortChannelIdsTlv.QueryFlagType.includeNodeAnnouncement2).getOrElse(true) && !sent1.contains(ca.nodeId2)) { - d.nodes.get(ca.nodeId2).foreach { n => - transport ! n - sent1 = sent1 + ca.nodeId2 - } - } - loop(tail, flags.drop(1), numca1, numcu1, sent1) - } - - val (channelCount, updateCount, nodeCount) = loop(shortChannelIds.array, flags) + var channelCount = 0 + var updateCount = 0 + var nodeCount = 0 + + Router.handleQuery(d.nodes, d.channels, d.updates)( + shortChannelIds.array, + flags, + ca => { + channelCount = channelCount + 1 + transport ! ca + }, + cu => { + updateCount = updateCount + 1 + transport ! cu + }, + na => { + nodeCount = nodeCount + 1 + transport ! na + } + ) log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", shortChannelIds.array.size, channelCount, updateCount, nodeCount) transport ! ReplyShortChannelIdsEnd(chainHash, 1) stay @@ -886,45 +857,135 @@ object Router { height >= firstBlockNum && height <= (firstBlockNum + numberOfBlocks) } - def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])( - shortChannelId: ShortChannelId, - timestamps_opt: Option[ReplyChannelRangeTlv.Timestamps], - checksums_opt: Option[ReplyChannelRangeTlv.Checksums], - includeNodeAnnouncements: Boolean): Long = { - import QueryShortChannelIdsTlv.QueryFlagType - var flag = 0L - (timestamps_opt, checksums_opt) match { - case (Some(theirTimestamps), Some(theirChecksums)) if channels.contains(shortChannelId) => - val (ourTimestamps, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId) + def shouldRequestUpdate(ourTimestamp: Long, ourChecksum: Long, theirTimestamp_opt: Option[Long], theirChecksum_opt: Option[Long]): Boolean = { + (theirTimestamp_opt, theirChecksum_opt) match { + case (Some(theirTimestamp), Some(theirChecksum)) => // we request their channel_update if all those conditions are met: // - it is more recent than ours // - it is different from ours, or it is the same but ours is about to be stale - // - it is not stale itself - if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && (ourChecksums.checksum1 != theirChecksums.checksum1 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 - if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && (ourChecksums.checksum2 != theirChecksums.checksum2 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 - case (Some(theirTimestamps), None) if channels.contains(shortChannelId) => - val (ourTimestamps, _) = Router.getChannelDigestInfo(channels, updates)(shortChannelId) - // we request their channel_update if all those conditions are met: - // - it is more recent than ours - // - it is not stale itself - if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 - if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 - case (None, Some(theirChecksums)) if channels.contains(shortChannelId) => - val (_, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId) + // - it is not stale + val theirsIsMoreRecent = ourTimestamp < theirTimestamp + val theirsIsDifferent = ourChecksum != theirChecksum + val oursIsAlmostStale = isAlmostStale(ourTimestamp) + val theirsIsStale = isStale(theirTimestamp) + theirsIsMoreRecent && (theirsIsDifferent || oursIsAlmostStale) && !theirsIsStale + case (Some(theirTimestamp), None) => + val theirsIsMoreRecent = ourTimestamp < theirTimestamp + val theirsIsStale = isStale(theirTimestamp) + theirsIsMoreRecent && !theirsIsStale + case (None, Some(theirChecksum)) => // this should not happen as we will not ask for checksums without asking for timestamps too - if (ourChecksums.checksum1 != theirChecksums.checksum1 && theirChecksums.checksum1 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 - if (ourChecksums.checksum2 != theirChecksums.checksum2 && theirChecksums.checksum2 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 - case (None, None) if channels.contains(shortChannelId) => - // we know this channel: we only request their channel updates - flag = QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 + val theirsIsDifferent = theirChecksum != 0 && ourChecksum != theirChecksum + theirsIsDifferent case _ => - // we don't know this channel: we request its channel announcement and updates - flag = QueryFlagType.INCLUDE_CHANNEL_ANNOUNCEMENT | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2 - if (includeNodeAnnouncements) flag = flag | QueryFlagType.INCLUDE_NODE_ANNOUNCEMENT_1 | QueryFlagType.INCLUDE_NODE_ANNOUNCEMENT_2 + // they did not include timestamp or checksum => ask for the update + true + } + } + + def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])( + shortChannelId: ShortChannelId, + theirTimestamps_opt: Option[ReplyChannelRangeTlv.Timestamps], + theirChecksums_opt: Option[ReplyChannelRangeTlv.Checksums], + includeNodeAnnouncements: Boolean): Long = { + import QueryShortChannelIdsTlv.QueryFlagType._ + + val flag = channels.contains(shortChannelId) match { + case false if includeNodeAnnouncements => + INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2 | INCLUDE_NODE_ANNOUNCEMENT_1 | INCLUDE_NODE_ANNOUNCEMENT_2 + case false => + INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2 + case true => + // we already know this channel + val (ourTimestamps, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId) + // if they don't provide timestamps or checksums, we set appropriate default values: + // - we assume their timestamp is more recent than ours by setting timestamp = Long.MaxValue + // - we assume their update is different from ours by setting checkum = Long.MaxValue (NB: our default value for checksum is 0) + val shouldRequestUpdate1 = shouldRequestUpdate(ourTimestamps.timestamp1, ourChecksums.checksum1, theirTimestamps_opt.map(_.timestamp1), theirChecksums_opt.map(_.checksum1)) + val shouldRequestUpdate2 = shouldRequestUpdate(ourTimestamps.timestamp2, ourChecksums.checksum2, theirTimestamps_opt.map(_.timestamp2), theirChecksums_opt.map(_.checksum2)) + val flagUpdate1 = if (shouldRequestUpdate1) INCLUDE_CHANNEL_UPDATE_1 else 0 + val flagUpdate2 = if (shouldRequestUpdate2) INCLUDE_CHANNEL_UPDATE_2 else 0 + flagUpdate1 | flagUpdate2 } flag } + /** + * Handle a query message, which includes a list of channel ids and flags. + * + * @param nodes node id -> node announcement + * @param channels channel id -> channel announcement + * @param updates channel description -> channel update + * @param ids list of channel ids + * @param flags list of query flags, either empty one flag per channel id + * @param onChannel called when a channel announcement matches (i.e. its bit is set in the query flag and we have it) + * @param onUpdate called when a channel update matches + * @param onNode called when a node announcement matches + * + */ + def handleQuery(nodes: Map[PublicKey, NodeAnnouncement], + channels: SortedMap[ShortChannelId, ChannelAnnouncement], + updates: Map[ChannelDesc, ChannelUpdate])( + ids: List[ShortChannelId], + flags: List[Long], + onChannel: ChannelAnnouncement => Unit, + onUpdate: ChannelUpdate => Unit, + onNode: NodeAnnouncement => Unit): Unit = { + import QueryShortChannelIdsTlv.QueryFlagType + + // we loop over channel ids and query flag. We track node Ids for node announcement + // we've already sent to avoid sending them multiple times, as requested by the BOLTs + @tailrec + def loop(ids: List[ShortChannelId], flags: List[Long], numca: Int = 0, numcu: Int = 0, nodesSent: Set[PublicKey] = Set.empty[PublicKey]): (Int, Int, Int) = ids match { + case Nil => (numca, numcu, nodesSent.size) + case head :: tail if !channels.contains(head) => + //log.warning("received query for shortChannelId={} that we don't have", head) + loop(tail, flags.drop(1), numca, numcu, nodesSent) + case head :: tail => + var numca1 = numca + var numcu1 = numcu + var sent1 = nodesSent + val ca = channels(head) + val flag_opt = flags.headOption + // no flag means send everything + + val includeChannel = flag_opt.forall(QueryFlagType.includeChannelAnnouncement) + val includeUpdate1 = flag_opt.forall(QueryFlagType.includeUpdate1) + val includeUpdate2 = flag_opt.forall(QueryFlagType.includeUpdate2) + val includeNode1 = flag_opt.forall(QueryFlagType.includeNodeAnnouncement1) + val includeNode2 = flag_opt.forall(QueryFlagType.includeNodeAnnouncement2) + + if (includeChannel) { + onChannel(ca) + } + if (includeUpdate1) { + updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => + onUpdate(u) + } + } + if (includeUpdate2) { + updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => + onUpdate(u) + } + } + if (includeNode1 && !sent1.contains(ca.nodeId1)) { + nodes.get(ca.nodeId1).foreach { n => + onNode(n) + sent1 = sent1 + ca.nodeId1 + } + } + if (includeNode2 && !sent1.contains(ca.nodeId2)) { + nodes.get(ca.nodeId2).foreach { n => + onNode(n) + sent1 = sent1 + ca.nodeId2 + } + } + loop(tail, flags.drop(1), numca1, numcu1, sent1) + } + + loop(ids, flags) + } + /** * Returns overall progress on synchronization * diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala index f57aebeb81..f7f009e970 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRangeQueriesSpec.scala @@ -26,6 +26,48 @@ import scala.compat.Platform class ChannelRangeQueriesSpec extends FunSuite { + test("ask for update test") { + // they don't provide anything => we always ask for the update + assert(Router.shouldRequestUpdate(0, 0, None, None)) + assert(Router.shouldRequestUpdate(Int.MaxValue, 12345, None, None)) + + // their update is older => don't ask + val now = Platform.currentTime / 1000 + assert(!Router.shouldRequestUpdate(now, 0, Some(now - 1), None)) + assert(!Router.shouldRequestUpdate(now, 0, Some(now - 1), Some(12345))) + assert(!Router.shouldRequestUpdate(now, 12344, Some(now - 1), None)) + assert(!Router.shouldRequestUpdate(now, 12344, Some(now - 1), Some(12345))) + + // their update is newer but stale => don't ask + val old = now - 4 * 2016 * 24 * 3600 + assert(!Router.shouldRequestUpdate(old - 1, 0, Some(old), None)) + assert(!Router.shouldRequestUpdate(old - 1, 0, Some(old), Some(12345))) + assert(!Router.shouldRequestUpdate(old - 1, 12344, Some(old), None)) + assert(!Router.shouldRequestUpdate(old - 1, 12344, Some(old), Some(12345))) + + // their update is newer but with the same checksum, and ours is stale or about to be => ask (we want to renew our update) + assert(Router.shouldRequestUpdate(old, 12345, Some(now), Some(12345))) + + // their update is newer but with the same checksum => don't ask + assert(!Router.shouldRequestUpdate(now - 1, 12345, Some(now), Some(12345))) + + // their update is newer with a different checksum => always ask + assert(Router.shouldRequestUpdate(now - 1, 0, Some(now), None)) + assert(Router.shouldRequestUpdate(now - 1, 0, Some(now), Some(12345))) + assert(Router.shouldRequestUpdate(now - 1, 12344, Some(now), None)) + assert(Router.shouldRequestUpdate(now - 1, 12344, Some(now), Some(12345))) + + // they just provided a 0 checksum => don't ask + assert(!Router.shouldRequestUpdate(0, 0, None, Some(0))) + assert(!Router.shouldRequestUpdate(now, 1234, None, Some(0))) + + // they just provided a checksum that is the same as us => don't ask + assert(!Router.shouldRequestUpdate(now, 1234, None, Some(1234))) + + // they just provided a different checksum that is the same as us => ask + assert(Router.shouldRequestUpdate(now, 1234, None, Some(1235))) + } + test("compute flag tests") { val now = Platform.currentTime / 1000