Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers {
Helpers.validateParamsFundee(nodeParams, channelType, localParams.initFeatures, open, remoteNodeId, remoteInit.features) match {
case Left(t) => handleLocalError(t, d, Some(open))
case Right((channelFeatures, remoteShutdownScript)) =>
context.system.eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isInitiator = false, open.temporaryChannelId, open.feeratePerKw, None))
eventStream.publish(ChannelCreated(self, peer, remoteNodeId, isInitiator = false, open.temporaryChannelId, open.feeratePerKw, None))
val fundingPubkey = keyManager.fundingPublicKey(localParams.fundingKeyPath).publicKey
val channelKeyPath = keyManager.keyPath(localParams, channelConfig)
val minimumDepth = Helpers.minDepthForFunding(nodeParams.channelConf, open.fundingSatoshis)
Expand Down Expand Up @@ -192,7 +192,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers {
val channelId = toLongId(fundingTx.hash, fundingTxOutputIndex)
peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
txPublisher ! SetChannelId(remoteNodeId, channelId)
context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
// NB: we don't send a ChannelSignatureSent for the first commit
goto(WAIT_FOR_FUNDING_SIGNED) using DATA_WAIT_FOR_FUNDING_SIGNED(channelId, localParams, remoteParams, fundingTx, fundingTxFee, localSpec, localCommitTx, RemoteCommit(0, remoteSpec, remoteCommitTx.tx.txid, remoteFirstPerCommitmentPoint), open.channelFlags, channelConfig, channelFeatures, fundingCreated) sending fundingCreated
}
Expand Down Expand Up @@ -249,8 +249,8 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers {
commitInput, ShaChain.init)
peer ! ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
txPublisher ! SetChannelId(remoteNodeId, channelId)
context.system.eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
eventStream.publish(ChannelIdAssigned(self, remoteNodeId, temporaryChannelId, channelId))
eventStream.publish(ChannelSignatureReceived(self, commitments))
// NB: we don't send a ChannelSignatureSent for the first commit
log.info(s"waiting for them to publish the funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
watchFundingTx(commitments)
Expand Down Expand Up @@ -291,7 +291,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers {
remoteNextCommitInfo = Right(randomKey().publicKey), // we will receive their next per-commitment point in the next message, so we temporarily put a random byte array
commitInput, ShaChain.init)
val blockHeight = nodeParams.currentBlockHeight
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
watchFundingTx(commitments)
blockchain ! WatchFundingConfirmed(self, commitInput.outPoint.txid, nodeParams.channelConf.minDepthBlocks)
Expand All @@ -302,7 +302,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers {
def publishFundingTx(): Unit = {
wallet.commit(fundingTx).onComplete {
case Success(true) =>
context.system.eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, fundingTxFee, "funding"))
eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, fundingTxFee, "funding"))
channelOpenReplyToUser(Right(ChannelOpenResponse.ChannelOpened(channelId)))
case Success(false) =>
channelOpenReplyToUser(Left(LocalError(new RuntimeException("couldn't publish funding tx"))))
Expand Down Expand Up @@ -351,8 +351,8 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers {
case Success(_) =>
log.info(s"channelId=${commitments.channelId} was confirmed at blockHeight=$blockHeight txIndex=$txIndex")
blockchain ! WatchFundingLost(self, commitments.commitInput.outPoint.txid, nodeParams.channelConf.minDepthBlocks)
if (!d.commitments.localParams.isInitiator) context.system.eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, 0 sat, "funding"))
context.system.eventStream.publish(TransactionConfirmed(commitments.channelId, remoteNodeId, fundingTx))
if (!d.commitments.localParams.isInitiator) eventStream.publish(TransactionPublished(commitments.channelId, remoteNodeId, fundingTx, 0 sat, "funding"))
eventStream.publish(TransactionConfirmed(commitments.channelId, remoteNodeId, fundingTx))
val channelKeyPath = keyManager.keyPath(d.commitments.localParams, commitments.channelConfig)
val nextPerCommitmentPoint = keyManager.commitmentPoint(channelKeyPath, 1)
val fundingLocked = FundingLocked(commitments.channelId, nextPerCommitmentPoint)
Expand Down Expand Up @@ -400,7 +400,7 @@ trait ChannelOpenSingleFunder extends FundingHandlers with ErrorHandlers {
case Event(FundingLocked(_, nextPerCommitmentPoint, _), d@DATA_WAIT_FOR_FUNDING_LOCKED(commitments, shortChannelId, _)) =>
// used to get the final shortChannelId, used in announcements (if minDepth >= ANNOUNCEMENTS_MINCONF this event will fire instantly)
blockchain ! WatchFundingDeeplyBuried(self, commitments.commitInput.outPoint.txid, ANNOUNCEMENTS_MINCONF)
context.system.eventStream.publish(ShortChannelIdAssigned(self, commitments.channelId, shortChannelId, None))
eventStream.publish(ShortChannelIdAssigned(self, commitments.channelId, shortChannelId, None))
// we create a channel_update early so that we can use it to send payments through this channel, but it won't be propagated to other nodes since the channel is not yet announced
val fees = getRelayFees(nodeParams, remoteNodeId, commitments)
val initialChannelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, shortChannelId, nodeParams.channelConf.expiryDelta, d.commitments.remoteParams.htlcMinimum, fees.feeBase, fees.feeProportionalMillionths, commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait CommonHandlers {
case d: PersistentChannelData =>
log.debug("updating database record for channelId={}", d.channelId)
nodeParams.db.channels.addOrUpdateChannel(d)
context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
state
case _: TransientChannelData =>
log.error(s"can't store data=${state.stateData} in state=${state.stateName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait ErrorHandlers extends CommonHandlers {
}

val error = Error(d.channelId, cause.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = true))
eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(cause), isFatal = true))

