Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

### API changes

<insert changes>
- `audit` now accepts `--count` and `--skip` parameters to limit the number of retrieved items (#2474, #2487)

### Miscellaneous improvements and bug fixes

Expand Down
10 changes: 5 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ trait Eclair {

def sendToRoute(amount: MilliSatoshi, recipientAmount_opt: Option[MilliSatoshi], externalId_opt: Option[String], parentId_opt: Option[UUID], invoice: Bolt11Invoice, route: PredefinedRoute, trampolineSecret_opt: Option[ByteVector32] = None, trampolineFees_opt: Option[MilliSatoshi] = None, trampolineExpiryDelta_opt: Option[CltvExpiryDelta] = None, trampolineNodes_opt: Seq[PublicKey] = Nil)(implicit timeout: Timeout): Future[SendPaymentToRouteResponse]

def audit(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[AuditResponse]
def audit(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[AuditResponse]

def networkFees(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[NetworkFee]]

Expand Down Expand Up @@ -419,11 +419,11 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
appKit.nodeParams.db.payments.getIncomingPayment(paymentHash)
}

override def audit(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[AuditResponse] = {
override def audit(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[AuditResponse] = {
Future(AuditResponse(
sent = appKit.nodeParams.db.audit.listSent(from.toTimestampMilli, to.toTimestampMilli),
received = appKit.nodeParams.db.audit.listReceived(from.toTimestampMilli, to.toTimestampMilli),
relayed = appKit.nodeParams.db.audit.listRelayed(from.toTimestampMilli, to.toTimestampMilli)
sent = appKit.nodeParams.db.audit.listSent(from.toTimestampMilli, to.toTimestampMilli, paginated_opt),
received = appKit.nodeParams.db.audit.listReceived(from.toTimestampMilli, to.toTimestampMilli, paginated_opt),
relayed = appKit.nodeParams.db.audit.listRelayed(from.toTimestampMilli, to.toTimestampMilli, paginated_opt)
))
}

Expand Down
8 changes: 4 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
import fr.acinq.eclair.TimestampMilli
import fr.acinq.eclair.{Paginated, TimestampMilli}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
Expand All @@ -44,11 +44,11 @@ trait AuditDb {

def addPathFindingExperimentMetrics(metrics: PathFindingExperimentMetrics): Unit

def listSent(from: TimestampMilli, to: TimestampMilli): Seq[PaymentSent]
def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentSent]

def listReceived(from: TimestampMilli, to: TimestampMilli): Seq[PaymentReceived]
def listReceived(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentReceived]

def listRelayed(from: TimestampMilli, to: TimestampMilli): Seq[PaymentRelayed]
def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed]

def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,19 @@ case class DualAuditDb(primary: AuditDb, secondary: AuditDb) extends AuditDb {
primary.addPathFindingExperimentMetrics(metrics)
}

override def listSent(from: TimestampMilli, to: TimestampMilli): Seq[PaymentSent] = {
runAsync(secondary.listSent(from, to))
primary.listSent(from, to)
override def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated]): Seq[PaymentSent] = {
runAsync(secondary.listSent(from, to, paginated_opt))
primary.listSent(from, to, paginated_opt)
}

override def listReceived(from: TimestampMilli, to: TimestampMilli): Seq[PaymentReceived] = {
runAsync(secondary.listReceived(from, to))
primary.listReceived(from, to)
override def listReceived(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated]): Seq[PaymentReceived] = {
runAsync(secondary.listReceived(from, to, paginated_opt))
primary.listReceived(from, to, paginated_opt)
}

override def listRelayed(from: TimestampMilli, to: TimestampMilli): Seq[PaymentRelayed] = {
runAsync(secondary.listRelayed(from, to))
primary.listRelayed(from, to)
override def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated]): Seq[PaymentRelayed] = {
runAsync(secondary.listRelayed(from, to, paginated_opt))
primary.listRelayed(from, to, paginated_opt)
}

override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[AuditDb.NetworkFee] = {
Expand Down
26 changes: 19 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, TimestampMilli}
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli}
import grizzled.slf4j.Logging

import java.sql.{Statement, Timestamp}
Expand Down Expand Up @@ -334,12 +334,12 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def listSent(from: TimestampMilli, to: TimestampMilli): Seq[PaymentSent] =
override def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentSent] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM audit.sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
statement.setTimestamp(1, from.toSqlTimestamp)
statement.setTimestamp(2, to.toSqlTimestamp)
statement.executeQuery()
val result = statement.executeQuery()
.foldLeft(Map.empty[UUID, PaymentSent]) { (sentByParentId, rs) =>
val parentId = UUID.fromString(rs.getString("parent_payment_id"))
val part = PaymentSent.PartialPayment(
Expand All @@ -361,15 +361,19 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
sentByParentId + (parentId -> sent)
}.values.toSeq.sortBy(_.timestamp)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
}
}
}

