From 6236a3ae84c8147d3a49b69e8d331baddde17f6b Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Wed, 17 Jul 2019 10:48:56 +0200 Subject: [PATCH 1/8] Detect upstream HTLC timeout and close channel --- eclair-core/src/main/resources/reference.conf | 5 +++ .../scala/fr/acinq/eclair/NodeParams.scala | 4 +- .../fr/acinq/eclair/channel/Channel.scala | 28 ++++++++---- .../eclair/channel/ChannelExceptions.scala | 4 +- .../fr/acinq/eclair/channel/Commitments.scala | 43 ++++++++++++------- .../scala/fr/acinq/eclair/TestConstants.scala | 9 ++-- .../channel/states/e/NormalStateSpec.scala | 35 ++++++++++++++- 7 files changed, 97 insertions(+), 31 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index b2de25ee7c..4d6a2ac691 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -68,6 +68,11 @@ eclair { max-to-local-delay-blocks = 2016 // maximum number of blocks that we are ready to accept for our own delayed outputs (2016 ~ 2 weeks) mindepth-blocks = 3 expiry-delta-blocks = 144 + // When we receive the pre-image for an HTLC and want to fulfill it but the upstream peer stops responding, we want to + // avoid letting its HTLC-timeout transaction become enforceable on-chain (otherwise there is a race condition between + // our HTLC-success and their HTLC-timeout). + // We will close the channel when the HTLC-timeout will happen in less than this number. + upstream-timeout-safety-blocks = 6 fee-base-msat = 1000 fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 9266d701e7..a8ec636e06 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -50,6 +50,7 @@ case class NodeParams(keyManager: KeyManager, maxHtlcValueInFlightMsat: UInt64, maxAcceptedHtlcs: Int, expiryDeltaBlocks: Int, + upstreamTimeoutSafetyBlocks: Int, htlcMinimumMsat: Int, toRemoteDelayBlocks: Int, maxToLocalDelayBlocks: Int, @@ -156,7 +157,7 @@ object NodeParams { val p = PublicKey(ByteVector.fromValidHex(e.getString("nodeid"))) val gf = ByteVector.fromValidHex(e.getString("global-features")) val lf = ByteVector.fromValidHex(e.getString("local-features")) - (p -> (gf, lf)) + p -> (gf, lf) }.toMap val socksProxy_opt = if (config.getBoolean("socks5.enabled")) { @@ -188,6 +189,7 @@ object NodeParams { maxHtlcValueInFlightMsat = UInt64(config.getLong("max-htlc-value-in-flight-msat")), maxAcceptedHtlcs = maxAcceptedHtlcs, expiryDeltaBlocks = config.getInt("expiry-delta-blocks"), + upstreamTimeoutSafetyBlocks = config.getInt("upstream-timeout-safety-blocks"), htlcMinimumMsat = config.getInt("htlc-minimum-msat"), toRemoteDelayBlocks = config.getInt("to-remote-delay-blocks"), maxToLocalDelayBlocks = config.getInt("max-to-local-delay-blocks"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 3199abcbe4..ac728c21fb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -849,8 +849,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } } - case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => + handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) + + case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) case Event(c@CurrentFeerates(feeratesPerKw), d: DATA_NORMAL) => val networkFeeratePerKw = feeratesPerKw.blocks_2 @@ -1132,8 +1135,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(r: RevocationTimeout, d: DATA_SHUTDOWN) => handleRevocationTimeout(r, d) - case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => + handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) + + case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) case Event(c@CurrentFeerates(feerates), d: DATA_SHUTDOWN) => val networkFeeratePerKw = feerates.blocks_2 @@ -1422,11 +1428,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(SYNCING) using d1 sending channelReestablish - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => + case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => // note: this can only happen if state is NORMAL or SHUTDOWN // -> in NEGOTIATING there are no more htlcs // -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) + + case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d) @@ -1560,8 +1569,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(NEGOTIATING) using d.copy(closingTxProposed = closingTxProposed1) sending d.localShutdown } - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedoutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c)) + case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => + handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) + + case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSince, d.fundingTx) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala index f5b45e77c6..3546f22af6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.channel import fr.acinq.bitcoin.Crypto.PrivateKey import fr.acinq.bitcoin.{ByteVector32, Transaction} -import fr.acinq.eclair.{ShortChannelId, UInt64} +import fr.acinq.eclair.UInt64 import fr.acinq.eclair.payment.Origin import fr.acinq.eclair.wire.{ChannelUpdate, UpdateAddHtlc} @@ -27,6 +27,7 @@ import fr.acinq.eclair.wire.{ChannelUpdate, UpdateAddHtlc} */ class ChannelException(val channelId: ByteVector32, message: String) extends RuntimeException(message) + // @formatter:off case class DebugTriggeredException (override val channelId: ByteVector32) extends ChannelException(channelId, "debug-mode triggered failure") case class InvalidChainHash (override val channelId: ByteVector32, local: ByteVector32, remote: ByteVector32) extends ChannelException(channelId, s"invalid chainHash (local=$local remote=$remote)") @@ -49,6 +50,7 @@ case class InvalidFinalScript (override val channelId: ByteVect case class FundingTxTimedout (override val channelId: ByteVector32) extends ChannelException(channelId, "funding tx timed out") case class FundingTxSpent (override val channelId: ByteVector32, spendingTx: Transaction) extends ChannelException(channelId, s"funding tx has been spent by txid=${spendingTx.txid}") case class HtlcTimedout (override val channelId: ByteVector32, htlcs: Set[UpdateAddHtlc]) extends ChannelException(channelId, s"one or more htlcs timed out: ids=${htlcs.take(10).map(_.id).mkString}") // we only display the first 10 ids +case class HtlcWillTimeoutUpstream (override val channelId: ByteVector32, htlcs: Set[UpdateAddHtlc]) extends ChannelException(channelId, s"one or more htlcs that should be fulfilled are close to timing out upstream: ids=${htlcs.take(10).map(_.id).mkString}") // we only display the first 10 ids case class HtlcOverridenByLocalCommit (override val channelId: ByteVector32) extends ChannelException(channelId, "htlc was overriden by local commit") case class FeerateTooSmall (override val channelId: ByteVector32, remoteFeeratePerKw: Long) extends ChannelException(channelId, s"remote fee rate is too small: remoteFeeratePerKw=$remoteFeeratePerKw") case class FeerateTooDifferent (override val channelId: ByteVector32, localFeeratePerKw: Long, remoteFeeratePerKw: Long) extends ChannelException(channelId, s"local/remote feerates are too different: remoteFeeratePerKw=$remoteFeeratePerKw localFeeratePerKw=$localFeeratePerKw") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 579f8bac9f..964ffe9c7a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -62,11 +62,23 @@ case class Commitments(channelVersion: ChannelVersion, def hasNoPendingHtlcs: Boolean = localCommit.spec.htlcs.isEmpty && remoteCommit.spec.htlcs.isEmpty && remoteNextCommitInfo.isRight - def timedoutOutgoingHtlcs(blockheight: Long): Set[UpdateAddHtlc] = + def timedOutOutgoingHtlcs(blockheight: Long): Set[UpdateAddHtlc] = (localCommit.spec.htlcs.filter(htlc => htlc.direction == OUT && blockheight >= htlc.add.cltvExpiry) ++ remoteCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry) ++ remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry)).getOrElse(Set.empty[DirectedHtlc])).map(_.add) + /** + * Once we have the pre-image for incoming htlcs, we are able to spend the HTLC success transaction. + * However, if the upstream peer doesn't update its commitment to remove that HTLC and waits for the HTLC timeout, + * there will be an on-chain race condition between their HTLC timeout and our HTLC success (both will be enforceable). + * If we get too close to the timeout, we must close the channel to enforce our HTLC success transactions safely. + */ + def almostTimedOutIncomingHtlcs(blockheight: Long, timeoutSafety: Int): Set[UpdateAddHtlc] = + localCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry - timeoutSafety && localChanges.signed.exists({ + case f: UpdateFulfillHtlc => f.id == htlc.add.id + case _ => false + })).map(_.add) + def addLocalProposal(proposal: UpdateMessage): Commitments = Commitments.addLocalProposal(this, proposal) def addRemoteProposal(proposal: UpdateMessage): Commitments = Commitments.addRemoteProposal(this, proposal) @@ -87,12 +99,13 @@ case class Commitments(channelVersion: ChannelVersion, } object Commitments { + /** - * add a change to our proposed change list + * Add a change to our proposed change list. * - * @param commitments - * @param proposal - * @return an updated commitment instance + * @param commitments current commitments. + * @param proposal proposed change to add. + * @return an updated commitment instance. */ private def addLocalProposal(commitments: Commitments, proposal: UpdateMessage): Commitments = commitments.copy(localChanges = commitments.localChanges.copy(proposed = commitments.localChanges.proposed :+ proposal)) @@ -212,14 +225,14 @@ object Commitments { val fulfill = UpdateFulfillHtlc(commitments.channelId, cmd.id, cmd.r) val commitments1 = addLocalProposal(commitments, fulfill) (commitments1, fulfill) - case Some(htlc) => throw InvalidHtlcPreimage(commitments.channelId, cmd.id) + case Some(_) => throw InvalidHtlcPreimage(commitments.channelId, cmd.id) case None => throw UnknownHtlcId(commitments.channelId, cmd.id) } def receiveFulfill(commitments: Commitments, fulfill: UpdateFulfillHtlc): Either[Commitments, (Commitments, Origin, UpdateAddHtlc)] = getHtlcCrossSigned(commitments, OUT, fulfill.id) match { case Some(htlc) if htlc.paymentHash == sha256(fulfill.paymentPreimage) => Right((addRemoteProposal(commitments, fulfill), commitments.originChannels(fulfill.id), htlc)) - case Some(htlc) => throw InvalidHtlcPreimage(commitments.channelId, fulfill.id) + case Some(_) => throw InvalidHtlcPreimage(commitments.channelId, fulfill.id) case None => throw UnknownHtlcId(commitments.channelId, fulfill.id) } @@ -244,7 +257,7 @@ object Commitments { val fail = UpdateFailHtlc(commitments.channelId, cmd.id, reason) val commitments1 = addLocalProposal(commitments, fail) (commitments1, fail) - case Failure(_) => throw new CannotExtractSharedSecret(commitments.channelId, htlc) + case Failure(_) => throw CannotExtractSharedSecret(commitments.channelId, htlc) } case None => throw UnknownHtlcId(commitments.channelId, cmd.id) } @@ -263,7 +276,7 @@ object Commitments { } => // we have already sent a fail/fulfill for this htlc throw UnknownHtlcId(commitments.channelId, cmd.id) - case Some(htlc) => + case Some(_) => val fail = UpdateFailMalformedHtlc(commitments.channelId, cmd.id, cmd.onionHash, cmd.failureCode) val commitments1 = addLocalProposal(commitments, fail) (commitments1, fail) @@ -348,9 +361,9 @@ object Commitments { def remoteHasUnsignedOutgoingHtlcs(commitments: Commitments): Boolean = commitments.remoteChanges.proposed.collectFirst { case u: UpdateAddHtlc => u }.isDefined - def localHasChanges(commitments: Commitments): Boolean = commitments.remoteChanges.acked.size > 0 || commitments.localChanges.proposed.size > 0 + def localHasChanges(commitments: Commitments): Boolean = commitments.remoteChanges.acked.nonEmpty || commitments.localChanges.proposed.nonEmpty - def remoteHasChanges(commitments: Commitments): Boolean = commitments.localChanges.acked.size > 0 || commitments.remoteChanges.proposed.size > 0 + def remoteHasChanges(commitments: Commitments): Boolean = commitments.localChanges.acked.nonEmpty || commitments.remoteChanges.proposed.nonEmpty def revocationPreimage(seed: ByteVector32, index: Long): ByteVector32 = ShaChain.shaChainFromSeed(seed, 0xFFFFFFFFFFFFFFFFL - index) @@ -428,7 +441,7 @@ object Commitments { val sortedHtlcTxs: Seq[TransactionWithInputInfo] = (htlcTimeoutTxs ++ htlcSuccessTxs).sortBy(_.input.outPoint.index) if (commit.htlcSignatures.size != sortedHtlcTxs.size) { - throw new HtlcSigCountMismatch(commitments.channelId, sortedHtlcTxs.size, commit.htlcSignatures.size) + throw HtlcSigCountMismatch(commitments.channelId, sortedHtlcTxs.size, commit.htlcSignatures.size) } val htlcSigs = sortedHtlcTxs.map(keyManager.sign(_, keyManager.htlcPoint(localParams.channelKeyPath), localPerCommitmentPoint)) val remoteHtlcPubkey = Generators.derivePubKey(remoteParams.htlcBasepoint, localPerCommitmentPoint) @@ -436,13 +449,13 @@ object Commitments { val htlcTxsAndSigs = (sortedHtlcTxs, htlcSigs, commit.htlcSignatures).zipped.toList.collect { case (htlcTx: HtlcTimeoutTx, localSig, remoteSig) => if (Transactions.checkSpendable(Transactions.addSigs(htlcTx, localSig, remoteSig)).isFailure) { - throw new InvalidHtlcSignature(commitments.channelId, htlcTx.tx) + throw InvalidHtlcSignature(commitments.channelId, htlcTx.tx) } HtlcTxAndSigs(htlcTx, localSig, remoteSig) case (htlcTx: HtlcSuccessTx, localSig, remoteSig) => // we can't check that htlc-success tx are spendable because we need the payment preimage; thus we only check the remote sig - if (Transactions.checkSig(htlcTx, remoteSig, remoteHtlcPubkey) == false) { - throw new InvalidHtlcSignature(commitments.channelId, htlcTx.tx) + if (!Transactions.checkSig(htlcTx, remoteSig, remoteHtlcPubkey)) { + throw InvalidHtlcSignature(commitments.channelId, htlcTx.tx) } HtlcTxAndSigs(htlcTx, localSig, remoteSig) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 79770b5833..04907cc040 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -23,11 +23,11 @@ import fr.acinq.bitcoin.{Block, ByteVector32, Script} import fr.acinq.eclair.NodeParams.BITCOIND import fr.acinq.eclair.crypto.LocalKeyManager import fr.acinq.eclair.db._ -import fr.acinq.eclair.db.sqlite._ import fr.acinq.eclair.io.Peer import fr.acinq.eclair.router.RouterConf import fr.acinq.eclair.wire.{Color, NodeAddress} import scodec.bits.ByteVector + import scala.concurrent.duration._ /** @@ -42,7 +42,6 @@ object TestConstants { def inMemoryDb(connection: Connection = sqliteInMemory()): Databases = Databases.databaseByConnections(connection, connection, connection) - object Alice { val seed = ByteVector32(ByteVector.fill(32)(1)) val keyManager = new LocalKeyManager(seed, Block.RegtestGenesisBlock.hash) @@ -60,6 +59,7 @@ object TestConstants { maxHtlcValueInFlightMsat = UInt64(150000000), maxAcceptedHtlcs = 100, expiryDeltaBlocks = 144, + upstreamTimeoutSafetyBlocks = 6, htlcMinimumMsat = 0, minDepthBlocks = 3, toRemoteDelayBlocks = 144, @@ -69,7 +69,7 @@ object TestConstants { feeProportionalMillionth = 10, reserveToFundingRatio = 0.01, // note: not used (overridden below) maxReserveToFundingRatio = 0.05, - db = inMemoryDb(sqliteInMemory), + db = inMemoryDb(sqliteInMemory()), revocationTimeout = 20 seconds, pingInterval = 30 seconds, pingTimeout = 10 seconds, @@ -126,6 +126,7 @@ object TestConstants { maxHtlcValueInFlightMsat = UInt64.MaxValue, // Bob has no limit on the combined max value of in-flight htlcs maxAcceptedHtlcs = 30, expiryDeltaBlocks = 144, + upstreamTimeoutSafetyBlocks = 6, htlcMinimumMsat = 1000, minDepthBlocks = 3, toRemoteDelayBlocks = 144, @@ -135,7 +136,7 @@ object TestConstants { feeProportionalMillionth = 10, reserveToFundingRatio = 0.01, // note: not used (overridden below) maxReserveToFundingRatio = 0.05, - db = inMemoryDb(sqliteInMemory), + db = inMemoryDb(sqliteInMemory()), revocationTimeout = 20 seconds, pingInterval = 30 seconds, pingTimeout = 10 seconds, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala index b37eb9418b..48453e8568 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala @@ -33,7 +33,7 @@ import fr.acinq.eclair.channel.states.StateTestsHelperMethods import fr.acinq.eclair.io.Peer import fr.acinq.eclair.payment._ import fr.acinq.eclair.router.Announcements -import fr.acinq.eclair.transactions.Transactions.{htlcSuccessWeight, htlcTimeoutWeight, weight2fee} +import fr.acinq.eclair.transactions.Transactions.{HtlcSuccessTx, htlcSuccessWeight, htlcTimeoutWeight, weight2fee} import fr.acinq.eclair.transactions.{IN, OUT} import fr.acinq.eclair.wire.{AnnouncementSignatures, ChannelUpdate, ClosingSigned, CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc} import fr.acinq.eclair.{Globals, TestConstants, TestkitBaseClass, randomBytes32} @@ -50,6 +50,8 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { type FixtureParam = SetupFixture + implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging + override def withFixture(test: OneArgTest): Outcome = { val setup = init() import setup._ @@ -1677,10 +1679,39 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { alice2blockchain.expectMsgType[PublishAsap] // main delayed alice2blockchain.expectMsgType[PublishAsap] // htlc timeout alice2blockchain.expectMsgType[PublishAsap] // htlc delayed - val watch = alice2blockchain.expectMsgType[WatchConfirmed] + val watch = alice2blockchain.expectMsgType[WatchConfirmed] assert(watch.event === BITCOIN_TX_CONFIRMED(aliceCommitTx)) } + test("recv CurrentBlockCount (an htlc that should be fulfilled will timeout upstream") { f => + import f._ + val sender = TestProbe() + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + // actual test begins: + // * Bob receives the HTLC pre-image and wants to fulfill + // * Alice does not send signed commitment with the fulfilled HTLC + // * When the HTLC timeout on Alice side is near, Bob needs to close the channel to avoid an on-chain race + // condition between his HTLC-success and Alice's HTLC-timeout + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + val (bobFulfilled, _) = Commitments.sendFulfill(initialState.commitments, CMD_FULFILL_HTLC(htlc.id, r, commit = true)) + val (bobSigned, _) = Commitments.sendCommit(bobFulfilled, TestConstants.Bob.keyManager) + bob.setState(stateData = initialState.copy(commitments = bobSigned)) + + sender.send(bob, CurrentBlockCount(400144 - Bob.nodeParams.upstreamTimeoutSafetyBlocks)) + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] + assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + val watch = bob2blockchain.expectMsgType[WatchConfirmed] + assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + alice2blockchain.expectNoMsg(500 millis) + } + test("recv CurrentFeerate (when funder, triggers an UpdateFee)") { f => import f._ val sender = TestProbe() From bc2f2f0c70b96583c61a0a1c5fa3d43234f07103 Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Wed, 17 Jul 2019 10:57:39 +0200 Subject: [PATCH 2/8] Clean-up: fix IntelliJ warnings --- .../fr/acinq/eclair/channel/Channel.scala | 90 ++++++++----------- 1 file changed, 39 insertions(+), 51 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index ac728c21fb..09d0fc1fec 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel import akka.actor.{ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy} import akka.event.Logging.MDC import akka.pattern.pipe -import fr.acinq.bitcoin.Crypto.{PublicKey, PrivateKey, sha256} +import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey, sha256} import fr.acinq.bitcoin._ import fr.acinq.eclair._ import fr.acinq.eclair.blockchain._ @@ -98,7 +98,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId import nodeParams.keyManager // we pass these to helpers classes so that they have the logging context - implicit def implicitLog = log + implicit def implicitLog: akka.event.LoggingAdapter = log val forwarder = context.actorOf(Props(new Forwarder(nodeParams)), "forwarder") @@ -146,7 +146,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId when(WAIT_FOR_INIT_INTERNAL)(handleExceptions { case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, _, localParams, remote, _, channelFlags), Nothing) => - context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, true, temporaryChannelId)) + context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, isFunder = true, temporaryChannelId)) forwarder ! remote val open = OpenChannel(nodeParams.chainHash, temporaryChannelId = temporaryChannelId, @@ -270,7 +270,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId Try(Helpers.validateParamsFundee(nodeParams, open)) match { case Failure(t) => handleLocalError(t, d, Some(open)) case Success(_) => - context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, false, open.temporaryChannelId)) + context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, isFunder = false, open.temporaryChannelId)) // TODO: maybe also check uniqueness of temporary channel id val minimumDepth = nodeParams.minDepthBlocks val accept = AcceptChannel(temporaryChannelId = open.temporaryChannelId, @@ -762,7 +762,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // we were waiting for our pending htlcs to be signed before replying with our local shutdown val localShutdown = Shutdown(d.channelId, commitments1.localParams.defaultFinalScriptPubKey) // note: it means that we had pending htlcs to sign, therefore we go to SHUTDOWN, not to NEGOTIATING - require(commitments1.remoteCommit.spec.htlcs.size > 0, "we must have just signed new htlcs, otherwise we would have sent our Shutdown earlier") + require(commitments1.remoteCommit.spec.htlcs.nonEmpty, "we must have just signed new htlcs, otherwise we would have sent our Shutdown earlier") goto(SHUTDOWN) using store(DATA_SHUTDOWN(commitments1, localShutdown, d.remoteShutdown.get)) sending localShutdown } else { stay using store(d.copy(commitments = commitments1)) @@ -937,7 +937,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d) - case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d) + case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid) => handleRemoteSpentNext(tx, d) case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) => handleRemoteSpentOther(tx, d) @@ -1154,7 +1154,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d) - case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d) + case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) if d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid) => handleRemoteSpentNext(tx, d) case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_SHUTDOWN) => handleRemoteSpentOther(tx, d) @@ -1168,7 +1168,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(c@ClosingSigned(_, remoteClosingFee, remoteSig), d: DATA_NEGOTIATING) => log.info(s"received closingFeeSatoshis=$remoteClosingFee") Closing.checkClosingSignature(keyManager, d.commitments, d.localShutdown.scriptPubKey, d.remoteShutdown.scriptPubKey, Satoshi(remoteClosingFee), remoteSig) match { - case Success(signedClosingTx) if Some(remoteClosingFee) == d.closingTxProposed.last.lastOption.map(_.localClosingSigned.feeSatoshis) || d.closingTxProposed.flatten.size >= MAX_NEGOTIATION_ITERATIONS => + case Success(signedClosingTx) if d.closingTxProposed.last.lastOption.map(_.localClosingSigned.feeSatoshis).contains(remoteClosingFee) || d.closingTxProposed.flatten.size >= MAX_NEGOTIATION_ITERATIONS => // we close when we converge or when there were too many iterations handleMutualClose(signedClosingTx, Left(d.copy(bestUnpublishedClosingTx_opt = Some(signedClosingTx)))) case Success(signedClosingTx) => @@ -1178,7 +1178,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId localClosingFee = lastLocalClosingFee.getOrElse(Closing.firstClosingFee(d.commitments, d.localShutdown.scriptPubKey, d.remoteShutdown.scriptPubKey)), remoteClosingFee = Satoshi(remoteClosingFee)) val (closingTx, closingSigned) = Closing.makeClosingTx(keyManager, d.commitments, d.localShutdown.scriptPubKey, d.remoteShutdown.scriptPubKey, nextClosingFee) - if (Some(nextClosingFee) == lastLocalClosingFee) { + if (lastLocalClosingFee.contains(nextClosingFee)) { // next computed fee is the same than the one we previously sent (probably because of rounding), let's close now handleMutualClose(signedClosingTx, Left(d.copy(bestUnpublishedClosingTx_opt = Some(signedClosingTx)))) } else if (nextClosingFee == Satoshi(remoteClosingFee)) { @@ -1203,7 +1203,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d) - case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d) + case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) if d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid) => handleRemoteSpentNext(tx, d) case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NEGOTIATING) => handleRemoteSpentOther(tx, d) @@ -1219,19 +1219,19 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Success((commitments1, _)) => log.info(s"got valid payment preimage, recalculating transactions to redeem the corresponding htlc on-chain") val localCommitPublished1 = d.localCommitPublished.map { - case localCommitPublished => + localCommitPublished => val localCommitPublished1 = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, commitments1, localCommitPublished.commitTx) doPublish(localCommitPublished1) localCommitPublished1 } val remoteCommitPublished1 = d.remoteCommitPublished.map { - case remoteCommitPublished => + remoteCommitPublished => val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx) doPublish(remoteCommitPublished1) remoteCommitPublished1 } val nextRemoteCommitPublished1 = d.nextRemoteCommitPublished.map { - case remoteCommitPublished => + remoteCommitPublished => val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx) doPublish(remoteCommitPublished1) remoteCommitPublished1 @@ -1253,22 +1253,22 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } else if (d.mutualCloseProposed.map(_.txid).contains(tx.txid)) { // at any time they can publish a closing tx with any sig we sent them handleMutualClose(tx, Right(d)) - } else if (Some(tx.txid) == d.localCommitPublished.map(_.commitTx.txid)) { + } else if (d.localCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay - } else if (Some(tx.txid) == d.remoteCommitPublished.map(_.commitTx.txid)) { + } else if (d.remoteCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay - } else if (Some(tx.txid) == d.nextRemoteCommitPublished.map(_.commitTx.txid)) { + } else if (d.nextRemoteCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay - } else if (Some(tx.txid) == d.futureRemoteCommitPublished.map(_.commitTx.txid)) { + } else if (d.futureRemoteCommitPublished.map(_.commitTx.txid).contains(tx.txid)) { // this is because WatchSpent watches never expire and we are notified multiple times stay } else if (tx.txid == d.commitments.remoteCommit.txid) { // counterparty may attempt to spend its last commit tx at any time handleRemoteSpentCurrent(tx, d) - } else if (Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid)) { + } else if (d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid).contains(tx.txid)) { // counterparty may attempt to spend its last commit tx at any time handleRemoteSpentNext(tx, d) } else { @@ -1313,7 +1313,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val futureRemoteCommitPublished1 = d.futureRemoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx)) val revokedCommitPublished1 = d.revokedCommitPublished.map(Closing.updateRevokedCommitPublished(_, tx)) // if the local commitment tx just got confirmed, let's send an event telling when we will get the main output refund - if (localCommitPublished1.map(_.commitTx.txid) == Some(tx.txid)) { + if (localCommitPublished1.map(_.commitTx.txid).contains(tx.txid)) { context.system.eventStream.publish(LocalCommitConfirmed(self, remoteNodeId, d.channelId, blockHeight + d.commitments.remoteParams.toSelfDelay)) } // we may need to fail some htlcs in case a commitment tx was published and they have reached the timeout threshold @@ -1351,7 +1351,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // we update the channel data val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1) // and we also send events related to fee - Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) } + Closing.networkFeePaid(tx, d1) foreach { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) } // then let's see if any of the possible close scenarii can be considered done val closingType_opt = Closing.isClosed(d1, Some(tx)) // finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state) @@ -1385,7 +1385,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case d: HasCommitments => log.info(s"deleting database record for channelId=${d.channelId}") nodeParams.db.channels.removeChannel(d.channelId) - case _ => {} + case _ => } log.info("shutting down") stop(FSM.Normal) @@ -1484,7 +1484,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // if next_remote_revocation_number is greater than our local commitment index, it means that either we are using an outdated commitment, or they are lying // but first we need to make sure that the last per_commitment_secret that they claim to have received from us is correct for that next_remote_revocation_number minus 1 if (keyManager.commitmentSecret(d.commitments.localParams.channelKeyPath, nextRemoteRevocationNumber - 1) == yourLastPerCommitmentSecret) { - log.warning(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourCommitmentNumber=${d.commitments.localCommit.index} theirCommitmentNumber=${nextRemoteRevocationNumber}") + log.warning(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourCommitmentNumber=${d.commitments.localCommit.index} theirCommitmentNumber=$nextRemoteRevocationNumber") // their data checks out, we indeed seem to be using an old revoked commitment, and must absolutely *NOT* publish it, because that would be a cheating attempt and they // would punish us by taking all the funds in the channel val exc = PleasePublishYourCommitment(d.channelId) @@ -1496,7 +1496,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } case ChannelReestablish(_, nextLocalCommitmentNumber, _, _, _) if !Helpers.checkRemoteCommit(d, nextLocalCommitmentNumber) => // if next_local_commit_number is more than one more our remote commitment index, it means that either we are using an outdated commitment, or they are lying - log.warning(s"counterparty says that they have a more recent commitment than the one we know of!!! ourCommitmentNumber=${d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.index).getOrElse(d.commitments.remoteCommit.index)} theirCommitmentNumber=${nextLocalCommitmentNumber}") + log.warning(s"counterparty says that they have a more recent commitment than the one we know of!!! ourCommitmentNumber=${d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.index).getOrElse(d.commitments.remoteCommit.index)} theirCommitmentNumber=$nextLocalCommitmentNumber") // there is no way to make sure that they are saying the truth, the best thing to do is ask them to publish their commitment right now // maybe they will publish their commitment, in that case we need to remember their commitment point in order to be able to claim our outputs // not that if they don't comply, we could publish our own commitment (it is not stale, otherwise we would be in the case above) @@ -1735,13 +1735,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This function is used to return feedback to user at channel opening */ - def replyToUser(message: Either[Channel.ChannelError, String]) = { + def replyToUser(message: Either[Channel.ChannelError, String]): Unit = { val m = message match { case Left(LocalError(t)) => Status.Failure(t) case Left(RemoteError(e)) => Status.Failure(new RuntimeException(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}")) case Right(s) => s } - origin_opt.map(_ ! m) + origin_opt.foreach(_ ! m) } def handleCommandSuccess(sender: ActorRef, newData: Data) = { @@ -1761,10 +1761,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * When we are funder, we use this function to detect when our funding tx has been double-spent (by another transaction * that we made for some reason). If the funding tx has been double spent we can forget about the channel. - * - * @param fundingTx */ - def checkDoubleSpent(fundingTx: Transaction) = { + def checkDoubleSpent(fundingTx: Transaction): Unit = { log.debug(s"checking status of funding tx txid=${fundingTx.txid}") wallet.doubleSpent(fundingTx).onComplete { case Success(true) => @@ -1899,7 +1897,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(CLOSING) using store(nextData) } - def doPublish(closingTx: Transaction) = { + def doPublish(closingTx: Transaction): Unit = { blockchain ! PublishAsap(closingTx) blockchain ! WatchConfirmed(self, closingTx, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(closingTx)) } @@ -1934,11 +1932,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This helper method will publish txes only if they haven't yet reached minDepth - * - * @param txes - * @param irrevocablySpent */ - def publishIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) = { + def publishIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = { val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) process.foreach { tx => log.info(s"publishing txid=${tx.txid}") @@ -1949,11 +1944,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This helper method will watch txes only if they haven't yet reached minDepth - * - * @param txes - * @param irrevocablySpent */ - def watchConfirmedIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) = { + def watchConfirmedIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = { val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) process.foreach(tx => blockchain ! WatchConfirmed(self, tx, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(tx))) skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed")) @@ -1961,18 +1953,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId /** * This helper method will watch txes only if the utxo they spend hasn't already been irrevocably spent - * - * @param parentTx - * @param txes - * @param irrevocablySpent */ - def watchSpentIfNeeded(parentTx: Transaction, txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) = { + def watchSpentIfNeeded(parentTx: Transaction, txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = { val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT)) skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed")) } - def doPublish(localCommitPublished: LocalCommitPublished) = { + def doPublish(localCommitPublished: LocalCommitPublished): Unit = { import localCommitPublished._ val publishQueue = List(commitTx) ++ claimMainDelayedOutputTx ++ htlcSuccessTxs ++ htlcTimeoutTxs ++ claimHtlcDelayedTxs @@ -2036,7 +2024,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(CLOSING) using store(nextData) } - def doPublish(remoteCommitPublished: RemoteCommitPublished) = { + def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = { import remoteCommitPublished._ val publishQueue = claimMainOutputTx ++ claimHtlcSuccessTxs ++ claimHtlcTimeoutTxs @@ -2078,7 +2066,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } } - def doPublish(revokedCommitPublished: RevokedCommitPublished) = { + def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = { import revokedCommitPublished._ val publishQueue = claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs @@ -2121,7 +2109,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId log.debug(s"localNextHtlcId=${d.commitments.localNextHtlcId}->${commitments1.localNextHtlcId}") log.debug(s"remoteNextHtlcId=${d.commitments.remoteNextHtlcId}->${commitments1.remoteNextHtlcId}") - def resendRevocation = { + def resendRevocation(): Unit = { // let's see the state of remote sigs if (commitments1.localCommit.index == channelReestablish.nextRemoteRevocationNumber) { // nothing to do @@ -2146,24 +2134,24 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // they had received the new sig but their revocation was lost during the disconnection // they will send us the revocation, nothing to do here log.debug(s"waiting for them to re-send their last revocation") - resendRevocation + resendRevocation() case Left(waitingForRevocation) if waitingForRevocation.nextRemoteCommit.index == channelReestablish.nextLocalCommitmentNumber => // we had sent a new sig and were waiting for their revocation // they didn't receive the new sig because of the disconnection // we just resend the same updates and the same sig val revWasSentLast = commitments1.localCommit.index > waitingForRevocation.sentAfterLocalCommitIndex - if (!revWasSentLast) resendRevocation + if (!revWasSentLast) resendRevocation() log.debug(s"re-sending previously local signed changes: ${commitments1.localChanges.signed.map(Commitments.msg2String(_)).mkString(",")}") commitments1.localChanges.signed.foreach(forwarder ! _) log.debug(s"re-sending the exact same previous sig") forwarder ! waitingForRevocation.sent - if (revWasSentLast) resendRevocation + if (revWasSentLast) resendRevocation() case Right(_) if commitments1.remoteCommit.index + 1 == channelReestablish.nextLocalCommitmentNumber => // there wasn't any sig in-flight when the disconnection occurred - resendRevocation + resendRevocation() case _ => throw CommitmentSyncError(d.channelId) } @@ -2208,7 +2196,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Right(u) => Relayed(u.channelId, u.id, u.amountMsat, c.amountMsat) // this is a relayed payment } - def feePaid(fee: Satoshi, tx: Transaction, desc: String, channelId: ByteVector32) = { + def feePaid(fee: Satoshi, tx: Transaction, desc: String, channelId: ByteVector32): Unit = { log.info(s"paid feeSatoshi=${fee.amount} for txid=${tx.txid} desc=$desc") context.system.eventStream.publish(NetworkFeePaid(self, remoteNodeId, channelId, tx, fee, desc)) } From f3c00c75c8c1a852b5fa3813e487fa85ab5fbb93 Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Wed, 17 Jul 2019 14:29:27 +0200 Subject: [PATCH 3/8] Add assertion that upstream-timeout must be smaller than cltv-expiry-delta --- eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index a8ec636e06..2d80f616d3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -78,6 +78,7 @@ case class NodeParams(keyManager: KeyManager, routerConf: RouterConf, socksProxy_opt: Option[Socks5ProxyParams], maxPaymentAttempts: Int) { + require(upstreamTimeoutSafetyBlocks < expiryDeltaBlocks, "upstream-timeout-safety-blocks must be smaller than expiry-delta-blocks") val privateKey = keyManager.nodeKey.privateKey val nodeId = keyManager.nodeId From 49a543239aeafb735317bd22e1018d83a5c386c9 Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Wed, 17 Jul 2019 15:54:10 +0200 Subject: [PATCH 4/8] Rename configuration parameter. Add more tests and fix edge cases. --- eclair-core/src/main/resources/reference.conf | 2 +- .../scala/fr/acinq/eclair/NodeParams.scala | 6 +- .../fr/acinq/eclair/channel/Channel.scala | 16 ++-- .../fr/acinq/eclair/channel/Commitments.scala | 11 +-- .../scala/fr/acinq/eclair/TestConstants.scala | 4 +- .../channel/states/e/NormalStateSpec.scala | 76 +++++++++++++++++-- 6 files changed, 90 insertions(+), 25 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 4d6a2ac691..fc78e6c470 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -72,7 +72,7 @@ eclair { // avoid letting its HTLC-timeout transaction become enforceable on-chain (otherwise there is a race condition between // our HTLC-success and their HTLC-timeout). // We will close the channel when the HTLC-timeout will happen in less than this number. - upstream-timeout-safety-blocks = 6 + fulfill-safety-before-timeout-blocks = 6 fee-base-msat = 1000 fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 2d80f616d3..194e582ce6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -50,7 +50,7 @@ case class NodeParams(keyManager: KeyManager, maxHtlcValueInFlightMsat: UInt64, maxAcceptedHtlcs: Int, expiryDeltaBlocks: Int, - upstreamTimeoutSafetyBlocks: Int, + fulfillSafetyBeforeTimeoutBlocks: Int, htlcMinimumMsat: Int, toRemoteDelayBlocks: Int, maxToLocalDelayBlocks: Int, @@ -78,7 +78,7 @@ case class NodeParams(keyManager: KeyManager, routerConf: RouterConf, socksProxy_opt: Option[Socks5ProxyParams], maxPaymentAttempts: Int) { - require(upstreamTimeoutSafetyBlocks < expiryDeltaBlocks, "upstream-timeout-safety-blocks must be smaller than expiry-delta-blocks") + require(fulfillSafetyBeforeTimeoutBlocks < expiryDeltaBlocks, "fulfill-safety-before-timeout-blocks must be smaller than expiry-delta-blocks") val privateKey = keyManager.nodeKey.privateKey val nodeId = keyManager.nodeId @@ -190,7 +190,7 @@ object NodeParams { maxHtlcValueInFlightMsat = UInt64(config.getLong("max-htlc-value-in-flight-msat")), maxAcceptedHtlcs = maxAcceptedHtlcs, expiryDeltaBlocks = config.getInt("expiry-delta-blocks"), - upstreamTimeoutSafetyBlocks = config.getInt("upstream-timeout-safety-blocks"), + fulfillSafetyBeforeTimeoutBlocks = config.getInt("fulfill-safety-before-timeout-blocks"), htlcMinimumMsat = config.getInt("htlc-minimum-msat"), toRemoteDelayBlocks = config.getInt("to-remote-delay-blocks"), maxToLocalDelayBlocks = config.getInt("max-to-local-delay-blocks"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 09d0fc1fec..52fcbd2354 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -852,8 +852,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) + case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) case Event(c@CurrentFeerates(feeratesPerKw), d: DATA_NORMAL) => val networkFeeratePerKw = feeratesPerKw.blocks_2 @@ -1138,8 +1138,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) + case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) case Event(c@CurrentFeerates(feerates), d: DATA_SHUTDOWN) => val networkFeeratePerKw = feerates.blocks_2 @@ -1434,8 +1434,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) + case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d) @@ -1572,8 +1572,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.upstreamTimeoutSafetyBlocks)), d, Some(c)) + case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSince, d.fundingTx) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 964ffe9c7a..a5a4d83428 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -73,11 +73,12 @@ case class Commitments(channelVersion: ChannelVersion, * there will be an on-chain race condition between their HTLC timeout and our HTLC success (both will be enforceable). * If we get too close to the timeout, we must close the channel to enforce our HTLC success transactions safely. */ - def almostTimedOutIncomingHtlcs(blockheight: Long, timeoutSafety: Int): Set[UpdateAddHtlc] = - localCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry - timeoutSafety && localChanges.signed.exists({ - case f: UpdateFulfillHtlc => f.id == htlc.add.id - case _ => false - })).map(_.add) + def almostTimedOutIncomingHtlcs(blockheight: Long, fulfillSafety: Int): Set[UpdateAddHtlc] = { + val pendingFulfills = (localChanges.proposed ++ localChanges.signed ++ localChanges.acked).collect { case u: UpdateFulfillHtlc => u.id }.toSet + localCommit.spec.htlcs.collect { + case htlc if htlc.direction == IN && blockheight >= htlc.add.cltvExpiry - fulfillSafety && pendingFulfills.contains(htlc.add.id) => htlc.add + } + } def addLocalProposal(proposal: UpdateMessage): Commitments = Commitments.addLocalProposal(this, proposal) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 04907cc040..7dc094c9e8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -59,7 +59,7 @@ object TestConstants { maxHtlcValueInFlightMsat = UInt64(150000000), maxAcceptedHtlcs = 100, expiryDeltaBlocks = 144, - upstreamTimeoutSafetyBlocks = 6, + fulfillSafetyBeforeTimeoutBlocks = 6, htlcMinimumMsat = 0, minDepthBlocks = 3, toRemoteDelayBlocks = 144, @@ -126,7 +126,7 @@ object TestConstants { maxHtlcValueInFlightMsat = UInt64.MaxValue, // Bob has no limit on the combined max value of in-flight htlcs maxAcceptedHtlcs = 30, expiryDeltaBlocks = 144, - upstreamTimeoutSafetyBlocks = 6, + fulfillSafetyBeforeTimeoutBlocks = 6, htlcMinimumMsat = 1000, minDepthBlocks = 3, toRemoteDelayBlocks = 144, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala index 48453e8568..bf3b2ae994 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala @@ -1683,7 +1683,7 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { assert(watch.event === BITCOIN_TX_CONFIRMED(aliceCommitTx)) } - test("recv CurrentBlockCount (an htlc that should be fulfilled will timeout upstream") { f => + test("recv CurrentBlockCount (fulfilled signed htlc ignored by upstream peer)") { f => import f._ val sender = TestProbe() val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) @@ -1691,17 +1691,81 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill - // * Alice does not send signed commitment with the fulfilled HTLC + // * Alice does not react to the fulfill (drops the message for some reason) // * When the HTLC timeout on Alice side is near, Bob needs to close the channel to avoid an on-chain race // condition between his HTLC-success and Alice's HTLC-timeout val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo - val (bobFulfilled, _) = Commitments.sendFulfill(initialState.commitments, CMD_FULFILL_HTLC(htlc.id, r, commit = true)) - val (bobSigned, _) = Commitments.sendCommit(bobFulfilled, TestConstants.Bob.keyManager) - bob.setState(stateData = initialState.copy(commitments = bobSigned)) - sender.send(bob, CurrentBlockCount(400144 - Bob.nodeParams.upstreamTimeoutSafetyBlocks)) + sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = true)) + sender.expectMsg("ok") + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] + assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + val watch = bob2blockchain.expectMsgType[WatchConfirmed] + assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + alice2blockchain.expectNoMsg(500 millis) + } + + test("recv CurrentBlockCount (fulfilled proposed htlc ignored by upstream peer)") { f => + import f._ + val sender = TestProbe() + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + // actual test begins: + // * Bob receives the HTLC pre-image and wants to fulfill but doesn't sign + // * Alice does not react to the fulfill (drops the message for some reason) + // * When the HTLC timeout on Alice side is near, Bob needs to close the channel to avoid an on-chain race + // condition between his HTLC-success and Alice's HTLC-timeout + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + + sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = false)) + sender.expectMsg("ok") + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] + assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + val watch = bob2blockchain.expectMsgType[WatchConfirmed] + assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + alice2blockchain.expectNoMsg(500 millis) + } + + test("recv CurrentBlockCount (fulfilled proposed htlc acked but not committed by upstream peer)") { f => + import f._ + val sender = TestProbe() + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + // actual test begins: + // * Bob receives the HTLC pre-image and wants to fulfill + // * Alice acks but doesn't commit + // * When the HTLC timeout on Alice side is near, Bob needs to close the channel to avoid an on-chain race + // condition between his HTLC-success and Alice's HTLC-timeout + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + + sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = true)) + sender.expectMsg("ok") + bob2alice.expectMsgType[UpdateFulfillHtlc] + bob2alice.forward(alice) + bob2alice.expectMsgType[CommitSig] + bob2alice.forward(alice) + alice2bob.expectMsgType[RevokeAndAck] + alice2bob.forward(bob) + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) bob2blockchain.expectMsgType[PublishAsap] // main delayed val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] From 9df297fd96035c39f3de7b411d5a07c4d0ae29ff Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Fri, 19 Jul 2019 14:04:35 +0200 Subject: [PATCH 5/8] NodeParams: move _require_ inside _makeNodeParams_ --- .../src/main/scala/fr/acinq/eclair/NodeParams.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 194e582ce6..d707fa9e6c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -78,7 +78,6 @@ case class NodeParams(keyManager: KeyManager, routerConf: RouterConf, socksProxy_opt: Option[Socks5ProxyParams], maxPaymentAttempts: Int) { - require(fulfillSafetyBeforeTimeoutBlocks < expiryDeltaBlocks, "fulfill-safety-before-timeout-blocks must be smaller than expiry-delta-blocks") val privateKey = keyManager.nodeKey.privateKey val nodeId = keyManager.nodeId @@ -151,6 +150,10 @@ object NodeParams { val offeredCLTV = config.getInt("to-remote-delay-blocks") require(maxToLocalCLTV <= Channel.MAX_TO_SELF_DELAY && offeredCLTV <= Channel.MAX_TO_SELF_DELAY, s"CLTV delay values too high, max is ${Channel.MAX_TO_SELF_DELAY}") + val expiryDeltaBlocks = config.getInt("expiry-delta-blocks") + val fulfillSafetyBeforeTimeoutBlocks = config.getInt("fulfill-safety-before-timeout-blocks") + require(fulfillSafetyBeforeTimeoutBlocks < expiryDeltaBlocks, "fulfill-safety-before-timeout-blocks must be smaller than expiry-delta-blocks") + val nodeAlias = config.getString("node-alias") require(nodeAlias.getBytes("UTF-8").length <= 32, "invalid alias, too long (max allowed 32 bytes)") @@ -189,8 +192,8 @@ object NodeParams { dustLimitSatoshis = dustLimitSatoshis, maxHtlcValueInFlightMsat = UInt64(config.getLong("max-htlc-value-in-flight-msat")), maxAcceptedHtlcs = maxAcceptedHtlcs, - expiryDeltaBlocks = config.getInt("expiry-delta-blocks"), - fulfillSafetyBeforeTimeoutBlocks = config.getInt("fulfill-safety-before-timeout-blocks"), + expiryDeltaBlocks = expiryDeltaBlocks, + fulfillSafetyBeforeTimeoutBlocks = fulfillSafetyBeforeTimeoutBlocks, htlcMinimumMsat = config.getInt("htlc-minimum-msat"), toRemoteDelayBlocks = config.getInt("to-remote-delay-blocks"), maxToLocalDelayBlocks = config.getInt("max-to-local-delay-blocks"), From f94a38486b862c73f65ff0debfcf2bf4265d6b00 Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Fri, 19 Jul 2019 14:31:16 +0200 Subject: [PATCH 6/8] Channel: refactor new block handler --- .../fr/acinq/eclair/channel/Channel.scala | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 52fcbd2354..73f83c3a0e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -849,11 +849,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } } - case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - - case Event(c@CurrentBlockCount(count), d: DATA_NORMAL) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) + case Event(c: CurrentBlockCount, d: DATA_NORMAL) => handleNewBlock(c, d) case Event(c@CurrentFeerates(feeratesPerKw), d: DATA_NORMAL) => val networkFeeratePerKw = feeratesPerKw.blocks_2 @@ -1135,11 +1131,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(r: RevocationTimeout, d: DATA_SHUTDOWN) => handleRevocationTimeout(r, d) - case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - - case Event(c@CurrentBlockCount(count), d: DATA_SHUTDOWN) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) + case Event(c: CurrentBlockCount, d: DATA_SHUTDOWN) => handleNewBlock(c, d) case Event(c@CurrentFeerates(feerates), d: DATA_SHUTDOWN) => val networkFeeratePerKw = feerates.blocks_2 @@ -1428,14 +1420,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(SYNCING) using d1 sending channelReestablish - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => - // note: this can only happen if state is NORMAL or SHUTDOWN - // -> in NEGOTIATING there are no more htlcs - // -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) + // note: this can only happen if state is NORMAL or SHUTDOWN + // -> in NEGOTIATING there are no more htlcs + // -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway + case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d) case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d) @@ -1569,11 +1557,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(NEGOTIATING) using d.copy(closingTxProposed = closingTxProposed1) sending d.localShutdown } - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.timedOutOutgoingHtlcs(count).nonEmpty => - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(count)), d, Some(c)) - - case Event(c@CurrentBlockCount(count), d: HasCommitments) if d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty => - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(count, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) + case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d) case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSince, d.fundingTx) @@ -1847,6 +1831,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } } + def handleNewBlock(c: CurrentBlockCount, d: HasCommitments) = { + if (d.commitments.timedOutOutgoingHtlcs(c.blockCount).nonEmpty) { + handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(c.blockCount)), d, Some(c)) + } else if (d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty) { + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) + } else { + stay + } + } + def handleLocalError(cause: Throwable, d: Data, msg: Option[Any]) = { cause match { case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request") From f534bfb1bee2195e839cdc1115eb660708e376ae Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Fri, 19 Jul 2019 17:53:49 +0200 Subject: [PATCH 7/8] Handle the case where some fulfills are pending in the CommandBuffer. --- .../fr/acinq/eclair/channel/Channel.scala | 28 +++++-- .../fr/acinq/eclair/channel/Commitments.scala | 21 +++-- .../channel/states/e/OfflineStateSpec.scala | 78 +++++++++++++++++-- 3 files changed, 109 insertions(+), 18 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 73f83c3a0e..b31e95484d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -1771,7 +1771,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId // we also check if the funding tx has been double-spent checkDoubleSpent(fundingTx) context.system.scheduler.scheduleOnce(1 day, blockchain, GetTxWithMeta(txid)) - case None if (now.seconds - waitingSince.seconds) > FUNDING_TIMEOUT_FUNDEE && (now.seconds - lastBlockTimestamp.seconds) < 1.hour => + case None if (now.seconds - waitingSince.seconds) > FUNDING_TIMEOUT_FUNDEE && (now.seconds - lastBlockTimestamp.seconds) < 1.hour => // if we are fundee, we give up after some time // NB: we want to be sure that the blockchain is in sync to prevent false negatives log.warning(s"funding tx hasn't been published in ${(now.seconds - waitingSince.seconds).toDays} days and blockchain is fresh from ${(now.seconds - lastBlockTimestamp.seconds).toMinutes} minutes ago") @@ -1832,10 +1832,28 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } def handleNewBlock(c: CurrentBlockCount, d: HasCommitments) = { - if (d.commitments.timedOutOutgoingHtlcs(c.blockCount).nonEmpty) { - handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedOutOutgoingHtlcs(c.blockCount)), d, Some(c)) - } else if (d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeoutBlocks).nonEmpty) { - handleLocalError(HtlcWillTimeoutUpstream(d.channelId, d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeoutBlocks)), d, Some(c)) + val timedOutOutgoing = d.commitments.timedOutOutgoingHtlcs(c.blockCount) + val almostTimedOutIncoming = d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeoutBlocks) + if (timedOutOutgoing.nonEmpty) { + // Downstream timed out. + handleLocalError(HtlcTimedout(d.channelId, timedOutOutgoing), d, Some(c)) + } else if (almostTimedOutIncoming.nonEmpty) { + // Upstream is close to timing out. + val relayedFulfills = d.commitments.pendingFulfillHtlcs().map(_.id) + val offendingRelayedHtlcs = almostTimedOutIncoming.filter(htlc => relayedFulfills.contains(htlc.id)) + if (offendingRelayedHtlcs.nonEmpty) { + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, offendingRelayedHtlcs), d, Some(c)) + } else { + // There might be pending fulfill commands that we haven't relayed yet. + // Since this involves a DB call, we only want to check it if all the previous checks failed (this is the slow path). + val pendingRelayFulfills = nodeParams.db.pendingRelay.listPendingRelay(d.channelId).collect { case CMD_FULFILL_HTLC(id, r, _) => id } + val offendingPendingRelayFulfills = almostTimedOutIncoming.filter(htlc => pendingRelayFulfills.contains(htlc.id)) + if (offendingPendingRelayFulfills.nonEmpty) { + handleLocalError(HtlcWillTimeoutUpstream(d.channelId, offendingPendingRelayFulfills), d, Some(c)) + } else { + stay + } + } } else { stay } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index a5a4d83428..156f7f6ceb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -67,17 +67,28 @@ case class Commitments(channelVersion: ChannelVersion, remoteCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry) ++ remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.spec.htlcs.filter(htlc => htlc.direction == IN && blockheight >= htlc.add.cltvExpiry)).getOrElse(Set.empty[DirectedHtlc])).map(_.add) + /** + * HTLCs that are close to timing out upstream are potentially dangerous. If we received the pre-image for those + * HTLCs, we need to get a remote signed updated commitment that removes this HTLC. + * Otherwise when we get close to the upstream timeout, we risk an on-chain race condition between their HTLC timeout + * and our HTLC success in case of a force-close. + */ + def almostTimedOutIncomingHtlcs(blockheight: Long, fulfillSafety: Int): Set[UpdateAddHtlc] = { + localCommit.spec.htlcs.collect { + case htlc if htlc.direction == IN && blockheight >= htlc.add.cltvExpiry - fulfillSafety => htlc.add + } + } + /** * Once we have the pre-image for incoming htlcs, we are able to spend the HTLC success transaction. * However, if the upstream peer doesn't update its commitment to remove that HTLC and waits for the HTLC timeout, * there will be an on-chain race condition between their HTLC timeout and our HTLC success (both will be enforceable). * If we get too close to the timeout, we must close the channel to enforce our HTLC success transactions safely. */ - def almostTimedOutIncomingHtlcs(blockheight: Long, fulfillSafety: Int): Set[UpdateAddHtlc] = { - val pendingFulfills = (localChanges.proposed ++ localChanges.signed ++ localChanges.acked).collect { case u: UpdateFulfillHtlc => u.id }.toSet - localCommit.spec.htlcs.collect { - case htlc if htlc.direction == IN && blockheight >= htlc.add.cltvExpiry - fulfillSafety && pendingFulfills.contains(htlc.add.id) => htlc.add - } + def pendingFulfillHtlcs(): Set[UpdateFulfillHtlc] = { + localChanges.all.collect { + case u: UpdateFulfillHtlc => u + }.toSet } def addLocalProposal(proposal: UpdateMessage): Commitments = Commitments.addLocalProposal(this, proposal) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala index e3a2c2dfcc..e1aef1d2de 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala @@ -16,16 +16,19 @@ package fr.acinq.eclair.channel.states.e -import akka.actor.Status import java.util.UUID -import akka.testkit.TestProbe -import fr.acinq.bitcoin.Crypto.{PrivateKey} +import akka.actor.Status +import akka.testkit.{TestActorRef, TestProbe} +import fr.acinq.bitcoin.Crypto.PrivateKey import fr.acinq.bitcoin.{ByteVector32, ScriptFlags, Transaction} -import fr.acinq.eclair.blockchain.{PublishAsap, WatchEventSpent} +import fr.acinq.eclair.blockchain.{CurrentBlockCount, PublishAsap, WatchConfirmed, WatchEventSpent} import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.states.StateTestsHelperMethods +import fr.acinq.eclair.payment.CommandBuffer +import fr.acinq.eclair.payment.CommandBuffer.CommandSend import fr.acinq.eclair.router.Announcements +import fr.acinq.eclair.transactions.Transactions.HtlcSuccessTx import fr.acinq.eclair.wire._ import fr.acinq.eclair.{TestConstants, TestkitBaseClass, randomBytes32} import org.scalatest.Outcome @@ -52,6 +55,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { } def aliceInit = Init(TestConstants.Alice.nodeParams.globalFeatures, TestConstants.Alice.nodeParams.localFeatures) + def bobInit = Init(TestConstants.Bob.nodeParams.globalFeatures, TestConstants.Bob.nodeParams.localFeatures) /** @@ -180,7 +184,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { // b will re-send the lost revocation val ba_rev_0_re = bob2alice.expectMsg(ba_rev_0) // rev ->a - bob2alice.forward(alice, ba_rev_0) + bob2alice.forward(alice, ba_rev_0_re) // and b will attempt a new signature bob2alice.expectMsg(ba_sig_0) @@ -256,7 +260,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { // we start by storing the current state val oldStateData = alice.stateData // then we add an htlc and sign it - val (ra1, htlca1) = addHtlc(250000000, alice, bob, alice2bob, bob2alice) + addHtlc(250000000, alice, bob, alice2bob, bob2alice) sender.send(alice, CMD_SIGN) sender.expectMsg("ok") alice2bob.expectMsgType[CommitSig] @@ -362,7 +366,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](20 seconds).channelUpdate assert(channelUpdate.feeBaseMsat === 4200) assert(channelUpdate.feeProportionalMillionths === 123456) - assert(Announcements.isEnabled(channelUpdate.channelFlags) == true) + assert(Announcements.isEnabled(channelUpdate.channelFlags)) // no more messages channelUpdateListener.expectNoMsg(300 millis) @@ -388,7 +392,65 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { // alice will broadcast a new disabled channel_update val update = channelUpdateListener.expectMsgType[LocalChannelUpdate] - assert(Announcements.isEnabled(update.channelUpdate.channelFlags) == false) + assert(!Announcements.isEnabled(update.channelUpdate.channelFlags)) + } + + test("pending non-relayed fulfill htlcs will timeout upstream") { f => + import f._ + val sender = TestProbe() + val register = TestProbe() + val commandBuffer = TestActorRef(new CommandBuffer(bob.underlyingActor.nodeParams, register.ref)) + val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] + val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx + val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo + + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // We simulate a pending fulfill on that HTLC but not relayed. + // When it is close to expiring upstream, we should close the channel. + sender.send(commandBuffer, CommandSend(htlc.channelId, htlc.id, CMD_FULFILL_HTLC(htlc.id, r, commit = true))) + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + val watch = bob2blockchain.expectMsgType[WatchConfirmed] + assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + bob2blockchain.expectMsgType[WatchConfirmed] // main delayed + + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) + bob2blockchain.expectMsgType[PublishAsap] // main delayed + val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] + assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + bob2blockchain.expectMsgType[PublishAsap] // htlc delayed + alice2blockchain.expectNoMsg(500 millis) + } + + test("pending non-relayed fail htlcs will timeout upstream") { f => + import f._ + val sender = TestProbe() + val register = TestProbe() + val commandBuffer = TestActorRef(new CommandBuffer(bob.underlyingActor.nodeParams, register.ref)) + val (_, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // We simulate a pending failure on that HTLC. + // Even if we get close to expiring upstream we shouldn't close the channel, because we have nothing to lose. + sender.send(commandBuffer, CommandSend(htlc.channelId, htlc.id, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0))))) + sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + + bob2blockchain.expectNoMsg(250 millis) + alice2blockchain.expectNoMsg(250 millis) } } From fae61f7b7605971b9c6b4eda350477d098ae3d99 Mon Sep 17 00:00:00 2001 From: Bastien Teinturier Date: Mon, 22 Jul 2019 09:30:05 +0200 Subject: [PATCH 8/8] Update tests to verify channel error broadcast. --- .../fr/acinq/eclair/channel/Channel.scala | 2 +- .../fr/acinq/eclair/channel/Commitments.scala | 12 ----- .../channel/states/e/NormalStateSpec.scala | 45 +++++++++++++------ .../channel/states/e/OfflineStateSpec.scala | 14 ++++-- 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index b31e95484d..1ce4c5a51f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -1839,7 +1839,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId handleLocalError(HtlcTimedout(d.channelId, timedOutOutgoing), d, Some(c)) } else if (almostTimedOutIncoming.nonEmpty) { // Upstream is close to timing out. - val relayedFulfills = d.commitments.pendingFulfillHtlcs().map(_.id) + val relayedFulfills = d.commitments.localChanges.all.collect { case u: UpdateFulfillHtlc => u.id }.toSet val offendingRelayedHtlcs = almostTimedOutIncoming.filter(htlc => relayedFulfills.contains(htlc.id)) if (offendingRelayedHtlcs.nonEmpty) { handleLocalError(HtlcWillTimeoutUpstream(d.channelId, offendingRelayedHtlcs), d, Some(c)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 156f7f6ceb..40aa32a9f0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -79,18 +79,6 @@ case class Commitments(channelVersion: ChannelVersion, } } - /** - * Once we have the pre-image for incoming htlcs, we are able to spend the HTLC success transaction. - * However, if the upstream peer doesn't update its commitment to remove that HTLC and waits for the HTLC timeout, - * there will be an on-chain race condition between their HTLC timeout and our HTLC success (both will be enforceable). - * If we get too close to the timeout, we must close the channel to enforce our HTLC success transactions safely. - */ - def pendingFulfillHtlcs(): Set[UpdateFulfillHtlc] = { - localChanges.all.collect { - case u: UpdateFulfillHtlc => u - }.toSet - } - def addLocalProposal(proposal: UpdateMessage): Commitments = Commitments.addLocalProposal(this, proposal) def addRemoteProposal(proposal: UpdateMessage): Commitments = Commitments.addRemoteProposal(this, proposal) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala index bf3b2ae994..ca6a1dd373 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala @@ -27,9 +27,9 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.UInt64.Conversions._ import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.blockchain.fee.FeeratesPerKw -import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh, Reconnected, RevocationTimeout} -import fr.acinq.eclair.channel._ +import fr.acinq.eclair.channel.Channel._ import fr.acinq.eclair.channel.states.StateTestsHelperMethods +import fr.acinq.eclair.channel.{ChannelErrorOccured, _} import fr.acinq.eclair.io.Peer import fr.acinq.eclair.payment._ import fr.acinq.eclair.router.Announcements @@ -1689,6 +1689,9 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill // * Alice does not react to the fulfill (drops the message for some reason) @@ -1700,15 +1703,18 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = true)) sender.expectMsg("ok") + bob2alice.expectMsgType[UpdateFulfillHtlc] sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) bob2blockchain.expectMsgType[PublishAsap] // main delayed - val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] - assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) bob2blockchain.expectMsgType[PublishAsap] // htlc delayed - val watch = bob2blockchain.expectMsgType[WatchConfirmed] - assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) alice2blockchain.expectNoMsg(500 millis) } @@ -1718,6 +1724,9 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill but doesn't sign // * Alice does not react to the fulfill (drops the message for some reason) @@ -1729,15 +1738,18 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { sender.send(bob, CMD_FULFILL_HTLC(htlc.id, r, commit = false)) sender.expectMsg("ok") + bob2alice.expectMsgType[UpdateFulfillHtlc] sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) bob2blockchain.expectMsgType[PublishAsap] // main delayed - val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] - assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) bob2blockchain.expectMsgType[PublishAsap] // htlc delayed - val watch = bob2blockchain.expectMsgType[WatchConfirmed] - assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) alice2blockchain.expectNoMsg(500 millis) } @@ -1747,6 +1759,9 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill // * Alice acks but doesn't commit @@ -1766,13 +1781,15 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { alice2bob.forward(bob) sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - Bob.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) bob2blockchain.expectMsgType[PublishAsap] // main delayed - val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] - assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) bob2blockchain.expectMsgType[PublishAsap] // htlc delayed - val watch = bob2blockchain.expectMsgType[WatchConfirmed] - assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) alice2blockchain.expectNoMsg(500 millis) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala index e1aef1d2de..d51b69525c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala @@ -23,6 +23,7 @@ import akka.testkit.{TestActorRef, TestProbe} import fr.acinq.bitcoin.Crypto.PrivateKey import fr.acinq.bitcoin.{ByteVector32, ScriptFlags, Transaction} import fr.acinq.eclair.blockchain.{CurrentBlockCount, PublishAsap, WatchConfirmed, WatchEventSpent} +import fr.acinq.eclair.channel.Channel.LocalError import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.states.StateTestsHelperMethods import fr.acinq.eclair.payment.CommandBuffer @@ -403,6 +404,9 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val (r, htlc) = addHtlc(50000000, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) + val listener = TestProbe() + system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccured]) + val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] val initialCommitTx = initialState.commitments.localCommit.publishableTxs.commitTx.tx val HtlcSuccessTx(_, htlcSuccessTx, _) = initialState.commitments.localCommit.publishableTxs.htlcTxsAndSigs.head.txinfo @@ -417,16 +421,18 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { sender.send(commandBuffer, CommandSend(htlc.channelId, htlc.id, CMD_FULFILL_HTLC(htlc.id, r, commit = true))) sender.send(bob, CurrentBlockCount(htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks)) + val ChannelErrorOccured(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccured] + assert(isFatal) + assert(err.isInstanceOf[HtlcWillTimeoutUpstream]) + bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) bob2blockchain.expectMsgType[PublishAsap] // main delayed - val watch = bob2blockchain.expectMsgType[WatchConfirmed] - assert(watch.event === BITCOIN_TX_CONFIRMED(initialCommitTx)) + assert(bob2blockchain.expectMsgType[WatchConfirmed].event === BITCOIN_TX_CONFIRMED(initialCommitTx)) bob2blockchain.expectMsgType[WatchConfirmed] // main delayed bob2blockchain.expectMsg(PublishAsap(initialCommitTx)) bob2blockchain.expectMsgType[PublishAsap] // main delayed - val htlcPublishedTx = bob2blockchain.expectMsgType[PublishAsap] - assert(htlcPublishedTx.tx.txOut === htlcSuccessTx.txOut) + assert(bob2blockchain.expectMsgType[PublishAsap].tx.txOut === htlcSuccessTx.txOut) bob2blockchain.expectMsgType[PublishAsap] // htlc delayed alice2blockchain.expectNoMsg(500 millis) }