d match {
case dd: PersistentChannelData if Closing.nothingAtStake(dd) => goto(CLOSED)
Expand Down Expand Up @@ -120,7 +120,7 @@ trait ErrorHandlers extends CommonHandlers {
def handleRemoteError(e: Error, d: ChannelData) = {
// see BOLT 1: only print out data verbatim if is composed of printable ASCII characters
log.error(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}")
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, RemoteError(e), isFatal = true))
eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, RemoteError(e), isFatal = true))

d match {
case _: DATA_CLOSING => stay() // nothing to do, there is already a spending tx published
Expand Down Expand Up @@ -221,7 +221,7 @@ trait ErrorHandlers extends CommonHandlers {
log.warning(s"they published their current commit in txid=${commitTx.txid}")
require(commitTx.txid == d.commitments.remoteCommit.txid, "txid mismatch")

context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "remote-commit"))
eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "remote-commit"))
val remoteCommitPublished = Closing.RemoteClose.claimCommitTxOutputs(keyManager, d.commitments, d.commitments.remoteCommit, commitTx, nodeParams.currentBlockHeight, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
val nextData = d match {
case closing: DATA_CLOSING => closing.copy(remoteCommitPublished = Some(remoteCommitPublished))
Expand All @@ -234,7 +234,7 @@ trait ErrorHandlers extends CommonHandlers {

def handleRemoteSpentFuture(commitTx: Transaction, d: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) = {
log.warning(s"they published their future commit (because we asked them to) in txid=${commitTx.txid}")
context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "future-remote-commit"))
eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "future-remote-commit"))
val remotePerCommitmentPoint = d.remoteChannelReestablish.myCurrentPerCommitmentPoint
val remoteCommitPublished = RemoteCommitPublished(
commitTx = commitTx,
Expand All @@ -253,7 +253,7 @@ trait ErrorHandlers extends CommonHandlers {
val remoteCommit = waitingForRevocation.nextRemoteCommit
require(commitTx.txid == remoteCommit.txid, "txid mismatch")

context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "next-remote-commit"))
eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, commitTx, Closing.commitTxFee(d.commitments.commitInput, commitTx, d.commitments.localParams.isInitiator), "next-remote-commit"))
val remoteCommitPublished = Closing.RemoteClose.claimCommitTxOutputs(keyManager, d.commitments, remoteCommit, commitTx, nodeParams.currentBlockHeight, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
val nextData = d match {
case closing: DATA_CLOSING => closing.copy(nextRemoteCommitPublished = Some(remoteCommitPublished))
Expand Down Expand Up @@ -287,7 +287,7 @@ trait ErrorHandlers extends CommonHandlers {
Closing.RevokedClose.claimCommitTxOutputs(keyManager, d.commitments, tx, nodeParams.db.channels, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets) match {
case Some(revokedCommitPublished) =>
log.warning(s"txid=${tx.txid} was a revoked commitment, publishing the penalty tx")
context.system.eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(d.commitments.commitInput, tx, d.commitments.localParams.isInitiator), "revoked-commit"))
eventStream.publish(TransactionPublished(d.channelId, remoteNodeId, tx, Closing.commitTxFee(d.commitments.commitInput, tx, d.commitments.localParams.isInitiator), "revoked-commit"))
val exc = FundingTxSpent(d.channelId, tx)
val error = Error(d.channelId, exc.getMessage)

Expand All @@ -301,7 +301,7 @@ trait ErrorHandlers extends CommonHandlers {
case None =>
// the published tx was neither their current commitment nor a revoked one
log.error(s"couldn't identify txid=${tx.txid}, something very bad is going on!!!")
context.system.eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team."))
eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team."))
goto(ERR_INFORMATION_LEAK)
}
}
Expand All @@ -326,7 +326,7 @@ trait ErrorHandlers extends CommonHandlers {
def handleInformationLeak(tx: Transaction, d: PersistentChannelData) = {
// this is never supposed to happen !!
log.error(s"our funding tx ${d.commitments.commitInput.outPoint.txid} was spent by txid=${tx.txid}!!")
context.system.eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team."))
eventStream.publish(NotifyNodeOperator(NotificationsLogger.Error, s"funding tx ${d.commitments.commitInput.outPoint.txid} of channel ${d.channelId} was spent by an unknown transaction, indicating that your DB has lost data or your node has been breached: please contact the dev team."))
val exc = FundingTxSpent(d.channelId, tx)
val error = Error(d.channelId, exc.getMessage)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ trait FundingHandlers extends CommonHandlers {
val error = Error(d.channelId, exc.getMessage)
// NB: we don't use the handleLocalError handler because it would result in the commit tx being published, which we don't want:
// implementation *guarantees* that in case of BITCOIN_FUNDING_PUBLISH_FAILED, the funding tx hasn't and will never be published, so we can close the channel right away
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true))
eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true))
goto(CLOSED) sending error
}

