From 2e3d9bd89a2919e49bf87d6d0e97a87750455a0a Mon Sep 17 00:00:00 2001 From: rorp Date: Sun, 16 Apr 2023 17:21:57 -0700 Subject: [PATCH 1/8] Add closedchannels RPC --- contrib/eclair-cli.bash-completion | 2 +- eclair-core/eclair-cli | 1 + .../src/main/scala/fr/acinq/eclair/Eclair.scala | 10 ++++++++++ .../src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala | 2 ++ .../main/scala/fr/acinq/eclair/db/DualDatabases.scala | 5 +++++ .../scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala | 9 +++++++++ .../fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala | 7 +++++++ .../test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala | 2 ++ .../scala/fr/acinq/eclair/api/handlers/Channel.scala | 8 +++++++- 9 files changed, 44 insertions(+), 2 deletions(-) diff --git a/contrib/eclair-cli.bash-completion b/contrib/eclair-cli.bash-completion index 47b76ecd76..7173779230 100644 --- a/contrib/eclair-cli.bash-completion +++ b/contrib/eclair-cli.bash-completion @@ -21,7 +21,7 @@ _eclair-cli() *) # works fine, but is too slow at the moment. # allopts=$($eclaircli help 2>&1 | awk '$1 ~ /^"/ { sub(/,/, ""); print $1}' | sed 's/[":]//g') - allopts="getinfo connect open close forceclose updaterelayfee peers channels channel allnodes allchannels allupdates findroute findroutetonode findroutebetweennodes parseinvoice payinvoice sendtonode getsentinfo createinvoice getinvoice listinvoices listpendinginvoices listreceivedpayments getreceivedinfo audit networkfees channelstats" + allopts="getinfo connect open close forceclose updaterelayfee peers channels channel closedchannels allnodes allchannels allupdates findroute findroutetonode findroutebetweennodes parseinvoice payinvoice sendtonode getsentinfo createinvoice getinvoice listinvoices listpendinginvoices listreceivedpayments getreceivedinfo audit networkfees channelstats" if ! [[ " $allopts " =~ " $prev " ]]; then # prevent double arguments if [[ -z "$cur" || "$cur" =~ ^[a-z] ]]; then diff --git a/eclair-core/eclair-cli b/eclair-core/eclair-cli index c5a9a962e6..6711d03d8c 100755 --- a/eclair-core/eclair-cli +++ b/eclair-core/eclair-cli @@ -45,6 +45,7 @@ and COMMAND is one of the available commands: - forceclose - channel - channels + - closedchannels - allchannels - allupdates - channelstats diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index 276fd771e9..f5d8fa124a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -104,6 +104,8 @@ trait Eclair { def channelInfo(channel: ApiTypes.ChannelIdentifier)(implicit timeout: Timeout): Future[CommandResponse[CMD_GET_CHANNEL_INFO]] + def closedChannels(toRemoteNode_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] + def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] def node(nodeId: PublicKey)(implicit timeout: Timeout): Future[Option[Router.PublicNode]] @@ -286,6 +288,14 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging { sendToChannel[CMD_GET_CHANNEL_INFO, CommandResponse[CMD_GET_CHANNEL_INFO]](channel, CMD_GET_CHANNEL_INFO(ActorRef.noSender)) } + override def closedChannels(toRemoteNode_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { + Future { + appKit.nodeParams.db.channels.listClosedChannels().map { data => + RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, state = CLOSED, data = data) + } + } + } + override def allChannels()(implicit timeout: Timeout): Future[Iterable[ChannelDesc]] = { (appKit.router ? Router.GetChannels).mapTo[Iterable[ChannelAnnouncement]].map(_.map(c => ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2))) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index a771f927cb..9ef7aff275 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -33,6 +33,8 @@ trait ChannelsDb { def listLocalChannels(): Seq[PersistentChannelData] + def listClosedChannels(): Seq[PersistentChannelData] + def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index 43d9182aa0..f05c3a4011 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -229,6 +229,11 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.listLocalChannels() } + override def listClosedChannels(): Seq[PersistentChannelData] = { + runAsync(secondary.listLocalChannels()) + primary.listLocalChannels() + } + override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = { runAsync(secondary.addHtlcInfo(channelId, commitmentNumber, paymentHash, cltvExpiry)) primary.addHtlcInfo(channelId, commitmentNumber, paymentHash, cltvExpiry) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 4b003da8a3..a29f2601e8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -228,6 +228,15 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } + override def listClosedChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { + withLock { pg => + using(pg.createStatement) { statement => + statement.executeQuery("SELECT data FROM local.channels WHERE is_closed=TRUE") + .mapCodec(channelDataCodec).toSeq + } + } + } + override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("INSERT INTO local.htlc_infos VALUES (?, ?, ?, ?)")) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index c39ee4f68d..62b6cf958d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -169,6 +169,13 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } } + override def listClosedChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { + using(sqlite.createStatement) { statement => + statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=1") + .mapCodec(channelDataCodec).toSeq + } + } + override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) { using(sqlite.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement => statement.setBytes(1, channelId.toArray) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index b11b9a9b41..c111e4b346 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -89,9 +89,11 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) + assert(db.listClosedChannels().isEmpty) db.removeChannel(channel1.channelId) assert(db.getChannel(channel1.channelId).isEmpty) assert(db.listLocalChannels() == List(channel2b)) + assert(db.listClosedChannels() == List(channel1)) assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala index 0957958994..b61ab518c6 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala @@ -127,6 +127,12 @@ trait Channel { } } + val closedChannels: Route = postRequest("closedchannels") { implicit t => + formFields(nodeIdFormParam.?) { toRemoteNodeId_opt => + complete(eclairApi.closedChannels(toRemoteNodeId_opt)) + } + } + val allChannels: Route = postRequest("allchannels") { implicit t => complete(eclairApi.allChannels()) } @@ -147,6 +153,6 @@ trait Channel { complete(eclairApi.channelBalances()) } - val channelRoutes: Route = open ~ rbfOpen ~ spliceIn ~ spliceOut ~ close ~ forceClose ~ channel ~ channels ~ allChannels ~ allUpdates ~ channelStats ~ channelBalances + val channelRoutes: Route = open ~ rbfOpen ~ spliceIn ~ spliceOut ~ close ~ forceClose ~ channel ~ channels ~ closedChannels ~ allChannels ~ allUpdates ~ channelStats ~ channelBalances } From 85d6af0859572fe8bcc45f592e3b6db68bcc6c47 Mon Sep 17 00:00:00 2001 From: rorp Date: Sun, 16 Apr 2023 17:29:02 -0700 Subject: [PATCH 2/8] Update the release notes --- docs/release-notes/eclair-vnext.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index 319d5eacd5..c7cfedbe52 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -40,6 +40,7 @@ All this data is signed and encrypted so that it can not be read or forged by th - `channel-opened` websocket event was updated to contain the final `channel_id` and be published when a channel is ready to process payments (#2567) - `getsentinfo` can now be used with `--offer` to list payments sent to a specific offer. - `listreceivedpayments` lists payments received by your node (#2607) +- `closedchannels` lists closed channels (#2642) ### Miscellaneous improvements and bug fixes From 1282fb8927b8574e7a5c6fb7a13c83f2fbe04abf Mon Sep 17 00:00:00 2001 From: rorp Date: Wed, 3 May 2023 09:56:48 -0700 Subject: [PATCH 3/8] Add pagination --- docs/release-notes/eclair-vnext.md | 2 +- .../main/scala/fr/acinq/eclair/Eclair.scala | 6 ++--- .../scala/fr/acinq/eclair/db/ChannelsDb.scala | 4 ++-- .../fr/acinq/eclair/db/DualDatabases.scala | 6 ++--- .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 22 ++++++++++++++----- .../eclair/db/sqlite/SqliteChannelsDb.scala | 22 ++++++++++++++----- .../fr/acinq/eclair/db/ChannelsDbSpec.scala | 4 ++-- .../acinq/eclair/api/handlers/Channel.scala | 6 ++--- 8 files changed, 46 insertions(+), 26 deletions(-) diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index c7cfedbe52..e34acaf223 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -40,7 +40,7 @@ All this data is signed and encrypted so that it can not be read or forged by th - `channel-opened` websocket event was updated to contain the final `channel_id` and be published when a channel is ready to process payments (#2567) - `getsentinfo` can now be used with `--offer` to list payments sent to a specific offer. - `listreceivedpayments` lists payments received by your node (#2607) -- `closedchannels` lists closed channels (#2642) +- `closedchannels` lists closed channels. It accepts `--count` and `--skip` parameters to limit the number of retrieved items as well (#2642) ### Miscellaneous improvements and bug fixes diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index f5d8fa124a..732f37be20 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -104,7 +104,7 @@ trait Eclair { def channelInfo(channel: ApiTypes.ChannelIdentifier)(implicit timeout: Timeout): Future[CommandResponse[CMD_GET_CHANNEL_INFO]] - def closedChannels(toRemoteNode_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] + def closedChannels(paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] @@ -288,9 +288,9 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging { sendToChannel[CMD_GET_CHANNEL_INFO, CommandResponse[CMD_GET_CHANNEL_INFO]](channel, CMD_GET_CHANNEL_INFO(ActorRef.noSender)) } - override def closedChannels(toRemoteNode_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { + override def closedChannels(paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { Future { - appKit.nodeParams.db.channels.listClosedChannels().map { data => + appKit.nodeParams.db.channels.listClosedChannels(paginated_opt).map { data => RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, state = CLOSED, data = data) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index 9ef7aff275..569499bde4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -17,7 +17,7 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.eclair.CltvExpiry +import fr.acinq.eclair.{CltvExpiry, Paginated} import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -33,7 +33,7 @@ trait ChannelsDb { def listLocalChannels(): Seq[PersistentChannelData] - def listClosedChannels(): Seq[PersistentChannelData] + def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index f05c3a4011..4ae71d9bee 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -229,9 +229,9 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.listLocalChannels() } - override def listClosedChannels(): Seq[PersistentChannelData] = { - runAsync(secondary.listLocalChannels()) - primary.listLocalChannels() + override def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = { + runAsync(secondary.listClosedChannels(paginated_opt)) + primary.listClosedChannels(paginated_opt) } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index a29f2601e8..9c730739cf 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.pg import com.zaxxer.hikari.util.IsolationLevel import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.eclair.CltvExpiry +import fr.acinq.eclair.{CltvExpiry, Paginated} import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -35,7 +35,7 @@ import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 7 + val CURRENT_VERSION = 8 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -99,6 +99,11 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit )(logger) } + def migration78(statement: Statement): Unit = { + statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local.channels(created_timestamp)") + statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local.channels(closed_timestamp)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") @@ -109,7 +114,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels ((json->'commitments'->'remoteParams'->>'nodeId'))") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON local.htlc_infos(channel_id, commitment_number)") - case Some(v@(2 | 3 | 4 | 5 | 6)) => + statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local.channels(created_timestamp)") + statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local.channels(closed_timestamp)") + case Some(v@(2 | 3 | 4 | 5 | 6 | 7)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 3) { migration23(statement) @@ -126,6 +133,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit if (v < 7) { migration67() } + if (v < 8) { + migration78(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -222,16 +232,16 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => - statement.executeQuery("SELECT data FROM local.channels WHERE is_closed=FALSE") + statement.executeQuery("SELECT data FROM local.channels WHERE is_closed=FALSE ORDER BY created_timestamp") .mapCodec(channelDataCodec).toSeq } } } - override def listClosedChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { + override def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => - statement.executeQuery("SELECT data FROM local.channels WHERE is_closed=TRUE") + statement.executeQuery(limited("SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC", paginated_opt)) .mapCodec(channelDataCodec).toSeq } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index 62b6cf958d..41c3c5fc10 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -23,7 +23,7 @@ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, TimestampMilli} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -31,7 +31,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -78,12 +78,19 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } + def migration45(statement: Statement): Unit = { + statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local_channels(created_timestamp)") + statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local_channels(closed_timestamp)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") - case Some(v@(1 | 2 | 3)) => + statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local_channels(created_timestamp)") + statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local_channels(closed_timestamp)") + case Some(v@(1 | 2 | 3 | 4)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { migration12(statement) @@ -94,6 +101,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } + if (v < 5) { + migration45(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -164,14 +174,14 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => - statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") + statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0 ORDER BY created_timestamp") .mapCodec(channelDataCodec).toSeq } } - override def listClosedChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { + override def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => - statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=1") + statement.executeQuery(limited("SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC", paginated_opt)) .mapCodec(channelDataCodec).toSeq } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index c111e4b346..07ebf9d24e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -89,11 +89,11 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) - assert(db.listClosedChannels().isEmpty) + assert(db.listClosedChannels(None).isEmpty) db.removeChannel(channel1.channelId) assert(db.getChannel(channel1.channelId).isEmpty) assert(db.listLocalChannels() == List(channel2b)) - assert(db.listClosedChannels() == List(channel1)) + assert(db.listClosedChannels(None) == List(channel1)) assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala index b61ab518c6..7ebdfa09cf 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.api.handlers import akka.http.scaladsl.server.{MalformedFormFieldRejection, Route} import akka.util.Timeout import fr.acinq.bitcoin.scalacompat.{Satoshi, Script} -import fr.acinq.eclair.MilliSatoshi +import fr.acinq.eclair.{MilliSatoshi, Paginated} import fr.acinq.eclair.api.Service import fr.acinq.eclair.api.directives.EclairDirectives import fr.acinq.eclair.api.serde.FormParamExtractors._ @@ -128,8 +128,8 @@ trait Channel { } val closedChannels: Route = postRequest("closedchannels") { implicit t => - formFields(nodeIdFormParam.?) { toRemoteNodeId_opt => - complete(eclairApi.closedChannels(toRemoteNodeId_opt)) + withPaginated { paginated_opt => + complete(eclairApi.closedChannels(paginated_opt.orElse(Some(Paginated(count = 10, skip = 0))))) } } From 3f86a1195eea45ca3ef259f421e913f941a35da8 Mon Sep 17 00:00:00 2001 From: rorp Date: Thu, 4 May 2023 09:49:34 -0700 Subject: [PATCH 4/8] Add --nodeId parameter --- .../main/scala/fr/acinq/eclair/Eclair.scala | 6 +-- .../scala/fr/acinq/eclair/db/ChannelsDb.scala | 3 +- .../fr/acinq/eclair/db/DualDatabases.scala | 7 ++-- .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 41 ++++++++++++------- .../eclair/db/sqlite/SqliteChannelsDb.scala | 32 +++++++++------ .../fr/acinq/eclair/db/ChannelsDbSpec.scala | 7 +++- .../acinq/eclair/api/handlers/Channel.scala | 4 +- 7 files changed, 62 insertions(+), 38 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index 732f37be20..b861b18612 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -104,7 +104,7 @@ trait Eclair { def channelInfo(channel: ApiTypes.ChannelIdentifier)(implicit timeout: Timeout): Future[CommandResponse[CMD_GET_CHANNEL_INFO]] - def closedChannels(paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] + def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] @@ -288,9 +288,9 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging { sendToChannel[CMD_GET_CHANNEL_INFO, CommandResponse[CMD_GET_CHANNEL_INFO]](channel, CMD_GET_CHANNEL_INFO(ActorRef.noSender)) } - override def closedChannels(paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { + override def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { Future { - appKit.nodeParams.db.channels.listClosedChannels(paginated_opt).map { data => + appKit.nodeParams.db.channels.listClosedChannels(nodeId_opt, paginated_opt).map { data => RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, state = CLOSED, data = data) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index 569499bde4..c742425348 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -17,6 +17,7 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.ByteVector32 +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.{CltvExpiry, Paginated} import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -33,7 +34,7 @@ trait ChannelsDb { def listLocalChannels(): Seq[PersistentChannelData] - def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] + def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index 4ae71d9bee..1359dc9122 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -1,6 +1,7 @@ package fr.acinq.eclair.db import com.google.common.util.concurrent.ThreadFactoryBuilder +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, Crypto, Satoshi} import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.Databases.{FileBackup, PostgresDatabases, SqliteDatabases} @@ -229,9 +230,9 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.listLocalChannels() } - override def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = { - runAsync(secondary.listClosedChannels(paginated_opt)) - primary.listClosedChannels(paginated_opt) + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = { + runAsync(secondary.listClosedChannels(remoteNodeId_opt, paginated_opt)) + primary.listClosedChannels(remoteNodeId_opt, paginated_opt) } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 9c730739cf..e29f1aca53 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.pg import com.zaxxer.hikari.util.IsolationLevel import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.eclair.{CltvExpiry, Paginated} +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -26,6 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec +import fr.acinq.eclair.{CltvExpiry, Paginated} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -35,7 +36,7 @@ import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 8 + val CURRENT_VERSION = 7 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -99,11 +100,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit )(logger) } - def migration78(statement: Statement): Unit = { - statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local.channels(created_timestamp)") - statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local.channels(closed_timestamp)") - } - getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") @@ -116,7 +112,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX htlc_infos_idx ON local.htlc_infos(channel_id, commitment_number)") statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local.channels(created_timestamp)") statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local.channels(closed_timestamp)") - case Some(v@(2 | 3 | 4 | 5 | 6 | 7)) => + case Some(v@(2 | 3 | 4 | 5 | 6)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 3) { migration23(statement) @@ -133,9 +129,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit if (v < 7) { migration67() } - if (v < 8) { - migration78(statement) - } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -238,13 +231,31 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } - override def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { - withLock { pg => + private val listClosedChannelsSql = "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC" + + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { + val rs = withLock { pg => using(pg.createStatement) { statement => - statement.executeQuery(limited("SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC", paginated_opt)) - .mapCodec(channelDataCodec).toSeq + remoteNodeId_opt match { + case None => + statement.executeQuery(limited(listClosedChannelsSql, paginated_opt)) + .mapCodec(channelDataCodec) + case Some(_) => + statement.executeQuery(listClosedChannelsSql) + .mapCodec(channelDataCodec) + } } } + remoteNodeId_opt match { + case None => rs.toSeq + case Some(nodeId) => + val filtered = rs.filter(_.remoteNodeId == nodeId) + val limited = paginated_opt match { + case None => filtered + case Some(p) => filtered.slice(p.skip, p.skip + p.count) + } + limited.toSeq + } } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Postgres) { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index 41c3c5fc10..690725b84f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -17,6 +17,7 @@ package fr.acinq.eclair.db.sqlite import fr.acinq.bitcoin.scalacompat.ByteVector32 +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -31,7 +32,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 5 + val CURRENT_VERSION = 4 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -78,11 +79,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } - def migration45(statement: Statement): Unit = { - statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local_channels(created_timestamp)") - statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local_channels(closed_timestamp)") - } - getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") @@ -90,7 +86,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local_channels(created_timestamp)") statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local_channels(closed_timestamp)") - case Some(v@(1 | 2 | 3 | 4)) => + case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { migration12(statement) @@ -101,9 +97,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } - if (v < 5) { - migration45(statement) - } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -179,10 +172,23 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } } - override def listClosedChannels(paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { + private val listClosedChannelsSql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" + + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => - statement.executeQuery(limited("SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC", paginated_opt)) - .mapCodec(channelDataCodec).toSeq + remoteNodeId_opt match { + case None => + statement.executeQuery(limited(listClosedChannelsSql, paginated_opt)) + .mapCodec(channelDataCodec).toSeq + case Some(nodeId) => + val filtered = statement.executeQuery(listClosedChannelsSql) + .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) + val limited = paginated_opt match { + case None => filtered + case Some(p) => filtered.slice(p.skip, p.skip + p.count) + } + limited.toSeq + } } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index 07ebf9d24e..14cc86ad61 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -18,6 +18,7 @@ package fr.acinq.eclair.db import com.softwaremill.quicklens._ import fr.acinq.bitcoin.scalacompat.ByteVector32 +import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck} import fr.acinq.eclair.channel.RealScidStatus import fr.acinq.eclair.db.ChannelsDbSpec.{getPgTimestamp, getTimestamp, testCases} @@ -89,11 +90,13 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) - assert(db.listClosedChannels(None).isEmpty) + assert(db.listClosedChannels(None, None).isEmpty) db.removeChannel(channel1.channelId) assert(db.getChannel(channel1.channelId).isEmpty) assert(db.listLocalChannels() == List(channel2b)) - assert(db.listClosedChannels(None) == List(channel1)) + assert(db.listClosedChannels(None, None) == List(channel1)) + assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) + assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala index 7ebdfa09cf..01571052fe 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala @@ -129,7 +129,9 @@ trait Channel { val closedChannels: Route = postRequest("closedchannels") { implicit t => withPaginated { paginated_opt => - complete(eclairApi.closedChannels(paginated_opt.orElse(Some(Paginated(count = 10, skip = 0))))) + formFields(nodeIdFormParam.?) { toRemoteNodeId_opt => + complete(eclairApi.closedChannels(toRemoteNodeId_opt, paginated_opt.orElse(Some(Paginated(count = 10, skip = 0))))) + } } } From 7e8daaad3365b84cf93175fbd04e6970fe5143df Mon Sep 17 00:00:00 2001 From: rorp Date: Fri, 5 May 2023 09:31:11 -0700 Subject: [PATCH 5/8] Address to the PR comments --- .../main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala | 7 ++----- .../scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala | 2 -- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index e29f1aca53..64fd591315 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -28,7 +28,7 @@ import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.{CltvExpiry, Paginated} import grizzled.slf4j.Logging -import scodec.bits.BitVector +import scodec.bits.{BitVector, ByteVector} import java.sql.{Connection, Statement, Timestamp} import java.time.Instant @@ -110,8 +110,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels ((json->'commitments'->'remoteParams'->>'nodeId'))") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON local.htlc_infos(channel_id, commitment_number)") - statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local.channels(created_timestamp)") - statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local.channels(closed_timestamp)") case Some(v@(2 | 3 | 4 | 5 | 6)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 3) { @@ -231,9 +229,8 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } - private val listClosedChannelsSql = "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC" - override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { + val listClosedChannelsSql = "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC" val rs = withLock { pg => using(pg.createStatement) { statement => remoteNodeId_opt match { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index 690725b84f..e35e76c9d5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -84,8 +84,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") - statement.executeUpdate("CREATE INDEX created_timestamp_idx ON local_channels(created_timestamp)") - statement.executeUpdate("CREATE INDEX closed_timestamp_idx ON local_channels(closed_timestamp)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { From ceeb7671ce5f6cd392bfaaa4dffddcdececec2d5 Mon Sep 17 00:00:00 2001 From: rorp Date: Sun, 7 May 2023 13:53:09 -0700 Subject: [PATCH 6/8] Add from/to parameters --- .../main/scala/fr/acinq/eclair/Eclair.scala | 6 +-- .../scala/fr/acinq/eclair/db/ChannelsDb.scala | 4 +- .../fr/acinq/eclair/db/DualDatabases.scala | 8 +-- .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 47 +++++++++++----- .../eclair/db/sqlite/SqliteChannelsDb.scala | 53 +++++++++++++------ .../fr/acinq/eclair/db/ChannelsDbSpec.scala | 12 +++-- .../acinq/eclair/api/handlers/Channel.scala | 4 +- 7 files changed, 88 insertions(+), 46 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index 5849c686a3..8a397ec2e1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -104,7 +104,7 @@ trait Eclair { def channelInfo(channel: ApiTypes.ChannelIdentifier)(implicit timeout: Timeout): Future[CommandResponse[CMD_GET_CHANNEL_INFO]] - def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] + def closedChannels(from: TimestampSecond, to: TimestampSecond, nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] @@ -290,9 +290,9 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging { sendToChannel[CMD_GET_CHANNEL_INFO, CommandResponse[CMD_GET_CHANNEL_INFO]](channel, CMD_GET_CHANNEL_INFO(ActorRef.noSender)) } - override def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { + override def closedChannels(from: TimestampSecond, to: TimestampSecond, nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { Future { - appKit.nodeParams.db.channels.listClosedChannels(nodeId_opt, paginated_opt).map { data => + appKit.nodeParams.db.channels.listClosedChannels(from, to, nodeId_opt, paginated_opt).map { data => RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, state = CLOSED, data = data) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index c742425348..788d691a5f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.{CltvExpiry, Paginated} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond} import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.DbEventHandler.ChannelEvent @@ -34,7 +34,7 @@ trait ChannelsDb { def listLocalChannels(): Seq[PersistentChannelData] - def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] + def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index 1359dc9122..abba85e97f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -11,7 +11,7 @@ import fr.acinq.eclair.payment._ import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement} -import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli} +import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond} import grizzled.slf4j.Logging import java.io.File @@ -230,9 +230,9 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.listLocalChannels() } - override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = { - runAsync(secondary.listClosedChannels(remoteNodeId_opt, paginated_opt)) - primary.listClosedChannels(remoteNodeId_opt, paginated_opt) + override def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = { + runAsync(secondary.listClosedChannels(from, to, remoteNodeId_opt, paginated_opt)) + primary.listClosedChannels(from, to, remoteNodeId_opt, paginated_opt) } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 64fd591315..dcbd8dd427 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -26,9 +26,9 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond} import grizzled.slf4j.Logging -import scodec.bits.{BitVector, ByteVector} +import scodec.bits.BitVector import java.sql.{Connection, Statement, Timestamp} import java.time.Instant @@ -36,7 +36,7 @@ import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 7 + val CURRENT_VERSION = 8 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -100,6 +100,10 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit )(logger) } + def migration78(statement: Statement): Unit = { + statement.executeUpdate("CREATE INDEX local_channels_closed_timestamp_idx ON local.channels (closed_timestamp)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") @@ -110,7 +114,8 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels ((json->'commitments'->'remoteParams'->>'nodeId'))") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON local.htlc_infos(channel_id, commitment_number)") - case Some(v@(2 | 3 | 4 | 5 | 6)) => + statement.executeUpdate("CREATE INDEX local_channels_closed_timestamp_idx ON local.channels (closed_timestamp)") + case Some(v@(2 | 3 | 4 | 5 | 6 | 7)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 3) { migration23(statement) @@ -127,6 +132,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit if (v < 7) { migration67() } + if (v < 8) { + migration78(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -217,32 +225,45 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.setString(1, channelId.toHex) statement.executeUpdate() } + + using(pg.prepareStatement("UPDATE local.channels SET closed_timestamp=? WHERE channel_id=? AND closed_timestamp IS NULL")) { statement => + statement.setTimestamp(1, TimestampSecond.now().toSqlTimestamp) + statement.setString(2, channelId.toHex) + statement.executeUpdate() + } } } override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => - statement.executeQuery("SELECT data FROM local.channels WHERE is_closed=FALSE ORDER BY created_timestamp") + statement.executeQuery("SELECT data FROM local.channels WHERE is_closed=FALSE") .mapCodec(channelDataCodec).toSeq } } } - override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { - val listClosedChannelsSql = "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC" + override def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { + val sql = "SELECT data FROM local.channels WHERE closed_timestamp>=? AND closed_timestamp<=? ORDER BY closed_timestamp" val rs = withLock { pg => - using(pg.createStatement) { statement => remoteNodeId_opt match { case None => - statement.executeQuery(limited(listClosedChannelsSql, paginated_opt)) - .mapCodec(channelDataCodec) + using(pg.prepareStatement(limited(sql, paginated_opt))) { statement => + statement.setTimestamp(1, from.toSqlTimestamp) + statement.setTimestamp(2, to.toSqlTimestamp) + statement.executeQuery() + .mapCodec(channelDataCodec) + } case Some(_) => - statement.executeQuery(listClosedChannelsSql) - .mapCodec(channelDataCodec) + using(pg.prepareStatement(sql)) { statement => + statement.setTimestamp(1, from.toSqlTimestamp) + statement.setTimestamp(2, to.toSqlTimestamp) + statement.executeQuery() + .mapCodec(channelDataCodec) + } } } - } + remoteNodeId_opt match { case None => rs.toSeq case Some(nodeId) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index e35e76c9d5..e83f262f11 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -24,7 +24,7 @@ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli, TimestampSecond} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -32,7 +32,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -79,12 +79,17 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } + def migration45(statement: Statement): Unit = { + statement.executeUpdate("CREATE INDEX local_channels_closed_timestamp_idx ON local_channels (closed_timestamp)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") - case Some(v@(1 | 2 | 3)) => + statement.executeUpdate("CREATE INDEX local_channels_closed_timestamp_idx ON local_channels (closed_timestamp)") + case Some(v@(1 | 2 | 3 | 4)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { migration12(statement) @@ -95,6 +100,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } + if (v < 5) { + migration45(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -161,33 +169,44 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.setBytes(1, channelId.toArray) statement.executeUpdate() } + + using(sqlite.prepareStatement("UPDATE local_channels SET closed_timestamp=? WHERE channel_id=? AND closed_timestamp IS NULL")) { statement => + statement.setLong(1, TimestampSecond.now().toLong) + statement.setBytes(2, channelId.toArray) + statement.executeUpdate() + } } override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => - statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0 ORDER BY created_timestamp") + statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") .mapCodec(channelDataCodec).toSeq } } - private val listClosedChannelsSql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" - override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { - using(sqlite.createStatement) { statement => - remoteNodeId_opt match { + override def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { + val sql = "SELECT data FROM local_channels WHERE closed_timestamp>=? AND closed_timestamp<=? ORDER BY closed_timestamp" + remoteNodeId_opt match { case None => - statement.executeQuery(limited(listClosedChannelsSql, paginated_opt)) - .mapCodec(channelDataCodec).toSeq + using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => + statement.setLong(1, from.toLong) + statement.setLong(2, to.toLong) + statement.executeQuery().mapCodec(channelDataCodec).toSeq + } case Some(nodeId) => - val filtered = statement.executeQuery(listClosedChannelsSql) - .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) - val limited = paginated_opt match { - case None => filtered - case Some(p) => filtered.slice(p.skip, p.skip + p.count) + using(sqlite.prepareStatement(sql)) { statement => + statement.setLong(1, from.toLong) + statement.setLong(2, to.toLong) + val filtered = statement.executeQuery() + .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) + val limited = paginated_opt match { + case None => filtered + case Some(p) => filtered.slice(p.skip, p.skip + p.count) + } + limited.toSeq } - limited.toSeq } - } } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index 14cc86ad61..2c9c183aa2 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -30,7 +30,7 @@ import fr.acinq.eclair.db.sqlite.SqliteChannelsDb import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec -import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, randomBytes32} +import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, TimestampSecond, randomBytes32} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.ByteVector @@ -90,13 +90,15 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) - assert(db.listClosedChannels(None, None).isEmpty) + assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, None, None).isEmpty) db.removeChannel(channel1.channelId) assert(db.getChannel(channel1.channelId).isEmpty) assert(db.listLocalChannels() == List(channel2b)) - assert(db.listClosedChannels(None, None) == List(channel1)) - assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) - assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) + assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, None, None) == List(channel1)) + assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.now() - 86400, None, None) == Nil) + assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, Some(channel1.remoteNodeId), None) == List(channel1)) + assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.now() - 86400, Some(channel1.remoteNodeId), None) == Nil) + assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, Some(PrivateKey(randomBytes32()).publicKey), None) == Nil) assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala index 01571052fe..041b578e66 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala @@ -129,8 +129,8 @@ trait Channel { val closedChannels: Route = postRequest("closedchannels") { implicit t => withPaginated { paginated_opt => - formFields(nodeIdFormParam.?) { toRemoteNodeId_opt => - complete(eclairApi.closedChannels(toRemoteNodeId_opt, paginated_opt.orElse(Some(Paginated(count = 10, skip = 0))))) + formFields(nodeIdFormParam.?, fromFormParam(), toFormParam()) { (toRemoteNodeId_opt, from, to) => + complete(eclairApi.closedChannels(from, to, toRemoteNodeId_opt, paginated_opt.orElse(Some(Paginated(count = 10, skip = 0))))) } } } From d01b6f4b79fe6c52b05c5903ae038586d6e73634 Mon Sep 17 00:00:00 2001 From: rorp Date: Fri, 26 May 2023 14:36:25 -0700 Subject: [PATCH 7/8] Respond to the comments --- .../main/scala/fr/acinq/eclair/Eclair.scala | 6 +++--- .../scala/fr/acinq/eclair/db/ChannelsDb.scala | 2 +- .../fr/acinq/eclair/db/DualDatabases.scala | 6 +++--- .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 8 ++------ .../eclair/db/sqlite/SqliteChannelsDb.scala | 20 ++++--------------- .../fr/acinq/eclair/db/ChannelsDbSpec.scala | 10 ++++------ .../acinq/eclair/api/handlers/Channel.scala | 4 ++-- 7 files changed, 19 insertions(+), 37 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index 8a397ec2e1..5849c686a3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -104,7 +104,7 @@ trait Eclair { def channelInfo(channel: ApiTypes.ChannelIdentifier)(implicit timeout: Timeout): Future[CommandResponse[CMD_GET_CHANNEL_INFO]] - def closedChannels(from: TimestampSecond, to: TimestampSecond, nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] + def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] @@ -290,9 +290,9 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging { sendToChannel[CMD_GET_CHANNEL_INFO, CommandResponse[CMD_GET_CHANNEL_INFO]](channel, CMD_GET_CHANNEL_INFO(ActorRef.noSender)) } - override def closedChannels(from: TimestampSecond, to: TimestampSecond, nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { + override def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { Future { - appKit.nodeParams.db.channels.listClosedChannels(from, to, nodeId_opt, paginated_opt).map { data => + appKit.nodeParams.db.channels.listClosedChannels(nodeId_opt, paginated_opt).map { data => RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, state = CLOSED, data = data) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index 788d691a5f..c312c9786d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -34,7 +34,7 @@ trait ChannelsDb { def listLocalChannels(): Seq[PersistentChannelData] - def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] + def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index abba85e97f..baa6e62fcb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -230,9 +230,9 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.listLocalChannels() } - override def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = { - runAsync(secondary.listClosedChannels(from, to, remoteNodeId_opt, paginated_opt)) - primary.listClosedChannels(from, to, remoteNodeId_opt, paginated_opt) + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = { + runAsync(secondary.listClosedChannels(remoteNodeId_opt, paginated_opt)) + primary.listClosedChannels(remoteNodeId_opt, paginated_opt) } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 49526b3384..4d2ccff2fc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -253,21 +253,17 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } - override def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { - val sql = "SELECT data FROM local.channels WHERE closed_timestamp>=? AND closed_timestamp<=? ORDER BY closed_timestamp" + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { + val sql = "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp" val rs = withLock { pg => remoteNodeId_opt match { case None => using(pg.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.setTimestamp(1, from.toSqlTimestamp) - statement.setTimestamp(2, to.toSqlTimestamp) statement.executeQuery() .mapCodec(channelDataCodec) } case Some(_) => using(pg.prepareStatement(sql)) { statement => - statement.setTimestamp(1, from.toSqlTimestamp) - statement.setTimestamp(2, to.toSqlTimestamp) statement.executeQuery() .mapCodec(channelDataCodec) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index 05caeb63a8..41809bf71a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -32,7 +32,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 5 + val CURRENT_VERSION = 4 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -79,17 +79,12 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } - def migration45(statement: Statement): Unit = { - statement.executeUpdate("CREATE INDEX local_channels_closed_timestamp_idx ON local_channels (closed_timestamp)") - } - getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") - statement.executeUpdate("CREATE INDEX local_channels_closed_timestamp_idx ON local_channels (closed_timestamp)") - case Some(v@(1 | 2 | 3 | 4)) => + case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") if (v < 2) { migration12(statement) @@ -100,9 +95,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } - if (v < 5) { - migration45(statement) - } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -186,19 +178,15 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } - override def listClosedChannels(from: TimestampSecond, to: TimestampSecond, remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { - val sql = "SELECT data FROM local_channels WHERE closed_timestamp>=? AND closed_timestamp<=? ORDER BY closed_timestamp" + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { + val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp" remoteNodeId_opt match { case None => using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.setLong(1, from.toLong) - statement.setLong(2, to.toLong) statement.executeQuery().mapCodec(channelDataCodec).toSeq } case Some(nodeId) => using(sqlite.prepareStatement(sql)) { statement => - statement.setLong(1, from.toLong) - statement.setLong(2, to.toLong) val filtered = statement.executeQuery() .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) val limited = paginated_opt match { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index b30c98a555..25cd2ef44c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -91,15 +91,13 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) - assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, None, None).isEmpty) + assert(db.listClosedChannels(None, None).isEmpty) db.removeChannel(channel1.channelId) assert(db.getChannel(channel1.channelId).isEmpty) assert(db.listLocalChannels() == List(channel2b)) - assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, None, None) == List(channel1)) - assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.now() - 86400, None, None) == Nil) - assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, Some(channel1.remoteNodeId), None) == List(channel1)) - assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.now() - 86400, Some(channel1.remoteNodeId), None) == Nil) - assert(db.listClosedChannels(TimestampSecond.min, TimestampSecond.max, Some(PrivateKey(randomBytes32()).publicKey), None) == Nil) + assert(db.listClosedChannels(None, None) == List(channel1)) + assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) + assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None) == Nil) assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala index 041b578e66..01571052fe 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/api/handlers/Channel.scala @@ -129,8 +129,8 @@ trait Channel { val closedChannels: Route = postRequest("closedchannels") { implicit t => withPaginated { paginated_opt => - formFields(nodeIdFormParam.?, fromFormParam(), toFormParam()) { (toRemoteNodeId_opt, from, to) => - complete(eclairApi.closedChannels(from, to, toRemoteNodeId_opt, paginated_opt.orElse(Some(Paginated(count = 10, skip = 0))))) + formFields(nodeIdFormParam.?) { toRemoteNodeId_opt => + complete(eclairApi.closedChannels(toRemoteNodeId_opt, paginated_opt.orElse(Some(Paginated(count = 10, skip = 0))))) } } } From f82a2c52f1232990ad76b14f46bbd1df67a20a79 Mon Sep 17 00:00:00 2001 From: rorp Date: Thu, 1 Jun 2023 10:55:45 -0700 Subject: [PATCH 8/8] Respond to the comments --- .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 40 +++++-------------- .../eclair/db/sqlite/SqliteChannelsDb.scala | 8 +--- 2 files changed, 10 insertions(+), 38 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 4d2ccff2fc..a2f852befc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -26,7 +26,7 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond} +import fr.acinq.eclair.{CltvExpiry, Paginated} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -235,12 +235,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.setString(2, channelId.toHex) statement.executeUpdate() } - - using(pg.prepareStatement("UPDATE local.channels SET closed_timestamp=? WHERE channel_id=? AND closed_timestamp IS NULL")) { statement => - statement.setTimestamp(1, TimestampSecond.now().toSqlTimestamp) - statement.setString(2, channelId.toHex) - statement.executeUpdate() - } } } @@ -254,31 +248,15 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { - val sql = "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp" - val rs = withLock { pg => - remoteNodeId_opt match { - case None => - using(pg.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.executeQuery() - .mapCodec(channelDataCodec) - } - case Some(_) => - using(pg.prepareStatement(sql)) { statement => - statement.executeQuery() - .mapCodec(channelDataCodec) - } - } + val sql = remoteNodeId_opt match { + case None => "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC" + case Some(remoteNodeId) => s"SELECT data FROM local.channels WHERE is_closed=TRUE AND remote_node_id = '${remoteNodeId.toHex}' ORDER BY closed_timestamp DESC" + } + withLock { pg => + using(pg.prepareStatement(limited(sql, paginated_opt))) { statement => + statement.executeQuery() + .mapCodec(channelDataCodec).toSeq } - - remoteNodeId_opt match { - case None => rs.toSeq - case Some(nodeId) => - val filtered = rs.filter(_.remoteNodeId == nodeId) - val limited = paginated_opt match { - case None => filtered - case Some(p) => filtered.slice(p.skip, p.skip + p.count) - } - limited.toSeq } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index 41809bf71a..ea07977280 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -162,12 +162,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.setBytes(2, channelId.toArray) statement.executeUpdate() } - - using(sqlite.prepareStatement("UPDATE local_channels SET closed_timestamp=? WHERE channel_id=? AND closed_timestamp IS NULL")) { statement => - statement.setLong(1, TimestampSecond.now().toLong) - statement.setBytes(2, channelId.toArray) - statement.executeUpdate() - } } override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { @@ -179,7 +173,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { - val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp" + val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" remoteNodeId_opt match { case None => using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement =>