Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait PaymentsDb {
def addOutgoingPayment(outgoingPayment: OutgoingPayment)

// updates the status of the payment, if the newStatus is SUCCEEDED you must supply a preimage
def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32] = None)
def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32], fee: MilliSatoshi)

def getOutgoingPayment(id: UUID): Option[OutgoingPayment]

Expand Down Expand Up @@ -75,11 +75,12 @@ case class IncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, rece
* @param paymentHash payment_hash
* @param preimage the preimage of the payment_hash, known if the outgoing payment was successful
* @param amount amount of the payment, in milli-satoshis
* @param fee off-chain fees paid, in milli-satoshis
* @param createdAt absolute time in seconds since UNIX epoch when the payment was created.
* @param completedAt absolute time in seconds since UNIX epoch when the payment succeeded.
Comment thread
araspitzu marked this conversation as resolved.
* @param status current status of the payment.
*/
case class OutgoingPayment(id: UUID, paymentHash: ByteVector32, preimage:Option[ByteVector32], amount: MilliSatoshi, createdAt: Long, completedAt: Option[Long], status: OutgoingPaymentStatus.Value)
case class OutgoingPayment(id: UUID, paymentHash: ByteVector32, preimage:Option[ByteVector32], amount: MilliSatoshi, fee: MilliSatoshi, createdAt: Long, completedAt: Option[Long], status: OutgoingPaymentStatus.Value)

object OutgoingPaymentStatus extends Enumeration {
val PENDING = Value(1, "PENDING")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package fr.acinq.eclair.db.sqlite

import java.sql.Connection
import java.sql.{Connection, Statement}
import java.util.UUID
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.db.sqlite.SqliteUtils._
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment, OutgoingPaymentStatus, PaymentsDb}
import fr.acinq.eclair.payment.PaymentRequest
import fr.acinq.eclair._
import grizzled.slf4j.Logging
import scala.collection.immutable.Queue
import OutgoingPaymentStatus._
Expand All @@ -34,14 +35,37 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
import SqliteUtils.ExtendedResultSet._

val DB_NAME = "payments"
val CURRENT_VERSION = 2
val CURRENT_VERSION = 3

