Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 46 additions & 56 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -467,24 +467,26 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val now = Platform.currentTime.milliseconds.toSeconds
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
// we do this to make sure that the channel state has been written to disk when we publish the funding tx
val nextState = store(DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, Some(fundingTx), now, None, Left(fundingCreated)))
blockchain ! WatchSpent(self, commitments.commitInput.outPoint.txid, commitments.commitInput.outPoint.index.toInt, commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT) // TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchConfirmed(self, commitments.commitInput.outPoint.txid, commitments.commitInput.txOut.publicKeyScript, nodeParams.minDepthBlocks, BITCOIN_FUNDING_DEPTHOK)
log.info(s"committing txid=${fundingTx.txid}")
wallet.commit(fundingTx).onComplete {
case Success(true) =>
// NB: funding tx isn't confirmed at this point, so technically we didn't really pay the network fee yet, so this is a (fair) approximation
feePaid(fundingTxFee, fundingTx, "funding", commitments.channelId)
replyToUser(Right(s"created channel $channelId"))
case Success(false) =>
replyToUser(Left(LocalError(new RuntimeException("couldn't publish funding tx"))))
self ! BITCOIN_FUNDING_PUBLISH_FAILED // fail-fast: this should be returned only when we are really sure the tx has *not* been published
case Failure(t) =>
replyToUser(Left(LocalError(t)))
log.error(t, s"error while committing funding tx: ") // tx may still have been published, can't fail-fast
// we will publish the funding tx only after the channel state has been written to disk because we want to
// make sure we first persist the commitment that returns back the funds to us in case of problem
def publishFundingTx(): Unit = {
wallet.commit(fundingTx).onComplete {
case Success(true) =>
// NB: funding tx isn't confirmed at this point, so technically we didn't really pay the network fee yet, so this is a (fair) approximation
feePaid(fundingTxFee, fundingTx, "funding", commitments.channelId)
replyToUser(Right(s"created channel $channelId"))
case Success(false) =>
replyToUser(Left(LocalError(new RuntimeException("couldn't publish funding tx"))))
self ! BITCOIN_FUNDING_PUBLISH_FAILED // fail-fast: this should be returned only when we are really sure the tx has *not* been published
case Failure(t) =>
replyToUser(Left(LocalError(t)))
log.error(t, s"error while committing funding tx: ") // tx may still have been published, can't fail-fast
}
}
goto(WAIT_FOR_FUNDING_CONFIRMED) using nextState
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, Some(fundingTx), now, None, Left(fundingCreated)) storing() calling(publishFundingTx)
}

case Event(CMD_CLOSE(_) | CMD_FORCECLOSE, d: DATA_WAIT_FOR_FUNDING_SIGNED) =>
Expand Down Expand Up @@ -1210,25 +1212,17 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
Try(Commitments.sendFulfill(d.commitments, c)) match {
case Success((commitments1, _)) =>
log.info(s"got valid payment preimage, recalculating transactions to redeem the corresponding htlc on-chain")
val localCommitPublished1 = d.localCommitPublished.map {
localCommitPublished =>
val localCommitPublished1 = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, commitments1, localCommitPublished.commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
doPublish(localCommitPublished1)
localCommitPublished1
}
val remoteCommitPublished1 = d.remoteCommitPublished.map {
remoteCommitPublished =>
val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
doPublish(remoteCommitPublished1)
remoteCommitPublished1
}
val nextRemoteCommitPublished1 = d.nextRemoteCommitPublished.map {
remoteCommitPublished =>
val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
doPublish(remoteCommitPublished1)
remoteCommitPublished1
val localCommitPublished1 = d.localCommitPublished.map(localCommitPublished => Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, commitments1, localCommitPublished.commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets))
val remoteCommitPublished1 = d.remoteCommitPublished.map(remoteCommitPublished => Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets))
val nextRemoteCommitPublished1 = d.nextRemoteCommitPublished.map(remoteCommitPublished => Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets))

def republish(): Unit = {
Comment thread
pm47 marked this conversation as resolved.
localCommitPublished1.foreach(doPublish)
remoteCommitPublished1.foreach(doPublish)
nextRemoteCommitPublished1.foreach(doPublish)
}
stay using d.copy(commitments = commitments1, localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1) storing()

stay using d.copy(commitments = commitments1, localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1) storing() calling(republish)
case Failure(cause) => handleCommandError(cause, c)
}

Expand Down Expand Up @@ -1900,13 +1894,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
def handleMutualClose(closingTx: Transaction, d: Either[DATA_NEGOTIATING, DATA_CLOSING]) = {
log.info(s"closing tx published: closingTxId=${closingTx.txid}")

doPublish(closingTx)

val nextData = d match {
case Left(negotiating) => DATA_CLOSING(negotiating.commitments, fundingTx = None, waitingSince = now, negotiating.closingTxProposed.flatten.map(_.unsignedTx), mutualClosePublished = closingTx :: Nil)
case Right(closing) => closing.copy(mutualClosePublished = closing.mutualClosePublished :+ closingTx)
}
goto(CLOSING) using nextData storing()

goto(CLOSING) using nextData storing() calling(doPublish(closingTx))
}

