diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index a6ac85de83..13fddf73c8 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -121,6 +121,8 @@ eclair { sync { request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know encoding-type = zlib // encoding for short_channel_ids and timestamps in query channel sync messages; other possible value is "uncompressed" + channel-range-chunk-size = 2500 // max number of short_channel_ids (+ timestamps + checksums) in reply_channel_range *do not change this unless you know what you are doing* + channel-query-chunk-size = 100 // max number of short_channel_ids in query_short_channel_ids *do not change this unless you know what you are doing* } // the values below will be used to perform route searching diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 21e20c532f..ecadd9dbc0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -246,6 +246,8 @@ object NodeParams { randomizeRouteSelection = config.getBoolean("router.randomize-route-selection"), requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"), encodingType = routerSyncEncodingType, + channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"), + channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"), searchMaxRouteLength = config.getInt("router.path-finding.max-route-length"), searchMaxCltv = CltvExpiryDelta(config.getInt("router.path-finding.max-cltv")), searchMaxFeeBase = Satoshi(config.getLong("router.path-finding.fee-threshold-sat")), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 7001f3bb0f..a42fecb688 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -53,6 +53,8 @@ case class RouterConf(randomizeRouteSelection: Boolean, routerBroadcastInterval: FiniteDuration, requestNodeAnnouncements: Boolean, encodingType: EncodingType, + channelRangeChunkSize: Int, + channelQueryChunkSize: Int, searchMaxFeeBase: Satoshi, searchMaxFeePct: Double, searchMaxRouteLength: Int, @@ -163,8 +165,6 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ setTimer(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval, repeat = true) setTimer(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour, repeat = true) - val SHORTID_WINDOW = 100 - val defaultRouteParams = getDefaultRouteParams(nodeParams.routerConf) val db = nodeParams.db.network @@ -541,7 +541,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ // keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks] val shortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _)) log.info("replying with {} items for range=({}, {})", shortChannelIds.size, firstBlockNum, numberOfBlocks) - split(shortChannelIds) + split(shortChannelIds, nodeParams.routerConf.channelRangeChunkSize) .foreach(chunk => { val (timestamps, checksums) = routingMessage.queryFlags_opt match { case Some(extension) if extension.wantChecksums | extension.wantTimestamps => @@ -589,7 +589,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", shortChannelIds.array.size, channelCount, updatesCount, shortChannelIds.encoding) // we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time) val replies = shortChannelIdAndFlags - .grouped(SHORTID_WINDOW) + .grouped(nodeParams.routerConf.channelQueryChunkSize) .map(chunk => QueryShortChannelIds(chainHash, shortChannelIds = EncodedShortChannelIds(shortChannelIds.encoding, chunk.map(_.shortChannelId)), if (routingMessage.timestamps_opt.isDefined || routingMessage.checksums_opt.isDefined) @@ -811,7 +811,6 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ } object Router { - val SHORTID_WINDOW = 100 def props(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Done]] = None) = Props(new Router(nodeParams, watcher, initialized)) @@ -1070,15 +1069,14 @@ object Router { * @param shortChannelIds * @return */ - def split(shortChannelIds: SortedSet[ShortChannelId]): List[ShortChannelIdsChunk] = { + def split(shortChannelIds: SortedSet[ShortChannelId], channelRangeChunkSize: Int): List[ShortChannelIdsChunk] = { // this algorithm can split blocks (meaning that we can in theory generate several replies with the same first_block/num_blocks // and a different set of short_channel_ids) but it doesn't matter - val SPLIT_SIZE = 3500 // we can theoretically fit 4091 uncompressed channel ids in a single lightning message (max size 65 Kb) if (shortChannelIds.isEmpty) { List(ShortChannelIdsChunk(0, 0, List.empty)) } else { shortChannelIds - .grouped(SPLIT_SIZE) + .grouped(channelRangeChunkSize) .toList .map { group => // NB: group is never empty diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/ReplyChannelRangeTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/ReplyChannelRangeTlv.scala index bde4605551..c7ea06fa67 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/ReplyChannelRangeTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/ReplyChannelRangeTlv.scala @@ -39,8 +39,8 @@ object ReplyChannelRangeTlv { case class EncodedChecksums(checksums: List[Checksums]) extends ReplyChannelRangeTlv val timestampsCodec: Codec[Timestamps] = ( - ("checksum1" | uint32) :: - ("checksum2" | uint32) + ("timestamp1" | uint32) :: + ("timestamp2" | uint32) ).as[Timestamps] val encodedTimestampsCodec: Codec[EncodedTimestamps] = variableSizeBytesLong(varintoverflow, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 1495bc9686..5d6a9b11b5 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -109,6 +109,8 @@ object TestConstants { routerBroadcastInterval = 5 seconds, requestNodeAnnouncements = true, encodingType = EncodingType.COMPRESSED_ZLIB, + channelRangeChunkSize = 20, + channelQueryChunkSize = 5, searchMaxFeeBase = 21 sat, searchMaxFeePct = 0.03, searchMaxCltv = CltvExpiryDelta(2016), @@ -182,6 +184,8 @@ object TestConstants { routerBroadcastInterval = 5 seconds, requestNodeAnnouncements = true, encodingType = EncodingType.UNCOMPRESSED, + channelRangeChunkSize = 20, + channelQueryChunkSize = 5, searchMaxFeeBase = 21 sat, searchMaxFeePct = 0.03, searchMaxCltv = CltvExpiryDelta(2016), diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala index 9144657412..fefa68b5e4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala @@ -43,7 +43,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { val fakeRoutingInfo: TreeMap[ShortChannelId, (PublicChannel, NodeAnnouncement, NodeAnnouncement)] = RoutingSyncSpec .shortChannelIds - .take(4567) + .take(60) .foldLeft(TreeMap.empty[ShortChannelId, (PublicChannel, NodeAnnouncement, NodeAnnouncement)]) { case (m, shortChannelId) => m + (shortChannelId -> makeFakeRoutingInfo(shortChannelId)) } @@ -137,7 +137,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { awaitCond(alice.stateData.nodes === bob.stateData.nodes) // add some channels and updates to bob and resync - fakeRoutingInfo.take(40).values.foreach { + fakeRoutingInfo.take(10).values.foreach { case (pc, na1, na2) => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get)) @@ -145,17 +145,17 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } - awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 40) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) + awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10) + assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) // add some updates to bob and resync - fakeRoutingInfo.take(40).values.foreach { + fakeRoutingInfo.take(10).values.foreach { case (pc, na1, na2) => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get)) } - awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 80) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 80, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts) + awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2) + assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10 * 2, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels) // add everything (duplicates will be ignored) @@ -168,7 +168,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds) - assert(BasicSyncResult(ranges = 2, queries = 46, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 3, queries = 12, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds) } @@ -185,7 +185,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { awaitCond(alice.stateData.channels === bob.stateData.channels) // add some channels and updates to bob and resync - fakeRoutingInfo.take(40).values.foreach { + fakeRoutingInfo.take(10).values.foreach { case (pc, na1, na2) => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get)) @@ -193,18 +193,18 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1)) sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } - awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 40) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = if (requestNodeAnnouncements) 80 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) + awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10) + assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds) if (requestNodeAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes) // add some updates to bob and resync - fakeRoutingInfo.take(40).values.foreach { + fakeRoutingInfo.take(10).values.foreach { case (pc, na1, na2) => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get)) } - awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 80) - assert(BasicSyncResult(ranges = 1, queries = 1, channels = 0, updates = 40, nodes = if (requestNodeAnnouncements) 80 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) + awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2) + assert(BasicSyncResult(ranges = 1, queries = 2, channels = 0, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds) // add everything (duplicates will be ignored) @@ -217,7 +217,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2)) } awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds) - assert(BasicSyncResult(ranges = 2, queries = 46, channels = fakeRoutingInfo.size - 40, updates = 2 * (fakeRoutingInfo.size - 40), nodes = if (requestNodeAnnouncements) 2 * (fakeRoutingInfo.size - 40) else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 3, queries = 10, channels = fakeRoutingInfo.size - 10, updates = 2 * (fakeRoutingInfo.size - 10), nodes = if (requestNodeAnnouncements) 2 * (fakeRoutingInfo.size - 10) else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds) // bump random channel_updates @@ -226,9 +226,9 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { makeNewerChannelUpdate(c, if (side) u1 else u2) } - val bumpedUpdates = (List(0, 42, 147, 153, 654, 834, 4301).map(touchUpdate(_, true)) ++ List(1, 42, 150, 200).map(touchUpdate(_, false))).toSet + val bumpedUpdates = (List(0, 3, 7).map(touchUpdate(_, true)) ++ List(1, 3, 9).map(touchUpdate(_, false))).toSet bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c))) - assert(BasicSyncResult(ranges = 2, queries = 2, channels = 0, updates = bumpedUpdates.size, nodes = if (requestNodeAnnouncements) 20 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) + assert(BasicSyncResult(ranges = 3, queries = 1, channels = 0, updates = bumpedUpdates.size, nodes = if (requestNodeAnnouncements) 5 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts) awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds) if (requestNodeAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes) } @@ -254,7 +254,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike { val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange] sender.expectMsgType[GossipTimestampFilter] - val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(100).keys.toList), None, None) + val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.channelQueryChunkSize).keys.toList), None, None) // send first block sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, block1))