From a6381d1b0b9bb17850f84653183eea9e3e262b16 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 1 Apr 2022 18:11:08 +0200 Subject: [PATCH 1/3] use long id instead of short id in `ChannelRelay` --- .../eclair/payment/relay/ChannelRelay.scala | 115 ++++++++++-------- .../eclair/payment/relay/ChannelRelayer.scala | 68 +++++------ .../acinq/eclair/payment/relay/Relayer.scala | 2 + .../payment/relay/ChannelRelayerSpec.scala | 56 ++++----- .../eclair/payment/relay/RelayerSpec.scala | 2 +- 5 files changed, 123 insertions(+), 120 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index 818ab5b80b..7909f505e0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -28,7 +28,7 @@ import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags} import fr.acinq.eclair.payment.relay.Relayer.OutgoingChannel import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket} import fr.acinq.eclair.wire.protocol._ -import fr.acinq.eclair.{Logs, NodeParams, ShortChannelId, TimestampSecond, channel, nodeFee} +import fr.acinq.eclair.{Logs, NodeParams, TimestampSecond, channel, nodeFee} import java.util.UUID @@ -37,17 +37,17 @@ object ChannelRelay { // @formatter:off sealed trait Command private case object DoRelay extends Command - private case class WrappedForwardShortIdFailure(failure: Register.ForwardShortIdFailure[CMD_ADD_HTLC]) extends Command + private case class WrappedForwardFailure(failure: Register.ForwardFailure[CMD_ADD_HTLC]) extends Command private case class WrappedAddResponse(res: CommandResponse[CMD_ADD_HTLC]) extends Command // @formatter:on // @formatter:off sealed trait RelayResult case class RelayFailure(cmdFail: CMD_FAIL_HTLC) extends RelayResult - case class RelaySuccess(shortChannelId: ShortChannelId, cmdAdd: CMD_ADD_HTLC) extends RelayResult + case class RelaySuccess(selectedChannelId: ByteVector32, cmdAdd: CMD_ADD_HTLC) extends RelayResult // @formatter:on - def apply(nodeParams: NodeParams, register: ActorRef, channels: Map[ShortChannelId, Relayer.OutgoingChannel], relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] = + def apply(nodeParams: NodeParams, register: ActorRef, channels: Map[ByteVector32, Relayer.OutgoingChannel], relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] = Behaviors.setup { context => Behaviors.withMdc(Logs.mdc( category_opt = Some(Logs.LogCategory.PAYMENT), @@ -92,16 +92,16 @@ object ChannelRelay { */ class ChannelRelay private(nodeParams: NodeParams, register: ActorRef, - channels: Map[ShortChannelId, Relayer.OutgoingChannel], + channels: Map[ByteVector32, Relayer.OutgoingChannel], r: IncomingPaymentPacket.ChannelRelayPacket, context: ActorContext[ChannelRelay.Command]) { import ChannelRelay._ - private val forwardShortIdAdapter = context.messageAdapter[Register.ForwardShortIdFailure[CMD_ADD_HTLC]](WrappedForwardShortIdFailure) + private val forwardFailureAdapter = context.messageAdapter[Register.ForwardFailure[CMD_ADD_HTLC]](WrappedForwardFailure) private val addResponseAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddResponse) - private case class PreviouslyTried(shortChannelId: ShortChannelId, failure: RES_ADD_FAILED[ChannelException]) + private case class PreviouslyTried(channelId: ByteVector32, failure: RES_ADD_FAILED[ChannelException]) def relay(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = { Behaviors.receiveMessagePartial { @@ -115,17 +115,17 @@ class ChannelRelay private(nodeParams: NodeParams, Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) context.log.info(s"rejecting htlc reason=${cmdFail.reason}") safeSendAndStop(r.add.channelId, cmdFail) - case RelaySuccess(selectedShortChannelId, cmdAdd) => - context.log.info(s"forwarding htlc to shortChannelId=$selectedShortChannelId") - register ! Register.ForwardShortId(forwardShortIdAdapter.toClassic, selectedShortChannelId, cmdAdd) - waitForAddResponse(selectedShortChannelId, previousFailures) + case RelaySuccess(selectedChannelId, cmdAdd) => + context.log.info(s"forwarding htlc to channelId=$selectedChannelId") + register ! Register.Forward(forwardFailureAdapter.toClassic, selectedChannelId, cmdAdd) + waitForAddResponse(selectedChannelId, previousFailures) } } } - def waitForAddResponse(selectedShortChannelId: ShortChannelId, previousFailures: Seq[PreviouslyTried]): Behavior[Command] = + def waitForAddResponse(selectedChannelId: ByteVector32, previousFailures: Seq[PreviouslyTried]): Behavior[Command] = Behaviors.receiveMessagePartial { - case WrappedForwardShortIdFailure(Register.ForwardShortIdFailure(Register.ForwardShortId(_, shortChannelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) => + case WrappedForwardFailure(Register.ForwardFailure(Register.Forward(_, shortChannelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) => context.log.warn(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${o.add.id}") val cmdFail = CMD_FAIL_HTLC(o.add.id, Right(UnknownNextPeer), commit = true) Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) @@ -134,7 +134,7 @@ class ChannelRelay private(nodeParams: NodeParams, case WrappedAddResponse(addFailed@RES_ADD_FAILED(CMD_ADD_HTLC(_, _, _, _, _, _: Origin.ChannelRelayedHot, _), _, _)) => context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName) context.self ! DoRelay - relay(previousFailures :+ PreviouslyTried(selectedShortChannelId, addFailed)) + relay(previousFailures :+ PreviouslyTried(selectedChannelId, addFailed)) case WrappedAddResponse(_: RES_SUCCESS[_]) => context.log.debug("sent htlc to the downstream channel") @@ -170,14 +170,13 @@ class ChannelRelay private(nodeParams: NodeParams, * - a CMD_ADD_HTLC to propagate downstream */ def handleRelay(previousFailures: Seq[PreviouslyTried]): RelayResult = { - val alreadyTried = previousFailures.map(_.shortChannelId) - selectPreferredChannel(alreadyTried) - .flatMap(selectedShortChannelId => channels.get(selectedShortChannelId)) match { + val alreadyTried = previousFailures.map(_.channelId) + selectPreferredChannel(alreadyTried) match { case None if previousFailures.nonEmpty => // no more channels to try val error = previousFailures // we return the error for the initially requested channel if it exists - .find(_.shortChannelId == r.payload.outgoingChannelId) + .find(failure => requestedChannelId_opt.contains(failure.channelId)) // otherwise we return the error for the first channel tried .getOrElse(previousFailures.head) .failure @@ -190,58 +189,68 @@ class ChannelRelay private(nodeParams: NodeParams, /** all the channels point to the same next node, we take the first one */ private val nextNodeId_opt = channels.headOption.map(_._2.nextNodeId) + /** channel id explicitly requested in the onion payload */ + private val requestedChannelId_opt = channels.find(_._2.channelUpdate.shortChannelId == r.payload.outgoingChannelId).map(_._1) + /** * Select a channel to the same node to relay the payment to, that has the lowest capacity and balance and is * compatible in terms of fees, expiry_delta, etc. * * If no suitable channel is found we default to the originally requested channel. */ - def selectPreferredChannel(alreadyTried: Seq[ShortChannelId]): Option[ShortChannelId] = { + def selectPreferredChannel(alreadyTried: Seq[ByteVector32]): Option[OutgoingChannel] = { val requestedShortChannelId = r.payload.outgoingChannelId context.log.debug("selecting next channel with requestedShortChannelId={}", requestedShortChannelId) nextNodeId_opt match { case Some(_) => // we then filter out channels that we have already tried - val candidateChannels: Map[ShortChannelId, OutgoingChannel] = channels -- alreadyTried + val candidateChannels: Map[ByteVector32, OutgoingChannel] = channels -- alreadyTried // and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta) candidateChannels - .map { case (shortChannelId, channelInfo) => - val relayResult = relayOrFail(Some(channelInfo)) - context.log.debug(s"candidate channel: shortChannelId=$shortChannelId availableForSend={} capacity={} channelUpdate={} result={}", - channelInfo.commitments.availableBalanceForSend, - channelInfo.commitments.capacity, - channelInfo.channelUpdate, + .values + .map { channel => + val relayResult = relayOrFail(Some(channel)) + context.log.debug(s"candidate channel: channelId=${channel.channelId} availableForSend={} capacity={} channelUpdate={} result={}", + channel.commitments.availableBalanceForSend, + channel.commitments.capacity, + channel.channelUpdate, relayResult match { case _: RelaySuccess => "success" case RelayFailure(CMD_FAIL_HTLC(_, Right(failureReason), _, _)) => failureReason case other => other }) - (shortChannelId, channelInfo, relayResult) + (channel, relayResult) } .collect { // we only keep channels that have enough balance to handle this payment - case (shortChannelId, channelInfo, _: RelaySuccess) if channelInfo.commitments.availableBalanceForSend > r.payload.amountToForward => (shortChannelId, channelInfo.commitments) + case (channel, _: RelaySuccess) if channel.commitments.availableBalanceForSend > r.payload.amountToForward => channel } .toList // needed for ordering // we want to use the channel with: // - the lowest available capacity to ensure we keep high-capacity channels for big payments // - the lowest available balance to increase our incoming liquidity - .sortBy { case (_, commitments) => (commitments.capacity, commitments.availableBalanceForSend) } + .sortBy { channel => (channel.commitments.capacity, channel.commitments.availableBalanceForSend) } .headOption match { - case Some((preferredShortChannelId, commitments)) if preferredShortChannelId != requestedShortChannelId => - context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, commitments.availableBalanceForSend) - Some(preferredShortChannelId) - case Some(_) => - context.log.debug("requested short channel id is our preferred channel") - Some(requestedShortChannelId) - case None if !alreadyTried.contains(requestedShortChannelId) => - context.log.debug("no channel seems to work for this payment, we will try to use the requested short channel id") - Some(requestedShortChannelId) + case Some(channel) => + if (requestedChannelId_opt.contains(channel.channelId)) { + context.log.debug("requested short channel id is our preferred channel") + Some(channel) + } else { + context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, channel.channelUpdate.shortChannelId, channel.commitments.availableBalanceForSend) + Some(channel) + } case None => - context.log.debug("no channel seems to work for this payment and we have already tried the requested channel id: giving up") - None + val requestedChannel_opt = requestedChannelId_opt.flatMap(channels.get) + requestedChannel_opt match { + case Some(requestedChannel) if alreadyTried.contains(requestedChannel.channelId) => + context.log.debug("no channel seems to work for this payment and we have already tried the requested channel id: giving up") + None + case _ => + context.log.debug("no channel seems to work for this payment, we will try to use the one requested") + requestedChannel_opt + } } - case _ => Some(requestedShortChannelId) // we don't have a channel_update for this short_channel_id + case _ => requestedChannelId_opt.flatMap(channels.get) // we don't have a channel_update for this short_channel_id } } @@ -252,23 +261,23 @@ class ChannelRelay private(nodeParams: NodeParams, */ def relayOrFail(outgoingChannel_opt: Option[OutgoingChannel]): RelayResult = { import r._ - outgoingChannel_opt.map(_.channelUpdate) match { + outgoingChannel_opt match { case None => RelayFailure(CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true)) - case Some(channelUpdate) if !channelUpdate.channelFlags.isEnabled => - RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)), commit = true)) - case Some(channelUpdate) if payload.amountToForward < channelUpdate.htlcMinimumMsat => - RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, channelUpdate)), commit = true)) - case Some(channelUpdate) if r.expiryDelta < channelUpdate.cltvExpiryDelta => - RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, channelUpdate)), commit = true)) - case Some(channelUpdate) if r.relayFeeMsat < nodeFee(channelUpdate.relayFees, payload.amountToForward) && + case Some(c) if !c.channelUpdate.channelFlags.isEnabled => + RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(c.channelUpdate.messageFlags, c.channelUpdate.channelFlags, c.channelUpdate)), commit = true)) + case Some(c) if payload.amountToForward < c.channelUpdate.htlcMinimumMsat => + RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, c.channelUpdate)), commit = true)) + case Some(c) if r.expiryDelta < c.channelUpdate.cltvExpiryDelta => + RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, c.channelUpdate)), commit = true)) + case Some(c) if r.relayFeeMsat < nodeFee(c.channelUpdate.relayFees, payload.amountToForward) && // fees also do not satisfy the previous channel update for `enforcementDelay` seconds after current update - (TimestampSecond.now() - channelUpdate.timestamp > nodeParams.relayParams.enforcementDelay || + (TimestampSecond.now() - c.channelUpdate.timestamp > nodeParams.relayParams.enforcementDelay || outgoingChannel_opt.flatMap(_.prevChannelUpdate).forall(c => r.relayFeeMsat < nodeFee(c.relayFees, payload.amountToForward))) => - RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true)) - case Some(channelUpdate) => + RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, c.channelUpdate)), commit = true)) + case Some(c) => val origin = Origin.ChannelRelayedHot(addResponseAdapter.toClassic, add, payload.amountToForward) - RelaySuccess(channelUpdate.shortChannelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, origin, commit = true)) + RelaySuccess(c.channelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, origin, commit = true)) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala index 14abc832bf..f874479681 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala @@ -20,6 +20,7 @@ import akka.actor.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.Behaviors +import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel._ import fr.acinq.eclair.payment.IncomingPaymentPacket @@ -53,13 +54,11 @@ object ChannelRelayer { case _ => Map.empty } - private type ChannelUpdates = Map[ShortChannelId, Relayer.OutgoingChannel] - private type NodeChannels = mutable.MultiDict[PublicKey, ShortChannelId] - def apply(nodeParams: NodeParams, register: ActorRef, - channelUpdates: ChannelUpdates = Map.empty, - node2channels: NodeChannels = mutable.MultiDict.empty[PublicKey, ShortChannelId]): Behavior[Command] = + channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty, + scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty, + node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] = Behaviors.setup { context => context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[LocalChannelUpdate](WrappedLocalChannelUpdate)) context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[LocalChannelDown](WrappedLocalChannelDown)) @@ -70,61 +69,56 @@ object ChannelRelayer { Behaviors.receiveMessage { case Relay(channelRelayPacket) => val relayId = UUID.randomUUID() - val nextNodeId_opt: Option[PublicKey] = channelUpdates.get(channelRelayPacket.payload.outgoingChannelId) match { - case Some(channel) => Some(channel.nextNodeId) + val nextNodeId_opt: Option[PublicKey] = scid2channels.get(channelRelayPacket.payload.outgoingChannelId) match { + case Some(channelId) => channels.get(channelId).map(_.nextNodeId) case None => None } - val channels: Map[ShortChannelId, Relayer.OutgoingChannel] = nextNodeId_opt match { - case Some(nextNodeId) => node2channels.get(nextNodeId).map(channelUpdates).map(c => c.channelUpdate.shortChannelId -> c).toMap + val nextChannels: Map[ByteVector32, Relayer.OutgoingChannel] = nextNodeId_opt match { + case Some(nextNodeId) => node2channels.get(nextNodeId).flatMap(channels.get).map(c => c.channelId -> c).toMap case None => Map.empty } - context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), channels.keys.mkString(",")) - context.spawn(ChannelRelay.apply(nodeParams, register, channels, relayId, channelRelayPacket), name = relayId.toString) + context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), nextChannels.keys.mkString(",")) + context.spawn(ChannelRelay.apply(nodeParams, register, nextChannels, relayId, channelRelayPacket), name = relayId.toString) Behaviors.same case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) => - val channels = if (enabledOnly) { - channelUpdates.values.filter(o => o.channelUpdate.channelFlags.isEnabled) + val selected = if (enabledOnly) { + channels.values.filter(o => o.channelUpdate.channelFlags.isEnabled) } else { - channelUpdates.values + channels.values } - replyTo ! Relayer.OutgoingChannels(channels.toSeq) + replyTo ! Relayer.OutgoingChannels(selected.toSeq) Behaviors.same case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) => context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments) - val prevChannelUpdate = channelUpdates.get(shortChannelId).map(_.channelUpdate) - val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, prevChannelUpdate, commitments)) - val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId) - apply(nodeParams, register, channelUpdates1, node2channels1) + val prevChannelUpdate = channels.get(channelId).map(_.channelUpdate) + val channel = Relayer.OutgoingChannel(remoteNodeId, channelUpdate, prevChannelUpdate, commitments) + val channels1 = channels + (channelId -> channel) + val scid2channels1 = scid2channels + (channelUpdate.shortChannelId -> channelId) + val node2channels1 = node2channels.addOne(remoteNodeId, channelId) + apply(nodeParams, register, channels1, scid2channels1, node2channels1) case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortChannelId, remoteNodeId)) => context.log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId") - val node2channels1 = node2channels.subtractOne(remoteNodeId, shortChannelId) - apply(nodeParams, register, channelUpdates - shortChannelId, node2channels1) + val channels1 = channels - channelId + val scid2Channels1 = scid2channels - shortChannelId + val node2channels1 = node2channels.subtractOne(remoteNodeId, channelId) + apply(nodeParams, register, channels1, scid2Channels1, node2channels1) case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortChannelId, commitments)) => - val channelUpdates1 = channelUpdates.get(shortChannelId) match { + val channels1 = channels.get(channelId) match { case Some(c: Relayer.OutgoingChannel) => context.log.debug(s"available balance changed for channelId=$channelId shortChannelId=$shortChannelId availableForSend={} availableForReceive={}", commitments.availableBalanceForSend, commitments.availableBalanceForReceive) - channelUpdates + (shortChannelId -> c.copy(commitments = commitments)) - case None => channelUpdates // we only consider the balance if we have the channel_update + channels + (channelId -> c.copy(commitments = commitments)) + case None => channels // we only consider the balance if we have the channel_update } - apply(nodeParams, register, channelUpdates1, node2channels) + apply(nodeParams, register, channels1, scid2channels, node2channels) case WrappedShortChannelIdAssigned(ShortChannelIdAssigned(_, channelId, shortChannelId, previousShortChannelId_opt)) => - val (channelUpdates1, node2channels1) = previousShortChannelId_opt match { - case Some(previousShortChannelId) if previousShortChannelId != shortChannelId => - context.log.debug(s"shortChannelId changed for channelId=$channelId ($previousShortChannelId->$shortChannelId, probably due to chain re-org)") - // We simply remove the old entry: we should receive a LocalChannelUpdate with the new shortChannelId shortly. - val node2channels1 = channelUpdates.get(previousShortChannelId).map(_.nextNodeId) match { - case Some(remoteNodeId) => node2channels.subtractOne(remoteNodeId, previousShortChannelId) - case None => node2channels - } - (channelUpdates - previousShortChannelId, node2channels1) - case _ => (channelUpdates, node2channels) - } - apply(nodeParams, register, channelUpdates1, node2channels1) + context.log.debug(s"added new mapping shortChannelId=$shortChannelId for channelId=$channelId") + val scid2channels1 = scid2channels + (shortChannelId -> channelId) + apply(nodeParams, register, channels, scid2channels1, node2channels) } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala index d47d77d09a..6a95189bb5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala @@ -23,6 +23,7 @@ import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps import akka.actor.{Actor, ActorRef, DiagnosticActorLogging, Props, typed} import akka.event.Logging.MDC import akka.event.LoggingAdapter +import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.PendingCommandsDb @@ -141,6 +142,7 @@ object Relayer extends Logging { */ case class GetOutgoingChannels(enabledOnly: Boolean = true) case class OutgoingChannel(nextNodeId: PublicKey, channelUpdate: ChannelUpdate, prevChannelUpdate: Option[ChannelUpdate], commitments: AbstractCommitments) { + val channelId: ByteVector32 = commitments.channelId def toChannelBalance: ChannelBalance = ChannelBalance( remoteNodeId = nextNodeId, shortChannelId = channelUpdate.shortChannelId, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala index 268953a09a..3aba714c92 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala @@ -62,9 +62,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a fwd } - def expectFwdAdd(register: TestProbe[Any], shortChannelId: ShortChannelId, outAmount: MilliSatoshi, outExpiry: CltvExpiry): Register.ForwardShortId[CMD_ADD_HTLC] = { - val fwd = register.expectMessageType[Register.ForwardShortId[CMD_ADD_HTLC]] - assert(fwd.shortChannelId === shortChannelId) + def expectFwdAdd(register: TestProbe[Any], channelId: ByteVector32, outAmount: MilliSatoshi, outExpiry: CltvExpiry): Register.Forward[CMD_ADD_HTLC] = { + val fwd = register.expectMessageType[Register.Forward[CMD_ADD_HTLC]] + assert(fwd.channelId === channelId) assert(fwd.message.amount === outAmount) assert(fwd.message.cltvExpiry === outExpiry) assert(fwd.message.origin.isInstanceOf[Origin.ChannelRelayedHot]) @@ -83,7 +83,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r) - expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) } test("relay an htlc-add with onion tlv payload") { f => @@ -97,7 +97,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r) - expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) } test("relay an htlc-add with retries") { f => @@ -117,12 +117,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! Relay(r) // first try - val fwd1 = expectFwdAdd(register, shortId2, outgoingAmount, outgoingExpiry) + val fwd1 = expectFwdAdd(register, channelIds(shortId2), outgoingAmount, outgoingExpiry) // channel returns an error fwd1.message.replyTo ! RES_ADD_FAILED(fwd1.message, HtlcValueTooHighInFlight(channelIds(shortId2), UInt64(1000000000L), 1516977616L msat), Some(u2.channelUpdate)) // second try - val fwd2 = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + val fwd2 = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) // failure again fwd1.message.replyTo ! RES_ADD_FAILED(fwd2.message, HtlcValueTooHighInFlight(channelIds(shortId1), UInt64(1000000000L), 1516977616L msat), Some(u1.channelUpdate)) @@ -151,8 +151,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r) - val fwd = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) - fwd.replyTo ! Register.ForwardShortIdFailure(fwd) + val fwd = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) + fwd.replyTo ! Register.ForwardFailure(fwd) expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer), commit = true)) } @@ -208,7 +208,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r) - expectFwdAdd(register, shortId1, payload.amountToForward, payload.outgoingCltv).message + expectFwdAdd(register, channelIds(shortId1), payload.amountToForward, payload.outgoingCltv).message } test("fail to relay an htlc-add (expiry too small)") { f => @@ -248,7 +248,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! Relay(r) // relay succeeds with current channel update (u1) with lower fees - expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) val u2 = createLocalUpdate(shortId1, timestamp = TimestampSecond.now() - 530) @@ -256,7 +256,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! Relay(r) // relay succeeds because the current update (u2) with higher fees occurred less than 10 minutes ago - expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) val u3 = createLocalUpdate(shortId1, timestamp = TimestampSecond.now() - 601) @@ -290,7 +290,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r) - val fwd = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + val fwd = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) fwd.message.replyTo ! RES_ADD_FAILED(fwd.message, testCase.exc, Some(testCase.update)) expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(testCase.failure), commit = true)) } @@ -303,7 +303,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a def dummyLocalUpdate(shortChannelId: ShortChannelId, remoteNodeId: PublicKey, availableBalanceForSend: MilliSatoshi, capacity: Satoshi) = { val channelId = randomBytes32() val update = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, randomKey(), remoteNodeId, shortChannelId, CltvExpiryDelta(10), 100 msat, 1000 msat, 100, capacity.toMilliSatoshi) - val commitments = PaymentPacketSpec.makeCommitments(ByteVector32.Zeroes, availableBalanceForSend, testCapacity = capacity) + val commitments = PaymentPacketSpec.makeCommitments(channelId, availableBalanceForSend, testCapacity = capacity) LocalChannelUpdate(null, channelId, shortChannelId, remoteNodeId, None, update, commitments) } @@ -325,16 +325,16 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r = createValidIncomingPacket(1000000 msat, CltvExpiry(70), payload) channelRelayer ! Relay(r) // select the channel to the same node, with the lowest capacity and balance but still high enough to handle the payment - val cmd1 = expectFwdAdd(register, ShortChannelId(22223), payload.amountToForward, payload.outgoingCltv).message + val cmd1 = expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, payload.amountToForward, payload.outgoingCltv).message cmd1.replyTo ! RES_ADD_FAILED(cmd1, ChannelUnavailable(randomBytes32()), None) // select 2nd-to-best channel: higher capacity and balance - val cmd2 = expectFwdAdd(register, ShortChannelId(22222), payload.amountToForward, payload.outgoingCltv).message + val cmd2 = expectFwdAdd(register, channelUpdates(ShortChannelId(22222)).channelId, payload.amountToForward, payload.outgoingCltv).message cmd2.replyTo ! RES_ADD_FAILED(cmd2, TooManyAcceptedHtlcs(randomBytes32(), 42), Some(channelUpdates(ShortChannelId(22222)).channelUpdate)) // select 3rd-to-best channel: same balance but higher capacity - val cmd3 = expectFwdAdd(register, ShortChannelId(12345), payload.amountToForward, payload.outgoingCltv).message + val cmd3 = expectFwdAdd(register, channelUpdates(ShortChannelId(12345)).channelId, payload.amountToForward, payload.outgoingCltv).message cmd3.replyTo ! RES_ADD_FAILED(cmd3, TooManyAcceptedHtlcs(randomBytes32(), 42), Some(channelUpdates(ShortChannelId(12345)).channelUpdate)) // select 4th-to-best channel: same capacity but higher balance - val cmd4 = expectFwdAdd(register, ShortChannelId(11111), payload.amountToForward, payload.outgoingCltv).message + val cmd4 = expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, payload.amountToForward, payload.outgoingCltv).message cmd4.replyTo ! RES_ADD_FAILED(cmd4, HtlcValueTooHighInFlight(randomBytes32(), UInt64(100000000), 100000000 msat), Some(channelUpdates(ShortChannelId(11111)).channelUpdate)) // all the suitable channels have been tried expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(channelUpdates(ShortChannelId(12345)).channelUpdate)), commit = true)) @@ -344,28 +344,28 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = RelayLegacyPayload(ShortChannelId(12345), 50000000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(60000000 msat, CltvExpiry(70), payload) channelRelayer ! Relay(r) - expectFwdAdd(register, ShortChannelId(11111), payload.amountToForward, payload.outgoingCltv).message + expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, payload.amountToForward, payload.outgoingCltv).message } { // lower amount payment val payload = RelayLegacyPayload(ShortChannelId(12345), 1000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(60000000 msat, CltvExpiry(70), payload) channelRelayer ! Relay(r) - expectFwdAdd(register, ShortChannelId(33333), payload.amountToForward, payload.outgoingCltv).message + expectFwdAdd(register, channelUpdates(ShortChannelId(33333)).channelId, payload.amountToForward, payload.outgoingCltv).message } { // payment too high, no suitable channel found, we keep the requested one val payload = RelayLegacyPayload(ShortChannelId(12345), 1000000000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(1010000000 msat, CltvExpiry(70), payload) channelRelayer ! Relay(r) - expectFwdAdd(register, ShortChannelId(12345), payload.amountToForward, payload.outgoingCltv).message + expectFwdAdd(register, channelUpdates(ShortChannelId(12345)).channelId, payload.amountToForward, payload.outgoingCltv).message } { // cltv expiry larger than our requirements val payload = RelayLegacyPayload(ShortChannelId(12345), 998900 msat, CltvExpiry(50)) val r = createValidIncomingPacket(1000000 msat, CltvExpiry(70), payload) channelRelayer ! Relay(r) - expectFwdAdd(register, ShortChannelId(22223), payload.amountToForward, payload.outgoingCltv).message + expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, payload.amountToForward, payload.outgoingCltv).message } { // cltv expiry too small, no suitable channel found @@ -399,7 +399,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r) - val fwd = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + val fwd = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1) fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream_htlc, testCase.result) expectFwdFail(register, r.add.channelId, testCase.cmd) @@ -428,7 +428,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r) - val fwd1 = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry) + val fwd1 = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry) fwd1.message.replyTo ! RES_SUCCESS(fwd1.message, channelId1) fwd1.message.origin.replyTo ! RES_ADD_SETTLED(fwd1.message.origin, downstream_htlc, testCase.result) @@ -488,14 +488,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a // Simulate a chain re-org that changes the shortChannelId: channelRelayer ! WrappedShortChannelIdAssigned(ShortChannelIdAssigned(null, channelId_ab, ShortChannelId(42), Some(channelUpdate_ab.shortChannelId))) - val channels7 = getOutgoingChannels(true) - assert(channels7.isEmpty) // We should receive the updated channel update containing the new shortChannelId: channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, ShortChannelId(42), a, None, channelUpdate_ab.copy(shortChannelId = ShortChannelId(42)), makeCommitments(channelId_ab, 100000 msat, 200000 msat))) - val channels8 = getOutgoingChannels(true) - assert(channels8.size === 1) - assert(channels8.head.channelUpdate.shortChannelId === ShortChannelId(42)) + val channels7 = getOutgoingChannels(true) + assert(channels7.size === 1) + assert(channels7.head.channelUpdate.shortChannelId === ShortChannelId(42)) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala index 29ad5083fe..12d477466f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala @@ -90,7 +90,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat // and then manually build an htlc val add_ab = UpdateAddHtlc(channelId = randomBytes32(), id = 123456, cmd.amount, cmd.paymentHash, cmd.cltvExpiry, cmd.onion) relayer ! RelayForward(add_ab) - register.expectMessageType[Register.ForwardShortId[CMD_ADD_HTLC]] + register.expectMessageType[Register.Forward[CMD_ADD_HTLC]] } test("relay an htlc-add at the final node to the payment handler") { f => From 52774c8d4a74ecad18d7bca3fcb6e7e81c6eab06 Mon Sep 17 00:00:00 2001 From: pm47 Date: Wed, 18 May 2022 17:52:11 +0200 Subject: [PATCH 2/3] address review comment --- .../scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index 7909f505e0..4e65787f6b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -125,8 +125,8 @@ class ChannelRelay private(nodeParams: NodeParams, def waitForAddResponse(selectedChannelId: ByteVector32, previousFailures: Seq[PreviouslyTried]): Behavior[Command] = Behaviors.receiveMessagePartial { - case WrappedForwardFailure(Register.ForwardFailure(Register.Forward(_, shortChannelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) => - context.log.warn(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${o.add.id}") + case WrappedForwardFailure(Register.ForwardFailure(Register.Forward(_, channelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) => + context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${o.add.id}") val cmdFail = CMD_FAIL_HTLC(o.add.id, Right(UnknownNextPeer), commit = true) Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) safeSendAndStop(o.add.channelId, cmdFail) From 848062ff99142b595c98d143588c8abd40435597 Mon Sep 17 00:00:00 2001 From: pm47 Date: Thu, 19 May 2022 10:35:36 +0200 Subject: [PATCH 3/3] simplify selectPreferredChannel --- .../eclair/payment/relay/ChannelRelay.scala | 94 +++++++++---------- 1 file changed, 45 insertions(+), 49 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index 4e65787f6b..bcec91b1d0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -201,56 +201,52 @@ class ChannelRelay private(nodeParams: NodeParams, def selectPreferredChannel(alreadyTried: Seq[ByteVector32]): Option[OutgoingChannel] = { val requestedShortChannelId = r.payload.outgoingChannelId context.log.debug("selecting next channel with requestedShortChannelId={}", requestedShortChannelId) - nextNodeId_opt match { - case Some(_) => - // we then filter out channels that we have already tried - val candidateChannels: Map[ByteVector32, OutgoingChannel] = channels -- alreadyTried - // and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta) - candidateChannels - .values - .map { channel => - val relayResult = relayOrFail(Some(channel)) - context.log.debug(s"candidate channel: channelId=${channel.channelId} availableForSend={} capacity={} channelUpdate={} result={}", - channel.commitments.availableBalanceForSend, - channel.commitments.capacity, - channel.channelUpdate, - relayResult match { - case _: RelaySuccess => "success" - case RelayFailure(CMD_FAIL_HTLC(_, Right(failureReason), _, _)) => failureReason - case other => other - }) - (channel, relayResult) - } - .collect { - // we only keep channels that have enough balance to handle this payment - case (channel, _: RelaySuccess) if channel.commitments.availableBalanceForSend > r.payload.amountToForward => channel - } - .toList // needed for ordering - // we want to use the channel with: - // - the lowest available capacity to ensure we keep high-capacity channels for big payments - // - the lowest available balance to increase our incoming liquidity - .sortBy { channel => (channel.commitments.capacity, channel.commitments.availableBalanceForSend) } - .headOption match { - case Some(channel) => - if (requestedChannelId_opt.contains(channel.channelId)) { - context.log.debug("requested short channel id is our preferred channel") - Some(channel) - } else { - context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, channel.channelUpdate.shortChannelId, channel.commitments.availableBalanceForSend) - Some(channel) - } - case None => - val requestedChannel_opt = requestedChannelId_opt.flatMap(channels.get) - requestedChannel_opt match { - case Some(requestedChannel) if alreadyTried.contains(requestedChannel.channelId) => - context.log.debug("no channel seems to work for this payment and we have already tried the requested channel id: giving up") - None - case _ => - context.log.debug("no channel seems to work for this payment, we will try to use the one requested") - requestedChannel_opt - } + // we filter out channels that we have already tried + val candidateChannels: Map[ByteVector32, OutgoingChannel] = channels -- alreadyTried + // and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta) + candidateChannels + .values + .map { channel => + val relayResult = relayOrFail(Some(channel)) + context.log.debug(s"candidate channel: channelId=${channel.channelId} availableForSend={} capacity={} channelUpdate={} result={}", + channel.commitments.availableBalanceForSend, + channel.commitments.capacity, + channel.channelUpdate, + relayResult match { + case _: RelaySuccess => "success" + case RelayFailure(CMD_FAIL_HTLC(_, Right(failureReason), _, _)) => failureReason + case other => other + }) + (channel, relayResult) + } + .collect { + // we only keep channels that have enough balance to handle this payment + case (channel, _: RelaySuccess) if channel.commitments.availableBalanceForSend > r.payload.amountToForward => channel + } + .toList // needed for ordering + // we want to use the channel with: + // - the lowest available capacity to ensure we keep high-capacity channels for big payments + // - the lowest available balance to increase our incoming liquidity + .sortBy { channel => (channel.commitments.capacity, channel.commitments.availableBalanceForSend) } + .headOption match { + case Some(channel) => + if (requestedChannelId_opt.contains(channel.channelId)) { + context.log.debug("requested short channel id is our preferred channel") + Some(channel) + } else { + context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, channel.channelUpdate.shortChannelId, channel.commitments.availableBalanceForSend) + Some(channel) + } + case None => + val requestedChannel_opt = requestedChannelId_opt.flatMap(channels.get) + requestedChannel_opt match { + case Some(requestedChannel) if alreadyTried.contains(requestedChannel.channelId) => + context.log.debug("no channel seems to work for this payment and we have already tried the requested channel id: giving up") + None + case _ => + context.log.debug("no channel seems to work for this payment, we will try to use the one requested") + requestedChannel_opt } - case _ => requestedChannelId_opt.flatMap(channels.get) // we don't have a channel_update for this short_channel_id } }