Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,19 @@ object Databases {
val sqliteEclair = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "eclair.sqlite")}")
val sqliteNetwork = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "network.sqlite")}")
val sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "audit.sqlite")}")
val sqliteExt = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "ext.sqlite")}")
SqliteUtils.obtainExclusiveLock(sqliteEclair) // there should only be one process writing to this file
SqliteUtils.obtainExclusiveLock(sqliteExt) // there should only be one process writing to this file

databaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair)
databaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair, sqliteExt)
}

def databaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection) = new Databases {
def databaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection, extJdbc: Connection) = new Databases {
override val network = new SqliteNetworkDb(networkJdbc)
override val audit = new SqliteAuditDb(auditJdbc)
override val channels = new SqliteChannelsDb(eclairJdbc)
override val peers = new SqlitePeersDb(eclairJdbc)
override val payments = new SqlitePaymentsDb(eclairJdbc)
override val payments = new SqlitePaymentsDb(eclairJdbc, extJdbc)
override val pendingRelay = new SqlitePendingRelayDb(eclairJdbc)
override def backup(file: File): Unit = {
SqliteUtils.using(eclairJdbc.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ trait PaymentsDb {
def addOutgoingPayment(outgoingPayment: OutgoingPayment)

// updates the status of the payment, if the newStatus is SUCCEEDED you must supply a preimage
def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32] = None)
def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32] = None, failures: Seq[String] = Seq.empty)

def getOutgoingPayment(id: UUID): Option[OutgoingPayment]

Expand Down Expand Up @@ -77,7 +77,7 @@ case class IncomingPayment(paymentHash: ByteVector32, amountMsat: Long, received
* @param completedAt absolute time in seconds since UNIX epoch when the payment succeeded.
* @param status current status of the payment.
*/
case class OutgoingPayment(id: UUID, paymentHash: ByteVector32, preimage:Option[ByteVector32], amountMsat: Long, createdAt: Long, completedAt: Option[Long], status: OutgoingPaymentStatus.Value)
case class OutgoingPayment(id: UUID, paymentHash: ByteVector32, preimage:Option[ByteVector32], amountMsat: Long, createdAt: Long, completedAt: Option[Long], status: OutgoingPaymentStatus.Value, failures: Seq[String])

object OutgoingPaymentStatus extends Enumeration {
val PENDING = Value(1, "PENDING")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,32 @@ import grizzled.slf4j.Logging
import scala.collection.immutable.Queue
import OutgoingPaymentStatus._
import concurrent.duration._
import scala.collection.mutable
import scala.compat.Platform

class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
class SqlitePaymentsDb(sqlite: Connection, extSqlite: Connection) extends PaymentsDb with Logging {

import SqliteUtils.ExtendedResultSet._

val DB_NAME = "payments"
val CURRENT_VERSION = 2
val EXT_DB_NAME = "ext"
val CURRENT_VERSION = 3

using(sqlite.createStatement()) { statement =>
require(getVersion(statement, DB_NAME, CURRENT_VERSION) <= CURRENT_VERSION, s"incompatible version of $DB_NAME DB found") // version 2 is "backward compatible" in the sense that it uses separate tables from version 1. There is no migration though
require(getVersion(statement, DB_NAME, CURRENT_VERSION) <= CURRENT_VERSION, s"incompatible version of $DB_NAME DB found") // version 3 is "backward compatible" in the sense that it uses separate tables from versions 1 and 2. There is no migration though
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received_payments (payment_hash BLOB NOT NULL PRIMARY KEY, preimage BLOB NOT NULL, payment_request TEXT NOT NULL, received_msat INTEGER, created_at INTEGER NOT NULL, expire_at INTEGER, received_at INTEGER)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent_payments (id TEXT NOT NULL PRIMARY KEY, payment_hash BLOB NOT NULL, preimage BLOB, amount_msat INTEGER NOT NULL, created_at INTEGER NOT NULL, completed_at INTEGER, status VARCHAR NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS payment_hash_idx ON sent_payments(payment_hash)")
setVersion(statement, DB_NAME, CURRENT_VERSION)
}

using(extSqlite.createStatement()) { statement =>
require(getVersion(statement, EXT_DB_NAME, CURRENT_VERSION) <= CURRENT_VERSION, s"incompatible version of $EXT_DB_NAME DB found") // version 3 is "backward compatible" in the sense that it uses separate tables from versions 1 and 2. There is no migration though
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent_payments_failures (id TEXT NOT NULL, failure TEXT NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_payments_failures_idx ON sent_payments_failures(id)")
setVersion(statement, EXT_DB_NAME, CURRENT_VERSION)
}

override def addOutgoingPayment(sent: OutgoingPayment): Unit = {
using(sqlite.prepareStatement("INSERT INTO sent_payments (id, payment_hash, amount_msat, created_at, status) VALUES (?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, sent.id.toString)
Expand All @@ -53,10 +62,12 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
val res = statement.executeUpdate()
logger.debug(s"inserted $res payment=${sent.paymentHash} into payment DB")
}
insertFailures(sent.id, sent.failures)
}

override def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32] = None) = {
override def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32] = None, failures: Seq[String] = Seq.empty): Unit = {
require((newStatus == SUCCEEDED && preimage.isDefined) || (newStatus == FAILED && preimage.isEmpty), "Wrong combination of state/preimage")
require((newStatus == SUCCEEDED && failures.isEmpty) || (newStatus == FAILED && failures.nonEmpty), "Wrong combination of state/failures")

using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, preimage, status) = (?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
statement.setLong(1, Platform.currentTime)
Expand All @@ -65,10 +76,11 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
statement.setString(4, id.toString)
if (statement.executeUpdate() == 0) throw new IllegalArgumentException(s"Tried to update an outgoing payment (id=$id) already in final status with=$newStatus")
}
insertFailures(id, failures)
}

override def getOutgoingPayment(id: UUID): Option[OutgoingPayment] = {
using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments WHERE id = ?")) { statement =>
val res = using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments WHERE id = ?")) { statement =>
statement.setString(1, id.toString)
val rs = statement.executeQuery()
if (rs.next()) {
Expand All @@ -79,16 +91,18 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
rs.getLong("amount_msat"),
rs.getLong("created_at"),
getNullableLong(rs, "completed_at"),
OutgoingPaymentStatus.withName(rs.getString("status"))
OutgoingPaymentStatus.withName(rs.getString("status")),
Nil
))
} else {
None
}
}
res.map(op => op.copy(failures = selectFailures(op.id)))
}