using(sqlite.createStatement()) { statement =>
require(getVersion(statement, DB_NAME, CURRENT_VERSION) <= CURRENT_VERSION, s"incompatible version of $DB_NAME DB found") // version 2 is "backward compatible" in the sense that it uses separate tables from version 1. There is no migration though
def migration12(statement: Statement): Unit = {
// version 2 is "backward compatible" in the sense that it uses separate tables from version 1. There is no migration though
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received_payments (payment_hash BLOB NOT NULL PRIMARY KEY, preimage BLOB NOT NULL, payment_request TEXT NOT NULL, received_msat INTEGER, created_at INTEGER NOT NULL, expire_at INTEGER, received_at INTEGER)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent_payments (id TEXT NOT NULL PRIMARY KEY, payment_hash BLOB NOT NULL, preimage BLOB, amount_msat INTEGER NOT NULL, created_at INTEGER NOT NULL, completed_at INTEGER, status VARCHAR NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS payment_hash_idx ON sent_payments(payment_hash)")
setVersion(statement, DB_NAME, CURRENT_VERSION)
}

def migration23(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE sent_payments ADD COLUMN fee_msat INTEGER DEFAULT 0 NOT NULL")
}

using(sqlite.createStatement()) { statement =>
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 1 => // previous version let's migrate
logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 2 =>
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received_payments (payment_hash BLOB NOT NULL PRIMARY KEY, preimage BLOB NOT NULL, payment_request TEXT NOT NULL, received_msat INTEGER, created_at INTEGER NOT NULL, expire_at INTEGER, received_at INTEGER)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent_payments (id TEXT NOT NULL PRIMARY KEY, payment_hash BLOB NOT NULL, preimage BLOB, amount_msat INTEGER NOT NULL, created_at INTEGER NOT NULL, completed_at INTEGER, status VARCHAR NOT NULL, fee_msat INTEGER DEFAULT 0 NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS payment_hash_idx ON sent_payments(payment_hash)")
case unknownVersion =>
throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
}

override def addOutgoingPayment(sent: OutgoingPayment): Unit = {
Expand All @@ -56,20 +80,21 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
}
}

override def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32] = None) = {
require((newStatus == SUCCEEDED && preimage.isDefined) || (newStatus == FAILED && preimage.isEmpty), "Wrong combination of state/preimage")
override def updateOutgoingPayment(id: UUID, newStatus: OutgoingPaymentStatus.Value, preimage: Option[ByteVector32], fee: MilliSatoshi): Unit = {
require((newStatus == SUCCEEDED && preimage.isDefined) || (newStatus != SUCCEEDED && preimage.isEmpty && fee == 0.msat), "Wrong combination of state/preimage")

using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, preimage, status) = (?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
using(sqlite.prepareStatement("UPDATE sent_payments SET (completed_at, preimage, status, fee_msat) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
statement.setLong(1, Platform.currentTime)
statement.setBytes(2, if (preimage.isEmpty) null else preimage.get.toArray)
statement.setString(3, newStatus.toString)
statement.setString(4, id.toString)
statement.setLong(4, fee.toLong)
statement.setString(5, id.toString)
if (statement.executeUpdate() == 0) throw new IllegalArgumentException(s"Tried to update an outgoing payment (id=$id) already in final status with=$newStatus")
}
}

override def getOutgoingPayment(id: UUID): Option[OutgoingPayment] = {
using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments WHERE id = ?")) { statement =>
using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status, fee_msat FROM sent_payments WHERE id = ?")) { statement =>
statement.setString(1, id.toString)
val rs = statement.executeQuery()
if (rs.next()) {
Expand All @@ -78,6 +103,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
rs.getByteVector32("payment_hash"),
rs.getByteVector32Nullable("preimage"),
MilliSatoshi(rs.getLong("amount_msat")),
MilliSatoshi(rs.getLong("fee_msat")),
rs.getLong("created_at"),
getNullableLong(rs, "completed_at"),
OutgoingPaymentStatus.withName(rs.getString("status"))
Expand All @@ -89,7 +115,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
}

override def getOutgoingPayments(paymentHash: ByteVector32): Seq[OutgoingPayment] = {
using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments WHERE payment_hash = ?")) { statement =>
using(sqlite.prepareStatement("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status, fee_msat FROM sent_payments WHERE payment_hash = ?")) { statement =>
statement.setBytes(1, paymentHash.toArray)
val rs = statement.executeQuery()
var q: Queue[OutgoingPayment] = Queue()
Expand All @@ -99,6 +125,7 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {
rs.getByteVector32("payment_hash"),
rs.getByteVector32Nullable("preimage"),
MilliSatoshi(rs.getLong("amount_msat")),
MilliSatoshi(rs.getLong("fee_msat")),
rs.getLong("created_at"),
getNullableLong(rs, "completed_at"),
OutgoingPaymentStatus.withName(rs.getString("status"))
Expand All @@ -110,14 +137,15 @@ class SqlitePaymentsDb(sqlite: Connection) extends PaymentsDb with Logging {

override def listOutgoingPayments(): Seq[OutgoingPayment] = {
using(sqlite.createStatement()) { statement =>
val rs = statement.executeQuery("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status FROM sent_payments")
val rs = statement.executeQuery("SELECT id, payment_hash, preimage, amount_msat, created_at, completed_at, status, fee_msat FROM sent_payments")
var q: Queue[OutgoingPayment] = Queue()
while (rs.next()) {
q = q :+ OutgoingPayment(
UUID.fromString(rs.getString("id")),
rs.getByteVector32("payment_hash"),
rs.getByteVector32Nullable("preimage"),
MilliSatoshi(rs.getLong("amount_msat")),
MilliSatoshi(rs.getLong("fee_msat")),
rs.getLong("created_at"),
getNullableLong(rs, "completed_at"),
OutgoingPaymentStatus.withName(rs.getString("status"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
when(WAITING_FOR_REQUEST) {
case Event(c: SendPaymentToRoute, WaitingForRequest) =>
val send = SendPayment(c.paymentHash, c.hops.last, c.finalPayload, maxAttempts = 1)
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.finalPayload.amount, Platform.currentTime, None, OutgoingPaymentStatus.PENDING))
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.finalPayload.amount, fee = 0 msat, Platform.currentTime, None, OutgoingPaymentStatus.PENDING))
router ! FinalizeRoute(c.hops)
goto(WAITING_FOR_ROUTE) using WaitingForRoute(sender, send, failures = Nil)

case Event(c: SendPayment, WaitingForRequest) =>
router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.assistedRoutes, routeParams = c.routeParams)
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.finalPayload.amount, Platform.currentTime, None, OutgoingPaymentStatus.PENDING))
paymentsDb.addOutgoingPayment(OutgoingPayment(id, c.paymentHash, None, c.finalPayload.amount, fee = 0 msat, Platform.currentTime, None, OutgoingPaymentStatus.PENDING))
goto(WAITING_FOR_ROUTE) using WaitingForRoute(sender, c, failures = Nil)
}

Expand All @@ -68,17 +68,18 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis

case Event(Status.Failure(t), WaitingForRoute(s, c, failures)) =>
reply(s, PaymentFailed(id, c.paymentHash, failures = failures :+ LocalFailure(t)))
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, preimage = None, fee = MilliSatoshi(0))
stop(FSM.Normal)
}

when(WAITING_FOR_PAYMENT_COMPLETE) {
case Event("ok", _) => stay()

case Event(fulfill: UpdateFulfillHtlc, WaitingForComplete(s, c, cmd, _, _, _, _, hops)) =>
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.SUCCEEDED, preimage = Some(fulfill.paymentPreimage))
val feesPaid = cmd.amount - c.finalPayload.amount
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.SUCCEEDED, preimage = Some(fulfill.paymentPreimage), fee = feesPaid)
reply(s, PaymentSucceeded(id, cmd.amount, c.paymentHash, fulfill.paymentPreimage, hops))
context.system.eventStream.publish(PaymentSent(id, c.finalPayload.amount, cmd.amount - c.finalPayload.amount, cmd.paymentHash, fulfill.paymentPreimage, fulfill.channelId))
context.system.eventStream.publish(PaymentSent(id, c.finalPayload.amount, feesPaid, cmd.paymentHash, fulfill.paymentPreimage, fulfill.channelId))
stop(FSM.Normal)

case Event(fail: UpdateFailHtlc, WaitingForComplete(s, c, _, failures, sharedSecrets, ignoreNodes, ignoreChannels, hops)) =>
Expand All @@ -87,7 +88,7 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
// if destination node returns an error, we fail the payment immediately
log.warning(s"received an error message from target nodeId=$nodeId, failing the payment (failure=$failureMessage)")
reply(s, PaymentFailed(id, c.paymentHash, failures = failures :+ RemoteFailure(hops, e)))
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, preimage = None, fee = MilliSatoshi(0))
stop(FSM.Normal)
case res if failures.size + 1 >= c.maxAttempts =>
// otherwise we never try more than maxAttempts, no matter the kind of error returned
Expand All @@ -101,7 +102,7 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis
}
log.warning(s"too many failed attempts, failing the payment")
reply(s, PaymentFailed(id, c.paymentHash, failures = failures :+ failure))
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, preimage = None, fee = MilliSatoshi(0))
stop(FSM.Normal)
case Failure(t) =>
log.warning(s"cannot parse returned error: ${t.getMessage}")
Expand Down Expand Up @@ -166,7 +167,7 @@ class PaymentLifecycle(nodeParams: NodeParams, id: UUID, router: ActorRef, regis

case Event(Status.Failure(t), WaitingForComplete(s, c, _, failures, _, ignoreNodes, ignoreChannels, hops)) =>
if (failures.size + 1 >= c.maxAttempts) {
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
paymentsDb.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, preimage = None, fee = MilliSatoshi(0))
reply(s, PaymentFailed(id, c.paymentHash, failures :+ LocalFailure(t)))
stop(FSM.Normal)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
case Local(id, None) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost,
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, preimage = None, fee = MilliSatoshi(0))
context.system.eventStream.publish(PaymentFailed(id, paymentHash, Nil))
case Local(_, Some(sender)) =>
sender ! Status.Failure(addFailed)
Expand All @@ -163,7 +163,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
context.system.eventStream.publish(PaymentSent(id, add.amountMsat, feesPaid, add.paymentHash, fulfill.paymentPreimage, fulfill.channelId))
// we sent the payment, but we probably restarted and the reference to the original sender was lost,
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.SUCCEEDED, Some(fulfill.paymentPreimage))
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.SUCCEEDED, Some(fulfill.paymentPreimage), fee = MilliSatoshi(0))
context.system.eventStream.publish(PaymentSucceeded(id, add.amountMsat, add.paymentHash, fulfill.paymentPreimage, Nil)) //
case Local(_, Some(sender)) =>
sender ! fulfill
Expand All @@ -178,7 +178,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
case Local(id, None) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, preimage = None, fee = MilliSatoshi(0))
context.system.eventStream.publish(PaymentFailed(id, add.paymentHash, Nil))
case Local(_, Some(sender)) =>
sender ! fail
Expand All @@ -192,7 +192,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
case Local(id, None) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED, preimage = None, fee = MilliSatoshi(0))
context.system.eventStream.publish(PaymentFailed(id, add.paymentHash, Nil))
case Local(_, Some(sender)) =>
sender ! fail
Expand Down
Loading