Skip to content

Commit ac7621e

Browse files
committed
Add LeaderScheduleEntry
1 parent 163e114 commit ac7621e

File tree

6 files changed

+41
-51
lines changed

6 files changed

+41
-51
lines changed

entries/epoch.go

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ type EpochEntry struct {
1414
InitialBlockHeight uint64
1515
InitialView uint64
1616
FinalBlockHeight uint64
17-
CreatedAtBlockTimestampNanoSecs uint64
18-
19-
BadgerKey []byte `pg:",pk,use_zero"`
17+
CreatedAtBlockTimestampNanoSecs int64
18+
SnapshotAtEpochNumber uint64
2019
}
2120

2221
type PGEpochEntry struct {
@@ -33,12 +32,19 @@ type PGEpochUtxoOps struct {
3332

3433
// Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun.
3534
func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry {
35+
36+
var snapshotAtEpochNumber uint64
37+
// Epochs use data snapshotted from two epochs ago. Epochs 0 and 1 use data from epoch 0.
38+
if epochEntry.EpochNumber >= 2 {
39+
snapshotAtEpochNumber = epochEntry.EpochNumber - 2
40+
}
3641
return EpochEntry{
37-
EpochNumber: epochEntry.EpochNumber,
38-
InitialBlockHeight: epochEntry.InitialBlockHeight,
39-
InitialView: epochEntry.InitialView,
40-
FinalBlockHeight: epochEntry.FinalBlockHeight,
41-
BadgerKey: keyBytes,
42+
EpochNumber: epochEntry.EpochNumber,
43+
InitialBlockHeight: epochEntry.InitialBlockHeight,
44+
InitialView: epochEntry.InitialView,
45+
FinalBlockHeight: epochEntry.FinalBlockHeight,
46+
CreatedAtBlockTimestampNanoSecs: epochEntry.CreatedAtBlockTimestampNanoSecs,
47+
SnapshotAtEpochNumber: snapshotAtEpochNumber,
4248
}
4349
}
4450

@@ -49,8 +55,11 @@ func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param
4955
// We also ensure before this that all entries have the same operation type.
5056
operationType := entries[0].OperationType
5157
var err error
58+
// Core only tracks the current epoch entry and never deletes them.
59+
// In order to track all historical epoch entries, we don't use the badger
60+
// key to uniquely identify them, but rather the epoch number.
5261
if operationType == lib.DbOperationTypeDelete {
53-
err = bulkDeleteEpochEntry(entries, db, operationType)
62+
return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Delete operation type not supported")
5463
} else {
5564
err = bulkInsertEpochEntry(entries, db, operationType, params)
5665
}
@@ -76,31 +85,11 @@ func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
7685
query := db.NewInsert().Model(&pgEntrySlice)
7786

7887
if operationType == lib.DbOperationTypeUpsert {
79-
query = query.On("CONFLICT (badger_key) DO UPDATE")
88+
query = query.On("CONFLICT (epoch_number) DO UPDATE")
8089
}
8190

8291
if _, err := query.Returning("").Exec(context.Background()); err != nil {
8392
return errors.Wrapf(err, "entries.bulkInsertEpochEntry: Error inserting entries")
8493
}
8594
return nil
8695
}
87-
88-
// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database.
89-
func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
90-
// Track the unique entries we've inserted so we don't insert the same entry twice.
91-
uniqueEntries := consumer.UniqueEntries(entries)
92-
93-
// Transform the entries into a list of keys to delete.
94-
keysToDelete := consumer.KeysToDelete(uniqueEntries)
95-
96-
// Execute the delete query.
97-
if _, err := db.NewDelete().
98-
Model(&PGEpochEntry{}).
99-
Where("badger_key IN (?)", bun.In(keysToDelete)).
100-
Returning("").
101-
Exec(context.Background()); err != nil {
102-
return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries")
103-
}
104-
105-
return nil
106-
}

entries/pkid.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package entries
22

