Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment.relay.Relayer.OutgoingChannel
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Logs, NodeParams, ShortChannelId, TimestampSecond, channel, nodeFee}
import fr.acinq.eclair.{Logs, NodeParams, TimestampSecond, channel, nodeFee}

import java.util.UUID

Expand All @@ -37,17 +37,17 @@ object ChannelRelay {
// @formatter:off
sealed trait Command
private case object DoRelay extends Command
private case class WrappedForwardShortIdFailure(failure: Register.ForwardShortIdFailure[CMD_ADD_HTLC]) extends Command
private case class WrappedForwardFailure(failure: Register.ForwardFailure[CMD_ADD_HTLC]) extends Command
private case class WrappedAddResponse(res: CommandResponse[CMD_ADD_HTLC]) extends Command
// @formatter:on

// @formatter:off
sealed trait RelayResult
case class RelayFailure(cmdFail: CMD_FAIL_HTLC) extends RelayResult
case class RelaySuccess(shortChannelId: ShortChannelId, cmdAdd: CMD_ADD_HTLC) extends RelayResult
case class RelaySuccess(selectedChannelId: ByteVector32, cmdAdd: CMD_ADD_HTLC) extends RelayResult
// @formatter:on

def apply(nodeParams: NodeParams, register: ActorRef, channels: Map[ShortChannelId, Relayer.OutgoingChannel], relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] =
def apply(nodeParams: NodeParams, register: ActorRef, channels: Map[ByteVector32, Relayer.OutgoingChannel], relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(
category_opt = Some(Logs.LogCategory.PAYMENT),
Expand Down Expand Up @@ -92,16 +92,16 @@ object ChannelRelay {
*/
class ChannelRelay private(nodeParams: NodeParams,
register: ActorRef,
channels: Map[ShortChannelId, Relayer.OutgoingChannel],
channels: Map[ByteVector32, Relayer.OutgoingChannel],
r: IncomingPaymentPacket.ChannelRelayPacket,
context: ActorContext[ChannelRelay.Command]) {

import ChannelRelay._

private val forwardShortIdAdapter = context.messageAdapter[Register.ForwardShortIdFailure[CMD_ADD_HTLC]](WrappedForwardShortIdFailure)
private val forwardFailureAdapter = context.messageAdapter[Register.ForwardFailure[CMD_ADD_HTLC]](WrappedForwardFailure)
private val addResponseAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddResponse)

private case class PreviouslyTried(shortChannelId: ShortChannelId, failure: RES_ADD_FAILED[ChannelException])
private case class PreviouslyTried(channelId: ByteVector32, failure: RES_ADD_FAILED[ChannelException])

def relay(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
Expand All @@ -115,26 +115,26 @@ class ChannelRelay private(nodeParams: NodeParams,
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
context.log.info(s"rejecting htlc reason=${cmdFail.reason}")
safeSendAndStop(r.add.channelId, cmdFail)
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
context.log.info(s"forwarding htlc to shortChannelId=$selectedShortChannelId")
register ! Register.ForwardShortId(forwardShortIdAdapter.toClassic, selectedShortChannelId, cmdAdd)
waitForAddResponse(selectedShortChannelId, previousFailures)
case RelaySuccess(selectedChannelId, cmdAdd) =>
context.log.info(s"forwarding htlc to channelId=$selectedChannelId")
register ! Register.Forward(forwardFailureAdapter.toClassic, selectedChannelId, cmdAdd)
waitForAddResponse(selectedChannelId, previousFailures)
}
}
}

def waitForAddResponse(selectedShortChannelId: ShortChannelId, previousFailures: Seq[PreviouslyTried]): Behavior[Command] =
def waitForAddResponse(selectedChannelId: ByteVector32, previousFailures: Seq[PreviouslyTried]): Behavior[Command] =
Behaviors.receiveMessagePartial {
case WrappedForwardShortIdFailure(Register.ForwardShortIdFailure(Register.ForwardShortId(_, shortChannelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) =>
context.log.warn(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${o.add.id}")
case WrappedForwardFailure(Register.ForwardFailure(Register.Forward(_, channelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) =>
context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${o.add.id}")
val cmdFail = CMD_FAIL_HTLC(o.add.id, Right(UnknownNextPeer), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
safeSendAndStop(o.add.channelId, cmdFail)

case WrappedAddResponse(addFailed@RES_ADD_FAILED(CMD_ADD_HTLC(_, _, _, _, _, _: Origin.ChannelRelayedHot, _), _, _)) =>
context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName)
context.self ! DoRelay
relay(previousFailures :+ PreviouslyTried(selectedShortChannelId, addFailed))
relay(previousFailures :+ PreviouslyTried(selectedChannelId, addFailed))

case WrappedAddResponse(_: RES_SUCCESS[_]) =>
context.log.debug("sent htlc to the downstream channel")
Expand Down Expand Up @@ -170,14 +170,13 @@ class ChannelRelay private(nodeParams: NodeParams,
* - a CMD_ADD_HTLC to propagate downstream
*/
def handleRelay(previousFailures: Seq[PreviouslyTried]): RelayResult = {
val alreadyTried = previousFailures.map(_.shortChannelId)
selectPreferredChannel(alreadyTried)
.flatMap(selectedShortChannelId => channels.get(selectedShortChannelId)) match {
val alreadyTried = previousFailures.map(_.channelId)
selectPreferredChannel(alreadyTried) match {
case None if previousFailures.nonEmpty =>
// no more channels to try
val error = previousFailures
// we return the error for the initially requested channel if it exists
.find(_.shortChannelId == r.payload.outgoingChannelId)
.find(failure => requestedChannelId_opt.contains(failure.channelId))
// otherwise we return the error for the first channel tried
.getOrElse(previousFailures.head)
.failure
Expand All @@ -190,58 +189,64 @@ class ChannelRelay private(nodeParams: NodeParams,
/** all the channels point to the same next node, we take the first one */
private val nextNodeId_opt = channels.headOption.map(_._2.nextNodeId)

/** channel id explicitly requested in the onion payload */
private val requestedChannelId_opt = channels.find(_._2.channelUpdate.shortChannelId == r.payload.outgoingChannelId).map(_._1)

/**
* Select a channel to the same node to relay the payment to, that has the lowest capacity and balance and is
* compatible in terms of fees, expiry_delta, etc.
*
* If no suitable channel is found we default to the originally requested channel.
*/
def selectPreferredChannel(alreadyTried: Seq[ShortChannelId]): Option[ShortChannelId] = {
def selectPreferredChannel(alreadyTried: Seq[ByteVector32]): Option[OutgoingChannel] = {
val requestedShortChannelId = r.payload.outgoingChannelId
context.log.debug("selecting next channel with requestedShortChannelId={}", requestedShortChannelId)
nextNodeId_opt match {
case Some(_) =>
// we then filter out channels that we have already tried
val candidateChannels: Map[ShortChannelId, OutgoingChannel] = channels -- alreadyTried
// and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta)
candidateChannels
.map { case (shortChannelId, channelInfo) =>
val relayResult = relayOrFail(Some(channelInfo))
context.log.debug(s"candidate channel: shortChannelId=$shortChannelId availableForSend={} capacity={} channelUpdate={} result={}",
channelInfo.commitments.availableBalanceForSend,
channelInfo.commitments.capacity,
channelInfo.channelUpdate,
relayResult match {
case _: RelaySuccess => "success"
case RelayFailure(CMD_FAIL_HTLC(_, Right(failureReason), _, _)) => failureReason
case other => other
})
(shortChannelId, channelInfo, relayResult)
}
.collect {
// we only keep channels that have enough balance to handle this payment
case (shortChannelId, channelInfo, _: RelaySuccess) if channelInfo.commitments.availableBalanceForSend > r.payload.amountToForward => (shortChannelId, channelInfo.commitments)
}
.toList // needed for ordering
// we want to use the channel with:
// - the lowest available capacity to ensure we keep high-capacity channels for big payments
// - the lowest available balance to increase our incoming liquidity
.sortBy { case (_, commitments) => (commitments.capacity, commitments.availableBalanceForSend) }
.headOption match {
case Some((preferredShortChannelId, commitments)) if preferredShortChannelId != requestedShortChannelId =>
context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, commitments.availableBalanceForSend)
Some(preferredShortChannelId)
case Some(_) =>
context.log.debug("requested short channel id is our preferred channel")
Some(requestedShortChannelId)
case None if !alreadyTried.contains(requestedShortChannelId) =>
context.log.debug("no channel seems to work for this payment, we will try to use the requested short channel id")
Some(requestedShortChannelId)
case None =>
// we filter out channels that we have already tried
val candidateChannels: Map[ByteVector32, OutgoingChannel] = channels -- alreadyTried
// and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta)
candidateChannels
.values
.map { channel =>
val relayResult = relayOrFail(Some(channel))
context.log.debug(s"candidate channel: channelId=${channel.channelId} availableForSend={} capacity={} channelUpdate={} result={}",
channel.commitments.availableBalanceForSend,
channel.commitments.capacity,
channel.channelUpdate,
relayResult match {
case _: RelaySuccess => "success"
case RelayFailure(CMD_FAIL_HTLC(_, Right(failureReason), _, _)) => failureReason
case other => other
})
(channel, relayResult)
}
.collect {
// we only keep channels that have enough balance to handle this payment
case (channel, _: RelaySuccess) if channel.commitments.availableBalanceForSend > r.payload.amountToForward => channel
}
.toList // needed for ordering
// we want to use the channel with:
// - the lowest available capacity to ensure we keep high-capacity channels for big payments
// - the lowest available balance to increase our incoming liquidity
.sortBy { channel => (channel.commitments.capacity, channel.commitments.availableBalanceForSend) }
.headOption match {
case Some(channel) =>
if (requestedChannelId_opt.contains(channel.channelId)) {
context.log.debug("requested short channel id is our preferred channel")
Some(channel)
} else {
context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, channel.channelUpdate.shortChannelId, channel.commitments.availableBalanceForSend)
Some(channel)
}
case None =>
val requestedChannel_opt = requestedChannelId_opt.flatMap(channels.get)
requestedChannel_opt match {
case Some(requestedChannel) if alreadyTried.contains(requestedChannel.channelId) =>
context.log.debug("no channel seems to work for this payment and we have already tried the requested channel id: giving up")
None
case _ =>
context.log.debug("no channel seems to work for this payment, we will try to use the one requested")
requestedChannel_opt
}
case _ => Some(requestedShortChannelId) // we don't have a channel_update for this short_channel_id
}
}

Expand All @@ -252,23 +257,23 @@ class ChannelRelay private(nodeParams: NodeParams,
*/
def relayOrFail(outgoingChannel_opt: Option[OutgoingChannel]): RelayResult = {
import r._
outgoingChannel_opt.map(_.channelUpdate) match {
outgoingChannel_opt match {
case None =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true))
case Some(channelUpdate) if !channelUpdate.channelFlags.isEnabled =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)), commit = true))
case Some(channelUpdate) if payload.amountToForward < channelUpdate.htlcMinimumMsat =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, channelUpdate)), commit = true))
case Some(channelUpdate) if r.expiryDelta < channelUpdate.cltvExpiryDelta =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, channelUpdate)), commit = true))
case Some(channelUpdate) if r.relayFeeMsat < nodeFee(channelUpdate.relayFees, payload.amountToForward) &&
case Some(c) if !c.channelUpdate.channelFlags.isEnabled =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(c.channelUpdate.messageFlags, c.channelUpdate.channelFlags, c.channelUpdate)), commit = true))
case Some(c) if payload.amountToForward < c.channelUpdate.htlcMinimumMsat =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, c.channelUpdate)), commit = true))
case Some(c) if r.expiryDelta < c.channelUpdate.cltvExpiryDelta =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, c.channelUpdate)), commit = true))
case Some(c) if r.relayFeeMsat < nodeFee(c.channelUpdate.relayFees, payload.amountToForward) &&
// fees also do not satisfy the previous channel update for `enforcementDelay` seconds after current update
(TimestampSecond.now() - channelUpdate.timestamp > nodeParams.relayParams.enforcementDelay ||
(TimestampSecond.now() - c.channelUpdate.timestamp > nodeParams.relayParams.enforcementDelay ||
outgoingChannel_opt.flatMap(_.prevChannelUpdate).forall(c => r.relayFeeMsat < nodeFee(c.relayFees, payload.amountToForward))) =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true))
case Some(channelUpdate) =>
RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, c.channelUpdate)), commit = true))
case Some(c) =>
val origin = Origin.ChannelRelayedHot(addResponseAdapter.toClassic, add, payload.amountToForward)
RelaySuccess(channelUpdate.shortChannelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, origin, commit = true))
RelaySuccess(c.channelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, origin, commit = true))
}
}

Expand Down
Loading