diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index b2de25ee7c..fc78e6c470 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -68,6 +68,11 @@ eclair { max-to-local-delay-blocks = 2016 // maximum number of blocks that we are ready to accept for our own delayed outputs (2016 ~ 2 weeks) mindepth-blocks = 3 expiry-delta-blocks = 144 + // When we receive the pre-image for an HTLC and want to fulfill it but the upstream peer stops responding, we want to + // avoid letting its HTLC-timeout transaction become enforceable on-chain (otherwise there is a race condition between + // our HTLC-success and their HTLC-timeout). + // We will close the channel when the HTLC-timeout will happen in less than this number. + fulfill-safety-before-timeout-blocks = 6 fee-base-msat = 1000 fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 9266d701e7..d707fa9e6c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -50,6 +50,7 @@ case class NodeParams(keyManager: KeyManager, maxHtlcValueInFlightMsat: UInt64, maxAcceptedHtlcs: Int, expiryDeltaBlocks: Int, + fulfillSafetyBeforeTimeoutBlocks: Int, htlcMinimumMsat: Int, toRemoteDelayBlocks: Int, maxToLocalDelayBlocks: Int, @@ -149,6 +150,10 @@ object NodeParams { val offeredCLTV = config.getInt("to-remote-delay-blocks") require(maxToLocalCLTV <= Channel.MAX_TO_SELF_DELAY && offeredCLTV <= Channel.MAX_TO_SELF_DELAY, s"CLTV delay values too high, max is ${Channel.MAX_TO_SELF_DELAY}") + val expiryDeltaBlocks = config.getInt("expiry-delta-blocks") + val fulfillSafetyBeforeTimeoutBlocks = config.getInt("fulfill-safety-before-timeout-blocks") + require(fulfillSafetyBeforeTimeoutBlocks < expiryDeltaBlocks, "fulfill-safety-before-timeout-blocks must be smaller than expiry-delta-blocks") + val nodeAlias = config.getString("node-alias") require(nodeAlias.getBytes("UTF-8").length <= 32, "invalid alias, too long (max allowed 32 bytes)") @@ -156,7 +161,7 @@ object NodeParams { val p = PublicKey(ByteVector.fromValidHex(e.getString("nodeid"))) val gf = ByteVector.fromValidHex(e.getString("global-features")) val lf = ByteVector.fromValidHex(e.getString("local-features")) - (p -> (gf, lf)) + p -> (gf, lf) }.toMap val socksProxy_opt = if (config.getBoolean("socks5.enabled")) { @@ -187,7 +192,8 @@ object NodeParams { dustLimitSatoshis = dustLimitSatoshis, maxHtlcValueInFlightMsat = UInt64(config.getLong("max-htlc-value-in-flight-msat")), maxAcceptedHtlcs = maxAcceptedHtlcs, - expiryDeltaBlocks = config.getInt("expiry-delta-blocks"), + expiryDeltaBlocks = expiryDeltaBlocks, + fulfillSafetyBeforeTimeoutBlocks = fulfillSafetyBeforeTimeoutBlocks, htlcMinimumMsat = config.getInt("htlc-minimum-msat"), toRemoteDelayBlocks = config.getInt("to-remote-delay-blocks"), maxToLocalDelayBlocks = config.getInt("max-to-local-delay-blocks"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 3199abcbe4..1ce4c5a51f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel import akka.actor.{ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy} import akka.event.Logging.MDC import akka.pattern.pipe -import fr.acinq.bitcoin.Crypto.{PublicKey, PrivateKey, sha256} +import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey, sha256} import fr.acinq.bitcoin._ import fr.acinq.eclair._ import fr.acinq.eclair.blockchain._ @@ -98,7 +98,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId import nodeParams.keyManager // we pass these to helpers classes so that they have the logging context - implicit def implicitLog = log + implicit def implicitLog: akka.event.LoggingAdapter = log val forwarder = context.actorOf(Props(new Forwarder(nodeParams)), "forwarder") @@ -146,7 +146,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId when(WAIT_FOR_INIT_INTERNAL)(handleExceptions { case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, _, localParams, remote, _, channelFlags), Nothing) => - context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, true, temporaryChannelId)) + context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, isFunder = true, temporaryChannelId)) forwarder ! remote val open = OpenChannel(nodeParams.chainHash, temporaryChannelId = temporaryChannelId, @@ -270,7 +270,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId Try(Helpers.validateParamsFundee(nodeParams, open)) match { case Failure(t) => handleLocalError(t, d, Some(open)) case Success(_) => - context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, false, open.temporaryChannelId)) + context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, isFunder = false, open.temporaryChannelId)) // TODO: maybe also check uniqueness of temporary channel id val minimumDepth = nodeParams.minDepthBlocks val accept = AcceptChannel(temporaryChannelId = open.temporaryChannelId, @@ -762,7 +762,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // we were waiting for our pending htlcs to be signed before replying with our local shutdown val localShutdown = Shutdown(d.channelId, commitments1.localParams.defaultFinalScriptPubKey) // note: it means that we had pending htlcs to sign, therefore we go to SHUTDOWN, not to NEGOTIATING - require(commitments1.remoteCommit.spec.htlcs.size > 0, "we must have just signed new htlcs, otherwise we would have sent our Shutdown earlier") + require(commitments1.remoteCommit.spec.htlcs.nonEmpty, "we must have just signed new htlcs, otherwise we would have sent our Shutdown earlier") goto(SHUTDOWN) using store(DATA_SHUTDOWN(commitments1, localShutdown, d.remoteShutdown.get)) sending localShutdown } else { stay using store(d.copy(commitments = commitments1)) @@ -849,8 +849,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } } - case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + case Event(c: CurrentBlockCount, d: DATA_NORMAL) => handleNewBlock(c, d) case Event(c@CurrentFeerates(feeratesPerKw), d: DATA_NORMAL) => val networkFeeratePerKw = feeratesPerKw.blocks_2 @@ -934,7 +933,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d) - case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d) + case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid) => handleRemoteSpentNext(tx, d) case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) => handleRemoteSpentOther(tx, d) @@ -1132,8 +1131,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(r: RevocationTimeout, d: DATA_SHUTDOWN) => handleRevocationTimeout(r, d) - case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + case Event(c: CurrentBlockCount, d: DATA_SHUTDOWN) => handleNewBlock(c, d) case Event(c@CurrentFeerates(feerates), d: DATA_SHUTDOWN) => val networkFeeratePerKw = feerates.blocks_2 @@ -1148,7 +1146,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d) - case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d) + case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) if d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid) => handleRemoteSpentNext(tx, d) case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) => handleRemoteSpentOther(tx, d) @@ -1162,7 +1160,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(c@ClosingSigned(_, remoteClosingFee, remoteSig), d: DATA_NEGOTIATING) => log.info(s"received closingFeeSatoshis=$remoteClosingFee") Closing.checkClosingSignature(keyManager, d.commitments, d.localShutdown.scriptPubKey, d.remoteShutdown.scriptPubKey, Satoshi(remoteClosingFee), remoteSig) match { - case Success(signedClosingTx) if Some(remoteClosingFee) == d.closingTxProposed.last.lastOption.map(_.localClosingSigned.feeSatoshis) || d.closingTxProposed.flatten.size >= MAX_NEGOTIATION_ITERATIONS => + case Success(signedClosingTx) if d.closingTxProposed.last.lastOption.map(_.localClosingSigned.feeSatoshis).contains(remoteClosingFee) || d.closingTxProposed.flatten.size >= MAX_NEGOTIATION_ITERATIONS => // we close when we converge or when there were too many iterations handleMutualClose(signedClosingTx, Left(d.copy(bestUnpublishedClosingTx_opt = Some(signedClosingTx)))) case Success(signedClosingTx) => @@ -1172,7 +1170,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId localClosingFee = lastLocalClosingFee.getOrElse(Closing.firstClosingFee(d.commitments, d.localShutdown.scriptPubKey, d.remoteShutdown.scriptPubKey)), remoteClosingFee = Satoshi(remoteClosingFee)) val (closingTx, closingSigned) = Closing.makeClosingTx(keyManager, d.commitments, d.localShutdown.scriptPubKey, d.remoteShutdown.scriptPubKey, nextClosingFee) - if (Some(nextClosingFee) == lastLocalClosingFee) { + if (lastLocalClosingFee.contains(nextClosingFee)) { // next computed fee is the same than the one we previously sent (probably because of rounding), let's close now handleMutualClose(signedClosingTx, Left(d.copy(bestUnpublishedClosingTx_opt = Some(signedClosingTx)))) } else if (nextClosingFee == Satoshi(remoteClosingFee)) { @@ -1197,7 +1195,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d) - case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d) + case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) if d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid) => handleRemoteSpentNext(tx, d) case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) => handleRemoteSpentOther(tx, d) @@ -1213,19 +1211,19 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Success((commitments1, _)) => log.info(s"got valid payment preimage, recalculating transactions to redeem the corresponding htlc on-chain") val localCommitPublished1 = d.localCommitPublished.map { - case localCommitPublished => + localCommitPublished => val localCommitPublished1 = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, commitments1, localCommitPublished.commitTx) doPublish(localCommitPublished1) localCommitPublished1 } val remoteCommitPublished1 = d.remoteCommitPublished.map { - case remoteCommitPublished => + remoteCommitPublished => val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx) doPublish(remoteCommitPublished1) remoteCommitPublished1 } val nextRemoteCommitPublished1 = d.nextRemoteCommitPublished.map { - case remoteCommitPublished => + remoteCommitPublished => val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx) doPublish(remoteCommitPublished1) remoteCommitPublished1 @@ -1247,22 +1245,22 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } else if (d.mutualCloseProposed.map(_.txid).contains(tx.txid)) { // at any time they can publish a closing tx with any sig we sent them handleMutualClose(tx, Right(d)) - } else if (Some(tx.txid) == d.localCommitPublished.map(_.commitTx.txid)) { + } else if (d.localCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay - } else if (Some(tx.txid) == d.remoteCommitPublished.map(_.commitTx.txid)) { + } else if (d.remoteCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay - } else if (Some(tx.txid) == d.nextRemoteCommitPublished.map(_.commitTx.txid)) { + } else if (d.nextRemoteCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay - } else if (Some(tx.txid) == d.futureRemoteCommitPublished.map(_.commitTx.txid)) { + } else if (d.futureRemoteCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay } else if (tx.txid == d.commitments.remoteCommit.txid) { // counterparty may attempt to spend its last commit tx at any time handleRemoteSpentCurrent(tx, d) - } else if (Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid)) { + } else if (d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid)) { // counterparty may attempt to spend its last commit tx at any time handleRemoteSpentNext(tx, d) } else { @@ -1307,7 +1305,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val futureRemoteCommitPublished1 = d.futureRemoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx)) val revokedCommitPublished1 = d.revokedCommitPublished.map(Closing.updateRevokedCommitPublished(_, tx)) // if the local commitment tx just got confirmed, let's send an event telling when we will get the main output refund - if (localCommitPublished1.map(_.commitTx.txid) == Some(tx.txid)) { + if (localCommitPublished1.map(_.commitTx.txid).contains(tx.txid)) { context.system.eventStream.publish(LocalCommitConfirmed(self, remoteNodeId, d.channelId, blockHeight + d.commitments.remoteParams.toSelfDelay)) } // we may need to fail some htlcs in case a commitment tx was published and they have reached the timeout threshold @@ -1345,7 +1343,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // we update the channel data val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1) // and we also send events related to fee - Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) } + Closing.networkFeePaid(tx, d1) foreach { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) } // then let's see if any of the possible close scenarii can be considered done val closingType_opt = Closing.isClosed(d1, Some(tx)) // finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state) @@ -1379,7 +1377,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case d: HasCommitments => log.info(s"deleting database record for channelId=${d.channelId}") nodeParams.db.channels.removeChannel(d.channelId) - case _ => {} + case _ => } log.info("shutting down") stop(FSM.Normal) @@ -1422,11 +1420,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(SYNCING) using d1 sending channelReestablish - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => - // note: this can only happen if state is NORMAL or SHUTDOWN - // -> in NEGOTIATING there are no more htlcs - // -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + // note: this can only happen if state is NORMAL or SHUTDOWN + // -> in NEGOTIATING there are no more htlcs + // -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway + case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d) case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d) @@ -1475,7 +1472,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // if next_remote_revocation_number is greater than our local commitment index, it means that either we are using an outdated commitment, or they are lying // but first we need to make sure that the last per_commitment_secret that they claim to have received from us is correct for that next_remote_revocation_number minus 1 if (keyManager.commitmentSecret(d.commitments.localParams.channelKeyPath, nextRemoteRevocationNumber - 1) == yourLastPerCommitmentSecret) { - log.warning(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourCommitmentNumber=${d.commitments.localCommit.index} theirCommitmentNumber=${nextRemoteRevocationNumber}") + log.warning(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourCommitmentNumber=${d.commitments.localCommit.index} theirCommitmentNumber=$nextRemoteRevocationNumber") // their data checks out, we indeed seem to be using an old revoked commitment, and must absolutely *NOT* publish it, because that would be a cheating attempt and they // would punish us by taking all the funds in the channel val exc = PleasePublishYourCommitment(d.channelId) @@ -1487,7 +1484,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } case ChannelReestablish(_, nextLocalCommitmentNumber, _, _, _) if !Helpers.checkRemoteCommit(d, nextLocalCommitmentNumber) => // if next_local_commit_number is more than one more our remote commitment index, it means that either we are using an outdated commitment, or they are lying - log.warning(s"counterparty says that they have a more recent commitment than the one we know of!!! ourCommitmentNumber=${d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.index).getOrElse(d.commitments.remoteCommit.index)} theirCommitmentNumber=${nextLocalCommitmentNumber}") + log.warning(s"counterparty says that they have a more recent commitment than the one we know of!!! ourCommitmentNumber=${d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.index).getOrElse(d.commitments.remoteCommit.index)} theirCommitmentNumber=$nextLocalCommitmentNumber") // there is no way to make sure that they are saying the truth, the best thing to do is ask them to publish their commitment right now // maybe they will publish their commitment, in that case we need to remember their commitment point in order to be able to claim our outputs // not that if they don't comply, we could publish our own commitment (it is not stale, otherwise we would be in the case above) @@ -1560,8 +1557,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(NEGOTIATING) using d.copy(closingTxProposed = closingTxProposed1) sending d.localShutdown } - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d) case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSince, d.fundingTx) @@ -1723,13 +1719,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This function is used to return feedback to user at channel opening */ - def replyToUser(message: Either[Channel.ChannelError, String]) = { + def replyToUser(message: Either[Channel.ChannelError, String]): Unit = { val m = message match { case Left(LocalError(t)) => Status.Failure(t) case Left(RemoteError(e)) => Status.Failure(new RuntimeException(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}")) case Right(s) => s } - origin_opt.map(_ ! m) + origin_opt.foreach(_ ! m) } def handleCommandSuccess(sender: ActorRef, newData: Data) = { @@ -1749,10 +1745,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * When we are funder, we use this function to detect when our funding tx has been double-spent (by another transaction * that we made for some reason). If the funding tx has been double spent we can forget about the channel. - * - * @param fundingTx */ - def checkDoubleSpent(fundingTx: Transaction) = { + def checkDoubleSpent(fundingTx: Transaction): Unit = { log.debug(s"checking status of funding tx txid=${fundingTx.txid}") wallet.doubleSpent(fundingTx).onComplete { case Success(true) => @@ -1777,7 +1771,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // we also check if the funding tx has been double-spent checkDoubleSpent(fundingTx) context.system.scheduler.scheduleOnce(1 day, blockchain, GetTxWithMeta(txid)) - case None if (now.seconds - waitingSince.seconds) > FUNDING_TIMEOUT_FUNDEE && (now.seconds - lastBlockTimestamp.seconds) < 1.hour => + case None if (now.seconds - waitingSince.seconds) > FUNDING_TIMEOUT_FUNDEE && (now.seconds - lastBlockTimestamp.seconds) < 1.hour => // if we are fundee, we give up after some time // NB: we want to be sure that the blockchain is in sync to prevent false negatives log.warning(s"funding tx hasn't been published in ${(now.seconds - waitingSince.seconds).toDays} days and blockchain is fresh from ${(now.seconds - lastBlockTimestamp.seconds).toMinutes} minutes ago") @@ -1837,6 +1831,34 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } } + def handleNewBlock(c: CurrentBlockCount, d: HasCommitments) = { + val timedOutOutgoing = d.commitments.timedOutOutgoingHtlcs(c.blockCount) + val almostTimedOutIncoming = d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeoutBlocks) + if (timedOutOutgoing.nonEmpty) { + // Downstream timed out. + handleLocalError(HtlcTimedout(d.channelId, timedOutOutgoing), d, Some(c)) + } else if (almostTimedOutIncoming.nonEmpty) { + // Upstream is close to timing out. + val relayedFulfills = d.commitments.localChanges.all.collect { case u: UpdateFulfillHtlc => u.id }.toSet + val offendingRelayedHtlcs = almostTimedOutIncoming.filter(htlc => relayedFulfills.contains(htlc.id)) + if (offendingRelayedHtlcs.nonEmpty) { + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, offendingRelayedHtlcs), d, Some(c)) + } else { + // There might be pending fulfill commands that we haven't relayed yet. + // Since this involves a DB call, we only want to check it if all the previous checks failed (this is the slow path). + val pendingRelayFulfills = nodeParams.db.pendingRelay.listPendingRelay(d.channelId).collect { case CMD_FULFILL_HTLC(id, r, _) => id } + val offendingPendingRelayFulfills = almostTimedOutIncoming.filter(htlc => pendingRelayFulfills.contains(htlc.id)) + if (offendingPendingRelayFulfills.nonEmpty) { + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, offendingPendingRelayFulfills), d, Some(c)) + } else { + stay + } + } + } else { + stay + } + } + def handleLocalError(cause: Throwable, d: Data, msg: Option[Any]) = { cause match { case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request") @@ -1887,7 +1909,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(CLOSING) using store(nextData) } - def doPublish(closingTx: Transaction) = { + def doPublish(closingTx: Transaction): Unit = { blockchain ! PublishAsap(closingTx) blockchain ! WatchConfirmed(self, closingTx, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(closingTx)) } @@ -1922,11 +1944,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This helper method will publish txes only if they haven't yet reached minDepth - * - * @param txes - * @param irrevocablySpent */ - def publishIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) = { + def publishIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = { val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) process.foreach { tx => log.info(s"publishing txid=${tx.txid}") @@ -1937,11 +1956,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This helper method will watch txes only if they haven't yet reached minDepth - * - * @param txes - * @param irrevocablySpent */ - def watchConfirmedIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) = { + def watchConfirmedIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = { val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) process.foreach(tx => blockchain ! WatchConfirmed(self, tx, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(tx))) skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed")) @@ -1949,18 +1965,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This helper method will watch txes only if the utxo they spend hasn't already been irrevocably spent - * - * @param parentTx - * @param txes - * @param irrevocablySpent */ - def watchSpentIfNeeded(parentTx: Transaction, txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) = { + def watchSpentIfNeeded(parentTx: Transaction, txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = { val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT)) skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed")) } - def doPublish(localCommitPublished: LocalCommitPublished) = { + def doPublish(localCommitPublished: LocalCommitPublished): Unit = { import localCommitPublished._ val publishQueue = List(commitTx) ++ claimMainDelayedOutputTx ++ htlcSuccessTxs ++ htlcTimeoutTxs ++ claimHtlcDelayedTxs @@ -2024,7 +2036,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(CLOSING) using store(nextData) } - def doPublish(remoteCommitPublished: RemoteCommitPublished) = { + def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = { import remoteCommitPublished._ val publishQueue = claimMainOutputTx ++ claimHtlcSuccessTxs ++ claimHtlcTimeoutTxs @@ -2066,7 +2078,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } } - def doPublish(revokedCommitPublished: RevokedCommitPublished) = { + def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = { import revokedCommitPublished._ val publishQueue = claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs @@ -2109,7 +2121,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId log.debug(s"localNextHtlcId=${d.commitments.localNextHtlcId}->${commitments1.localNextHtlcId}") log.debug(s"remoteNextHtlcId=${d.commitments.remoteNextHtlcId}->${commitments1.remoteNextHtlcId}") - def resendRevocation = { + def resendRevocation(): Unit = { // let's see the state of remote sigs if (commitments1.localCommit.index == channelReestablish.nextRemoteRevocationNumber) { // nothing to do @@ -2134,24 +2146,24 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // they had received the new sig but their revocation was lost during the disconnection // they will send us the revocation, nothing to do here log.debug(s"waiting for them to re-send their last revocation") - resendRevocation + resendRevocation() case Left(waitingForRevocation) if waitingForRevocation.nextRemoteCommit.index == channelReestablish.nextLocalCommitmentNumber => // we had sent a new sig and were waiting for their revocation // they didn't receive the new sig because of the disconnection // we just resend the same updates and the same sig val revWasSentLast = commitments1.localCommit.index > waitingForRevocation.sentAfterLocalCommitIndex - if (!revWasSentLast) resendRevocation + if (!revWasSentLast) resendRevocation() log.debug(s"re-sending previously local signed changes: ${commitments1.localChanges.signed.map(Commitments.msg2String(_)).mkString(",")}") commitments1.localChanges.signed.foreach(forwarder ! _) log.debug(s"re-sending the exact same previous sig") forwarder ! waitingForRevocation.sent - if (revWasSentLast) resendRevocation + if (revWasSentLast) resendRevocation() case Right(_) if commitments1.remoteCommit.index + 1 == channelReestablish.nextLocalCommitmentNumber => // there wasn't any sig in-flight when the disconnection occurred - resendRevocation + resendRevocation() case _ => throw CommitmentSyncError(d.channelId) } @@ -2196,7 +2208,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Right(u) => Relayed(u.channelId, u.id, u.amountMsat, c.amountMsat) // this is a relayed payment } - def feePaid(fee: Satoshi, tx: Transaction, desc: String, channelId: ByteVector32) = { + def feePaid(fee: Satoshi, tx: Transaction, desc: String, channelId: ByteVector32): Unit = { log.info(s"paid feeSatoshi=${fee.amount} for txid=${tx.txid} desc=$desc") context.system.eventStream.publish(NetworkFeePaid(self, remoteNodeId, channelId, tx, fee, desc)) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala index f5b45e77c6..3546f22af6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.channel import fr.acinq.bitcoin.Crypto.PrivateKey import fr.acinq.bitcoin.{ByteVector32, Transaction} -import fr.acinq.eclair.{ShortChannelId, UInt64} +import fr.acinq.eclair.UInt64 import fr.acinq.eclair.payment.Origin import fr.acinq.eclair.wire.{ChannelUpdate, UpdateAddHtlc} @@ -27,6 +27,7 @@ import fr.acinq.eclair.wire.{ChannelUpdate, UpdateAddHtlc} */ class ChannelException(val channelId: ByteVector32, message: String) extends RuntimeException(message) + // @formatter:off case class DebugTriggeredException (override val channelId: ByteVector32) extends ChannelException(channelId, "debug-mode triggered failure") case class InvalidChainHash (override val channelId: ByteVector32, local: ByteVector32, remote: ByteVector32) extends ChannelException(channelId, s"invalid chainHash (local=$local remote=$remote)") @@ -49,6 +50,7 @@ case class InvalidFinalScript (override val channelId: ByteVect case class FundingTxTimedout (override val channelId: ByteVector32) extends ChannelException(channelId, "funding tx timed out") case class FundingTxSpent (override val channelId: ByteVector32, spendingTx: Transaction) extends ChannelException(channelId, s"funding tx has been spent by txid=${spendingTx.txid}") case class HtlcTimedout (override val channelId: ByteVector32, htlcs: Set[UpdateAddHtlc]) extends ChannelException(channelId, s"one or more htlcs timed out: ids=${htlcs.take(10).map(_.id).mkString}") // we only display the first 10 ids +case class HtlcWillTimeoutUpstream (override val channelId: ByteVector32, htlcs: Set[UpdateAddHtlc]) extends ChannelException(channelId, s"one or more htlcs that should be fulfilled are close to timing out upstream: ids=${htlcs.take(10).map(_.id).mkString}") // we only display the first 10 ids case class HtlcOverridenByLocalCommit (override val channelId: ByteVector32) extends ChannelException(channelId, "htlc was overriden by local commit") case class FeerateTooSmall (override val channelId: ByteVector32, remoteFeeratePerKw: Long) extends ChannelException(channelId, s"remote fee rate is too small: remoteFeeratePerKw=$remoteFeeratePerKw") case class FeerateTooDifferent (override val channelId: ByteVector32, localFeeratePerKw: Long, remoteFeeratePerKw: Long) extends ChannelException(channelId, s"local/remote feerates are too different: remoteFeeratePerKw=$remoteFeeratePerKw localFeeratePerKw=$localFeeratePerKw") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 579f8bac9f..40aa32a9f0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -62,11 +62,23 @@ case class Commitments(channelVersion: ChannelVersion, def hasNoPendingHtlcs: Boolean = localCommit.spec.htlcs.isEmpty && remoteCommit.spec.htlcs.isEmpty && remoteNextCommitInfo.isRight - def timedoutOutgoingHtlcs(blockheight: Long): Set[UpdateAddHtlc] = + def timedOutOutgoingHtlcs(blockheight: Long): Set[UpdateAddHtlc] = (localCommit.spec.htlcs.filter(htlc => htlc.direction == OUT && blockheight >= htlc.add.cltvExpiry) ++ remoteCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry) ++ remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry)).getOrElse(Set.empty[DirectedHtlc])).map(_.add) + /** + * HTLCs that are close to timing out upstream are potentially dangerous. If we received the pre-image for those + * HTLCs, we need to get a remote signed updated commitment that removes this HTLC. + * Otherwise when we get close to the upstream timeout, we risk an on-chain race condition between their HTLC timeout + * and our HTLC success in case of a force-close. + */ + def almostTimedOutIncomingHtlcs(blockheight: Long, fulfillSafety: Int): Set[UpdateAddHtlc] = { + localCommit.spec.htlcs.collect { + case htlc if htlc.direction == IN && blockheight >= htlc.add.cltvExpiry - fulfillSafety => htlc.add + } + } + def addLocalProposal(proposal: UpdateMessage): Commitments = Commitments.addLocalProposal(this, proposal) def addRemoteProposal(proposal: UpdateMessage): Commitments = Commitments.addRemoteProposal(this, proposal) @@ -87,12 +99,13 @@ case class Commitments(channelVersion: ChannelVersion, } object Commitments { + /** - * add a change to our proposed change list + * Add a change to our proposed change list. * - * @param commitments - * @param proposal - * @return an updated commitment instance + * @param commitments current commitments. + * @param proposal proposed change to add. + * @return an updated commitment instance. */ private def addLocalProposal(commitments: Commitments, proposal: UpdateMessage): Commitments = commitments.copy(localChanges = commitments.localChanges.copy(proposed = commitments.localChanges.proposed :+ proposal)) @@ -212,14 +225,14 @@ object Commitments { val fulfill = UpdateFulfillHtlc(commitments.channelId, cmd.id, cmd.r) val commitments1 = addLocalProposal(commitments, fulfill) (commitments1, fulfill) - case Some(htlc) => throw InvalidHtlcPreimage(commitments.channelId, cmd.id) + case Some(_) => throw InvalidHtlcPreimage(commitments.channelId, cmd.id) case None => throw UnknownHtlcId(commitments.channelId, cmd.id) } def receiveFulfill(commitments: Commitments, fulfill: UpdateFulfillHtlc): Either[Commitments, (Commitments, Origin, UpdateAddHtlc)] = getHtlcCrossSigned(commitments, OUT, fulfill.id) match { case Some(htlc) if htlc.paymentHash == sha256(fulfill.paymentPreimage) => Right((addRemoteProposal(commitments, fulfill), commitments.originChannels(fulfill.id), htlc)) - case Some(htlc) => throw InvalidHtlcPreimage(commitments.channelId, fulfill.id) + case Some(_) => throw InvalidHtlcPreimage(commitments.channelId, fulfill.id) case None => throw UnknownHtlcId(commitments.channelId, fulfill.id) } @@ -244,7 +257,7 @@ object Commitments { val fail = UpdateFailHtlc(commitments.channelId, cmd.id, reason) val commitments1 = addLocalProposal(commitments, fail) (commitments1, fail) - case Failure(_) => throw new CannotExtractSharedSecret(commitments.channelId, htlc) + case Failure(_) => throw CannotExtractSharedSecret(commitments.channelId, htlc) } case None => throw UnknownHtlcId(commitments.channelId, cmd.id) } @@ -263,7 +276,7 @@ object Commitments { } => // we have already sent a fail/fulfill for this htlc throw UnknownHtlcId(commitments.channelId, cmd.id) - case Some(htlc) => + case Some(_) => val fail = UpdateFailMalformedHtlc(commitments.channelId, cmd.id, cmd.onionHash, cmd.failureCode) val commitments1 = addLocalProposal(commitments, fail) (commitments1, fail) @@ -348,9 +361,9 @@ object Commitments { def remoteHasUnsignedOutgoingHtlcs(commitments: Commitments): Boolean = commitments.remoteChanges.proposed.collectFirst { case u: UpdateAddHtlc => u }.isDefined - def localHasChanges(commitments: Commitments): Boolean = commitments.remoteChanges.acked.size > 0 || commitments.localChanges.proposed.size > 0 + def localHasChanges(commitments: Commitments): Boolean = commitments.remoteChanges.acked.nonEmpty || commitments.localChanges.proposed.nonEmpty - def remoteHasChanges(commitments: Commitments): Boolean = commitments.localChanges.acked.size > 0 || commitments.remoteChanges.proposed.size > 0 + def remoteHasChanges(commitments: Commitments): Boolean = commitments.localChanges.acked.nonEmpty || commitments.remoteChanges.proposed.nonEmpty def revocationPreimage(seed: ByteVector32, index: Long): ByteVector32 = ShaChain.shaChainFromSeed(seed, 0xFFFFFFFFFFFFFFFFL - index) @@ -428,7 +441,7 @@ object Commitments { val sortedHtlcTxs: Seq[TransactionWithInputInfo] = (htlcTimeoutTxs ++ htlcSuccessTxs).sortBy(_.input.outPoint.index) if (commit.htlcSignatures.size != sortedHtlcTxs.size) { - throw new HtlcSigCountMismatch(commitments.channelId, sortedHtlcTxs.size, commit.htlcSignatures.size) + throw HtlcSigCountMismatch(commitments.channelId, sortedHtlcTxs.size, commit.htlcSignatures.size) } val htlcSigs = sortedHtlcTxs.map(keyManager.sign(_, keyManager.htlcPoint(localParams.channelKeyPath), localPerCommitmentPoint)) val remoteHtlcPubkey = Generators.derivePubKey(remoteParams.htlcBasepoint, localPerCommitmentPoint) @@ -436,13 +449,13 @@ object Commitments { val htlcTxsAndSigs = (sortedHtlcTxs, htlcSigs, commit.htlcSignatures).zipped.toList.collect { case (htlcTx: HtlcTimeoutTx, localSig, remoteSig) => if (Transactions.checkSpendable(Transactions.addSigs(htlcTx, localSig, remoteSig)).isFailure) { - throw new InvalidHtlcSignature(commitments.channelId, htlcTx.tx) + throw InvalidHtlcSignature(commitments.channelId, htlcTx.tx) } HtlcTxAndSigs(htlcTx, localSig, remoteSig) case (htlcTx: HtlcSuccessTx, localSig, remoteSig) => // we can't check that htlc-success tx are spendable because we need the payment preimage; thus we only check the remote sig - if (Transactions.checkSig(htlcTx, remoteSig, remoteHtlcPubkey) == false) { - throw new InvalidHtlcSignature(commitments.channelId, htlcTx.tx) + if (!Transactions.checkSig(htlcTx, remoteSig, remoteHtlcPubkey)) { + throw InvalidHtlcSignature(commitments.channelId, htlcTx.tx) } HtlcTxAndSigs(htlcTx, localSig, remoteSig) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 79770b5833..7dc094c9e8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -23,11 +23,11 @@ import fr.acinq.bitcoin.{Block, ByteVector32, Script} import fr.acinq.eclair.NodeParams.BITCOIND import fr.acinq.eclair.crypto.LocalKeyManager import fr.acinq.eclair.db._ -import fr.acinq.eclair.db.sqlite._ import fr.acinq.eclair.io.Peer import fr.acinq.eclair.router.RouterConf import fr.acinq.eclair.wire.{Color, NodeAddress} import scodec.bits.ByteVector + import scala.concurrent.duration._ /** @@ -42,7 +42,6 @@ object TestConstants { def inMemoryDb(connection: Connection = sqliteInMemory()): Databases = Databases.databaseByConnections(connection, connection, connection) - object Alice { val seed = ByteVector32(ByteVector.fill(32)(1)) val keyManager = new LocalKeyManager(seed, Block.RegtestGenesisBlock.hash) @@ -60,6 +59,7 @@ object TestConstants { maxHtlcValueInFlightMsat = UInt64(150000000), maxAcceptedHtlcs = 100, expiryDeltaBlocks = 144, + fulfillSafetyBeforeTimeoutBlocks = 6, htlcMinimumMsat = 0, minDepthBlocks = 3, toRemoteDelayBlocks = 144, @@ -69,7 +69,7 @@ object TestConstants { feeProportionalMillionth = 10, reserveToFundingRatio = 0.01, // note: not used (overridden below) maxReserveToFundingRatio = 0.05, - db = inMemoryDb(sqliteInMemory), + db = inMemoryDb(sqliteInMemory()), revocationTimeout = 20 seconds, pingInterval = 30 seconds, pingTimeout = 10 seconds, @@ -126,6 +126,7 @@ object TestConstants { maxHtlcValueInFlightMsat = UInt64.MaxValue, // Bob has no limit on the combined max value of in-flight htlcs maxAcceptedHtlcs = 30, expiryDeltaBlocks = 144, + fulfillSafetyBeforeTimeoutBlocks = 6, htlcMinimumMsat = 1000, minDepthBlocks = 3, toRemoteDelayBlocks = 144, @@ -135,7 +136,7 @@ object TestConstants { feeProportionalMillionth = 10, reserveToFundingRatio = 0.01, // note: not used (overridden below) maxReserveToFundingRatio = 0.05, - db = inMemoryDb(sqliteInMemory), + db = inMemoryDb(sqliteInMemory()), revocationTimeout = 20 seconds, pingInterval = 30 seconds, pingTimeout = 10 seconds, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala index b37eb9418b..ca6a1dd373 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala @@ -27,13 +27,13 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.UInt64.Conversions._ import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.blockchain.fee.FeeratesPerKw -import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh, Reconnected, RevocationTimeout} -import fr.acinq.eclair.channel._ +import fr.acinq.eclair.channel.Channel._ import fr.acinq.eclair.channel.states.StateTestsHelperMethods +import fr.acinq.eclair.channel.{ChannelErrorOccured, _} import fr.acinq.eclair.io.Peer import fr.acinq.eclair.payment._ import fr.acinq.eclair.router.Announcements -import fr.acinq.eclair.transactions.Transactions.{htlcSuccessWeight, htlcTimeoutWeight, weight2fee} +import fr.acinq.eclair.transactions.Transactions.{HtlcSuccessTx, htlcSuccessWeight, htlcTimeoutWeight, weight2fee} import fr.acinq.eclair.transactions.{IN, OUT} import fr.acinq.eclair.wire.{AnnouncementSignatures, ChannelUpdate, ClosingSigned, CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc} import fr.acinq.eclair.{Globals, TestConstants, TestkitBaseClass, randomBytes32} @@ -50,6 +50,8 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { type FixtureParam = SetupFixture + implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging + override def withFixture(test: OneArgTest): Outcome = { val setup = init() import setup._ @@ -1677,10 +1679,120 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { alice2blockchain.expectMsgType[PublishAsap] // main delayed alice2blockchain.expectMsgType[PublishAsap] // htlc timeout alice2blockchain.expectMsgType[PublishAsap] // htlc delayed - val watch = alice2blockchain.expectMsgType[WatchConfirmed] + val watch = alice2blockchain.expectMsgType[WatchConfirmed] assert(watch.event === BITCOIN_TX_CONFIRMED(aliceCommitTx)) } + test("recv CurrentBlockCount (fulfilled signed htlc ignored by upstream peer)") { f => + import f._ + val sender = TestProbe() + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + + // actual test begins: + // * Bob receives the HTLC pre-image and wants to fulfill + // * Alice does not react to the fulfill (drops the message for some reason) + // * When the HTLC timeout on Alice side is near, Bob needs to close the channel to avoid an on-chain race + // condition between his HTLC-success and Alice's HTLC-timeout + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + + sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = true)) + sender.expectMsg("ok") + bob2alice.expectMsgType[UpdateFulfillHtlc] + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + alice2blockchain.expectNoMsg(500 millis) + } + + test("recv CurrentBlockCount (fulfilled proposed htlc ignored by upstream peer)") { f => + import f._ + val sender = TestProbe() + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + + // actual test begins: + // * Bob receives the HTLC pre-image and wants to fulfill but doesn't sign + // * Alice does not react to the fulfill (drops the message for some reason) + // * When the HTLC timeout on Alice side is near, Bob needs to close the channel to avoid an on-chain race + // condition between his HTLC-success and Alice's HTLC-timeout + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + + sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = false)) + sender.expectMsg("ok") + bob2alice.expectMsgType[UpdateFulfillHtlc] + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + alice2blockchain.expectNoMsg(500 millis) + } + + test("recv CurrentBlockCount (fulfilled proposed htlc acked but not committed by upstream peer)") { f => + import f._ + val sender = TestProbe() + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + + // actual test begins: + // * Bob receives the HTLC pre-image and wants to fulfill + // * Alice acks but doesn't commit + // * When the HTLC timeout on Alice side is near, Bob needs to close the channel to avoid an on-chain race + // condition between his HTLC-success and Alice's HTLC-timeout + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + + sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = true)) + sender.expectMsg("ok") + bob2alice.expectMsgType[UpdateFulfillHtlc] + bob2alice.forward(alice) + bob2alice.expectMsgType[CommitSig] + bob2alice.forward(alice) + alice2bob.expectMsgType[RevokeAndAck] + alice2bob.forward(bob) + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + alice2blockchain.expectNoMsg(500 millis) + } + test("recv CurrentFeerate (when funder, triggers an UpdateFee)") { f => import f._ val sender = TestProbe() diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala index e3a2c2dfcc..d51b69525c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala @@ -16,16 +16,20 @@ package fr.acinq.eclair.channel.states.e -import akka.actor.Status import java.util.UUID -import akka.testkit.TestProbe -import fr.acinq.bitcoin.Crypto.{PrivateKey} +import akka.actor.Status +import akka.testkit.{TestActorRef, TestProbe} +import fr.acinq.bitcoin.Crypto.PrivateKey import fr.acinq.bitcoin.{ByteVector32, ScriptFlags, Transaction} -import fr.acinq.eclair.blockchain.{PublishAsap, WatchEventSpent} +import fr.acinq.eclair.blockchain.{CurrentBlockCount, PublishAsap, WatchConfirmed, WatchEventSpent} +import fr.acinq.eclair.channel.Channel.LocalError import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.states.StateTestsHelperMethods +import fr.acinq.eclair.payment.CommandBuffer +import fr.acinq.eclair.payment.CommandBuffer.CommandSend import fr.acinq.eclair.router.Announcements +import fr.acinq.eclair.transactions.Transactions.HtlcSuccessTx import fr.acinq.eclair.wire._ import fr.acinq.eclair.{TestConstants, TestkitBaseClass, randomBytes32} import org.scalatest.Outcome @@ -52,6 +56,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { } def aliceInit = Init(TestConstants.Alice.nodeParams.globalFeatures, TestConstants.Alice.nodeParams.localFeatures) + def bobInit = Init(TestConstants.Bob.nodeParams.globalFeatures, TestConstants.Bob.nodeParams.localFeatures) /** @@ -180,7 +185,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { // b will re-send the lost revocation val ba_rev_0_re = bob2alice.expectMsg(ba_rev_0) // rev ->a - bob2alice.forward(alice, ba_rev_0) + bob2alice.forward(alice, ba_rev_0_re) // and b will attempt a new signature bob2alice.expectMsg(ba_sig_0) @@ -256,7 +261,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { // we start by storing the current state val oldStateData = alice.stateData // then we add an htlc and sign it - val (ra1, htlca1) = addHtlc(250000000, alice, bob, alice2bob, bob2alice) + addHtlc(250000000, alice, bob, alice2bob, bob2alice) sender.send(alice, CMD_SIGN) sender.expectMsg("ok") alice2bob.expectMsgType[CommitSig] @@ -362,7 +367,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](20 seconds).channelUpdate assert(channelUpdate.feeBaseMsat === 4200) assert(channelUpdate.feeProportionalMillionths === 123456) - assert(Announcements.isEnabled(channelUpdate.channelFlags) == true) + assert(Announcements.isEnabled(channelUpdate.channelFlags)) // no more messages channelUpdateListener.expectNoMsg(300 millis) @@ -388,7 +393,70 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { // alice will broadcast a new disabled channel_update val update = channelUpdateListener.expectMsgType[LocalChannelUpdate] - assert(Announcements.isEnabled(update.channelUpdate.channelFlags) == false) + assert(!Announcements.isEnabled(update.channelUpdate.channelFlags)) + } + + test("pending non-relayed fulfill htlcs will timeout upstream") { f => + import f._ + val sender = TestProbe() + val register = TestProbe() + val commandBuffer = TestActorRef(new CommandBuffer(bob.underlyingActor.nodeParams, register.ref)) + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // We simulate a pending fulfill on that HTLC but not relayed. + // When it is close to expiring upstream, we should close the channel. + sender.send(commandBuffer, CommandSend(htlc.channelId, htlc.id, CMD_FULFILL_HTLC(htlc.id, r, commit = true))) + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + bob2blockchain.expectMsgType[WatchConfirmed] // main delayed + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + alice2blockchain.expectNoMsg(500 millis) + } + + test("pending non-relayed fail htlcs will timeout upstream") { f => + import f._ + val sender = TestProbe() + val register = TestProbe() + val commandBuffer = TestActorRef(new CommandBuffer(bob.underlyingActor.nodeParams, register.ref)) + val (_, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // We simulate a pending failure on that HTLC. + // Even if we get close to expiring upstream we shouldn't close the channel, because we have nothing to lose. + sender.send(commandBuffer, CommandSend(htlc.channelId, htlc.id, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0))))) + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + bob2blockchain.expectNoMsg(250 millis) + alice2blockchain.expectNoMsg(250 millis) } }