33
import (
4-
"bytes"
54
"context"
65
"github.com/deso-protocol/core/lib"
76
"github.com/deso-protocol/state-consumer/consumer"
@@ -131,7 +130,6 @@ func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT
131130
func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
132131
// We check before we call this function that there is at least one operation type.
133132
// We also ensure before this that all entries have the same operation type.
134-
glog.Infof("PkidBatchOperation: Putting %d entries into the database", len(entries))
135133
operationType := entries[0].OperationType
136134
var err error
137135
if operationType == lib.DbOperationTypeDelete {
@@ -149,21 +147,23 @@ func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib
149147
func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
150148
// Track the unique entries we've inserted so we don't insert the same entry twice.
151149
uniqueEntries := consumer.UniqueEntries(entries)
150+
151+
uniqueLeaderScheduleEntries := consumer.FilterEntriesByPrefix(
152+
uniqueEntries, lib.Prefixes.PrefixSnapshotLeaderSchedule)
152153
// NOTE: if we need to support parsing other indexes for PKIDs beyond LeaderSchedule,
153-
// we will need to create an
154+
// we will need to filter the uniqueEntries by the appropriate prefix and then convert
155+
// the entries to the appropriate PG struct.
154156
// Create a new array to hold the bun struct.
155-
pgEntrySlice := make([]*PGLeaderScheduleEntry, len(uniqueEntries))
157+
pgEntrySlice := make([]*PGLeaderScheduleEntry, len(uniqueLeaderScheduleEntries))
156158

157159
// Loop through the entries and convert them to PGPostEntry.
158-
for ii, entry := range uniqueEntries {
159-
if bytes.Equal(entry.KeyBytes[0:1], lib.Prefixes.PrefixSnapshotLeaderSchedule) {
160-
leaderScheduleEntry := LeaderScheduleEncoderToPGStruct(entry.Encoder.(*lib.PKID), entry.KeyBytes, params)
161-
if leaderScheduleEntry == nil {
162-
glog.Errorf("bulkInsertPkid: Error converting LeaderScheduleEntry to PG struct")
163-
continue
164-
}
165-
pgEntrySlice[ii] = &PGLeaderScheduleEntry{LeaderScheduleEntry: *leaderScheduleEntry}
160+
for ii, entry := range uniqueLeaderScheduleEntries {
161+
leaderScheduleEntry := LeaderScheduleEncoderToPGStruct(entry.Encoder.(*lib.PKID), entry.KeyBytes, params)
162+
if leaderScheduleEntry == nil {
163+
glog.Errorf("bulkInsertPkid: Error converting LeaderScheduleEntry to PG struct")
164+
continue
166165
}
166+
pgEntrySlice[ii] = &PGLeaderScheduleEntry{LeaderScheduleEntry: *leaderScheduleEntry}
167167
}
168168

169169
query := db.NewInsert().Model(&pgEntrySlice)
@@ -185,12 +185,12 @@ func bulkDeletePkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType l
185185

186186
// Transform the entries into a list of keys to delete.
187187
keysToDelete := consumer.KeysToDelete(uniqueEntries)
188-
keysToDelete = consumer.FilterKeysByPrefix(keysToDelete, lib.Prefixes.PrefixSnapshotLeaderSchedule)
188+
leaderSchedKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, lib.Prefixes.PrefixSnapshotLeaderSchedule)
189189

190190
// Execute the delete query.
191191
if _, err := db.NewDelete().
192192
Model(&LeaderScheduleEntry{}).
193-
Where("badger_key IN (?)", bun.In(keysToDelete)).
193+
Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)).
194194
Returning("").
195195
Exec(context.Background()); err != nil {
196196
return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries")

migrations/initial_migrations/20240129000003_create_epoch_entry_table.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@ import (
1212
func createEpochEntryTable(db *bun.DB, tableName string) error {
1313
_, err := db.Exec(strings.Replace(`
1414
CREATE TABLE {tableName} (
15-
epoch_number BIGINT NOT NULL,
15+
epoch_number BIGINT PRIMARY KEY NOT NULL,
1616
initial_block_height BIGINT NOT NULL,
1717
initial_view BIGINT NOT NULL,
1818
final_block_height BIGINT NOT NULL,
1919
created_at_block_timestamp_nano_secs BIGINT NOT NULL,
20-
21-
badger_key BYTEA PRIMARY KEY
20+
snapshot_at_epoch_number BIGINT NOT NULL
2221
);
22+
23+
CREATE INDEX {tableName}_epoch_number_idx ON {tableName} (epoch_number);
24+
CREATE INDEX {tableName}_initial_block_height_idx ON {tableName} (initial_block_height);
25+
CREATE INDEX {tableName}_final_block_height_idx ON {tableName} (final_block_height);
26+
CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number);
2327
`, "{tableName}", tableName, -1))
2428
// TODO: What other fields do we need indexed?
2529
return err

migrations/initial_migrations/20240215000001_create_leader_schedule.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ func createLeaderScheduleTable(db *bun.DB, tableName string) error {
1919
CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number);
2020
CREATE INDEX {tableName}_snapshot_at_epoch_number_leader_index_idx ON {tableName} (snapshot_at_epoch_number, leader_index);
2121
`, "{tableName}", tableName, -1))
22-
// TODO: What other fields do we need indexed?
2322
return err
2423
}
2524

migrations/post_sync_migrations/20240213000002_create_pos_fk_comments.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ func init() {
2222
comment on column locked_stake_entry.badger_key is E'@omit';
2323
comment on column yield_curve_point.badger_key is E'@omit';
2424
comment on column locked_balance_entry.badger_key is E'@omit';
25-
comment on column epoch_entry.badger_key is E'@omit';
2625
comment on table transaction_partition_34 is E'@omit';
2726
comment on table transaction_partition_35 is E'@omit';
2827
comment on table transaction_partition_36 is E'@omit';

migrations/post_sync_migrations/20240215000002_create_leader_schedule_fk_comments.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ import (
66
"github.com/uptrace/bun"
77
)
88

9-
// TODO: revisit access group relationships when we refactor the messaging app to use the graphql API.
109
func init() {
1110
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
1211
_, err := db.Exec(`
13-
comment on table leader_schedule_entry is E'@foreignKey (validator_pkid) references account (pkid)|@foreignFieldName leaderScheduleEntries|@fieldName leaderAccount\n@foreignKey (validator_pkid) references validator_entry (validator_pkid)|@foreignFieldName leaderScheduleEntries|@fieldName validatorEntry';
12+
comment on table leader_schedule_entry is E'@foreignKey (validator_pkid) references account (pkid)|@foreignFieldName leaderScheduleEntries|@fieldName leaderAccount\n@foreignKey (validator_pkid) references validator_entry (validator_pkid)|@foreignFieldName leaderScheduleEntries|@fieldName validatorEntry\n@foreignKey (snapshot_at_epoch_number) references epoch_entry (snapshot_at_epoch_number)|@foreignFieldName leaderScheduleEntries|@fieldName epochEntryBySnapshot';
1413
comment on column leader_schedule_entry.badger_key is E'@omit';
1514
`)
1615
if err != nil {

0 commit comments

Comments
 (0)