override def getOutgoingPayments(paymentHash: ByteVector32): Seq[OutgoingPayment] = {
using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments WHERE payment_hash = ?")) { statement =>
val res = using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments WHERE payment_hash = ?")) { statement =>
statement.setBytes(1, paymentHash.toArray)
val rs = statement.executeQuery()
var q: Queue[OutgoingPayment] = Queue()
Expand All @@ -100,15 +114,17 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
rs.getLong("amount_msat"),
rs.getLong("created_at"),
getNullableLong(rs, "completed_at"),
OutgoingPaymentStatus.withName(rs.getString("status"))
OutgoingPaymentStatus.withName(rs.getString("status")),
Nil
)
}
q
}
res.map(op => op.copy(failures = selectFailures(op.id)))
}

override def listOutgoingPayments(): Seq[OutgoingPayment] = {
using(sqlite.createStatement()) { statement =>
val res = using(sqlite.createStatement()) { statement =>
val rs = statement.executeQuery("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments")
var q: Queue[OutgoingPayment] = Queue()
while (rs.next()) {
Expand All @@ -119,11 +135,13 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
rs.getLong("amount_msat"),
rs.getLong("created_at"),
getNullableLong(rs, "completed_at"),
OutgoingPaymentStatus.withName(rs.getString("status"))
OutgoingPaymentStatus.withName(rs.getString("status")),
Nil
)
}
q
}
res.map(op => op.copy(failures = selectFailures(op.id)))
}

override def addPaymentRequest(pr: PaymentRequest, preimage: ByteVector32): Unit = {
Expand Down Expand Up @@ -225,4 +243,26 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
}
}

private def insertFailures(id: UUID, failures: Traversable[String]): Unit = {
failures.filter(_.nonEmpty).foreach { failure =>
using(extSqlite.prepareStatement("INSERT INTO sent_payments_failures (id, failure) VALUES (?, ?)")) { statement =>
statement.setString(1, id.toString)
statement.setString(2, failure)
val res = statement.executeUpdate()
}
}
}

private def selectFailures(id: UUID): Seq[String] = {
val res = new mutable.ListBuffer[String]
using(extSqlite.prepareStatement("SELECT failure FROM sent_payments_failures WHERE id = ?")) { statement =>
statement.setString(1, id.toString)
val rs = statement.executeQuery()
while (rs.next()) {
res += rs.getString("failure")
}
}
res.toList
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
when(WAITING_FOR_REQUEST) {
case Event(c: SendPaymentToRoute, WaitingForRequest) =>
val send = SendPayment(c.amountMsat, c.paymentHash, c.hops.last, finalCltvExpiry = c.finalCltvExpiry, maxAttempts = 1)
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.amountMsat, Platform.currentTime, None, OutgoingPaymentStatus.PENDING))
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.amountMsat, Platform.currentTime, None, OutgoingPaymentStatus.PENDING, Seq.empty))
router ! FinalizeRoute(c.hops)
goto(WAITING_FOR_ROUTE) using WaitingForRoute(sender, send, failures = Nil)

case Event(c: SendPayment, WaitingForRequest) =>
router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.amountMsat, c.assistedRoutes, routeParams = c.routeParams)
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.amountMsat, Platform.currentTime, None, OutgoingPaymentStatus.PENDING))
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.amountMsat, Platform.currentTime, None, OutgoingPaymentStatus.PENDING, Seq.empty))
goto(WAITING_FOR_ROUTE) using WaitingForRoute(sender, c, failures = Nil)
}

Expand All @@ -70,8 +70,9 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
goto(WAITING_FOR_PAYMENT_COMPLETE) using WaitingForComplete(s, c, cmd, failures, sharedSecrets, ignoreNodes, ignoreChannels, hops)

case Event(Status.Failure(t), WaitingForRoute(s, c, failures)) =>
reply(s, PaymentFailed(id, c.paymentHash, failures = failures :+ LocalFailure(t)))
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
val paymentFailed = PaymentFailed(id, c.paymentHash, failures = failures :+ LocalFailure(t))
reply(s, paymentFailed)
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, failures = paymentFailed.errorMessages)
stop(FSM.Normal)
}

Expand All @@ -89,8 +90,9 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
case Success(e@Sphinx.DecryptedFailurePacket(nodeId, failureMessage)) if nodeId == c.targetNodeId =>
// if destination node returns an error, we fail the payment immediately
log.warning(s"received an error message from target nodeId=$nodeId, failing the payment (failure=$failureMessage)")
reply(s, PaymentFailed(id, c.paymentHash, failures = failures :+ RemoteFailure(hops, e)))
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
val paymentFailed = PaymentFailed(id, c.paymentHash, failures = failures :+ RemoteFailure(hops, e))
reply(s, paymentFailed)
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, failures = paymentFailed.errorMessages)
stop(FSM.Normal)
case res if failures.size + 1 >= c.maxAttempts =>
// otherwise we never try more than maxAttempts, no matter the kind of error returned
Expand All @@ -103,8 +105,9 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
UnreadableRemoteFailure(hops)
}
log.warning(s"too many failed attempts, failing the payment")
reply(s, PaymentFailed(id, c.paymentHash, failures = failures :+ failure))
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
val paymentFailed = PaymentFailed(id, c.paymentHash, failures = failures :+ failure)
reply(s, paymentFailed)
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, failures = paymentFailed.errorMessages)
stop(FSM.Normal)
case Failure(t) =>
log.warning(s"cannot parse returned error: ${t.getMessage}")
Expand Down Expand Up @@ -169,8 +172,9 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis

case Event(Status.Failure(t), WaitingForComplete(s, c, _, failures, _, ignoreNodes, ignoreChannels, hops)) =>
if (failures.size + 1 >= c.maxAttempts) {
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
reply(s, PaymentFailed(id, c.paymentHash, failures :+ LocalFailure(t)))
val paymentFailed = PaymentFailed(id, c.paymentHash, failures :+ LocalFailure(t))
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, failures = paymentFailed.errorMessages)
reply(s, paymentFailed)
stop(FSM.Normal)
} else {
log.info(s"received an error message from local, trying to use a different channel (failure=${t.getMessage})")
Expand Down Expand Up @@ -213,11 +217,21 @@ object PaymentLifecycle {

sealed trait PaymentResult
case class PaymentSucceeded(id: UUID, amountMsat: Long, paymentHash: ByteVector32, paymentPreimage: ByteVector32, route: Seq[Hop]) extends PaymentResult // note: the amount includes fees
sealed trait PaymentFailure
case class LocalFailure(t: Throwable) extends PaymentFailure
case class RemoteFailure(route: Seq[Hop], e: Sphinx.DecryptedFailurePacket) extends PaymentFailure
case class UnreadableRemoteFailure(route: Seq[Hop]) extends PaymentFailure
case class PaymentFailed(id: UUID, paymentHash: ByteVector32, failures: Seq[PaymentFailure]) extends PaymentResult
sealed trait PaymentFailure {
def errorMessage: String
}
case class LocalFailure(t: Throwable) extends PaymentFailure {
override def errorMessage: String = t.getMessage
}
case class RemoteFailure(route: Seq[Hop], e: Sphinx.DecryptedFailurePacket) extends PaymentFailure {
override def errorMessage: String = e.failureMessage.message
}
case class UnreadableRemoteFailure(route: Seq[Hop]) extends PaymentFailure {
override def errorMessage: String = "unreadable remote failure"
}
case class PaymentFailed(id: UUID, paymentHash: ByteVector32, failures: Seq[PaymentFailure]) extends PaymentResult {
def errorMessages: Seq[String] = failures.map(_.errorMessage).filter(_.nonEmpty)
}

sealed trait Data
case object WaitingForRequest extends Data
Expand Down
Loading