Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 43 additions & 30 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import scodec.Attempt
import scodec.bits.BitVector
import scodec.codecs._

import java.sql.{ResultSet, Statement}
import java.sql.{ResultSet, Statement, Timestamp}
import java.time.Instant
import java.util.UUID
import javax.sql.DataSource
import scala.concurrent.duration._

class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb with Logging {

Expand All @@ -42,7 +42,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
import lock._

val DB_NAME = "payments"
val CURRENT_VERSION = 5
val CURRENT_VERSION = 6

private val hopSummaryCodec = (("node_id" | CommonCodecs.publicKey) :: ("next_node_id" | CommonCodecs.publicKey) :: ("short_channel_id" | optional(bool, CommonCodecs.shortchannelid))).as[HopSummary]
private val paymentRouteCodec = discriminated[List[HopSummary]].by(byte)
Expand All @@ -62,12 +62,21 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
statement.executeUpdate("ALTER TABLE sent SET SCHEMA payments")
}

def migration56(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN created_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + created_at * interval '1 millisecond'")
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN expire_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + expire_at * interval '1 millisecond'")
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN received_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + received_at * interval '1 millisecond'")

statement.executeUpdate("ALTER TABLE payments.sent ALTER COLUMN created_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + created_at * interval '1 millisecond'")
statement.executeUpdate("ALTER TABLE payments.sent ALTER COLUMN completed_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + completed_at * interval '1 millisecond'")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA payments")

statement.executeUpdate("CREATE TABLE payments.received (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at BIGINT NOT NULL, expire_at BIGINT NOT NULL, received_at BIGINT)")
statement.executeUpdate("CREATE TABLE payments.sent (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at BIGINT NOT NULL, completed_at BIGINT)")
statement.executeUpdate("CREATE TABLE payments.received (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, expire_at TIMESTAMP WITH TIME ZONE NOT NULL, received_at TIMESTAMP WITH TIME ZONE)")
statement.executeUpdate("CREATE TABLE payments.sent (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at TIMESTAMP WITH TIME ZONE NOT NULL, completed_at TIMESTAMP WITH TIME ZONE)")

statement.executeUpdate("CREATE INDEX sent_parent_id_idx ON payments.sent(parent_id)")
statement.executeUpdate("CREATE INDEX sent_payment_hash_idx ON payments.sent(payment_hash)")
Expand All @@ -76,6 +85,10 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand All @@ -95,7 +108,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
statement.setLong(6, sent.amount.toLong)
statement.setLong(7, sent.recipientAmount.toLong)
statement.setString(8, sent.recipientNodeId.value.toHex)
statement.setLong(9, sent.createdAt)
statement.setTimestamp(9, Timestamp.from(Instant.ofEpochMilli(sent.createdAt)))
statement.setString(10, sent.paymentRequest.map(PaymentRequest.write).orNull)
statement.executeUpdate()
}
Expand All @@ -106,7 +119,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, payment_preimage, fees_msat, payment_route) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
paymentResult.parts.foreach(p => {
statement.setLong(1, p.timestamp)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(p.timestamp)))
statement.setString(2, paymentResult.paymentPreimage.toHex)
statement.setLong(3, p.feesPaid.toLong)
statement.setBytes(4, paymentRouteCodec.encode(p.route.getOrElse(Nil).map(h => HopSummary(h)).toList).require.toByteArray)
Expand All @@ -121,7 +134,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, failures) = (?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
statement.setLong(1, paymentResult.timestamp)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(paymentResult.timestamp)))
statement.setBytes(2, paymentFailuresCodec.encode(paymentResult.failures.map(f => FailureSummary(f)).toList).require.toByteArray)
statement.setString(3, paymentResult.id.toString)
if (statement.executeUpdate() == 0) throw new IllegalArgumentException(s"Tried to mark an outgoing payment as failed but already in final status (id=${paymentResult.id})")
Expand All @@ -134,7 +147,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
rs.getByteVector32FromHexNullable("payment_preimage"),
rs.getMilliSatoshiNullable("fees_msat"),
rs.getBitVectorOpt("payment_route"),
rs.getLongNullable("completed_at"),
rs.getTimestampNullable("completed_at").map(_.getTime),
rs.getBitVectorOpt("failures"))