override def listReceived(from: TimestampMilli, to: TimestampMilli): Seq[PaymentReceived] =
override def listReceived(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentReceived] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM audit.received WHERE timestamp BETWEEN ? AND ?")) { statement =>
statement.setTimestamp(1, from.toSqlTimestamp)
statement.setTimestamp(2, to.toSqlTimestamp)
statement.executeQuery()
val result = statement.executeQuery()
.foldLeft(Map.empty[ByteVector32, PaymentReceived]) { (receivedByHash, rs) =>
val paymentHash = rs.getByteVector32FromHex("payment_hash")
val part = PaymentReceived.PartialPayment(
Expand All @@ -382,10 +386,14 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
receivedByHash + (paymentHash -> received)
}.values.toSeq.sortBy(_.timestamp)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
}
}
}

override def listRelayed(from: TimestampMilli, to: TimestampMilli): Seq[PaymentRelayed] =
override def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed] =
inTransaction { pg =>
val trampolineByHash = using(pg.prepareStatement("SELECT * FROM audit.relayed_trampoline WHERE timestamp BETWEEN ? and ?")) { statement =>
statement.setTimestamp(1, from.toSqlTimestamp)
Expand Down Expand Up @@ -413,7 +421,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part))
}
}
relayedByHash.flatMap {
val result = relayedByHash.flatMap {
case (paymentHash, parts) =>
// We may have been routing multiple payments for the same payment_hash (MPP) in both cases (trampoline and channel).
// NB: we may link the wrong in-out parts, but the overall sum will be correct: we sort by amounts to minimize the risk of mismatch.
Expand All @@ -429,6 +437,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
}
}

override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, TimestampMilli}
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli}
import grizzled.slf4j.Logging

import java.sql.{Connection, Statement}
Expand Down Expand Up @@ -310,11 +310,11 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
}
}

override def listSent(from: TimestampMilli, to: TimestampMilli): Seq[PaymentSent] =
override def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentSent] =
using(sqlite.prepareStatement("SELECT * FROM sent WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from.toLong)
statement.setLong(2, to.toLong)
statement.executeQuery()
val result = statement.executeQuery()
.foldLeft(Map.empty[UUID, PaymentSent]) { (sentByParentId, rs) =>
val parentId = UUID.fromString(rs.getString("parent_payment_id"))
val part = PaymentSent.PartialPayment(
Expand All @@ -336,13 +336,17 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
}
sentByParentId + (parentId -> sent)
}.values.toSeq.sortBy(_.timestamp)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
}
}

override def listReceived(from: TimestampMilli, to: TimestampMilli): Seq[PaymentReceived] =
override def listReceived(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentReceived] =
using(sqlite.prepareStatement("SELECT * FROM received WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from.toLong)
statement.setLong(2, to.toLong)
statement.executeQuery()
val result = statement.executeQuery()
.foldLeft(Map.empty[ByteVector32, PaymentReceived]) { (receivedByHash, rs) =>
val paymentHash = rs.getByteVector32("payment_hash")
val part = PaymentReceived.PartialPayment(
Expand All @@ -355,9 +359,13 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
}
receivedByHash + (paymentHash -> received)
}.values.toSeq.sortBy(_.timestamp)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
}
}

