diff --git a/eclair-core/pom.xml b/eclair-core/pom.xml index a6de36c710..cd169b8dc1 100644 --- a/eclair-core/pom.xml +++ b/eclair-core/pom.xml @@ -212,6 +212,12 @@ ${guava.version} + + com.softwaremill.quicklens + quicklens_${scala.version.short} + 1.4.11 + test + com.whisk docker-testkit-scalatest_${scala.version.short} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 98538fc7c2..0e774d0147 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -633,7 +633,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu self ! TickRefreshChannelUpdate } context.system.eventStream.publish(ChannelSignatureSent(self, commitments1)) - context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, nextRemoteCommit.spec.toRemoteMsat)) // note that remoteCommit.toRemote == toLocal + context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, nextRemoteCommit.spec.toRemoteMsat, commitments1.availableBalanceForSendMsat)) // note that remoteCommit.toRemote == toLocal // we expect a quick response from our peer setTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer = context.parent), timeout = nodeParams.revocationTimeout, repeat = false) handleCommandSuccess(sender, store(d.copy(commitments = commitments1))) sending commit diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index 8f89c12370..12007e1c53 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -52,7 +52,7 @@ case class ChannelFailed(channel: ActorRef, channelId: BinaryData, remoteNodeId: case class NetworkFeePaid(channel: ActorRef, remoteNodeId: PublicKey, channelId: BinaryData, tx: Transaction, fee: Satoshi, txType: String) extends ChannelEvent // NB: this event is only sent when the channel is available -case class AvailableBalanceChanged(channel: ActorRef, channelId: BinaryData, shortChannelId: ShortChannelId, localBalanceMsat: Long) extends ChannelEvent +case class AvailableBalanceChanged(channel: ActorRef, channelId: BinaryData, shortChannelId: ShortChannelId, localBalanceMsat: Long, availableBalanceForSendMsat: Long) extends ChannelEvent case class ChannelPersisted(channel: ActorRef, remoteNodeId: PublicKey, channelId: BinaryData, data: Data) extends ChannelEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index f835423006..699e1b4f9c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -71,6 +71,12 @@ case class Commitments(localParams: LocalParams, remoteParams: RemoteParams, def addRemoteProposal(proposal: UpdateMessage): Commitments = Commitments.addRemoteProposal(this, proposal) def announceChannel: Boolean = (channelFlags & 0x01) != 0 + + def availableBalanceForSendMsat: Long = { + val reduced = CommitmentSpec.reduce(remoteCommit.spec, remoteChanges.acked, localChanges.proposed) + val fees = if (localParams.isFunder) Transactions.commitTxFee(Satoshi(remoteParams.dustLimitSatoshis), reduced).amount else 0 + reduced.toRemoteMsat / 1000 - remoteParams.channelReserveSatoshis - fees + } } object Commitments { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/Relayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/Relayer.scala index fd573470ea..cf59c4255c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/Relayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/Relayer.scala @@ -70,16 +70,15 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR case LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments) => log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments) - val availableLocalBalance = commitments.remoteCommit.spec.toRemoteMsat // note that remoteCommit.toRemote == toLocal - context become main(channelUpdates + (channelUpdate.shortChannelId -> OutgoingChannel(remoteNodeId, channelUpdate, availableLocalBalance)), node2channels.addBinding(remoteNodeId, channelUpdate.shortChannelId)) + context become main(channelUpdates + (channelUpdate.shortChannelId -> OutgoingChannel(remoteNodeId, channelUpdate, commitments.availableBalanceForSendMsat)), node2channels.addBinding(remoteNodeId, channelUpdate.shortChannelId)) case LocalChannelDown(_, channelId, shortChannelId, remoteNodeId) => log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId") context become main(channelUpdates - shortChannelId, node2channels.removeBinding(remoteNodeId, shortChannelId)) - case AvailableBalanceChanged(_, _, shortChannelId, localBalanceMsat) => + case AvailableBalanceChanged(_, _, shortChannelId, _, availableBalanceForSendMsat) => val channelUpdates1 = channelUpdates.get(shortChannelId) match { - case Some(c: OutgoingChannel) => channelUpdates + (shortChannelId -> c.copy(availableBalanceMsat = localBalanceMsat)) + case Some(c: OutgoingChannel) => channelUpdates + (shortChannelId -> c.copy(availableBalanceMsat = availableBalanceForSendMsat)) case None => channelUpdates // we only consider the balance if we have the channel_update } context become main(channelUpdates1, node2channels) @@ -200,7 +199,7 @@ object Relayer { sealed trait NextPayload case class FinalPayload(add: UpdateAddHtlc, payload: PerHopPayload) extends NextPayload case class RelayPayload(add: UpdateAddHtlc, payload: PerHopPayload, nextPacket: Sphinx.Packet) extends NextPayload { - val relayFeeSatoshi = add.amountMsat - payload.amtToForward + val relayFeeMsat = add.amountMsat - payload.amtToForward val expiryDelta = add.cltvExpiry - payload.outgoingCltvValue } // @formatter:on @@ -264,19 +263,19 @@ object Relayer { case Some(channelUpdate) if !Announcements.isEnabled(channelUpdate.channelFlags) => Left(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)), commit = true)) case Some(channelUpdate) if payload.amtToForward < channelUpdate.htlcMinimumMsat => - Left(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(add.amountMsat, channelUpdate)), commit = true)) + Left(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amtToForward, channelUpdate)), commit = true)) case Some(channelUpdate) if relayPayload.expiryDelta != channelUpdate.cltvExpiryDelta => - Left(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(add.cltvExpiry, channelUpdate)), commit = true)) - case Some(channelUpdate) if relayPayload.relayFeeSatoshi < nodeFee(channelUpdate.feeBaseMsat, channelUpdate.feeProportionalMillionths, payload.amtToForward) => + Left(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltvValue, channelUpdate)), commit = true)) + case Some(channelUpdate) if relayPayload.relayFeeMsat < nodeFee(channelUpdate.feeBaseMsat, channelUpdate.feeProportionalMillionths, payload.amtToForward) => Left(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true)) case Some(channelUpdate) => - val isRedirected = (channelUpdate.shortChannelId != payload.shortChannelId) // we may decide to use another channel (to the same node) that the one requested + val isRedirected = (channelUpdate.shortChannelId != payload.shortChannelId) // we may decide to use another channel (to the same node) from the one requested Right(CMD_ADD_HTLC(payload.amtToForward, add.paymentHash, payload.outgoingCltvValue, nextPacket.serialize, upstream_opt = Some(add), commit = true, redirected = isRedirected)) } } /** - * Select a channel to the same node to the relay the payment to, that has the highest balance and is compatible in + * Select a channel to the same node to the relay the payment to, that has the lowest balance and is compatible in * terms of fees, expiry_delta, etc. * * If no suitable channel is found we default to the originally requested channel. @@ -293,14 +292,13 @@ object Relayer { log.debug(s"selecting next channel for htlc #{} paymentHash={} from channelId={} to requestedShortChannelId={}", add.id, add.paymentHash, add.channelId, requestedShortChannelId) // first we find out what is the next node channelUpdates.get(requestedShortChannelId) match { - case Some(OutgoingChannel(nextNodeId, _, requestedChannelId)) => + case Some(OutgoingChannel(nextNodeId, _, _)) => log.debug(s"next hop for htlc #{} paymentHash={} is nodeId={}", add.id, add.paymentHash, nextNodeId) // then we retrieve all known channels to this node - val candidateChannels = node2channels.get(nextNodeId).getOrElse(Set.empty) + val candidateChannels = node2channels.get(nextNodeId).getOrElse(Set.empty[ShortChannelId]) // and we filter keep the ones that are compatible with this payment (mainly fees, expiry delta) candidateChannels - .map { - case shortChannelId => + .map { shortChannelId => val channelInfo_opt = channelUpdates.get(shortChannelId) val channelUpdate_opt = channelInfo_opt.map(_.channelUpdate) val relayResult = handleRelay(relayPayload, channelUpdate_opt) @@ -308,9 +306,10 @@ object Relayer { (shortChannelId, channelInfo_opt, relayResult) } .collect { case (shortChannelId, Some(channelInfo), Right(_)) => (shortChannelId, channelInfo.availableBalanceMsat) } + .filter(_._2 > relayPayload.payload.amtToForward) // we only keep channels that have enough balance to handle this payment .toList // needed for ordering - .sortBy(_._2) // we want to use the channel with the highest available balance - .lastOption match { + .sortBy(_._2) // we want to use the channel with the lowest available balance that can process the payment + .headOption match { case Some((preferredShortChannelId, availableBalanceMsat)) if preferredShortChannelId != requestedShortChannelId => log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, availableBalanceMsat) preferredShortChannelId diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/ChannelSelectionSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/ChannelSelectionSpec.scala new file mode 100644 index 0000000000..a300dfae60 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/ChannelSelectionSpec.scala @@ -0,0 +1,98 @@ +package fr.acinq.eclair.payment + +import akka.http.impl.util.DefaultNoLogging +import fr.acinq.bitcoin.Block +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.eclair.channel.{CMD_ADD_HTLC, CMD_FAIL_HTLC} +import fr.acinq.eclair.crypto.Sphinx +import fr.acinq.eclair.payment.Relayer.{OutgoingChannel, RelayPayload} +import fr.acinq.eclair.router.Announcements +import fr.acinq.eclair.wire._ +import fr.acinq.eclair.{ShortChannelId, randomBytes, randomKey} +import org.scalatest.FunSuite + +import scala.collection.mutable + +class ChannelSelectionSpec extends FunSuite { + + /** + * This is just a simplified helper function with random values for fields we are not using here + */ + def dummyUpdate(shortChannelId: ShortChannelId, cltvExpiryDelta: Int, htlcMinimumMsat: Long, feeBaseMsat: Long, feeProportionalMillionths: Long, htlcMaximumMsat: Long, enable: Boolean = true) = + Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, randomKey, randomKey.publicKey, shortChannelId, cltvExpiryDelta, htlcMinimumMsat, feeBaseMsat, feeProportionalMillionths, htlcMaximumMsat, enable) + + test("handle relay") { + val relayPayload = RelayPayload( + add = UpdateAddHtlc(randomBytes(32), 42, 1000000, randomBytes(32), 70, ""), + payload = PerHopPayload(ShortChannelId(12345), amtToForward = 998900, outgoingCltvValue = 60), + nextPacket = Sphinx.LAST_PACKET // just a placeholder + ) + + val channelUpdate = dummyUpdate(ShortChannelId(12345), 10, 100, 1000, 100, 10000000, true) + + implicit val log = DefaultNoLogging + + // nominal case + assert(Relayer.handleRelay(relayPayload, Some(channelUpdate)) === Right(CMD_ADD_HTLC(relayPayload.payload.amtToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltvValue, relayPayload.nextPacket.serialize, upstream_opt = Some(relayPayload.add), commit = true, redirected = false))) + // redirected to preferred channel + assert(Relayer.handleRelay(relayPayload, Some(channelUpdate.copy(shortChannelId = ShortChannelId(1111)))) === Right(CMD_ADD_HTLC(relayPayload.payload.amtToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltvValue, relayPayload.nextPacket.serialize, upstream_opt = Some(relayPayload.add), commit = true, redirected = true))) + // no channel_update + assert(Relayer.handleRelay(relayPayload, channelUpdate_opt = None) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(UnknownNextPeer), commit = true))) + // channel disabled + val channelUpdate_disabled = channelUpdate.copy(channelFlags = Announcements.makeChannelFlags(true, enable = false)) + assert(Relayer.handleRelay(relayPayload, Some(channelUpdate_disabled)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(ChannelDisabled(channelUpdate_disabled.messageFlags, channelUpdate_disabled.channelFlags, channelUpdate_disabled)), commit = true))) + // amount too low + val relayPayload_toolow = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 99)) + assert(Relayer.handleRelay(relayPayload_toolow, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(AmountBelowMinimum(relayPayload_toolow.payload.amtToForward, channelUpdate)), commit = true))) + // incorrect cltv expiry + val relayPayload_incorrectcltv = relayPayload.copy(payload = relayPayload.payload.copy(outgoingCltvValue = 42)) + assert(Relayer.handleRelay(relayPayload_incorrectcltv, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(IncorrectCltvExpiry(relayPayload_incorrectcltv.payload.outgoingCltvValue, channelUpdate)), commit = true))) + // insufficient fee + val relayPayload_insufficientfee = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 998910)) + assert(Relayer.handleRelay(relayPayload_insufficientfee, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(FeeInsufficient(relayPayload_insufficientfee.add.amountMsat, channelUpdate)), commit = true))) + // note that a generous fee is ok! + val relayPayload_highfee = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 900000)) + assert(Relayer.handleRelay(relayPayload_highfee, Some(channelUpdate)) === Right(CMD_ADD_HTLC(relayPayload_highfee.payload.amtToForward, relayPayload_highfee.add.paymentHash, relayPayload_highfee.payload.outgoingCltvValue, relayPayload_highfee.nextPacket.serialize, upstream_opt = Some(relayPayload.add), commit = true, redirected = false))) + } + + test("relay channel selection") { + + val relayPayload = RelayPayload( + add = UpdateAddHtlc(randomBytes(32), 42, 1000000, randomBytes(32), 70, ""), + payload = PerHopPayload(ShortChannelId(12345), amtToForward = 998900, outgoingCltvValue = 60), + nextPacket = Sphinx.LAST_PACKET // just a placeholder + ) + + val (a, b) = (randomKey.publicKey, randomKey.publicKey) + val channelUpdate = dummyUpdate(ShortChannelId(12345), 10, 100, 1000, 100, 10000000, true) + + val channelUpdates = Map( + ShortChannelId(11111) -> OutgoingChannel(a, channelUpdate, 100000000), + ShortChannelId(12345) -> OutgoingChannel(a, channelUpdate, 20000000), + ShortChannelId(22222) -> OutgoingChannel(a, channelUpdate, 10000000), + ShortChannelId(33333) -> OutgoingChannel(a, channelUpdate, 100000), + ShortChannelId(44444) -> OutgoingChannel(b, channelUpdate, 1000000) + ) + + val node2channels = new mutable.HashMap[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId] + node2channels.put(a, mutable.Set(ShortChannelId(12345), ShortChannelId(11111), ShortChannelId(22222), ShortChannelId(33333))) + node2channels.put(b, mutable.Set(ShortChannelId(44444))) + + implicit val log = DefaultNoLogging + + import com.softwaremill.quicklens._ + + // select the channel to the same node, with the lowest balance but still high enough to handle the payment + assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels) === ShortChannelId(22222)) + // higher amount payment (have to increased incoming htlc amount for fees to be sufficient) + assert(Relayer.selectPreferredChannel(relayPayload.modify(_.add.amountMsat).setTo(60000000).modify(_.payload.amtToForward).setTo(50000000), channelUpdates, node2channels) === ShortChannelId(11111)) + // lower amount payment + assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000), channelUpdates, node2channels) === ShortChannelId(33333)) + // payment too high, no suitable channel, we keep the requested one + assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000000000), channelUpdates, node2channels) === ShortChannelId(12345)) + // invalid cltv expiry, no suitable channel, we keep the requested one + assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.outgoingCltvValue).setTo(40), channelUpdates, node2channels) === ShortChannelId(12345)) + + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/RelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/RelayerSpec.scala index 3d2283928a..690881cef6 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/RelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/RelayerSpec.scala @@ -55,9 +55,11 @@ class RelayerSpec extends TestkitBaseClass { val channelId_ab: BinaryData = randomBytes(32) val channelId_bc: BinaryData = randomBytes(32) - def makeCommitments(channelId: BinaryData) = Commitments(null, null, 0.toByte, null, + def makeCommitments(channelId: BinaryData) = new Commitments(null, null, 0.toByte, null, RemoteCommit(42, CommitmentSpec(Set.empty, 20000, 5000000, 100000000), "00" * 32, randomKey.toPoint), - null, null, 0, 0, Map.empty, null, null, null, channelId) + null, null, 0, 0, Map.empty, null, null, null, channelId) { + override def availableBalanceForSendMsat: Long = remoteCommit.spec.toRemoteMsat // approximation + } test("relay an htlc-add") { f => import f._