diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index d0705e6d30..678b4372d8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -18,13 +18,15 @@ package fr.acinq.eclair import java.util.UUID -import akka.actor.ActorRef +import akka.actor.{ActorRef, Props} import akka.pattern._ import akka.util.Timeout +import com.softwaremill.sttp.okhttp.OkHttpFutureBackend import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin.{ByteVector32, Satoshi} import fr.acinq.eclair.TimestampQueryFilters._ import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet +import fr.acinq.eclair.blockchain.bitcoind.rpc.BasicBitcoinJsonRPCClient import fr.acinq.eclair.channel.Register.{Forward, ForwardShortId} import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.{IncomingPayment, NetworkFee, OutgoingPayment, Stats} @@ -33,6 +35,9 @@ import fr.acinq.eclair.io.{NodeURI, Peer} import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentRequest import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels, UsableBalance} import fr.acinq.eclair.payment._ +import fr.acinq.eclair.recovery.RecoveryFSM +import fr.acinq.eclair.recovery.RecoveryFSM.RecoveryConnect +import fr.acinq.eclair.router.{ChannelDesc, RouteRequest, RouteResponse, Router} import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment import fr.acinq.eclair.router.{Announcements, ChannelDesc, GetNetworkStats, NetworkStats, PublicChannel, RouteRequest, RouteResponse, Router} import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement} @@ -117,6 +122,8 @@ trait Eclair { def getInfoResponse()(implicit timeout: Timeout): Future[GetInfoResponse] def usableBalances()(implicit timeout: Timeout): Future[Iterable[UsableBalance]] + + def doRecovery(uri: NodeURI): Unit } class EclairImpl(appKit: Kit) extends Eclair { @@ -311,4 +318,17 @@ class EclairImpl(appKit: Kit) extends Eclair { override def usableBalances()(implicit timeout: Timeout): Future[Iterable[UsableBalance]] = (appKit.relayer ? GetOutgoingChannels()).mapTo[OutgoingChannels].map(_.channels.map(_.toUsableBalance)) + + override def doRecovery(uri: NodeURI): Unit = { + implicit val shttp = OkHttpFutureBackend() + + val bitcoinRpcClient = new BasicBitcoinJsonRPCClient( + user = appKit.nodeParams.config.getString("bitcoind.rpcuser"), + password = appKit.nodeParams.config.getString("bitcoind.rpcpassword"), + host = appKit.nodeParams.config.getString("bitcoind.host"), + port = appKit.nodeParams.config.getInt("bitcoind.rpcport") + ) + + appKit.system.actorOf(RecoveryFSM.props(uri, appKit.nodeParams, appKit.wallet, bitcoinRpcClient), RecoveryFSM.actorName) + } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index ffdea41762..0f43206afc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -41,7 +41,8 @@ import scala.concurrent.duration.FiniteDuration /** * Created by PM on 26/02/2017. */ -case class NodeParams(keyManager: KeyManager, +case class NodeParams(config: Config, + keyManager: KeyManager, private val blockCount: AtomicLong, alias: String, color: Color, @@ -214,6 +215,7 @@ object NodeParams { } NodeParams( + config = config, keyManager = keyManager, blockCount = blockCount, alias = nodeAlias, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/recovery/RecoveryFSM.scala b/eclair-core/src/main/scala/fr/acinq/eclair/recovery/RecoveryFSM.scala new file mode 100644 index 0000000000..b8537b1503 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/recovery/RecoveryFSM.scala @@ -0,0 +1,219 @@ +package fr.acinq.eclair.recovery + + +import akka.actor.{ActorRef, FSM, Props} +import fr.acinq.bitcoin.{Base58, Base58Check, Bech32, ByteVector32, OP_0, OP_2, OP_CHECKMULTISIG, OP_CHECKSIG, OP_DUP, OP_EQUAL, OP_EQUALVERIFY, OP_HASH160, OP_PUSHDATA, OutPoint, Script, ScriptWitness, Transaction} +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.eclair.NodeParams +import fr.acinq.eclair.blockchain.EclairWallet +import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient} +import fr.acinq.eclair.channel.{Helpers, PleasePublishYourCommitment} +import fr.acinq.eclair.crypto.{Generators, KeyManager} +import fr.acinq.eclair.io.Peer.Disconnect +import fr.acinq.eclair.io.{NodeURI, Peer, PeerConnected} +import fr.acinq.eclair.recovery.RecoveryFSM._ +import fr.acinq.eclair.transactions.Transactions +import fr.acinq.eclair.wire._ +import grizzled.slf4j.Logging +import scodec.bits.ByteVector + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.util.Success + +class RecoveryFSM(remoteNodeURI: NodeURI, nodeParams: NodeParams, wallet: EclairWallet, bitcoinJsonRPCClient: BitcoinJsonRPCClient) extends FSM[State, Data] with Logging { + + implicit val ec = context.system.dispatcher + val bitcoinClient = new ExtendedBitcoinClient(bitcoinJsonRPCClient) + val CHECK_POLL_INTERVAL = 3 seconds + + context.system.eventStream.subscribe(self, classOf[PeerConnected]) + + startWith(RECOVERY_WAIT_FOR_CONNECTION, Nothing) + + self ! RecoveryConnect(remoteNodeURI) + + when(RECOVERY_WAIT_FOR_CONNECTION) { + case Event(RecoveryConnect(nodeURI: NodeURI), Nothing) => + logger.info(s"creating new recovery peer") + val peer = context.actorOf(Props(new RecoveryPeer(nodeParams, nodeURI.nodeId))) + peer ! Peer.Connect(nodeURI.nodeId, Some(nodeURI.address)) + stay using DATA_WAIT_FOR_CONNECTION(nodeURI.nodeId) + + case Event(PeerConnected(peer, nodeId), d: DATA_WAIT_FOR_CONNECTION) if d.remoteNodeId == nodeId => + logger.info(s"connected to remote $nodeId") + goto(RECOVERY_WAIT_FOR_CHANNEL) using DATA_WAIT_FOR_REMOTE_INFO(peer, nodeId) + } + + when(RECOVERY_WAIT_FOR_CHANNEL) { + case Event(ChannelFound(channelId, reestablish), d: DATA_WAIT_FOR_REMOTE_INFO) => + logger.info(s"peer=${d.remoteNodeId} knows channelId=$channelId") + lookupFundingTx(channelId) match { + case None => + logger.info(s"could not find funding transaction...disconnecting") + d.peer ! Disconnect(d.remoteNodeId) + stop() + + case Some((fundingTx, outIndex)) => + logger.info(s"found unspent channel funding_tx=${fundingTx.txid} outputIndex=$outIndex") + logger.info(s"asking remote to close the channel") + d.peer ! SendErrorToRemote(Error(channelId, PleasePublishYourCommitment(channelId).toString)) + context.system.scheduler.scheduleOnce(5 seconds)(self ! CheckCommitmentPublished) + goto(RECOVERY_WAIT_FOR_COMMIT_PUBLISHED) using DATA_WAIT_FOR_REMOTE_PUBLISH(d.peer, reestablish, fundingTx, outIndex) + } + } + + when(RECOVERY_WAIT_FOR_COMMIT_PUBLISHED) { + case Event(CheckCommitmentPublished, d: DATA_WAIT_FOR_REMOTE_PUBLISH) => + logger.info(s"looking for the commitment transaction") + Await.ready(lookForCommitTx(d.fundingTx.txid, d.fundingOutIndex), 30 seconds).value match { + case Some(Success(commitTx)) => + logger.info(s"found commitTx=${commitTx.txid}") + + val Some(remotePerCommitmentSecret) = d.channelReestablish.myCurrentPerCommitmentPoint + val fundingPubKey = recoverFundingKeyFromCommitment(nodeParams, commitTx, d.channelReestablish) + val channelKeyPath = KeyManager.channelKeyPath(fundingPubKey) + val paymentBasePoint = nodeParams.keyManager.paymentPoint(channelKeyPath) + val localPaymentKey = Generators.derivePubKey(paymentBasePoint.publicKey, remotePerCommitmentSecret) + + val finalScriptPubkey = Helpers.getFinalScriptPubKey(wallet, nodeParams.chainHash) + val claimTx = Transactions.makeClaimP2WPKHOutputTx(commitTx, nodeParams.dustLimit, localPaymentKey, finalScriptPubkey, nodeParams.onChainFeeConf.feeEstimator.getFeeratePerKw(6)) + val sig = nodeParams.keyManager.sign(claimTx, paymentBasePoint, remotePerCommitmentSecret) + val claimSigned = Transactions.addSigs(claimTx, localPaymentKey, sig) + logger.info(s"publishing claim-main-output transaction: address=${scriptPubKeyToAddress(finalScriptPubkey)} txid=${claimSigned.tx.txid}") + bitcoinClient.publishTransaction(claimSigned.tx) + context.system.scheduler.scheduleOnce(CHECK_POLL_INTERVAL)(self ! CheckClaimPublished) + goto(RECOVERY_WAIT_FOR_CLAIM_PUBLISHED) using DATA_WAIT_FOR_CLAIM_TX(d.peer, claimSigned.tx) + + case _ => + context.system.scheduler.scheduleOnce(CHECK_POLL_INTERVAL)(self ! CheckCommitmentPublished) + stay() + } + } + + when(RECOVERY_WAIT_FOR_CLAIM_PUBLISHED) { + case Event(CheckClaimPublished, d: DATA_WAIT_FOR_CLAIM_TX) => + Await.ready(bitcoinClient.getTransaction(d.claimTx.txid.toHex), 30 seconds).value match { + case Some(Success(claimTx)) => + logger.info(s"claim transaction published txid=${claimTx.txid}") + d.peer ! Disconnect(remoteNodeURI.nodeId) + stop() + + case _ => + bitcoinClient.publishTransaction(d.claimTx) + context.system.scheduler.scheduleOnce(CHECK_POLL_INTERVAL)(self ! CheckClaimPublished) + stay + } + } + + /** + * Given a channelId tries to guess the fundingTxId and retrieve the funding transaction + */ + def lookupFundingTx(channelId: ByteVector32): Option[(Transaction, Int)] = { + val candidateFundingTxIds = fundingIds(channelId) + val fundingTx_opt = Await.result(Future.sequence(candidateFundingTxIds.map { case (txId, _) => + getTransaction(txId) + }).map(_.flatten.headOption), 60 seconds) + + fundingTx_opt.map { funding => + (funding, candidateFundingTxIds.find(_._1 == funding.txid).map(_._2).get) + } + } + + /** + * Extracts the funding_txid and output index from channelId, brute forces the ids up to @param limit + */ + def fundingIds(channelId: ByteVector32, limit: Int = 5): Seq[(ByteVector32, Int)] = { + 0 until limit map { i => + (fr.acinq.eclair.toLongId(channelId.reverse, i), i) + } + } + + def getTransaction(txId: ByteVector32): Future[Option[Transaction]] = { + bitcoinClient.getTransaction(txId.toHex).collect { + case tx: Transaction => Some(tx) + }.recover { + case _ => None + } + } + + /** + * Lookup a commitTx spending the fundingTx in the mempool and then in the blocks + */ + def lookForCommitTx(fundingTxId: ByteVector32, fundingOutIndex: Int): Future[Transaction] = { + bitcoinClient.getMempool().map { mempoolTxs => + mempoolTxs.find(_.txIn.exists(_.outPoint == OutPoint(fundingTxId.reverse, fundingOutIndex))).get + }.recoverWith { case _ => + bitcoinClient.lookForSpendingTx(None, fundingTxId.toHex, fundingOutIndex) + } + } + + def scriptPubKeyToAddress(scriptPubKey: ByteVector) = Script.parse(scriptPubKey) match { + case OP_DUP :: OP_HASH160 :: OP_PUSHDATA(pubKeyHash, _) :: OP_EQUALVERIFY :: OP_CHECKSIG :: Nil => + Base58Check.encode(Base58.Prefix.PubkeyAddressTestnet, pubKeyHash) + case OP_HASH160 :: OP_PUSHDATA(scriptHash, _) :: OP_EQUAL :: Nil => + Base58Check.encode(Base58.Prefix.ScriptAddressTestnet, scriptHash) + case OP_0 :: OP_PUSHDATA(pubKeyHash, _) :: Nil if pubKeyHash.length == 20 => Bech32.encodeWitnessAddress("bcrt", 0, pubKeyHash) + case OP_0 :: OP_PUSHDATA(scriptHash, _) :: Nil if scriptHash.length == 32 => Bech32.encodeWitnessAddress("bcrt", 0, scriptHash) + case _ => throw new IllegalArgumentException(s"non standard scriptPubkey=$scriptPubKey") + } +} + +object RecoveryFSM { + + val actorName = "recovery-fsm-actor" + + def props(nodeURI: NodeURI, nodeParams: NodeParams, wallet: EclairWallet, bitcoinJsonRPCClient: BitcoinJsonRPCClient) = Props(new RecoveryFSM(nodeURI, nodeParams, wallet, bitcoinJsonRPCClient)) + + // formatter: off + sealed trait State + case object RECOVERY_WAIT_FOR_CONNECTION extends State + case object RECOVERY_WAIT_FOR_CHANNEL extends State + case object RECOVERY_WAIT_FOR_COMMIT_PUBLISHED extends State + case object RECOVERY_WAIT_FOR_CLAIM_PUBLISHED extends State + + sealed trait Data + case object Nothing extends Data + case class DATA_WAIT_FOR_CONNECTION(remoteNodeId: PublicKey) extends Data + case class DATA_WAIT_FOR_REMOTE_INFO(peer: ActorRef, remoteNodeId: PublicKey) extends Data + case class DATA_WAIT_FOR_REMOTE_PUBLISH(peer: ActorRef, channelReestablish: ChannelReestablish, fundingTx: Transaction, fundingOutIndex: Int) extends Data + case class DATA_WAIT_FOR_CLAIM_TX(peer: ActorRef, claimTx: Transaction) extends Data + + sealed trait Event + case class RecoveryConnect(remote: NodeURI) extends Event + case class ChannelFound(channelId: ByteVector32, reestablish: ChannelReestablish) extends Event + case class SendErrorToRemote(error: Error) extends Event + case object CheckCommitmentPublished extends Event + case object CheckClaimPublished extends Event + // formatter: on + + def recoverFundingKeyFromCommitment(nodeParams: NodeParams, commitTx: Transaction, channelReestablish: ChannelReestablish): PublicKey = { + val (key1, key2) = extractKeysFromWitness(commitTx.txIn.head.witness, channelReestablish) + + if(isOurFundingKey(nodeParams.keyManager, commitTx, key1, channelReestablish)) + key1 + else if(isOurFundingKey(nodeParams.keyManager, commitTx, key2, channelReestablish)) + key2 + else + throw new IllegalArgumentException("key not found, output trimmed?") + } + + def extractKeysFromWitness(witness: ScriptWitness, channelReestablish: ChannelReestablish): (PublicKey, PublicKey) = { + val ScriptWitness(Seq(ByteVector.empty, _, _, redeemScript)) = witness + + Script.parse(redeemScript) match { + case OP_2 :: OP_PUSHDATA(key1, _) :: OP_PUSHDATA(key2, _) :: OP_2 :: OP_CHECKMULTISIG :: Nil => (PublicKey(key1), PublicKey(key2)) + case _ => throw new IllegalArgumentException(s"commitTx redeem script doesn't match, script=$redeemScript") + } + } + + def isOurFundingKey(keyManager: KeyManager, commitTx: Transaction, key: PublicKey, channelReestablish: ChannelReestablish): Boolean = { + val channelKeyPath = KeyManager.channelKeyPath(key) + val paymentBasePoint = keyManager.paymentPoint(channelKeyPath).publicKey + val localPaymentKey = Generators.derivePubKey(paymentBasePoint, channelReestablish.myCurrentPerCommitmentPoint.get) + val toRemoteScriptPubkey = Script.write(Script.pay2wpkh(localPaymentKey)) + + commitTx.txOut.exists(_.publicKeyScript == toRemoteScriptPubkey) + } + +} \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/recovery/RecoveryPeer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/recovery/RecoveryPeer.scala new file mode 100644 index 0000000000..b9e20e3823 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/recovery/RecoveryPeer.scala @@ -0,0 +1,182 @@ +package fr.acinq.eclair.recovery + +import java.net.InetSocketAddress + +import akka.actor.{ActorRef, ActorSelection, FSM, OneForOneStrategy, PoisonPill, Status, SupervisorStrategy, Terminated} +import akka.event.Logging.MDC +import com.google.common.net.HostAndPort +import fr.acinq.bitcoin.ByteVector32 +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.io.Authenticator.{Authenticated, PendingAuth} +import fr.acinq.eclair.io._ +import fr.acinq.eclair.recovery.RecoveryPeer._ +import fr.acinq.eclair.recovery.RecoveryFSM.{ChannelFound, SendErrorToRemote} +import fr.acinq.eclair.router._ +import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelReestablish, GossipTimestampFilter, LightningMessage, NodeAddress, Ping, Pong, RoutingMessage} +import fr.acinq.eclair.{FSMDiagnosticActorLogging, Features, Logs, NodeParams, SimpleSupervisor, wire} +import scodec.bits.ByteVector + +class RecoveryPeer(val nodeParams: NodeParams, remoteNodeId: PublicKey) extends FSMDiagnosticActorLogging[RecoveryPeer.State, RecoveryPeer.Data] { + + def recoveryFSM: ActorSelection = context.system.actorSelection(context.system / RecoveryFSM.actorName) + + val authenticator = context.system.actorOf(SimpleSupervisor.props(Authenticator.props(nodeParams), "authenticator", SupervisorStrategy.Resume)) + authenticator ! self // register this actor as the receiver of the authentication handshake + + startWith(DISCONNECTED, DisconnectedData(address_opt = None)) + + when(DISCONNECTED) { + // sent by Client after establishing a TCP connection + case Event(p: PendingAuth, _) => + authenticator ! p + stay + + case Event(Peer.Connect(_, Some(address)), d: DisconnectedData) => + val inetAddress = Peer.hostAndPort2InetSocketAddress(address) + context.actorOf(Client.props(nodeParams, self, inetAddress, remoteNodeId, origin_opt = Some(sender()))) + stay using d.copy(address_opt = Some(inetAddress)) + + case Event(Authenticated(_, transport, remoteNodeId1, address, _, origin_opt), d: DisconnectedData) => + require(remoteNodeId == remoteNodeId1, s"invalid nodeid: $remoteNodeId != $remoteNodeId1") + log.debug(s"got authenticated connection to $remoteNodeId@${address.getHostString}:${address.getPort}") + transport ! TransportHandler.Listener(self) + context watch transport + val localInit = nodeParams.overrideFeatures.get(remoteNodeId) match { + case Some((gf, lf)) => wire.Init(globalFeatures = gf, localFeatures = lf) + case None => wire.Init(globalFeatures = nodeParams.globalFeatures, localFeatures = nodeParams.localFeatures) + } + log.info(s"using globalFeatures=${localInit.globalFeatures.toBin} and localFeatures=${localInit.localFeatures.toBin}") + transport ! localInit + + goto(INITIALIZING) using InitializingData(Some(address), transport, origin_opt, localInit) + } + + when(INITIALIZING) { + case Event(remoteInit: wire.Init, d: InitializingData) => + d.transport ! TransportHandler.ReadAck(remoteInit) + log.info(s"peer is using globalFeatures=${remoteInit.globalFeatures.toBin} and localFeatures=${remoteInit.localFeatures.toBin}") + if(!Features.areSupported(remoteInit.localFeatures)) { + log.warning(s"peer has unsupported features, continuing anyway") + } + goto(CONNECTED) using ConnectedData(d.address_opt, d.transport, d.localInit, remoteInit) + + case Event(Terminated(actor), d: InitializingData) if actor == d.transport => + log.warning(s"lost connection to $remoteNodeId") + goto(DISCONNECTED) using DisconnectedData(d.address_opt) + + case Event(Peer.Disconnect(nodeId), d: InitializingData) if nodeId == remoteNodeId => + log.info("disconnecting") + d.transport ! PoisonPill + stay + + case Event(unhandledMsg: LightningMessage, d: InitializingData) => + // we ack unhandled messages because we don't want to block further reads on the connection + d.transport ! TransportHandler.ReadAck(unhandledMsg) + log.warning(s"acking unhandled message $unhandledMsg") + stay + } + + when(CONNECTED) { + case Event(SendErrorToRemote(error), d: ConnectedData) => + log.info(s"recoveryFSM is sending an error to the peer") + d.transport ! error + stay + + case Event(msg: ChannelReestablish, d: ConnectedData) => + d.transport ! TransportHandler.ReadAck(msg) + recoveryFSM ! ChannelFound(msg.channelId, msg) + // when recovering we don't immediately reply channel_reestablish/error + stay + + + case Event(err@wire.Error(channelId, reason), d: ConnectedData) if channelId == CHANNELID_ZERO => + d.transport ! TransportHandler.ReadAck(err) + log.error(s"connection-level error! channelId=$channelId reason=${new String(reason.toArray)}") + d.transport ! wire.Error(err.channelId, UNKNOWN_CHANNEL_MESSAGE) + d.transport ! PoisonPill + goto(DISCONNECTED) using DisconnectedData(None) + + case Event(msg: wire.HasChannelId, d: ConnectedData) => + d.transport ! TransportHandler.ReadAck(msg) + log.info(s"received ${msg.getClass.getSimpleName} from $remoteNodeId") + stay + + case Event(msg: wire.HasTemporaryChannelId, d: ConnectedData) => + d.transport ! TransportHandler.ReadAck(msg) + log.info(s"received ${msg.getClass.getSimpleName} from $remoteNodeId") + stay + + case Event(msg: wire.RoutingMessage, _) => + log.info(s"peer sent us a ${msg.getClass.getSimpleName}") + // ACK and do nothing, we're in recovery mode + sender ! TransportHandler.ReadAck(msg) + stay + + case Event(readAck: TransportHandler.ReadAck, d: ConnectedData) => + // we just forward acks from router to transport + d.transport forward readAck + stay + + case Event(Peer.Disconnect(nodeId), d: ConnectedData) if nodeId == remoteNodeId => + log.info(s"disconnecting") + d.transport ! PoisonPill + stay + + case Event(Terminated(actor), d: ConnectedData) if actor == d.transport => + log.info(s"lost connection to $remoteNodeId") + stop(FSM.Normal) + + case Event(h: Authenticated, d: ConnectedData) => + log.info(s"got new transport while already connected, switching to new transport") + context unwatch d.transport + d.transport ! PoisonPill + self ! h + goto(DISCONNECTED) using DisconnectedData(d.address_opt) + + case Event(unhandledMsg: LightningMessage, d: ConnectedData) => + // we ack unhandled messages because we don't want to block further reads on the connection + d.transport ! TransportHandler.ReadAck(unhandledMsg) + log.warning(s"acking unhandled message $unhandledMsg") + stay + } + + onTransition { + case _ -> CONNECTED => + context.system.eventStream.publish(PeerConnected(self, remoteNodeId)) + case CONNECTED -> DISCONNECTED => + context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) + } + + onTermination { + case StopEvent(_, CONNECTED, d: ConnectedData) => + // the transition handler won't be fired if we go directly from CONNECTED to closed + context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) + } + + // a failing channel won't be restarted, it should handle its states + override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Stop } + + initialize() + + override def mdc(currentMessage: Any): MDC = Logs.mdc(remoteNodeId_opt = Some(remoteNodeId)) + + +} + +object RecoveryPeer { + + val CHANNELID_ZERO = ByteVector32.Zeroes + + val UNKNOWN_CHANNEL_MESSAGE = ByteVector.view("unknown channel".getBytes()) + + sealed trait Data + case class DisconnectedData(address_opt: Option[InetSocketAddress]) extends Data + case class InitializingData(address_opt: Option[InetSocketAddress], transport: ActorRef, origin_opt: Option[ActorRef], localInit: wire.Init) extends Data + case class ConnectedData(address_opt: Option[InetSocketAddress], transport: ActorRef, localInit: wire.Init, remoteInit: wire.Init) extends Data + + sealed trait State + case object DISCONNECTED extends State + case object INITIALIZING extends State + case object CONNECTED extends State +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala index 6d23200f09..4e1e060817 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala @@ -57,6 +57,7 @@ class EclairImplSpec extends TestKit(ActorSystem("test")) with fixture.FunSuiteL val switchboard = TestProbe() val paymentInitiator = TestProbe() val server = TestProbe() + val authenticator = TestProbe() val kit = Kit( TestConstants.Alice.nodeParams, system, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 3e2834b4fd..43bfd47ecc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -19,6 +19,7 @@ package fr.acinq.eclair import java.sql.{Connection, DriverManager} import java.util.concurrent.atomic.AtomicLong +import com.typesafe.config.ConfigFactory import fr.acinq.bitcoin.Crypto.PrivateKey import fr.acinq.bitcoin.{Block, ByteVector32, Script} import fr.acinq.eclair.NodeParams.BITCOIND @@ -66,6 +67,7 @@ object TestConstants { // This is a function, and not a val! When called will return a new NodeParams def nodeParams = NodeParams( + ConfigFactory.empty(), keyManager = keyManager, blockCount = new AtomicLong(defaultBlockHeight), alias = "alice", @@ -146,6 +148,7 @@ object TestConstants { val keyManager = new LocalKeyManager(seed, Block.RegtestGenesisBlock.hash) def nodeParams = NodeParams( + ConfigFactory.empty(), keyManager = keyManager, blockCount = new AtomicLong(defaultBlockHeight), alias = "bob", diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala index 066a799a99..c205dd21d6 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala @@ -20,7 +20,7 @@ import java.util.UUID import akka.testkit.{TestFSMRef, TestKitBase, TestProbe} import fr.acinq.bitcoin.Crypto.PublicKey -import fr.acinq.bitcoin.{ByteVector32, Crypto} +import fr.acinq.bitcoin.{ByteVector32, Crypto, Transaction} import fr.acinq.eclair.TestConstants.{Alice, Bob, TestFeeEstimator} import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.blockchain.fee.FeeTargets @@ -68,7 +68,7 @@ trait StateTestsHelperMethods extends TestKitBase with fixture.TestSuite with Pa } def reachNormal(setup: SetupFixture, - tags: Set[String] = Set.empty): Unit = { + tags: Set[String] = Set.empty): Transaction = { import setup._ val channelVersion = ChannelVersion.STANDARD val channelFlags = if (tags.contains("channels_public")) ChannelFlags.AnnounceChannel else ChannelFlags.Empty @@ -108,6 +108,7 @@ trait StateTestsHelperMethods extends TestKitBase with fixture.TestSuite with Pa // x2 because alice and bob share the same relayer channelUpdateListener.expectMsgType[LocalChannelUpdate] channelUpdateListener.expectMsgType[LocalChannelUpdate] + fundingTx } def makeCmdAdd(amount: MilliSatoshi, destination: PublicKey, currentBlockHeight: Long): (ByteVector32, CMD_ADD_HTLC) = { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala index 6f6afa84aa..791e559aed 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala @@ -20,20 +20,24 @@ import java.util.UUID import akka.actor.Status import akka.testkit.{TestActorRef, TestProbe} -import fr.acinq.bitcoin.Crypto.PrivateKey -import fr.acinq.bitcoin.{ByteVector32, ScriptFlags, Transaction} -import fr.acinq.eclair.TestConstants.Alice +import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} +import fr.acinq.bitcoin.{ByteVector32, ByteVector64, OutPoint, ScriptFlags, Transaction, TxOut} +import fr.acinq.eclair.TestConstants.{Alice, Bob} +import fr.acinq.eclair.blockchain.fee.FeeratesPerKw import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.blockchain.fee.FeeratesPerKw import fr.acinq.eclair.channel.Channel.LocalError import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.states.StateTestsHelperMethods import fr.acinq.eclair.payment.relay.CommandBuffer +import fr.acinq.eclair.crypto.ShaChain import fr.acinq.eclair.router.Announcements -import fr.acinq.eclair.transactions.Transactions.HtlcSuccessTx +import fr.acinq.eclair.transactions.{CommitmentSpec, Transactions} +import fr.acinq.eclair.transactions.Transactions.{CommitTx, HtlcSuccessTx} import fr.acinq.eclair.wire._ -import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, LongToBtcAmount, TestConstants, TestkitBaseClass, randomBytes32} +import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, LongToBtcAmount, TestConstants, TestkitBaseClass, UInt64, randomBytes32} import org.scalatest.{Outcome, Tag} +import scodec.bits._ import scala.concurrent.duration._ @@ -266,6 +270,207 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods { } + test("ask the last per-commitment-secret to remote and make it publish its commitment tx") { f => + import f._ + val sender = TestProbe() + + val oldAliceState = alice.stateData.asInstanceOf[DATA_NORMAL] + + // simulate a fulfilled payment to move forward the commitment index + addHtlc(250000000 msat, alice, bob, alice2bob, bob2alice) + sender.send(alice, CMD_SIGN) + sender.expectMsg("ok") + alice2bob.expectMsgType[CommitSig] + alice2bob.forward(bob) + bob2alice.expectMsgType[RevokeAndAck] + bob2alice.forward(alice) + bob2alice.expectMsgType[CommitSig] + bob2alice.forward(alice) + alice2bob.expectMsgType[RevokeAndAck] + alice2bob.forward(bob) + + addHtlc(210000000 msat, alice, bob, alice2bob, bob2alice) + sender.send(alice, CMD_SIGN) + sender.expectMsg("ok") + alice2bob.expectMsgType[CommitSig] + alice2bob.forward(bob) + bob2alice.expectMsgType[RevokeAndAck] + bob2alice.forward(alice) + bob2alice.expectMsgType[CommitSig] + bob2alice.forward(alice) + alice2bob.expectMsgType[RevokeAndAck] + alice2bob.forward(bob) + + addHtlc(210000000 msat, alice, bob, alice2bob, bob2alice) + sender.send(alice, CMD_SIGN) + sender.expectMsg("ok") + alice2bob.expectMsgType[CommitSig] + alice2bob.forward(bob) + bob2alice.expectMsgType[RevokeAndAck] + bob2alice.forward(alice) + bob2alice.expectMsgType[CommitSig] + bob2alice.forward(alice) + alice2bob.expectMsgType[RevokeAndAck] + alice2bob.forward(bob) + + // there have been 3 fully ack'ed and revoked commitments + val effectiveLastCommitmentIndex = 3 + assert(bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.index == effectiveLastCommitmentIndex) + + val mockAliceIndex = 1 // alice will claim to be at this index when reestablishing the channel - IT MUST BE STRICTLY SMALLER THAN THE ACTUAL INDEX + val mockBobIndex = 123 // alice will claim that BOB is at this index when reestablishing the channel + + // the mock state contain "random" data that is not really associated with the channel + // most importantly this data is made in such a way that it will trigger a channel failure from the remote + val mockAliceState = DATA_NORMAL( + commitments = Commitments( + channelVersion = ChannelVersion.STANDARD, + localParams = LocalParams( + nodeId = oldAliceState.commitments.localParams.nodeId, + fundingKeyPath = oldAliceState.commitments.localParams.fundingKeyPath, + dustLimit = 0 sat, + maxHtlcValueInFlightMsat = UInt64(0), + channelReserve = 0 sat, + toSelfDelay = CltvExpiryDelta(0), + htlcMinimum = 0 msat, + maxAcceptedHtlcs = 0, + isFunder = true, + defaultFinalScriptPubKey = oldAliceState.commitments.localParams.defaultFinalScriptPubKey, + globalFeatures = hex"00", + localFeatures = hex"00" + ), + remoteParams = RemoteParams( + Bob.nodeParams.nodeId, + dustLimit = 0 sat, + maxHtlcValueInFlightMsat = UInt64(0), + channelReserve = 0 sat, + htlcMinimum = 0 msat, + toSelfDelay = CltvExpiryDelta(0), + maxAcceptedHtlcs = 0, + fundingPubKey = PublicKey(hex"02184615bf2294acc075701892d7bd8aff28d78f84330e8931102e537c8dfe92a3"), + revocationBasepoint = PublicKey(hex"020beeba2c3015509a16558c35b930bed0763465cf7a9a9bc4555fd384d8d383f6"), + paymentBasepoint = PublicKey(hex"02e63d3b87e5269d96f1935563ca7c197609a35a928528484da1464eee117335c5"), + delayedPaymentBasepoint = PublicKey(hex"033dea641e24e7ae550f7c3a94bd9f23d55b26a649c79cd4a3febdf912c6c08281"), + htlcBasepoint = PublicKey(hex"0274a89988063045d3589b162ac6eea5fa0343bf34220648e92a636b1c2468a434"), + globalFeatures = hex"00", + localFeatures = hex"00" + ), + channelFlags = 1.toByte, + localCommit = LocalCommit( + mockAliceIndex, + spec = CommitmentSpec( + htlcs = Set(), + feeratePerKw = 234, + toLocal = 0 msat, + toRemote = 0 msat + ), + publishableTxs = PublishableTxs( + CommitTx( + input = Transactions.InputInfo( + outPoint = OutPoint(ByteVector32.Zeroes, 0), + txOut = TxOut(0 sat, ByteVector.empty), + redeemScript = ByteVector.empty + ), + tx = Transaction.read("0200000000010163c75c555d712a81998ddbaf9ce1d55b153fc7cb71441ae1782143bb6b04b95d0000000000a325818002bc893c0000000000220020ae8d04088ff67f3a0a9106adb84beb7530097b262ff91f8a9a79b7851b50857f00127a0000000000160014be0f04e9ed31b6ece46ca8c17e1ed233c71da0e9040047304402203b280f9655f132f4baa441261b1b590bec3a6fcd6d7180c929fa287f95d200f80220100d826d56362c65d09b8687ca470a31c1e2bb3ad9a41321ceba355d60b77b79014730440220539e34ab02cced861f9c39f9d14ece41f1ed6aed12443a9a4a88eb2792356be6022023dc4f18730a6471bdf9b640dfb831744b81249ffc50bd5a756ae85d8c6749c20147522102184615bf2294acc075701892d7bd8aff28d78f84330e8931102e537c8dfe92a3210367d50e7eab4a0ab0c6b92aa2dcf6cc55a02c3db157866b27a723b8ec47e1338152ae74f15a20") + ), + htlcTxsAndSigs = List.empty + ) + ), + remoteCommit = RemoteCommit( + mockBobIndex, + spec = CommitmentSpec( + htlcs = Set(), + feeratePerKw = 432, + toLocal = 0 msat, + toRemote = 0 msat + ), + txid = ByteVector32.fromValidHex("b70c3314af259029e7d11191ca0fe6ee407352dfaba59144df7f7ce5cc1c7b51"), + remotePerCommitmentPoint = PublicKey(hex"0286f6253405605640f6c19ea85a51267795163183a17df077050bf680ed62c224") + ), + localChanges = LocalChanges( + proposed = List.empty, + signed = List.empty, + acked = List.empty + ), + remoteChanges = RemoteChanges( + proposed = List.empty, + signed = List.empty, + acked = List.empty + ), + localNextHtlcId = 0, + remoteNextHtlcId = 0, + originChannels = Map(), + remoteNextCommitInfo = Right(PublicKey(hex"0386f6253405605640f6c19ea85a51267795163183a17df077050bf680ed62c224")), + commitInput = Transactions.InputInfo( + outPoint = OutPoint(ByteVector32.Zeroes, 0), + txOut = TxOut(0 sat, ByteVector.empty), + redeemScript = ByteVector.empty + ), + remotePerCommitmentSecrets = ShaChain.init, + channelId = ByteVector32.Zeroes + ), + shortChannelId = oldAliceState.shortChannelId, + buried = oldAliceState.buried, + channelAnnouncement = None, + channelUpdate = ChannelUpdate( + signature = ByteVector64.Zeroes, + chainHash = Alice.nodeParams.chainHash, + shortChannelId = oldAliceState.shortChannelId, + timestamp = 1556526043L, + messageFlags = 0.toByte, + channelFlags = 0.toByte, + cltvExpiryDelta = CltvExpiryDelta(144), + htlcMinimumMsat = 0 msat, + feeBaseMsat = 0 msat, + feeProportionalMillionths = 0, + htlcMaximumMsat = None + ), + localShutdown = None, + remoteShutdown = None + ) + + // we simulate a disconnection + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // alice's state data contains dummy values + alice.setState(OFFLINE, mockAliceState) + + // then we reconnect them + sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)) + sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)) + + // peers exchange channel_reestablish messages + val bobCommitments = bob.stateData.asInstanceOf[HasCommitments].commitments + val bobCurrentPerCommitmentPoint = Bob.keyManager.commitmentPoint(Bob.keyManager.channelKeyPath(bobCommitments.localParams, bobCommitments.channelVersion), bobCommitments.localCommit.index) + val aliceCurrentPerCommitmentPoint = Alice.keyManager.commitmentPoint(Alice.keyManager.channelKeyPath(mockAliceState.commitments.localParams, mockAliceState.commitments.channelVersion), mockAliceIndex) + // that's what we expect from Bob, Alice's per-commitment-secret generated using the latest commitment index + val aliceLatestPerCommitmentSecret = Alice.keyManager.commitmentSecret(Alice.keyManager.channelKeyPath(mockAliceState.commitments.localParams, mockAliceState.commitments.channelVersion), effectiveLastCommitmentIndex - 1) + + // Alice sends the indexes and commitment points according to her (mistaken) view of the commitment, Bob will let her know she's behind + alice2bob.expectMsg(ChannelReestablish(ByteVector32.Zeroes, mockAliceIndex + 1, mockBobIndex, Some(PrivateKey(ByteVector32.Zeroes)), Some(aliceCurrentPerCommitmentPoint))) + bob2alice.expectMsg(ChannelReestablish(oldAliceState.commitments.channelId, effectiveLastCommitmentIndex + 1, effectiveLastCommitmentIndex, Some(aliceLatestPerCommitmentSecret), Some(bobCurrentPerCommitmentPoint))) + + // alice then realizes it has an old state... + bob2alice.forward(alice) + // ... and ask bob to publish its current commitment + val error = alice2bob.expectMsgType[Error] + assert(new String(error.data.toArray) === PleasePublishYourCommitment(channelId(alice)).getMessage) + + // alice now waits for bob to publish its commitment + awaitCond(alice.stateName == WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) + + // bob is nice and publishes its commitment + val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.publishableTxs.commitTx.tx + sender.send(alice, WatchEventSpent(BITCOIN_FUNDING_SPENT, bobCommitTx)) + + // alice is able to claim its main output + val claimMainOutput = alice2blockchain.expectMsgType[PublishAsap].tx + Transaction.correctlySpends(claimMainOutput, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS) + } + test("discover that they have a more recent commit than the one we know") { f => import f._ val sender = TestProbe() diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/recovery/RecoveryFSMSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/recovery/RecoveryFSMSpec.scala new file mode 100644 index 0000000000..791559d8bd --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/recovery/RecoveryFSMSpec.scala @@ -0,0 +1,126 @@ +package fr.acinq.eclair.recovery + +import akka.testkit.{TestFSMRef, TestProbe} +import fr.acinq.eclair.{TestConstants, TestkitBaseClass} +import fr.acinq.eclair.channel.{CMD_SIGN, Channel, DATA_NORMAL, Data, INPUT_DISCONNECTED, INPUT_RECONNECTED, NORMAL, OFFLINE, State} +import fr.acinq.eclair.channel.states.StateTestsHelperMethods +import RecoveryFSM._ +import com.google.common.net.HostAndPort +import fr.acinq.bitcoin.Transaction +import fr.acinq.eclair +import fr.acinq.eclair.blockchain.TestWallet +import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinJsonRPCClient +import fr.acinq.eclair.io.{NodeURI, PeerConnected} +import fr.acinq.eclair.wire.{ChannelReestablish, CommitSig, Init, RevokeAndAck} +import org.json4s.JsonAST +import org.json4s.JsonAST.{JNull, JObject, JString} +import org.mockito.scalatest.{IdiomaticMockito, MockitoSugar} +import org.scalatest.{FunSuite, FunSuiteLike, Outcome} + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ + +class RecoveryFSMSpec extends TestkitBaseClass with StateTestsHelperMethods with IdiomaticMockito { + + type FixtureParam = SetupFixtureFSM + + case class SetupFixtureFSM(alice: TestFSMRef[State, Data, Channel], + bob: TestFSMRef[State, Data, Channel], + bob2alice: TestProbe, + fundingTx: Transaction) + + + override def withFixture(test: OneArgTest): Outcome = { + val setup = init() + import setup._ + within(30 seconds) { + val fundingTx = reachNormal(setup, test.tags) + awaitCond(alice.stateName == NORMAL) + awaitCond(bob.stateName == NORMAL) + withFixture(test.toNoArgTest(SetupFixtureFSM(alice, bob, bob2alice, fundingTx))) + } + } + + test("recover our funding key and channel keypath from the remote commit tx") { f => + import f._ + + val probe = TestProbe() + + val aliceFundingKey = TestConstants.Alice.nodeParams.keyManager.fundingPublicKey(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localParams.fundingKeyPath).publicKey + val remotePublishedCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.publishableTxs.commitTx.tx + + // disconnect the peers to obtain a channel_reestablish on reconnection + probe.send(alice, INPUT_DISCONNECTED) + probe.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + val aliceInit = Init(TestConstants.Alice.nodeParams.globalFeatures, TestConstants.Alice.nodeParams.localFeatures) + val bobInit = Init(TestConstants.Bob.nodeParams.globalFeatures, TestConstants.Bob.nodeParams.localFeatures) + + // reconnect the input in bob's channel, bob will send to alice a channel_reestablish + probe.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)) + val bobAliceReestablish = bob2alice.expectMsgType[ChannelReestablish] + + val (key1, key2) = extractKeysFromWitness(remotePublishedCommitTx.txIn.head.witness, bobAliceReestablish) + assert(aliceFundingKey == key1 || aliceFundingKey == key2) + assert(isOurFundingKey(TestConstants.Alice.nodeParams.keyManager, remotePublishedCommitTx, aliceFundingKey, bobAliceReestablish)) + + val recoveredFundingKey = recoverFundingKeyFromCommitment(TestConstants.Alice.nodeParams, remotePublishedCommitTx, bobAliceReestablish) + assert(recoveredFundingKey == aliceFundingKey) + } + + test("find the funding transaction id from channel id"){ f => + import f._ + + val probe = TestProbe() + val aliceStateData = f.alice.stateData.asInstanceOf[DATA_NORMAL] + val fundingId = aliceStateData.commitments.commitInput.outPoint.txid + val channelId = aliceStateData.commitments.channelId + + // disconnect the peers to obtain a channel_reestablish on reconnection + probe.send(alice, INPUT_DISCONNECTED) + probe.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + val aliceInit = Init(TestConstants.Alice.nodeParams.globalFeatures, TestConstants.Alice.nodeParams.localFeatures) + val bobInit = Init(TestConstants.Bob.nodeParams.globalFeatures, TestConstants.Bob.nodeParams.localFeatures) + + // reconnect the input in bob's channel, bob will send to alice a channel_reestablish + probe.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)) + val bobAliceReestablish = bob2alice.expectMsgType[ChannelReestablish] + + val nodeParams = TestConstants.Alice.nodeParams + val remotePeer = TestProbe() + val remotePeerId = TestConstants.Bob.nodeParams.nodeId + + // given the channel id the recovery FSM guesses several funding tx ids, this mock rpc client replies with the funding transaction + // only if the query is correct, which means when the parameter txid is equal to the funding txid, + val bitcoinRpcClient = new BitcoinJsonRPCClient { + override def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JsonAST.JValue] = method match { + case "getrawtransaction" if params.head.asInstanceOf[String] == fundingId.toHex => Future.successful( + JString(Transaction.write(fundingTx).toHex) + ) + case _ => Future.successful(JNull) + } + } + + val recoveryFSM = TestFSMRef(new RecoveryFSM(NodeURI(remotePeerId, HostAndPort.fromHost("localhost")), nodeParams, new TestWallet, bitcoinRpcClient)) + recoveryFSM.setState(RECOVERY_WAIT_FOR_CONNECTION, DATA_WAIT_FOR_CONNECTION(remotePeerId)) + + // skip peer connection + probe.send(recoveryFSM, PeerConnected(remotePeer.ref, remotePeerId)) + awaitCond(recoveryFSM.stateName == RECOVERY_WAIT_FOR_CHANNEL) + + // send a ChannelFound event with channel_reestablish to the recoveryFSM -- the channel has been found + probe.send(recoveryFSM, ChannelFound(channelId, bobAliceReestablish)) + + // the recovery FSM replies with an error asking the remote to publish its commitment + val sendError = remotePeer.expectMsgType[SendErrorToRemote] + assert(sendError.error.toAscii.contains("please publish your local commitment")) + + awaitCond(recoveryFSM.stateName == RECOVERY_WAIT_FOR_COMMIT_PUBLISHED) + } + +} diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala index 638ecad1e3..1d3fdac526 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala @@ -23,6 +23,7 @@ import akka.http.scaladsl.Http import akka.stream.{ActorMaterializer, BindFailedException} import com.typesafe.config.Config import fr.acinq.eclair.api.Service +import fr.acinq.eclair.recovery.RecoveryTool import grizzled.slf4j.Logging import kamon.Kamon @@ -52,6 +53,10 @@ object Boot extends App with Logging { case Success(kit) => startApiServiceIfEnabled(setup.config, kit) plugins.foreach(_.onKit(kit)) + if(setup.config.hasPath("recovery-tool")){ + RecoveryTool.interactiveRecovery(kit) + } + case Failure(t) => onError(t) } } catch { diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/recovery/RecoveryTool.scala b/eclair-node/src/main/scala/fr/acinq/eclair/recovery/RecoveryTool.scala new file mode 100644 index 0000000000..6f0eba5d9e --- /dev/null +++ b/eclair-node/src/main/scala/fr/acinq/eclair/recovery/RecoveryTool.scala @@ -0,0 +1,45 @@ +package fr.acinq.eclair.recovery + +import akka.actor.Props +import com.softwaremill.sttp.okhttp.OkHttpFutureBackend +import fr.acinq.eclair.io.{NodeURI, Peer} +import fr.acinq.eclair.Kit +import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BitcoinJsonRPCClient} +import fr.acinq.eclair.recovery.RecoveryFSM.RecoveryConnect +import grizzled.slf4j.Logging + +import scala.util.{Failure, Random, Success, Try} + +object RecoveryTool extends Logging { + + private lazy val scanner = new java.util.Scanner(System.in).useDelimiter("\\n") + + def interactiveRecovery(appKit: Kit): Unit = { + println(s"\n ### Welcome to the eclair recovery tool ### \n") + val nodeUri = getInput[NodeURI]("Please insert the URI of the target node: ", NodeURI.parse) + println(s"### Attempting channel recovery now - good luck! ###") + + implicit val shttp = OkHttpFutureBackend() + + val bitcoinRpcClient = new BasicBitcoinJsonRPCClient( + user = appKit.nodeParams.config.getString("bitcoind.rpcuser"), + password = appKit.nodeParams.config.getString("bitcoind.rpcpassword"), + host = appKit.nodeParams.config.getString("bitcoind.host"), + port = appKit.nodeParams.config.getInt("bitcoind.rpcport") + ) + + appKit.system.actorOf(RecoveryFSM.props(nodeUri, appKit.nodeParams, appKit.wallet, bitcoinRpcClient), RecoveryFSM.actorName) + } + + private def getInput[T](msg: String, parse: String => T): T = { + do { + print(msg) + Try(parse(scanner.next())) match { + case Success(someT) => return someT + case Failure(thr) => println(s"Error: ${thr.getMessage}") + } + } while (true) + + throw new IllegalArgumentException("Unable to get input") + } +}