override def listRelayed(from: TimestampMilli, to: TimestampMilli): Seq[PaymentRelayed] = {
override def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed] = {
val trampolineByHash = using(sqlite.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from.toLong)
statement.setLong(2, to.toLong)
Expand Down Expand Up @@ -385,7 +393,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
relayedByHash + (paymentHash -> (relayedByHash.getOrElse(paymentHash, Nil) :+ part))
}
}
relayedByHash.flatMap {
val result = relayedByHash.flatMap {
case (paymentHash, parts) =>
// We may have been routing multiple payments for the same payment_hash (MPP) in both cases (trampoline and channel).
// NB: we may link the wrong in-out parts, but the overall sum will be correct: we sort by amounts to minimize the risk of mismatch.
Expand All @@ -401,6 +409,10 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
paginated_opt match {
case Some(paginated) => result.slice(paginated.skip, paginated.skip + paginated.count)
case None => result
}
}

override def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee] =
Expand Down
16 changes: 14 additions & 2 deletions eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,22 @@ class AuditDbSpec extends AnyFunSuite {
db.add(e11)
db.add(e12)

assert(db.listSent(from = TimestampMilli(0L), to = TimestampMilli.now() + 15.minute).toSet == Set(e1, e5, e6))
assert(db.listSent(from = TimestampMilli(0L), to = TimestampMilli.now() + 15.minute).toList == List(e5, e1, e6))
assert(db.listSent(from = TimestampMilli(100000L), to = TimestampMilli.now() + 1.minute).toList == List(e1))
assert(db.listSent(from = TimestampMilli(0L), to = TimestampMilli.now() + 15.minute, Some(Paginated(count = 0, skip = 0))).toList == List())
assert(db.listSent(from = TimestampMilli(0L), to = TimestampMilli.now() + 15.minute, Some(Paginated(count = 2, skip = 0))).toList == List(e5, e1))
assert(db.listSent(from = TimestampMilli(0L), to = TimestampMilli.now() + 15.minute, Some(Paginated(count = 2, skip = 1))).toList == List(e1, e6))
assert(db.listSent(from = TimestampMilli(0L), to = TimestampMilli.now() + 15.minute, Some(Paginated(count = 2, skip = 2))).toList == List(e6))
assert(db.listSent(from = TimestampMilli(0L), to = TimestampMilli.now() + 15.minute, Some(Paginated(count = 2, skip = 3))).toList == List())
assert(db.listReceived(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute).toList == List(e2))
assert(db.listReceived(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute, Some(Paginated(count = 0, skip = 0))).toList == List())
assert(db.listReceived(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute, Some(Paginated(count = 2, skip = 0))).toList == List(e2))
assert(db.listReceived(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute, Some(Paginated(count = 2, skip = 1))).toList == List())
assert(db.listRelayed(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute).toList == List(e3, e10, e11, e12))
assert(db.listRelayed(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute, Some(Paginated(count = 0, skip = 0))).toList == List())
assert(db.listRelayed(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute, Some(Paginated(count = 2, skip = 0))).toList == List(e3, e10))
assert(db.listRelayed(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute, Some(Paginated(count = 2, skip = 1))).toList == List(e10, e11))
assert(db.listRelayed(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute, Some(Paginated(count = 2, skip = 4))).toList == List())
assert(db.listNetworkFees(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute).size == 1)
assert(db.listNetworkFees(from = TimestampMilli(0L), to = TimestampMilli.now() + 1.minute).head.txType == "mutual")
}
Expand Down Expand Up @@ -835,7 +847,7 @@ class AuditDbSpec extends AnyFunSuite {
statement.executeUpdate()
}

assert(db.listRelayed(0 unixms, 40 unixms) == Nil)
assert(db.listRelayed(0 unixms, 40 unixms, None) == Nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ trait Node {
}

val audit: Route = postRequest("audit") { implicit t =>
formFields(fromFormParam(), toFormParam()) { (from, to) =>
complete(eclairApi.audit(from, to))
withPaginated { paginated_opt =>
formFields(fromFormParam(), toFormParam()) { (from, to) =>
complete(eclairApi.audit(from, to, paginated_opt))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM
val eclair = mock[Eclair]
val mockService = new MockService(eclair)
val auditResponse = AuditResponse(Seq.empty, Seq.empty, Seq.empty)
eclair.audit(any, any)(any[Timeout]) returns Future.successful(auditResponse)
eclair.audit(any, any, any)(any[Timeout]) returns Future.successful(auditResponse)

Post("/audit") ~>
addCredentials(BasicHttpCredentials("", mockApi().password)) ~>
Expand All @@ -1100,7 +1100,7 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM
// The default is to get data for the last day.
val from = TimestampSecond.now() - 1.day
val to = TimestampSecond.now()
eclair.audit(argThat[TimestampSecond](t => from - 1.minute <= t && t <= from + 1.minute), argThat[TimestampSecond](t => to - 1.minute <= t && t <= to + 1.minute))(any[Timeout]).wasCalled(once)
eclair.audit(argThat[TimestampSecond](t => from - 1.minute <= t && t <= from + 1.minute), argThat[TimestampSecond](t => to - 1.minute <= t && t <= to + 1.minute), None)(any[Timeout]).wasCalled(once)
}

Post("/audit", FormData("from" -> TimestampSecond.min.toLong.toString, "to" -> TimestampSecond.max.toLong.toString)) ~>
Expand All @@ -1109,7 +1109,7 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM
check {
assert(handled)
assert(status == OK)
eclair.audit(TimestampSecond.min, TimestampSecond.max)(any[Timeout]).wasCalled(once)
eclair.audit(TimestampSecond.min, TimestampSecond.max, None)(any[Timeout]).wasCalled(once)
}

Post("/audit", FormData("from" -> 123456.toString, "to" -> 654321.toString)) ~>
Expand All @@ -1118,8 +1118,18 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM
check {
assert(handled)
assert(status == OK)
eclair.audit(123456 unixsec, 654321 unixsec)(any[Timeout]).wasCalled(once)
eclair.audit(123456 unixsec, 654321 unixsec, None)(any[Timeout]).wasCalled(once)
}

Post("/audit", FormData("from" -> 123456.toString, "to" -> 654321.toString, "count" -> 1.toString, "skip" -> 2.toString)) ~>
addCredentials(BasicHttpCredentials("", mockApi().password)) ~>
Route.seal(mockService.audit) ~>
check {
assert(handled)
assert(status == OK)
eclair.audit(123456 unixsec, 654321 unixsec, Some(Paginated(count = 1, skip = 2)))(any[Timeout]).wasCalled(once)
}

}

test("the websocket should return typed objects") {
Expand Down