def handleFundingTimeout(d: PersistentChannelData) = {
log.warning(s"funding tx hasn't been confirmed in time, cancelling channel delay=$FUNDING_TIMEOUT_FUNDEE")
val exc = FundingTxTimedout(d.channelId)
val error = Error(d.channelId, exc.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true))
eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true))
goto(CLOSED) sending error
}

Expand Down
24 changes: 18 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package fr.acinq.eclair.router
import akka.Done
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated, typed}
import akka.event.DiagnosticLoggingAdapter
import akka.event.{DiagnosticLoggingAdapter, EventStream}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
Expand Down Expand Up @@ -50,7 +50,7 @@ import scala.util.{Random, Try}
/**
* Created by PM on 24/05/2016.
*/
class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] {
class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], initialized: Option[Promise[Done]] = None, eventStream_opt: Option[EventStream] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] {

import Router._

Expand All @@ -59,9 +59,12 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
// we pass these to helpers classes so that they have the logging context
implicit def implicitLog: DiagnosticLoggingAdapter = diagLog

context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
// this allows overriding the default eventstream in tests
val eventStream = eventStream_opt.getOrElse(context.system.eventStream)

eventStream.subscribe(self, classOf[LocalChannelUpdate])
eventStream.subscribe(self, classOf[LocalChannelDown])
eventStream.subscribe(self, classOf[AvailableBalanceChanged])
Comment on lines +62 to +67
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit dirty: we only override the event stream for subscriptions. It's enough for testing, and it was not possible to update all references in Validation, RouteCalculation, Syncing etc. due to how the handlers are implemented. We could migrate them to the same technique as what we used for the channel fsm.


startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval)
startTimerWithFixedDelay(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour)
Expand Down Expand Up @@ -187,6 +190,14 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
sender() ! updates
stay()

case Event(PrintChannelUpdates, d) =>
println("public:")
d.channels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") }
println("private:")
d.privateChannels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") }
println("---------------------------------------------------")
stay()

case Event(GetRouterData, d) =>
sender() ! d
stay()
Expand Down Expand Up @@ -558,6 +569,7 @@ object Router {
case object GetChannels
case object GetChannelsMap
case object GetChannelUpdates
case object PrintChannelUpdates
// @formatter:on

// @formatter:off
Expand Down Expand Up @@ -604,7 +616,7 @@ object Router {
channels: SortedMap[ShortChannelId, PublicChannel],
stash: Stash,
rebroadcast: Rebroadcast,
awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
privateChannels: Map[ShortChannelId, PrivateChannel],
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedGraph,
Expand Down
Loading