diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index 7b7547a5be..e90465e9bf 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -19,6 +19,7 @@ package fr.acinq.eclair.channel.fsm import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, actorRefAdapter} import akka.actor.{Actor, ActorContext, ActorRef, FSM, OneForOneStrategy, PossiblyHarmful, Props, SupervisorStrategy, typed} +import akka.event.EventStream import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, Transaction} @@ -160,7 +161,7 @@ object Channel { } -class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val remoteNodeId: PublicKey, val blockchain: typed.ActorRef[ZmqWatcher.Command], val relayer: ActorRef, val txPublisherFactory: Channel.TxPublisherFactory, val origin_opt: Option[ActorRef] = None)(implicit val ec: ExecutionContext = ExecutionContext.Implicits.global) +class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val remoteNodeId: PublicKey, val blockchain: typed.ActorRef[ZmqWatcher.Command], val relayer: ActorRef, val txPublisherFactory: Channel.TxPublisherFactory, val origin_opt: Option[ActorRef] = None, eventStream_opt: Option[EventStream] = None)(implicit val ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[ChannelState, ChannelData] with FSMDiagnosticActorLogging[ChannelState, ChannelData] with ChannelOpenSingleFunder @@ -175,6 +176,9 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val // we pass these to helpers classes so that they have the logging context implicit def implicitLog: akka.event.DiagnosticLoggingAdapter = diagLog + // this allows overriding the default eventstream in tests + val eventStream = eventStream_opt.getOrElse(context.system.eventStream) + // we assume that the peer is the channel's parent val peer = context.parent // noinspection ActorMutableStateInspection @@ -188,11 +192,11 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val val txPublisher = txPublisherFactory.spawnTxPublisher(context, remoteNodeId) // this will be used to detect htlc timeouts - context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight]) + eventStream.subscribe(self, classOf[CurrentBlockHeight]) // the constant delay by which we delay processing of blocks (it will be smoothened among all channels) private val blockProcessingDelay = Random.nextLong(nodeParams.channelConf.maxBlockProcessingDelay.toMillis + 1).millis // this will be used to make sure the current commitment fee is up-to-date - context.system.eventStream.subscribe(self, classOf[CurrentFeerates]) + eventStream.subscribe(self, classOf[CurrentFeerates]) /* 8888888 888b 888 8888888 88888888888 @@ -209,7 +213,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val when(WAIT_FOR_INIT_INTERNAL)(handleExceptions { case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, fundingTxFeeratePerKw, localParams, remote, remoteInit, channelFlags, channelConfig, channelType), Nothing) => - context.system.eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isInitiator = true, temporaryChannelId, initialFeeratePerKw, Some(fundingTxFeeratePerKw))) + eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isInitiator = true, temporaryChannelId, initialFeeratePerKw, Some(fundingTxFeeratePerKw))) activeConnection = remote txPublisher ! SetChannelId(remoteNodeId, temporaryChannelId) val fundingPubKey = keyManager.fundingPublicKey(localParams.fundingKeyPath).publicKey @@ -248,7 +252,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val case Event(INPUT_RESTORED(data), _) => log.debug("restoring channel") - context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data)) + eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data)) txPublisher ! SetChannelId(remoteNodeId, data.channelId) data match { // NB: order matters! @@ -299,7 +303,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val case normal: DATA_NORMAL => watchFundingTx(data.commitments) - context.system.eventStream.publish(ShortChannelIdAssigned(self, normal.channelId, normal.channelUpdate.shortChannelId, None)) + eventStream.publish(ShortChannelIdAssigned(self, normal.channelId, normal.channelUpdate.shortChannelId, None)) // we check the configuration because the values for channel_update may have changed while eclair was down val fees = getRelayFees(nodeParams, remoteNodeId, data.commitments) @@ -355,7 +359,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val Commitments.sendAdd(d.commitments, c, nodeParams.currentBlockHeight, nodeParams.onChainFeeConf) match { case Right((commitments1, add)) => if (c.commit) self ! CMD_SIGN() - context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) + eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate)) } @@ -370,7 +374,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val Commitments.sendFulfill(d.commitments, c) match { case Right((commitments1, fulfill)) => if (c.commit) self ! CMD_SIGN() - context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) + eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fulfill case Left(cause) => // we acknowledge the command right away in case of failure @@ -390,7 +394,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val Commitments.sendFail(d.commitments, c, nodeParams.privateKey) match { case Right((commitments1, fail)) => if (c.commit) self ! CMD_SIGN() - context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) + eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fail case Left(cause) => // we acknowledge the command right away in case of failure @@ -401,7 +405,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val Commitments.sendFailMalformed(d.commitments, c) match { case Right((commitments1, fail)) => if (c.commit) self ! CMD_SIGN() - context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) + eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fail case Left(cause) => // we acknowledge the command right away in case of failure @@ -424,7 +428,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val Commitments.sendFee(d.commitments, c, nodeParams.onChainFeeConf) match { case Right((commitments1, fee)) => if (c.commit) self ! CMD_SIGN() - context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) + eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fee case Left(cause) => handleCommandError(cause, c) } @@ -459,7 +463,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val log.info("updating channel_update aboveReserve={}", Helpers.aboveReserve(commitments1)) self ! BroadcastChannelUpdate(AboveReserve) } - context.system.eventStream.publish(ChannelSignatureSent(self, commitments1)) + eventStream.publish(ChannelSignatureSent(self, commitments1)) // we expect a quick response from our peer startSingleTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), nodeParams.channelConf.revocationTimeout) handleCommandSuccess(c, d.copy(commitments = commitments1)).storing().sending(commit).acking(commitments1.localChanges.signed) @@ -481,9 +485,9 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val } if (d.commitments.availableBalanceForSend != commitments1.availableBalanceForSend) { // we send this event only when our balance changes - context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) + eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1)) } - context.system.eventStream.publish(ChannelSignatureReceived(self, commitments1)) + eventStream.publish(ChannelSignatureReceived(self, commitments1)) stay() using d.copy(commitments = commitments1) storing() sending revocation case Left(cause) => handleLocalError(cause, d, Some(commit)) } @@ -620,7 +624,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val val channelUpdate = if (shortChannelId != d.shortChannelId) { log.info(s"short channel id changed, probably due to a chain reorg: old=${d.shortChannelId} new=$shortChannelId") // we need to re-announce this shortChannelId - context.system.eventStream.publish(ShortChannelIdAssigned(self, d.channelId, shortChannelId, Some(d.shortChannelId))) + eventStream.publish(ShortChannelIdAssigned(self, d.channelId, shortChannelId, Some(d.shortChannelId))) // we re-announce the channelUpdate for the same reason Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments)) } else d.channelUpdate @@ -816,7 +820,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val log.debug(s"adding paymentHash=${htlc.paymentHash} cltvExpiry=${htlc.cltvExpiry} to htlcs db for commitNumber=$nextCommitNumber") nodeParams.db.channels.addHtlcInfo(d.channelId, nextCommitNumber, htlc.paymentHash, htlc.cltvExpiry) } - context.system.eventStream.publish(ChannelSignatureSent(self, commitments1)) + eventStream.publish(ChannelSignatureSent(self, commitments1)) // we expect a quick response from our peer startSingleTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), nodeParams.channelConf.revocationTimeout) handleCommandSuccess(c, d.copy(commitments = commitments1)).storing().sending(commit).acking(commitments1.localChanges.signed) @@ -832,7 +836,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val case Right((commitments1, revocation)) => // we always reply with a revocation log.debug("received a new sig:\n{}", Commitments.specs2String(commitments1)) - context.system.eventStream.publish(ChannelSignatureReceived(self, commitments1)) + eventStream.publish(ChannelSignatureReceived(self, commitments1)) if (commitments1.hasNoPendingHtlcsOrFeeUpdate) { if (d.commitments.localParams.isInitiator) { // we are the channel initiator, need to initiate the negotiation by sending the first closing_signed @@ -1139,7 +1143,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val case Event(WatchTxConfirmedTriggered(blockHeight, _, tx), d: DATA_CLOSING) => log.info(s"txid=${tx.txid} has reached mindepth, updating closing state") - context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, tx)) + eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, tx)) // first we check if this tx belongs to one of the current local/remote commits, update it and update the channel data val d1 = d.copy( localCommitPublished = d.localCommitPublished.map(localCommitPublished => { @@ -1158,7 +1162,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val ) // if the local commitment tx just got confirmed, let's send an event telling when we will get the main output refund if (d1.localCommitPublished.exists(_.commitTx.txid == tx.txid)) { - context.system.eventStream.publish(LocalCommitConfirmed(self, remoteNodeId, d.channelId, blockHeight + d.commitments.remoteParams.toSelfDelay.toInt)) + eventStream.publish(LocalCommitConfirmed(self, remoteNodeId, d.channelId, blockHeight + d.commitments.remoteParams.toSelfDelay.toInt)) } // we may need to fail some htlcs in case a commitment tx was published and they have reached the timeout threshold val timedOutHtlcs = Closing.isClosingTypeAlreadyKnown(d1) match { @@ -1191,14 +1195,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val Closing .onChainOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx) .map(add => (add, d.commitments.originChannels.get(add.id).collect { case o: Origin.Local => o.id })) // we resolve the payment id if this was a local payment - .collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, amount = add.amountMsat, add.paymentHash)) } + .collect { case (add, Some(id)) => eventStream.publish(PaymentSettlingOnChain(id, amount = add.amountMsat, add.paymentHash)) } // then let's see if any of the possible close scenarios can be considered done val closingType_opt = Closing.isClosed(d1, Some(tx)) // finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay() (note that we don't store the state) closingType_opt match { case Some(closingType) => log.info(s"channel closed (type=${closingType_opt.map(c => EventType.Closed(c).label).getOrElse("UnknownYet")})") - context.system.eventStream.publish(ChannelClosed(self, d.channelId, closingType, d.commitments)) + eventStream.publish(ChannelClosed(self, d.channelId, closingType, d.commitments)) goto(CLOSED) using d1 storing() case None => stay() using d1 storing() @@ -1581,7 +1585,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val case d: PersistentChannelData => Some(d.commitments) case _: TransientChannelData => None } - context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt)) + eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt)) } if (nextState == CLOSED) { @@ -1628,9 +1632,9 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val emitEvent_opt.foreach { case EmitLocalChannelUpdate(d) => log.info("emitting channel_update={} enabled={} ", d.channelUpdate, d.channelUpdate.channelFlags.isEnabled) - context.system.eventStream.publish(LocalChannelUpdate(self, d.channelId, d.shortChannelId, d.commitments.remoteParams.nodeId, d.channelAnnouncement, d.channelUpdate, d.commitments)) + eventStream.publish(LocalChannelUpdate(self, d.channelId, d.shortChannelId, d.commitments.remoteParams.nodeId, d.channelAnnouncement, d.channelUpdate, d.commitments)) case EmitLocalChannelDown(d) => - context.system.eventStream.publish(LocalChannelDown(self, d.channelId, d.shortChannelId, d.commitments.remoteParams.nodeId)) + eventStream.publish(LocalChannelDown(self, d.channelId, d.shortChannelId, d.commitments.remoteParams.nodeId)) } // When we change our channel update parameters (e.g. relay fees), we want to advertise it to other actors. @@ -1752,7 +1756,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val log.warning(s"${cause.getMessage} while processing cmd=${c.getClass.getSimpleName} in state=$stateName") val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo replyTo ! RES_ADD_FAILED(c, cause, channelUpdate) - context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = false)) + eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = false)) stay() } @@ -1763,7 +1767,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val case hasReplyTo: HasReplyToCommand => if (hasReplyTo.replyTo == ActorRef.noSender) Some(sender()) else Some(hasReplyTo.replyTo) } replyTo_opt.foreach(replyTo => replyTo ! RES_FAILURE(c, cause)) - context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = false)) + eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = false)) stay() } @@ -1834,7 +1838,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val private def maybeEmitChannelUpdateChangedEvent(newUpdate: ChannelUpdate, oldUpdate_opt: Option[ChannelUpdate], d: DATA_NORMAL): Unit = { if (oldUpdate_opt.isEmpty || !Announcements.areSameIgnoreFlags(newUpdate, oldUpdate_opt.get)) { - context.system.eventStream.publish(ChannelUpdateParametersChanged(self, d.channelId, newUpdate.shortChannelId, d.commitments.remoteNodeId, newUpdate)) + eventStream.publish(ChannelUpdateParametersChanged(self, d.channelId, newUpdate.shortChannelId, d.commitments.remoteNodeId, newUpdate)) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunder.scala index 932f8016a1..88b1091d32 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunder.scala @@ -77,7 +77,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers { Helpers.validateParamsFundee(nodeParams, channelType, localParams.initFeatures, open, remoteNodeId, remoteInit.features) match { case Left(t) => handleLocalError(t, d, Some(open)) case Right((channelFeatures, remoteShutdownScript)) => - context.system.eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isInitiator = false, open.temporaryChannelId, open.feeratePerKw, None)) + eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isInitiator = false, open.temporaryChannelId, open.feeratePerKw, None)) val fundingPubkey = keyManager.fundingPublicKey(localParams.fundingKeyPath).publicKey val channelKeyPath = keyManager.keyPath(localParams, channelConfig) val minimumDepth = Helpers.minDepthForFunding(nodeParams.channelConf, open.fundingSatoshis) @@ -192,7 +192,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers { val channelId = toLongId(fundingTx.hash, fundingTxOutputIndex) peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages txPublisher ! SetChannelId(remoteNodeId, channelId) - context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId)) + eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId)) // NB: we don't send a ChannelSignatureSent for the first commit goto(WAIT_FOR_FUNDING_SIGNED) using DATA_WAIT_FOR_FUNDING_SIGNED(channelId, localParams, remoteParams, fundingTx, fundingTxFee, localSpec, localCommitTx, RemoteCommit(0, remoteSpec, remoteCommitTx.tx.txid, remoteFirstPerCommitmentPoint), open.channelFlags, channelConfig, channelFeatures, fundingCreated) sending fundingCreated } @@ -249,8 +249,8 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers { commitInput, ShaChain.init) peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages txPublisher ! SetChannelId(remoteNodeId, channelId) - context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId)) - context.system.eventStream.publish(ChannelSignatureReceived(self, commitments)) + eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId)) + eventStream.publish(ChannelSignatureReceived(self, commitments)) // NB: we don't send a ChannelSignatureSent for the first commit log.info(s"waiting for them to publish the funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}") watchFundingTx(commitments) @@ -291,7 +291,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers { remoteNextCommitInfo = Right(randomKey().publicKey), // we will receive their next per-commitment point in the next message, so we temporarily put a random byte array commitInput, ShaChain.init) val blockHeight = nodeParams.currentBlockHeight - context.system.eventStream.publish(ChannelSignatureReceived(self, commitments)) + eventStream.publish(ChannelSignatureReceived(self, commitments)) log.info(s"publishing funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}") watchFundingTx(commitments) blockchain ! WatchFundingConfirmed(self, commitInput.outPoint.txid, nodeParams.channelConf.minDepthBlocks) @@ -302,7 +302,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers { def publishFundingTx(): Unit = { wallet.commit(fundingTx).onComplete { case Success(true) => - context.system.eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, fundingTxFee, "funding")) + eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, fundingTxFee, "funding")) channelOpenReplyToUser(Right(ChannelOpenResponse.ChannelOpened(channelId))) case Success(false) => channelOpenReplyToUser(Left(LocalError(new RuntimeException("couldn't publish funding tx")))) @@ -351,8 +351,8 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers { case Success(_) => log.info(s"channelId=${commitments.channelId} was confirmed at blockHeight=$blockHeight txIndex=$txIndex") blockchain ! WatchFundingLost(self, commitments.commitInput.outPoint.txid, nodeParams.channelConf.minDepthBlocks) - if (!d.commitments.localParams.isInitiator) context.system.eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, 0 sat, "funding")) - context.system.eventStream.publish(TransactionConfirmed(commitments.channelId, remoteNodeId, fundingTx)) + if (!d.commitments.localParams.isInitiator) eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, 0 sat, "funding")) + eventStream.publish(TransactionConfirmed(commitments.channelId, remoteNodeId, fundingTx)) val channelKeyPath = keyManager.keyPath(d.commitments.localParams, commitments.channelConfig) val nextPerCommitmentPoint = keyManager.commitmentPoint(channelKeyPath, 1) val fundingLocked = FundingLocked(commitments.channelId, nextPerCommitmentPoint) @@ -400,7 +400,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers { case Event(FundingLocked(_, nextPerCommitmentPoint, _), d@DATA_WAIT_FOR_FUNDING_LOCKED(commitments, shortChannelId, _)) => // used to get the final shortChannelId, used in announcements (if minDepth >= ANNOUNCEMENTS_MINCONF this event will fire instantly) blockchain ! WatchFundingDeeplyBuried(self, commitments.commitInput.outPoint.txid, ANNOUNCEMENTS_MINCONF) - context.system.eventStream.publish(ShortChannelIdAssigned(self, commitments.channelId, shortChannelId, None)) + eventStream.publish(ShortChannelIdAssigned(self, commitments.channelId, shortChannelId, None)) // we create a channel_update early so that we can use it to send payments through this channel, but it won't be propagated to other nodes since the channel is not yet announced val fees = getRelayFees(nodeParams, remoteNodeId, commitments) val initialChannelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, shortChannelId, nodeParams.channelConf.expiryDelta, d.commitments.remoteParams.htlcMinimum, fees.feeBase, fees.feeProportionalMillionths, commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala index 3fd98cd9a3..e2f7edd47b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala @@ -52,7 +52,7 @@ trait CommonHandlers { case d: PersistentChannelData => log.debug("updating database record for channelId={}", d.channelId) nodeParams.db.channels.addOrUpdateChannel(d) - context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d)) + eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d)) state case _: TransientChannelData => log.error(s"can't store data=${state.stateData} in state=${state.stateName}") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala index 096f3bd610..f9d0c707b3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala @@ -80,7 +80,7 @@ trait ErrorHandlers extends CommonHandlers { } val error = Error(d.channelId, cause.getMessage) - context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = true)) + eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = true)) d match { case dd: PersistentChannelData if Closing.nothingAtStake(dd) => goto(CLOSED) @@ -120,7 +120,7 @@ trait ErrorHandlers extends CommonHandlers { def handleRemoteError(e: Error, d: ChannelData) = { // see BOLT 1: only print out data verbatim if is composed of printable ASCII characters log.error(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}") - context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, RemoteError(e), isFatal = true)) + eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, RemoteError(e), isFatal = true)) d match { case _: DATA_CLOSING => stay() // nothing to do, there is already a spending tx published @@ -221,7 +221,7 @@ trait ErrorHandlers extends CommonHandlers { log.warning(s"they published their current commit in txid=${commitTx.txid}") require(commitTx.txid == d.commitments.remoteCommit.txid, "txid mismatch") - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "remote-commit")) + eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "remote-commit")) val remoteCommitPublished = Closing.RemoteClose.claimCommitTxOutputs(keyManager, d.commitments, d.commitments.remoteCommit, commitTx, nodeParams.currentBlockHeight, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets) val nextData = d match { case closing: DATA_CLOSING => closing.copy(remoteCommitPublished = Some(remoteCommitPublished)) @@ -234,7 +234,7 @@ trait ErrorHandlers extends CommonHandlers { def handleRemoteSpentFuture(commitTx: Transaction, d: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) = { log.warning(s"they published their future commit (because we asked them to) in txid=${commitTx.txid}") - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "future-remote-commit")) + eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "future-remote-commit")) val remotePerCommitmentPoint = d.remoteChannelReestablish.myCurrentPerCommitmentPoint val remoteCommitPublished = RemoteCommitPublished( commitTx = commitTx, @@ -253,7 +253,7 @@ trait ErrorHandlers extends CommonHandlers { val remoteCommit = waitingForRevocation.nextRemoteCommit require(commitTx.txid == remoteCommit.txid, "txid mismatch") - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "next-remote-commit")) + eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "next-remote-commit")) val remoteCommitPublished = Closing.RemoteClose.claimCommitTxOutputs(keyManager, d.commitments, remoteCommit, commitTx, nodeParams.currentBlockHeight, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets) val nextData = d match { case closing: DATA_CLOSING => closing.copy(nextRemoteCommitPublished = Some(remoteCommitPublished)) @@ -287,7 +287,7 @@ trait ErrorHandlers extends CommonHandlers { Closing.RevokedClose.claimCommitTxOutputs(keyManager, d.commitments, tx, nodeParams.db.channels, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets) match { case Some(revokedCommitPublished) => log.warning(s"txid=${tx.txid} was a revoked commitment, publishing the penalty tx") - context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(d.commitments.commitInput, tx, d.commitments.localParams.isInitiator), "revoked-commit")) + eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(d.commitments.commitInput, tx, d.commitments.localParams.isInitiator), "revoked-commit")) val exc = FundingTxSpent(d.channelId, tx) val error = Error(d.channelId, exc.getMessage) @@ -301,7 +301,7 @@ trait ErrorHandlers extends CommonHandlers { case None => // the published tx was neither their current commitment nor a revoked one log.error(s"couldn't identify txid=${tx.txid}, something very bad is going on!!!") - context.system.eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team.")) + eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team.")) goto(ERR_INFORMATION_LEAK) } } @@ -326,7 +326,7 @@ trait ErrorHandlers extends CommonHandlers { def handleInformationLeak(tx: Transaction, d: PersistentChannelData) = { // this is never supposed to happen !! log.error(s"our funding tx ${d.commitments.commitInput.outPoint.txid} was spent by txid=${tx.txid}!!") - context.system.eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team.")) + eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team.")) val exc = FundingTxSpent(d.channelId, tx) val error = Error(d.channelId, exc.getMessage) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/FundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/FundingHandlers.scala index f478859c2f..098114167e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/FundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/FundingHandlers.scala @@ -108,7 +108,7 @@ trait FundingHandlers extends CommonHandlers { val error = Error(d.channelId, exc.getMessage) // NB: we don't use the handleLocalError handler because it would result in the commit tx being published, which we don't want: // implementation *guarantees* that in case of BITCOIN_FUNDING_PUBLISH_FAILED, the funding tx hasn't and will never be published, so we can close the channel right away - context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true)) + eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true)) goto(CLOSED) sending error } @@ -116,7 +116,7 @@ trait FundingHandlers extends CommonHandlers { log.warning(s"funding tx hasn't been confirmed in time, cancelling channel delay=$FUNDING_TIMEOUT_FUNDEE") val exc = FundingTxTimedout(d.channelId) val error = Error(d.channelId, exc.getMessage) - context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true)) + eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true)) goto(CLOSED) sending error } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 2e2ecdcde1..81f3f7787d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.router import akka.Done import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated, typed} -import akka.event.DiagnosticLoggingAdapter +import akka.event.{DiagnosticLoggingAdapter, EventStream} import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi} @@ -50,7 +50,7 @@ import scala.util.{Random, Try} /** * Created by PM on 24/05/2016. */ -class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] { +class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None, eventStream_opt: Option[EventStream] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] { import Router._ @@ -59,9 +59,12 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm // we pass these to helpers classes so that they have the logging context implicit def implicitLog: DiagnosticLoggingAdapter = diagLog - context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate]) - context.system.eventStream.subscribe(self, classOf[LocalChannelDown]) - context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged]) + // this allows overriding the default eventstream in tests + val eventStream = eventStream_opt.getOrElse(context.system.eventStream) + + eventStream.subscribe(self, classOf[LocalChannelUpdate]) + eventStream.subscribe(self, classOf[LocalChannelDown]) + eventStream.subscribe(self, classOf[AvailableBalanceChanged]) startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval) startTimerWithFixedDelay(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour) @@ -187,6 +190,14 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm sender() ! updates stay() + case Event(PrintChannelUpdates, d) => + println("public:") + d.channels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") } + println("private:") + d.privateChannels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") } + println("---------------------------------------------------") + stay() + case Event(GetRouterData, d) => sender() ! d stay() @@ -558,6 +569,7 @@ object Router { case object GetChannels case object GetChannelsMap case object GetChannelUpdates + case object PrintChannelUpdates // @formatter:on // @formatter:off @@ -604,7 +616,7 @@ object Router { channels: SortedMap[ShortChannelId, PublicChannel], stash: Stash, rebroadcast: Rebroadcast, - awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done + awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done privateChannels: Map[ShortChannelId, PrivateChannel], excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graph: DirectedGraph, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index ed0cce7918..1e74f5d1d8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -81,17 +81,20 @@ object Validation { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors import nodeParams.db.{network => db} import r.c - d0.awaiting.get(c) match { - case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement + // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement + // (the other ones have already been acknowledged as duplicates) + d0.awaiting.getOrElse(c, Seq.empty).headOption match { + case Some(origin: RemoteGossip) => origin.peerConnection ! TransportHandler.ReadAck(c) + case Some(LocalGossip) => () // there is nothing to ack if it was a local gossip case _ => () } - val remoteOrigins_opt = d0.awaiting.get(c) - Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first + val remoteOrigins = d0.awaiting.getOrElse(c, Set.empty).collect { case rg: RemoteGossip => rg } + Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins.headOption.map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first log.debug("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size) val publicChannel_opt = r match { case ValidateResult(c, Left(t)) => log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c))) None case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) => val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId) @@ -103,12 +106,12 @@ object Validation { } if (fundingOutputIsInvalid) { log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c))) None } else { watcher ! WatchExternalChannelSpent(ctx.self, tx.txid, outputIndex, c.shortChannelId) log.debug("added channel channelId={}", c.shortChannelId) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))) val capacity = tx.txOut(outputIndex).amount ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil)) db.addChannel(c, tx.txid, capacity) @@ -118,24 +121,29 @@ object Validation { val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures()) ctx.self ! nodeAnn } - // public channels that haven't yet been announced are considered as private channels - val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta) - Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt)) + // maybe this previously was a local unannounced channel + val privateChannel_opt = d0.privateChannels.get(c.shortChannelId) + Some(PublicChannel(c, + tx.txid, + capacity, + update_1_opt = privateChannel_opt.flatMap(_.update_1_opt), + update_2_opt = privateChannel_opt.flatMap(_.update_2_opt), + meta_opt = privateChannel_opt.map(_.meta))) } case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => if (fundingTxStatus.spendingTxConfirmed) { log.debug("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid) // the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c))) } else { log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c)))) + remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c))) } // there may be a record if we have just restarted db.removeChannel(c.shortChannelId) None } - // we also reprocess node and channel_update announcements related to channels that were just analyzed + // we also reprocess node and channel_update announcements related to the channel that was just analyzed val reprocessUpdates = d0.stash.updates.view.filterKeys(u => u.shortChannelId == c.shortChannelId) val reprocessNodes = d0.stash.nodes.view.filterKeys(n => isRelatedTo(c, n.nodeId)) // and we remove the reprocessed messages from the stash @@ -145,12 +153,13 @@ object Validation { publicChannel_opt match { case Some(pc) => - // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update - // right after the channel_announcement, channel_updates will be moved from private to public at that time val d1 = d0.copy( channels = d0.channels + (c.shortChannelId -> pc), privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before - rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue + rebroadcast = d0.rebroadcast.copy( + channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers + updates = d0.rebroadcast.updates ++ (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet).map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin])).toMap // those updates are only defined if this was a previously an unannounced local channel, we broadcast them + ), // we also add the newly validated channels to the rebroadcast queue stash = stash1, awaiting = awaiting1) // we only reprocess updates and nodes if validation succeeded @@ -419,7 +428,7 @@ object Validation { case Some(c) => // channel wasn't announced but here is the announcement, we will process it *before* the channel_update watcher ! ValidateRequest(ctx.self, c) - val d1 = d.copy(awaiting = d.awaiting + (c -> Nil)) // no origin + val d1 = d.copy(awaiting = d.awaiting + (c -> Seq(LocalGossip))) // no origin // maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks) db.removeFromPruned(c.shortChannelId) handleChannelUpdate(d1, db, routerConf, Left(lcu)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index b77d9fa3af..ad38f3e92c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -161,7 +161,7 @@ object TestConstants { routerConf = RouterConf( watchSpentWindow = 1 second, channelExcludeDuration = 60 seconds, - routerBroadcastInterval = 5 seconds, + routerBroadcastInterval = 1 day, // "disables" rebroadcast requestNodeAnnouncements = true, encodingType = EncodingType.COMPRESSED_ZLIB, channelRangeChunkSize = 20, @@ -299,7 +299,7 @@ object TestConstants { routerConf = RouterConf( watchSpentWindow = 1 second, channelExcludeDuration = 60 seconds, - routerBroadcastInterval = 5 seconds, + routerBroadcastInterval = 1 day, // "disables" rebroadcast requestNodeAnnouncements = true, encodingType = EncodingType.UNCOMPRESSED, channelRangeChunkSize = 20, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala index d7364eba51..154d34c46c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala @@ -18,6 +18,7 @@ package fr.acinq.eclair.channel.states import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{ActorContext, ActorRef} +import akka.event.EventStream import akka.testkit.{TestFSMRef, TestKitBase, TestProbe} import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey @@ -118,9 +119,14 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { val bob2blockchain = TestProbe() val alice2relayer = TestProbe() val bob2relayer = TestProbe() + val aliceEventStream = new EventStream(system) + val bobEventStream = new EventStream(system) val channelUpdateListener = TestProbe() - system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate]) - system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown]) + // TODO: clean this up later, was to minimize changes + aliceEventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate]) + aliceEventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown]) + bobEventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate]) + bobEventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown]) val router = TestProbe() val finalNodeParamsA = nodeParamsA .modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(5000 sat) @@ -132,8 +138,8 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { .modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(5000 sat) .modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(10000 sat) .modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(10000 sat) - val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref) - val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref) + val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref), eventStream_opt = Some(aliceEventStream)), alicePeer.ref) + val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain), eventStream_opt = Some(bobEventStream)), bobPeer.ref) SetupFixture(alice, bob, aliceOrigin, alice2bob, bob2alice, alice2blockchain, bob2blockchain, router, alice2relayer, bob2relayer, channelUpdateListener, wallet, alicePeer, bobPeer) } @@ -179,7 +185,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { (aliceParams, bobParams, channelType) } - def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Unit = { + def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Transaction = { import setup._ @@ -231,6 +237,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase { // x2 because alice and bob share the same relayer channelUpdateListener.expectMsgType[LocalChannelUpdate] channelUpdateListener.expectMsgType[LocalChannelUpdate] + fundingTx } def localOrigin(replyTo: ActorRef): Origin.LocalHot = Origin.LocalHot(replyTo, UUID.randomUUID()) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala index b03b8d00e3..6a4a62dc72 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala @@ -82,7 +82,7 @@ class WaitForFundingSignedStateSpec extends TestKitBaseClass with FixtureAnyFunS test("recv FundingSigned with valid signature") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) bob2alice.expectMsgType[FundingSigned] bob2alice.forward(alice) awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala index 8b7a8c7480..0aa3c68c92 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingConfirmedStateSpec.scala @@ -54,7 +54,7 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF within(30 seconds) { val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) alice ! INPUT_INIT_FUNDER(ByteVector32.Zeroes, TestConstants.fundingSatoshis, pushMsat, TestConstants.feeratePerKw, TestConstants.feeratePerKw, aliceParams, alice2bob.ref, bobInit, ChannelFlags.Private, channelConfig, channelType) alice2blockchain.expectMsgType[TxPublisher.SetChannelId] bob ! INPUT_INIT_FUNDEE(ByteVector32.Zeroes, bobParams, bob2alice.ref, aliceInit, channelConfig, channelType) @@ -83,8 +83,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF import f._ // we create a new listener that registers after alice has published the funding tx val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) + bob.underlyingActor.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + bob.underlyingActor.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) // make bob send a FundingLocked msg val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get bob ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx) @@ -102,8 +102,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF import f._ // we create a new listener that registers after alice has published the funding tx val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed]) val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get alice ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx) assert(listener.expectMsgType[TransactionConfirmed].tx === fundingTx) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala index 59981ba15b..48ae5e623f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala @@ -74,7 +74,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with val initialState = alice.stateData.asInstanceOf[DATA_NORMAL] val sender = TestProbe() val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) val h = randomBytes32() val add = CMD_ADD_HTLC(sender.ref, 50000000 msat, h, CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), TestConstants.emptyOnionPacket, localOrigin(sender.ref)) alice ! add @@ -881,7 +881,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with bob2alice.expectMsgType[UpdateFulfillHtlc] // we listen to channel_update events val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate]) + bob.underlyingActor.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate]) // actual test starts here // when signing the fulfill, bob will have its main output go above reserve in alice's commitment tx @@ -896,7 +896,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv CMD_SIGN (after CMD_UPDATE_FEE)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged]) alice ! CMD_UPDATE_FEE(FeeratePerKw(654564 sat)) alice2bob.expectMsgType[UpdateFee] alice ! CMD_SIGN() @@ -2734,7 +2734,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlyingActor.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill @@ -2767,7 +2767,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlyingActor.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill but doesn't sign @@ -2800,7 +2800,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlyingActor.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) // actual test begins: // * Bob receives the HTLC pre-image and wants to fulfill diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala index c467c5d6ca..b04bf217d7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/OfflineStateSpec.scala @@ -527,7 +527,7 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with crossSign(alice, bob, alice2bob, bob2alice) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) + bob.underlyingActor.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred]) val initialState = bob.stateData.asInstanceOf[DATA_NORMAL] val initialCommitTx = initialState.commitments.localCommit.commitTxAndRemoteSig.commitTx.tx diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala index 6fcaca1652..0566ec8ba9 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala @@ -89,15 +89,19 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with bob2blockchain.expectMsgType[WatchFundingConfirmed] awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED) awaitCond(bob.stateName == WAIT_FOR_FUNDING_CONFIRMED) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + alice.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + alice.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + bob.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + bob.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) withFixture(test.toNoArgTest(FixtureParam(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, alice2relayer, bob2relayer, channelUpdateListener, eventListener, Nil))) } } else { within(30 seconds) { reachNormal(setup, test.tags) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) - system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + alice.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + alice.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) + bob.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished]) + bob.underlyingActor.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed]) val bobCommitTxs: List[CommitTxAndRemoteSig] = (for (amt <- List(100000000 msat, 200000000 msat, 300000000 msat)) yield { val (r, htlc) = addHtlc(amt, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) @@ -368,8 +372,8 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.channelFeatures === channelFeatures) val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed]) - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) // alice sends an htlc to bob val (_, htlca1) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice) @@ -473,7 +477,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv WatchTxConfirmedTriggered (local commit with htlcs only signed by local)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) val aliceCommitTx = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx // alice sends an htlc val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice) @@ -522,7 +526,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv WatchTxConfirmedTriggered (local commit with fail not acked by remote)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) val (_, htlc) = addHtlc(25000000 msat, alice, bob, alice2bob, bob2alice) crossSign(alice, bob, alice2bob, bob2alice) failHtlc(htlc.id, bob, alice, bob2alice, alice2bob) @@ -603,7 +607,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with test("recv WatchTxConfirmedTriggered (remote commit with htlcs only signed by local in next remote commit)") { f => import f._ val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) + alice.underlyingActor.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain]) val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx // alice sends an htlc val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index 3c2c2cff05..12e4b3cf27 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -122,7 +122,6 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi import com.softwaremill.quicklens._ val nodeParams = Alice.nodeParams .modify(_.nodeKeyManager).setTo(testNodeKeyManager) - .modify(_.routerConf.routerBroadcastInterval).setTo(1 day) // "disable" auto rebroadcast val router = system.actorOf(Router.props(nodeParams, watcher.ref)) // we announce channels peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala new file mode 100644 index 0000000000..0f4572285d --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala @@ -0,0 +1,120 @@ +package fr.acinq.eclair.router + +import akka.testkit.{TestFSMRef, TestProbe} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingDeeplyBuriedTriggered +import fr.acinq.eclair.channel.DATA_NORMAL +import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} +import fr.acinq.eclair.io.Peer.PeerRoutingMessage +import fr.acinq.eclair.router.Router.{GossipOrigin, LocalGossip} +import fr.acinq.eclair.wire.protocol.AnnouncementSignatures +import fr.acinq.eclair.{BlockHeight, TestKitBaseClass} +import org.scalatest.funsuite.FixtureAnyFunSuiteLike +import org.scalatest.{Outcome, Tag} + +/** + * This test checks the integration between Channel and Router (events, etc.) + */ +class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase { + + case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], rebroadcastListener: TestProbe, channels: SetupFixture, testTags: Set[String]) + + implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging + + override def withFixture(test: OneArgTest): Outcome = { + val channels = init(tags = test.tags) + val router: TestFSMRef[Router.State, Router.Data, Router] = TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None, eventStream_opt = Some(channels.alice.underlyingActor.eventStream))) + val rebroadcastListener = TestProbe() + // router publishes on the system's eventstream + system.eventStream.subscribe(rebroadcastListener.ref, classOf[Router.Rebroadcast]) + withFixture(test.toNoArgTest(FixtureParam(router, rebroadcastListener, channels, test.tags))) + } + + test("private local channel") { f => + import f._ + + reachNormal(channels, testTags) + + awaitCond(router.stateData.privateChannels.size == 1) + + { + // only the local channel_update is known + val pc = router.stateData.privateChannels.values.head + assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined) + } + + val peerConnection = TestProbe() + val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate + router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate) + + awaitCond { + // only the local channel_update is known + val pc = router.stateData.privateChannels.values.head + pc.update_1_opt.isDefined && pc.update_2_opt.isDefined + } + + // manual rebroadcast + router ! Router.TickBroadcast + rebroadcastListener.expectNoMessage() + + } + + test("public local channel", Tag(ChannelStateTestsTags.ChannelsPublic)) { f => + import f._ + + val fundingTx = reachNormal(channels, testTags) + + awaitCond(router.stateData.privateChannels.size == 1) + + { + val pc = router.stateData.privateChannels.values.head + // only the local channel_update is known + assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined) + } + + val peerConnection = TestProbe() + val aliceChannelUpdate = channels.alice.stateData.asInstanceOf[DATA_NORMAL].channelUpdate + val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate + router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate) + + awaitCond { + val pc = router.stateData.privateChannels.values.head + // both channel_updates are known + pc.update_1_opt.isDefined && pc.update_2_opt.isDefined + } + val privateChannel = router.stateData.privateChannels.values.head + + // funding tx reaches 6 blocks, announcements are exchanged + channels.alice ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null) + channels.alice2bob.expectMsgType[AnnouncementSignatures] + channels.alice2bob.forward(channels.bob) + + channels.bob ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null) + channels.bob2alice.expectMsgType[AnnouncementSignatures] + channels.bob2alice.forward(channels.alice) + + // router gets notified and attempts to validate the local channel + val vr = channels.alice2blockchain.expectMsgType[ZmqWatcher.ValidateRequest] + vr.replyTo ! ZmqWatcher.ValidateResult(vr.ann, Right((fundingTx, ZmqWatcher.UtxoStatus.Unspent))) + + awaitCond { + router.stateData.privateChannels.isEmpty && router.stateData.channels.size == 1 + } + + awaitCond { + val pc = router.stateData.channels.values.head + // both channel updates are preserved + pc.update_1_opt.isDefined && pc.update_2_opt.isDefined + } + + // manual rebroadcast + router ! Router.TickBroadcast + rebroadcastListener.expectMsg(Router.Rebroadcast( + channels = Map(vr.ann -> Set[GossipOrigin](LocalGossip)), + updates = Map(aliceChannelUpdate -> Set[GossipOrigin](LocalGossip), bobChannelUpdate -> Set.empty[GossipOrigin]), // broadcast the channel_updates (they were previously unannounced) + nodes = Map(router.underlyingActor.stateData.nodes.values.head -> Set[GossipOrigin](LocalGossip)), // new node_announcement + )) + + } + +} diff --git a/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala index d05ea1f218..404d3a2c7f 100644 --- a/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala +++ b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.router import akka.actor.ActorSystem import akka.actor.typed.scaladsl.adapter.actorRefAdapter -import akka.testkit.{TestKit, TestProbe} +import akka.testkit.{TestFSMRef, TestKit, TestProbe} import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} import fr.acinq.bitcoin.scalacompat.{Block, SatoshiLong, Transaction, TxOut} @@ -32,9 +32,6 @@ import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire.protocol.Color import org.scalatest.funsuite.AnyFunSuiteLike -import scodec.bits._ - -import scala.concurrent.duration._ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike { @@ -131,12 +128,14 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab)) peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab)) - // we have to wait 2 times the broadcast interval because there is an additional per-peer delay - val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second - peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) - peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) - peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty)) - peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty)) + // manual rebroadcast + front1 ! Router.TickBroadcast + peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) + peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) + front2 ! Router.TickBroadcast + peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty)) + front3 ! Router.TickBroadcast + peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty)) } test("aggregate gossip") { @@ -149,9 +148,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike val system2 = ActorSystem("front-system-2") val system3 = ActorSystem("front-system-3") - val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router)) - val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, router)) - val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, router)) + val front1 = { + implicit val system: ActorSystem = system1 + TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router)) + } + val front2 = { + implicit val system: ActorSystem = system2 + TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router)) + } + val front3 = { + implicit val system: ActorSystem = system3 + TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router)) + } val peerConnection1a = TestProbe("peerconn-1a") val peerConnection1b = TestProbe("peerconn-1b") @@ -182,7 +190,6 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_bc)) peerConnection3a.expectMsg(GossipDecision.NoRelatedChannel(channelUpdate_bc)) - watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab)) @@ -207,12 +214,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_b)) peerConnection3a.expectMsg(GossipDecision.Accepted(ann_b)) - // we have to wait 2 times the broadcast interval because there is an additional per-peer delay - val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second - peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) - peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) - peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) - peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a)))) + awaitCond(front1.stateData.nodes.size == 2) + awaitCond(front2.stateData.nodes.size == 2) + awaitCond(front3.stateData.nodes.size == 2) + + // manual rebroadcast + front1 ! Router.TickBroadcast + peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + front2 ! Router.TickBroadcast + peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + front3 ! Router.TickBroadcast + peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a)))) } test("do not forward duplicate gossip") {