OutgoingPayment(
Expand All @@ -146,7 +159,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
MilliSatoshi(rs.getLong("amount_msat")),
MilliSatoshi(rs.getLong("recipient_amount_msat")),
PublicKey(rs.getByteVectorFromHex("recipient_node_id")),
rs.getLong("created_at"),
rs.getTimestamp("created_at").getTime,
rs.getStringNullable("payment_request").map(PaymentRequest.read),
status
)
Expand Down Expand Up @@ -207,8 +220,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-timestamp", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.sent WHERE created_at >= ? AND created_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map { rs =>
parseOutgoingPayment(rs)
}.toSeq
Expand All @@ -223,8 +236,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
statement.setString(2, preimage.toHex)
statement.setString(3, paymentType)
statement.setString(4, PaymentRequest.write(pr))
statement.setLong(5, pr.timestamp.seconds.toMillis) // BOLT11 timestamp is in seconds
statement.setLong(6, (pr.timestamp + pr.expiry.getOrElse(PaymentRequest.DEFAULT_EXPIRY_SECONDS.toLong)).seconds.toMillis)
statement.setTimestamp(5, Timestamp.from(Instant.ofEpochSecond(pr.timestamp))) // BOLT11 timestamp is in seconds
statement.setTimestamp(6, Timestamp.from(Instant.ofEpochSecond(pr.timestamp + pr.expiry.getOrElse(PaymentRequest.DEFAULT_EXPIRY_SECONDS.toLong))))
statement.executeUpdate()
}
}
Expand All @@ -234,7 +247,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.received SET (received_msat, received_at) = (? + COALESCE(received_msat, 0), ?) WHERE payment_hash = ?")) { update =>
update.setLong(1, amount.toLong)
update.setLong(2, receivedAt)
update.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(receivedAt)))
update.setString(3, paymentHash.toHex)
val updated = update.executeUpdate()
if (updated == 0) {
Expand All @@ -250,8 +263,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
PaymentRequest.read(paymentRequest),
rs.getByteVector32FromHex("payment_preimage"),
rs.getString("payment_type"),
rs.getLong("created_at"),
buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), Some(paymentRequest), rs.getLongNullable("received_at")))
rs.getTimestamp("created_at").getTime,
buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), Some(paymentRequest), rs.getTimestampNullable("received_at").map(_.getTime)))
}

private def buildIncomingPaymentStatus(amount_opt: Option[MilliSatoshi], serializedPaymentRequest_opt: Option[String], receivedAt_opt: Option[Long]): IncomingPaymentStatus = {
Expand All @@ -274,8 +287,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE created_at > ? AND created_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand All @@ -284,8 +297,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-received", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat > 0 AND created_at > ? AND created_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand All @@ -294,9 +307,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-pending", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at > ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(3, System.currentTimeMillis)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.setTimestamp(3, Timestamp.from(Instant.now()))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand All @@ -305,9 +318,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-expired", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(3, System.currentTimeMillis)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.setTimestamp(3, Timestamp.from(Instant.now()))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand Down Expand Up @@ -366,9 +379,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
val paymentType = rs.getString("payment_type")
val paymentRequest_opt = rs.getStringNullable("payment_request")
val amount_opt = rs.getMilliSatoshiNullable("final_amount")
val createdAt = rs.getLong("created_at")
val completedAt_opt = rs.getLongNullable("completed_at")
val expireAt_opt = rs.getLongNullable("expire_at")
val createdAt = rs.getTimestamp("created_at").getTime
val completedAt_opt = rs.getTimestampNullable("completed_at").map(_.getTime)
val expireAt_opt = rs.getTimestampNullable("expire_at").map(_.getTime)

if (rs.getString("type") == "received") {
val status: IncomingPaymentStatus = buildIncomingPaymentStatus(amount_opt, paymentRequest_opt, completedAt_opt)
Expand Down
Loading