def doPublish(closingTx: Transaction): Unit = {
Expand All @@ -1929,7 +1922,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val commitTx = d.commitments.localCommit.publishableTxs.commitTx.tx

val localCommitPublished = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, d.commitments, commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
doPublish(localCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(localCommitPublished = Some(localCommitPublished))
Expand All @@ -1938,7 +1930,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, localCommitPublished = Some(localCommitPublished))
}

goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(localCommitPublished))
}
}

Expand Down Expand Up @@ -1994,7 +1986,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
require(commitTx.txid == d.commitments.remoteCommit.txid, "txid mismatch")

val remoteCommitPublished = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, d.commitments, d.commitments.remoteCommit, commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
doPublish(remoteCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(remoteCommitPublished = Some(remoteCommitPublished))
Expand All @@ -2003,7 +1994,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, remoteCommitPublished = Some(remoteCommitPublished))
}

goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(remoteCommitPublished))
}

def handleRemoteSpentFuture(commitTx: Transaction, d: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) = {
Expand All @@ -2013,8 +2004,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val remoteCommitPublished = Helpers.Closing.claimRemoteCommitMainOutput(keyManager, d.commitments, remotePerCommitmentPoint, commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
val nextData = DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, Nil, futureRemoteCommitPublished = Some(remoteCommitPublished))

doPublish(remoteCommitPublished)
goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(remoteCommitPublished))
}

def handleRemoteSpentNext(commitTx: Transaction, d: HasCommitments) = {
Expand All @@ -2024,7 +2014,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
require(commitTx.txid == remoteCommit.txid, "txid mismatch")

val remoteCommitPublished = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, d.commitments, remoteCommit, commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
doPublish(remoteCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(nextRemoteCommitPublished = Some(remoteCommitPublished))
Expand All @@ -2033,7 +2022,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, nextRemoteCommitPublished = Some(remoteCommitPublished))
}

goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(remoteCommitPublished))
}

def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = {
Expand Down Expand Up @@ -2062,15 +2051,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val exc = FundingTxSpent(d.channelId, tx)
val error = Error(d.channelId, exc.getMessage)

doPublish(revokedCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(revokedCommitPublished = closing.revokedCommitPublished :+ revokedCommitPublished)
case negotiating: DATA_NEGOTIATING => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, negotiating.closingTxProposed.flatten.map(_.unsignedTx), revokedCommitPublished = revokedCommitPublished :: Nil)
// NB: if there is a revoked commitment, we can't be in DATA_WAIT_FOR_FUNDING_CONFIRMED so we don't have the case where fundingTx is defined
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, revokedCommitPublished = revokedCommitPublished :: Nil)
}
goto(CLOSING) using nextData storing() sending error
goto(CLOSING) using nextData storing() calling(doPublish(revokedCommitPublished)) sending error
case None =>
// the published tx was neither their current commitment nor a revoked one
log.error(s"couldn't identify txid=${tx.txid}, something very bad is going on!!!")
Expand Down Expand Up @@ -2104,9 +2091,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// let's try to spend our current local tx
val commitTx = d.commitments.localCommit.publishableTxs.commitTx.tx
val localCommitPublished = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, d.commitments, commitTx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
doPublish(localCommitPublished)

goto(ERR_INFORMATION_LEAK) sending error
goto(ERR_INFORMATION_LEAK) calling(doPublish(localCommitPublished)) sending error
}

def handleSync(channelReestablish: ChannelReestablish, d: HasCommitments): Commitments = {
Expand Down Expand Up @@ -2215,21 +2201,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.eventStream.publish(NetworkFeePaid(self, remoteNodeId, channelId, tx, fee, desc))
}

def store(d: HasCommitments) = {
log.debug(s"updating database record for channelId={}", d.channelId)
nodeParams.db.channels.addOrUpdateChannel(d)
context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
d
}

implicit def state2mystate(state: FSM.State[fr.acinq.eclair.channel.State, Data]): MyState = MyState(state)

case class MyState(state: FSM.State[fr.acinq.eclair.channel.State, Data]) {

def storing(): FSM.State[fr.acinq.eclair.channel.State, Data] = {
state.stateData match {
case d: HasCommitments =>
store(d)
log.debug(s"updating database record for channelId={}", d.channelId)
nodeParams.db.channels.addOrUpdateChannel(d)
context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
state
case _ =>
log.error(s"can't store data=${state.stateData} in state=${state.stateName}")
Expand All @@ -2247,6 +2228,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
state
}

/**
* This method allows performing actions during the transition, e.g. after a call to [[MyState.storing]]. This is
* particularly useful to publish transactions only after we are sure that the state has been persisted.
*/
def calling(f: => Unit): FSM.State[fr.acinq.eclair.channel.State, Data] = {
f
state
}

}

def now = Platform.currentTime.milliseconds.toSeconds
Expand Down