diff --git a/cmd/soroban-rpc/internal/config/config.go b/cmd/soroban-rpc/internal/config/config.go index 00eb9e62d7..8ac6341bd7 100644 --- a/cmd/soroban-rpc/internal/config/config.go +++ b/cmd/soroban-rpc/internal/config/config.go @@ -24,4 +24,5 @@ type LocalConfig struct { LedgerEntryStorageTimeout time.Duration LedgerRetentionWindow int CheckpointFrequency uint32 + MaxEventsLimit uint } diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index e9aadfdfcd..a9e8d08444 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -16,6 +16,7 @@ import ( "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/db" + "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ingest" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/methods" ) @@ -103,9 +104,16 @@ func MustNew(cfg config.LocalConfig) *Daemon { logger.Fatalf("could not open database: %v", err) } + ledgerRetentionWindow := uint32(cfg.LedgerRetentionWindow) + eventStore, err := events.NewMemoryStore(cfg.NetworkPassphrase, ledgerRetentionWindow) + if err != nil { + logger.Fatalf("could not create event store: %v", err) + } + ingestService, err := ingest.NewService(ingest.Config{ Logger: logger, - DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, uint32(cfg.LedgerRetentionWindow)), + DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, ledgerRetentionWindow), + EventStore: eventStore, NetworkPassPhrase: cfg.NetworkPassphrase, Archive: historyArchive, LedgerBackend: core, @@ -134,13 +142,14 @@ func MustNew(cfg config.LocalConfig) *Daemon { handler, err := internal.NewJSONRPCHandler(internal.HandlerParams{ AccountStore: methods.AccountStore{Client: hc}, - EventStore: methods.EventStore{Client: hc}, + EventStore: eventStore, FriendbotURL: cfg.FriendbotURL, NetworkPassphrase: cfg.NetworkPassphrase, Logger: logger, TransactionProxy: transactionProxy, CoreClient: &stellarcore.Client{URL: cfg.StellarCoreURL}, LedgerEntryReader: db.NewLedgerEntryReader(dbConn), + MaxEventsLimit: cfg.MaxEventsLimit, }) if err != nil { logger.Fatalf("could not create handler: %v", err) diff --git a/cmd/soroban-rpc/internal/events/cursor.go b/cmd/soroban-rpc/internal/events/cursor.go new file mode 100644 index 0000000000..2c8c58d84b --- /dev/null +++ b/cmd/soroban-rpc/internal/events/cursor.go @@ -0,0 +1,101 @@ +package events + +import ( + "fmt" + "math" + "strconv" + "strings" + + "github.com/stellar/go/toid" +) + +// Cursor represents the position of a Soroban event. +// Soroban events are sorted in ascending order by +// ledger sequence, transaction index, operation index, +// and event index. +type Cursor struct { + // Ledger is the sequence of the ledger which emitted the event. + Ledger uint32 + // Tx is the index of the transaction within the ledger which emitted the event. + Tx uint32 + // Op is the index of the operation within the transaction which emitted the event. + Op uint32 + // Event is the index of the event within in the operation which emitted the event. + Event uint32 +} + +// String returns a string representation of this cursor +func (c Cursor) String() string { + return fmt.Sprintf( + "%019d-%010d", + toid.New(int32(c.Ledger), int32(c.Tx), int32(c.Op)).ToInt64(), + c.Event, + ) +} + +// ParseCursor parses the given string and returns the corresponding cursor +func ParseCursor(input string) (Cursor, error) { + parts := strings.SplitN(input, "-", 2) + if len(parts) != 2 { + return Cursor{}, fmt.Errorf("invalid event id %s", input) + } + + // Parse the first part (toid) + idInt, err := strconv.ParseInt(parts[0], 10, 64) //lint:ignore gomnd + if err != nil { + return Cursor{}, fmt.Errorf("invalid event id %s: %w", input, err) + } + parsed := toid.Parse(idInt) + + // Parse the second part (event order) + eventOrder, err := strconv.ParseInt(parts[1], 10, 64) //lint:ignore gomnd + if err != nil { + return Cursor{}, fmt.Errorf("invalid event id %s: %w", input, err) + } + + return Cursor{ + Ledger: uint32(parsed.LedgerSequence), + Tx: uint32(parsed.TransactionOrder), + Op: uint32(parsed.OperationOrder), + Event: uint32(eventOrder), + }, nil +} + +func cmp(a, b uint32) int { + if a < b { + return -1 + } + if a > b { + return 1 + } + return 0 +} + +// Cmp compares two cursors. +// 0 is returned if the c is equal to other. +// 1 is returned if c is greater than other. +// -1 is returned if c is less than other. +func (c Cursor) Cmp(other Cursor) int { + if c.Ledger == other.Ledger { + if c.Tx == other.Tx { + if c.Op == other.Op { + return cmp(c.Event, other.Event) + } + return cmp(c.Op, other.Op) + } + return cmp(c.Tx, other.Tx) + } + return cmp(c.Ledger, other.Ledger) +} + +var ( + // MinCursor is the smallest possible cursor + MinCursor = Cursor{} + // MaxCursor is the largest possible cursor + MaxCursor = Cursor{ + Ledger: math.MaxUint32, + Tx: math.MaxUint32, + Op: math.MaxUint32, + Event: math.MaxUint32, + } +) diff --git a/cmd/soroban-rpc/internal/events/cursor_test.go b/cmd/soroban-rpc/internal/events/cursor_test.go new file mode 100644 index 0000000000..946305ce33 --- /dev/null +++ b/cmd/soroban-rpc/internal/events/cursor_test.go @@ -0,0 +1,85 @@ +package events + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseCursor(t *testing.T) { + for _, cursor := range []Cursor{ + { + Ledger: math.MaxInt32, + Tx: 1048575, + Op: 4095, + Event: math.MaxInt32, + }, + { + Ledger: 0, + Tx: 0, + Op: 0, + Event: 0, + }, + { + Ledger: 123, + Tx: 10, + Op: 5, + Event: 1, + }, + } { + parsed, err := ParseCursor(cursor.String()) + assert.NoError(t, err) + assert.Equal(t, cursor, parsed) + } +} + +func TestCursorCmp(t *testing.T) { + for _, testCase := range []struct { + a Cursor + b Cursor + expected int + }{ + {MinCursor, MaxCursor, -1}, + {MinCursor, MinCursor, 0}, + {MaxCursor, MaxCursor, 0}, + { + Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4}, + 0, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 7, Tx: 2, Op: 3, Event: 4}, + -1, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 5, Tx: 7, Op: 3, Event: 4}, + -1, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 5, Tx: 2, Op: 7, Event: 4}, + -1, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 7}, + -1, + }, + } { + a := testCase.a + b := testCase.b + expected := testCase.expected + + if got := a.Cmp(b); got != expected { + t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got) + } + a, b = b, a + expected *= -1 + if got := a.Cmp(b); got != expected { + t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got) + } + } +} diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index 2b288ec52c..17097882b6 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "math" "sort" "sync" @@ -12,63 +11,10 @@ import ( "github.com/stellar/go/xdr" ) -// Cursor represents the position of a Soroban event. -// Soroban events are sorted in ascending order by -// ledger sequence, transaction index, operation index, -// and event index. -type Cursor struct { - // Ledger is the sequence of the ledger which emitted the event. - Ledger uint32 - // Tx is the index of the transaction within the ledger which emitted the event. - Tx uint32 - // Op is the index of the operation within the transaction which emitted the event. - Op uint32 - // Event is the index of the event within in the operation which emitted the event. - Event uint32 -} - -func cmp(a, b uint32) int { - if a < b { - return -1 - } - if a > b { - return 1 - } - return 0 -} - -// Cmp compares two cursors. -// 0 is returned if the c is equal to other. -// 1 is returned if c is greater than other. -// -1 is returned if c is less than other. -func (c Cursor) Cmp(other Cursor) int { - if c.Ledger == other.Ledger { - if c.Tx == other.Tx { - if c.Op == other.Op { - return cmp(c.Event, other.Event) - } - return cmp(c.Op, other.Op) - } - return cmp(c.Tx, other.Tx) - } - return cmp(c.Ledger, other.Ledger) -} - -var ( - // MinCursor is the smallest possible cursor - MinCursor = Cursor{} - // MaxCursor is the largest possible cursor - MaxCursor = Cursor{ - Ledger: math.MaxUint32, - Tx: math.MaxUint32, - Op: math.MaxUint32, - Event: math.MaxUint32, - } -) - type bucket struct { - ledgerSeq uint32 - events []event + ledgerSeq uint32 + ledgerCloseTimestamp int64 + events []event } type event struct { @@ -89,6 +35,12 @@ func (e event) cursor(ledgerSeq uint32) Cursor { // MemoryStore is an in-memory store of soroban events. type MemoryStore struct { + // networkPassphrase is an immutable string containing the + // Stellar network passphrase. + // Accessing networkPassphrase does not need to be protected + // by the lock + networkPassphrase string + // lock protects the mutable fields below lock sync.RWMutex // buckets is a circular buffer where each cell represents // all events occurring within a specific ledger. @@ -104,12 +56,13 @@ type MemoryStore struct { // will be included in the MemoryStore. If the MemoryStore // is full, any events from new ledgers will evict // older entries outside the retention window. -func NewMemoryStore(retentionWindow uint32) (*MemoryStore, error) { +func NewMemoryStore(networkPassphrase string, retentionWindow uint32) (*MemoryStore, error) { if retentionWindow == 0 { return nil, errors.New("retention window must be positive") } return &MemoryStore{ - buckets: make([]bucket, 0, retentionWindow), + networkPassphrase: networkPassphrase, + buckets: make([]bucket, 0, retentionWindow), }, nil } @@ -133,7 +86,7 @@ type Range struct { // remaining events in the range). Note that a read lock is held for the // entire duration of the Scan function so f should be written in a way // to minimize latency. -func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) bool) error { +func (m *MemoryStore) Scan(eventRange Range, f func(xdr.ContractEvent, Cursor, int64) bool) error { m.lock.RLock() defer m.lock.RUnlock() @@ -146,12 +99,13 @@ func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) b i := ((curLedger - minLedger) + m.start) % uint32(len(m.buckets)) events := seek(m.buckets[i].events, eventRange.Start) for ; curLedger == m.buckets[i].ledgerSeq; curLedger++ { + timestamp := m.buckets[i].ledgerCloseTimestamp for _, event := range events { cur := event.cursor(curLedger) if eventRange.End.Cmp(cur) <= 0 { return nil } - if !f(cur, event.contents) { + if !f(event.contents, cur, timestamp) { return nil } } @@ -177,8 +131,10 @@ func (m *MemoryStore) validateRange(eventRange *Range) error { return errors.New("start is before oldest ledger") } } - max := Cursor{Ledger: min.Ledger + uint32(len(m.buckets))} + if eventRange.Start.Cmp(max) >= 0 { + return errors.New("start is after newest ledger") + } if eventRange.End.Cmp(max) > 0 { if eventRange.ClampEnd { eventRange.End = max @@ -207,31 +163,51 @@ func seek(events []event, cursor Cursor) []event { // IngestEvents adds new events from the given ledger into the store. // As a side effect, events which fall outside the retention window are // removed from the store. -func (m *MemoryStore) IngestEvents(txReader *ingest.LedgerTransactionReader) error { - events, err := readEvents(txReader) +func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error { + // no need to acquire the lock because the networkPassphrase field + // is immutable + events, err := readEvents(m.networkPassphrase, ledgerCloseMeta) if err != nil { return err } - ledgerSequence := txReader.GetSequence() - return m.append(ledgerSequence, events) + ledgerSequence := ledgerCloseMeta.LedgerSequence() + ledgerCloseTime := int64(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime) + return m.append(ledgerSequence, ledgerCloseTime, events) } -func readEvents(txReader *ingest.LedgerTransactionReader) ([]event, error) { - var events []event +func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (events []event, err error) { + var txReader *ingest.LedgerTransactionReader + txReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) + if err != nil { + return + } + defer func() { + closeErr := txReader.Close() + if err == nil { + err = closeErr + } + }() + for { - tx, err := txReader.Read() + var tx ingest.LedgerTransaction + tx, err = txReader.Read() if err == io.EOF { + err = nil break } if err != nil { - return nil, err + return } + if !tx.Result.Successful() { + continue + } for i := range tx.Envelope.Operations() { opIndex := uint32(i) - opEvents, err := tx.GetOperationEvents(opIndex) + var opEvents []xdr.ContractEvent + opEvents, err = tx.GetOperationEvents(opIndex) if err != nil { - return nil, err + return } for eventIndex, opEvent := range opEvents { events = append(events, event{ @@ -243,11 +219,11 @@ func readEvents(txReader *ingest.LedgerTransactionReader) ([]event, error) { } } } - return events, nil + return events, err } // append adds new events to the circular buffer. -func (m *MemoryStore) append(sequence uint32, events []event) error { +func (m *MemoryStore) append(sequence uint32, ledgerCloseTimestamp int64, events []event) error { m.lock.Lock() defer m.lock.Unlock() @@ -259,17 +235,16 @@ func (m *MemoryStore) append(sequence uint32, events []event) error { } } + nextBucket := bucket{ + ledgerCloseTimestamp: ledgerCloseTimestamp, + ledgerSeq: sequence, + events: events, + } if length < uint32(cap(m.buckets)) { - m.buckets = append(m.buckets, bucket{ - ledgerSeq: sequence, - events: events, - }) + m.buckets = append(m.buckets, nextBucket) } else { index := (m.start + length) % uint32(len(m.buckets)) - m.buckets[index] = bucket{ - ledgerSeq: sequence, - events: events, - } + m.buckets[index] = nextBucket m.start++ } diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go index 015398973f..8fc134642a 100644 --- a/cmd/soroban-rpc/internal/events/events_test.go +++ b/cmd/soroban-rpc/internal/events/events_test.go @@ -9,28 +9,37 @@ import ( ) var ( - ledger5Events = []event{ + ledger5CloseTime = ledgerCloseTime(5) + ledger5Events = []event{ newEvent(1, 0, 0, 100), newEvent(1, 0, 1, 200), newEvent(2, 0, 0, 300), newEvent(2, 1, 0, 400), } - ledger6Events []event = nil - ledger7Events = []event{ + ledger6CloseTime = ledgerCloseTime(6) + ledger6Events []event = nil + ledger7CloseTime = ledgerCloseTime(7) + ledger7Events = []event{ newEvent(1, 0, 0, 500), } - ledger8Events = []event{ + ledger8CloseTime = ledgerCloseTime(8) + ledger8Events = []event{ newEvent(1, 0, 0, 600), newEvent(2, 0, 0, 700), newEvent(2, 0, 1, 800), newEvent(2, 0, 2, 900), newEvent(2, 1, 0, 1000), } - ledger9Events = []event{ + ledger9CloseTime = ledgerCloseTime(9) + ledger9Events = []event{ newEvent(1, 0, 0, 1100), } ) +func ledgerCloseTime(seq uint32) int64 { + return int64(seq)*25 + 100 +} + func newEvent(txIndex, opIndex, eventIndex, val uint32) event { v := xdr.Uint32(val) return event{ @@ -74,77 +83,28 @@ func eventsAreEqual(t *testing.T, a, b []event) { } } -func TestCursorCmp(t *testing.T) { - for _, testCase := range []struct { - a Cursor - b Cursor - expected int - }{ - {MinCursor, MaxCursor, -1}, - {MinCursor, MinCursor, 0}, - {MaxCursor, MaxCursor, 0}, - { - Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4}, - Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4}, - 0, - }, - { - Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, - Cursor{Ledger: 7, Tx: 2, Op: 3, Event: 4}, - -1, - }, - { - Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, - Cursor{Ledger: 5, Tx: 7, Op: 3, Event: 4}, - -1, - }, - { - Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, - Cursor{Ledger: 5, Tx: 2, Op: 7, Event: 4}, - -1, - }, - { - Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, - Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 7}, - -1, - }, - } { - a := testCase.a - b := testCase.b - expected := testCase.expected - - if got := a.Cmp(b); got != expected { - t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got) - } - a, b = b, a - expected *= -1 - if got := a.Cmp(b); got != expected { - t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got) - } - } -} - func TestAppend(t *testing.T) { - m, err := NewMemoryStore(3) + m, err := NewMemoryStore("unit-tests", 3) require.NoError(t, err) // test appending first bucket of events - require.NoError(t, m.append(5, ledger5Events)) + require.NoError(t, m.append(5, ledger5CloseTime, ledger5Events)) require.Equal(t, uint32(5), m.buckets[m.start].ledgerSeq) + require.Equal(t, ledger5CloseTime, m.buckets[m.start].ledgerCloseTimestamp) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) require.Equal(t, 1, len(m.buckets)) // the next bucket of events must follow the previous bucket (ledger 5) require.EqualError( - t, m.append(10, ledger5Events), + t, m.append(10, 100, ledger5Events), "events not contiguous: expected ledger sequence 6 but received 10", ) require.EqualError( - t, m.append(4, ledger5Events), + t, m.append(4, 100, ledger5Events), "events not contiguous: expected ledger sequence 6 but received 4", ) require.EqualError( - t, m.append(5, nil), + t, m.append(5, 100, nil), "events not contiguous: expected ledger sequence 6 but received 5", ) // check that none of the calls above modified our buckets @@ -152,51 +112,59 @@ func TestAppend(t *testing.T) { require.Equal(t, 1, len(m.buckets)) // append ledger 6 events, now we have two buckets filled - require.NoError(t, m.append(6, ledger6Events)) + require.NoError(t, m.append(6, ledger6CloseTime, ledger6Events)) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(6), m.buckets[(m.start+1)%uint32(len(m.buckets))].ledgerSeq) + require.Equal(t, ledger6CloseTime, m.buckets[(m.start+1)%uint32(len(m.buckets))].ledgerCloseTimestamp) require.Equal(t, 2, len(m.buckets)) // the next bucket of events must follow the previous bucket (ledger 6) require.EqualError( - t, m.append(10, ledger5Events), + t, m.append(10, 100, ledger5Events), "events not contiguous: expected ledger sequence 7 but received 10", ) require.EqualError( - t, m.append(5, ledger5Events), + t, m.append(5, 100, ledger5Events), "events not contiguous: expected ledger sequence 7 but received 5", ) require.EqualError( - t, m.append(6, nil), + t, m.append(6, 100, nil), "events not contiguous: expected ledger sequence 7 but received 6", ) // append ledger 7 events, now we have all three buckets filled - require.NoError(t, m.append(7, ledger7Events)) + require.NoError(t, m.append(7, ledger7CloseTime, ledger7Events)) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger7Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(7), m.buckets[(m.start+2)%uint32(len(m.buckets))].ledgerSeq) + require.Equal(t, ledger7CloseTime, m.buckets[(m.start+2)%uint32(len(m.buckets))].ledgerCloseTimestamp) require.Equal(t, 3, len(m.buckets)) // append ledger 8 events, but all buckets are full, so we need to evict ledger 5 - require.NoError(t, m.append(8, ledger8Events)) + require.NoError(t, m.append(8, ledger8CloseTime, ledger8Events)) eventsAreEqual(t, ledger6Events, m.buckets[m.start].events) eventsAreEqual(t, ledger7Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger8Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(8), m.buckets[(m.start+2)%uint32(len(m.buckets))].ledgerSeq) + require.Equal(t, ledger8CloseTime, m.buckets[(m.start+2)%uint32(len(m.buckets))].ledgerCloseTimestamp) require.Equal(t, 3, len(m.buckets)) // append ledger 9 events, but all buckets are full, so we need to evict ledger 6 - require.NoError(t, m.append(9, ledger9Events)) + require.NoError(t, m.append(9, ledger9CloseTime, ledger9Events)) eventsAreEqual(t, ledger7Events, m.buckets[m.start].events) eventsAreEqual(t, ledger8Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger9Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(9), m.buckets[(m.start+2)%uint32(len(m.buckets))].ledgerSeq) + require.Equal(t, ledger9CloseTime, m.buckets[(m.start+2)%uint32(len(m.buckets))].ledgerCloseTimestamp) require.Equal(t, 3, len(m.buckets)) } func TestScanRangeValidation(t *testing.T) { - m, err := NewMemoryStore(4) + m, err := NewMemoryStore("unit-tests", 4) require.NoError(t, err) - assertNoCalls := func(cursor Cursor, contractEvent xdr.ContractEvent) bool { + assertNoCalls := func(contractEvent xdr.ContractEvent, cursor Cursor, timestamp int64) bool { t.Fatalf("unexpected call") return true } @@ -257,7 +225,7 @@ func TestScanRangeValidation(t *testing.T) { End: Cursor{Ledger: 3}, ClampEnd: true, }, - "start is not before end", + "start is after newest ledger", }, { Range{ @@ -266,7 +234,7 @@ func TestScanRangeValidation(t *testing.T) { End: Cursor{Ledger: 3}, ClampEnd: false, }, - "start is not before end", + "start is after newest ledger", }, { Range{ @@ -275,7 +243,7 @@ func TestScanRangeValidation(t *testing.T) { End: Cursor{Ledger: 10}, ClampEnd: true, }, - "start is not before end", + "start is after newest ledger", }, { Range{ @@ -284,7 +252,7 @@ func TestScanRangeValidation(t *testing.T) { End: Cursor{Ledger: 10}, ClampEnd: false, }, - "end is after latest ledger", + "start is after newest ledger", }, { Range{ @@ -320,13 +288,13 @@ func TestScanRangeValidation(t *testing.T) { } func createStore(t *testing.T) *MemoryStore { - m, err := NewMemoryStore(4) + m, err := NewMemoryStore("unit-tests", 4) require.NoError(t, err) - require.NoError(t, m.append(5, ledger5Events)) - require.NoError(t, m.append(6, nil)) - require.NoError(t, m.append(7, ledger7Events)) - require.NoError(t, m.append(8, ledger8Events)) + require.NoError(t, m.append(5, ledger5CloseTime, ledger5Events)) + require.NoError(t, m.append(6, ledger6CloseTime, nil)) + require.NoError(t, m.append(7, ledger7CloseTime, ledger7Events)) + require.NoError(t, m.append(8, ledger8CloseTime, ledger8Events)) return m } @@ -461,7 +429,8 @@ func TestScan(t *testing.T) { for _, input := range genEquivalentInputs(testCase.input) { var events []event iterateAll := true - f := func(cursor Cursor, contractEvent xdr.ContractEvent) bool { + f := func(contractEvent xdr.ContractEvent, cursor Cursor, ledgerCloseTimestamp int64) bool { + require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp) events = append(events, event{ contents: contractEvent, txIndex: cursor.Tx, diff --git a/cmd/soroban-rpc/internal/ingest/service.go b/cmd/soroban-rpc/internal/ingest/service.go index 41a3b881b2..be8107f9ff 100644 --- a/cmd/soroban-rpc/internal/ingest/service.go +++ b/cmd/soroban-rpc/internal/ingest/service.go @@ -13,6 +13,7 @@ import ( "github.com/stellar/go/xdr" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/db" + "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events" ) const ( @@ -22,6 +23,7 @@ const ( type Config struct { Logger *log.Entry DB db.ReadWriter + EventStore *events.MemoryStore NetworkPassPhrase string Archive historyarchive.ArchiveInterface LedgerBackend backends.LedgerBackend @@ -33,6 +35,7 @@ func NewService(cfg Config) (*Service, error) { o := Service{ logger: cfg.Logger, db: cfg.DB, + eventStore: cfg.EventStore, ledgerBackend: cfg.LedgerBackend, networkPassPhrase: cfg.NetworkPassPhrase, timeout: cfg.Timeout, @@ -51,6 +54,7 @@ func NewService(cfg Config) (*Service, error) { type Service struct { logger *log.Entry db db.ReadWriter + eventStore *events.MemoryStore ledgerBackend backends.LedgerBackend timeout time.Duration networkPassPhrase string @@ -190,8 +194,11 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { } func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta) error { - ledgerWriter := tx.LedgerWriter() - if err := ledgerWriter.InsertLedger(ledgerCloseMeta); err != nil { + if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil { + return err + } + + if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil { return err } return nil diff --git a/cmd/soroban-rpc/internal/jsonrpc.go b/cmd/soroban-rpc/internal/jsonrpc.go index c60314c9aa..63058be6b2 100644 --- a/cmd/soroban-rpc/internal/jsonrpc.go +++ b/cmd/soroban-rpc/internal/jsonrpc.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go/support/log" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/db" + "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/methods" ) @@ -39,13 +40,14 @@ func (h Handler) Close() { type HandlerParams struct { AccountStore methods.AccountStore - EventStore methods.EventStore + EventStore *events.MemoryStore FriendbotURL string TransactionProxy *methods.TransactionProxy CoreClient *stellarcore.Client LedgerEntryReader db.LedgerEntryReader Logger *log.Entry NetworkPassphrase string + MaxEventsLimit uint } // NewJSONRPCHandler constructs a Handler instance @@ -53,7 +55,7 @@ func NewJSONRPCHandler(params HandlerParams) (Handler, error) { bridge := jhttp.NewBridge(handler.Map{ "getHealth": methods.NewHealthCheck(), "getAccount": methods.NewAccountHandler(params.AccountStore), - "getEvents": methods.NewGetEventsHandler(params.EventStore), + "getEvents": methods.NewGetEventsHandler(params.EventStore, params.MaxEventsLimit), "getNetwork": methods.NewGetNetworkHandler(params.NetworkPassphrase, params.FriendbotURL, params.CoreClient), "getLedgerEntry": methods.NewGetLedgerEntryHandler(params.Logger, params.LedgerEntryReader), "getTransactionStatus": methods.NewGetTransactionStatusHandler(params.TransactionProxy), diff --git a/cmd/soroban-rpc/internal/methods/get_events.go b/cmd/soroban-rpc/internal/methods/get_events.go index c3eafaa47b..0e55e0c680 100644 --- a/cmd/soroban-rpc/internal/methods/get_events.go +++ b/cmd/soroban-rpc/internal/methods/get_events.go @@ -5,25 +5,17 @@ import ( "encoding/hex" "encoding/json" "fmt" - "io" - "net/http" - "strconv" - "strings" "time" "github.com/creachadair/jrpc2" "github.com/creachadair/jrpc2/code" "github.com/creachadair/jrpc2/handler" - "github.com/stellar/go/clients/horizonclient" - "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/support/errors" - "github.com/stellar/go/toid" "github.com/stellar/go/xdr" -) -// maxLedgerRange is the maximum allowed value of endLedger-startLedger -// Just guessed 4320 as it is ~6hrs -const maxLedgerRange = 4320 + "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events" +) type EventInfo struct { EventType string `json:"type"` @@ -42,19 +34,18 @@ type EventInfoValue struct { type GetEventsRequest struct { StartLedger int32 `json:"startLedger,string"` - EndLedger int32 `json:"endLedger,string"` Filters []EventFilter `json:"filters"` Pagination *PaginationOptions `json:"pagination,omitempty"` } -func (g *GetEventsRequest) Valid() error { - // Validate start & end ledger - // Validate the ledger range min/max - if g.EndLedger < g.StartLedger { - return errors.New("endLedger must be after or the same as startLedger") +func (g *GetEventsRequest) Valid(maxLimit uint) error { + // Validate start + // Validate the paging limit (if it exists) + if g.StartLedger <= 0 { + return errors.New("startLedger must be positive") } - if g.EndLedger-g.StartLedger > maxLedgerRange { - return fmt.Errorf("endLedger must be less than %d ledgers after startLedger", maxLedgerRange) + if g.Pagination != nil && g.Pagination.Limit > maxLimit { + return fmt.Errorf("limit must not exceed %d", maxLimit) } // Validate filters @@ -118,7 +109,6 @@ func (e *EventFilter) Valid() error { return nil } -// TODO: Implement this more efficiently (ideally do it in the real data backend) func (e *EventFilter) Matches(event xdr.ContractEvent) bool { return e.matchesEventType(event) && e.matchesContractIDs(event) && e.matchesTopics(event) } @@ -259,223 +249,94 @@ type PaginationOptions struct { Limit uint `json:"limit,omitempty"` } -type EventStore struct { - Client TransactionClient -} - -type TransactionClient interface { - Transactions(request horizonclient.TransactionRequest) (horizon.TransactionsPage, error) -} - -// TODO: Extract this to a new package 'eventid' -// Build a lexically order-able id for this event record. This is -// based on Horizon's db2/history.Effect.ID method. -type EventID struct { - *toid.ID - EventOrder int32 -} - -// String returns a string representation of this id -func (id EventID) String() string { - return fmt.Sprintf( - "%019d-%010d", - id.ToInt64(), - id.EventOrder+1, - ) -} - -func (id *EventID) Parse(input string) error { - parts := strings.SplitN(input, "-", 2) - if len(parts) != 2 { - return fmt.Errorf("invalid event id %s", input) - } - - // Parse the first part (toid) - idInt, err := strconv.ParseInt(parts[0], 10, 64) //lint:ignore gomnd - if err != nil { - return errors.Wrapf(err, "invalid event id %s", input) - } - parsed := toid.Parse(idInt) - - // Parse the second part (event order) - eventOrder, err := strconv.ParseInt(parts[1], 10, 64) //lint:ignore gomnd - if err != nil { - return errors.Wrapf(err, "invalid event id %s", input) - } - - id.ID = &parsed - // Subtract one to go from the id to the - id.EventOrder = int32(eventOrder) - 1 - - return nil +type eventScanner interface { + Scan(eventRange events.Range, f func(xdr.ContractEvent, events.Cursor, int64) bool) error } -//lint:ignore gocyclo -func (a EventStore) GetEvents(request GetEventsRequest) ([]EventInfo, error) { - if err := request.Valid(); err != nil { - return nil, err - } - - finish := toid.AfterLedger(request.EndLedger) - - var results []EventInfo - - // TODO: Use a more efficient backend here. For now, we stream all ledgers in - // the range from horizon, and filter them. This sucks. - cursor := EventID{ - ID: toid.New(request.StartLedger, int32(0), 0), - EventOrder: 0, - } - if request.Pagination != nil && request.Pagination.Cursor != "" { - if err := cursor.Parse(request.Pagination.Cursor); err != nil { - return nil, errors.Wrap(err, "invalid pagination cursor") +func getEvents(scanner eventScanner, request GetEventsRequest, maxLimit uint) ([]EventInfo, error) { + if err := request.Valid(maxLimit); err != nil { + return nil, &jrpc2.Error{ + Code: code.InvalidParams, + Message: err.Error(), } } - err := a.ForEachTransaction(cursor.ID, finish, func(transaction horizon.Transaction) error { - // parse the txn paging-token, to get the transactionIndex - pagingTokenInt, err := strconv.ParseInt(transaction.PagingToken(), 10, 64) //lint:ignore gomnd - if err != nil { - return errors.Wrapf(err, "invalid paging token %s", transaction.PagingToken()) - } - pagingToken := toid.Parse(pagingTokenInt) - - // For the first txn, we might have to skip some events to get the first - // after the cursor. - operationCursor := cursor.OperationOrder - eventCursor := cursor.EventOrder - cursor.OperationOrder = 0 - cursor.EventOrder = 0 - if pagingToken.ToInt64() > cursor.ToInt64() { - // This transaction is after the cursor, so we need to reset the cursor - operationCursor = 0 - eventCursor = 0 - } - var meta xdr.TransactionMeta - if err := xdr.SafeUnmarshalBase64(transaction.ResultMetaXdr, &meta); err != nil { - // Invalid meta back. Eek! - return err + start := events.Cursor{Ledger: uint32(request.StartLedger)} + limit := maxLimit + if request.Pagination != nil { + if request.Pagination.Cursor != "" { + var err error + start, err = events.ParseCursor(request.Pagination.Cursor) + if err != nil { + return nil, errors.Wrap(err, "invalid pagination cursor") + } + // increment event index because, when paginating, + // we start with the item right after the cursor + start.Event++ } - - v3, ok := meta.GetV3() - if !ok { - return nil + if request.Pagination.Limit > 0 { + limit = request.Pagination.Limit } + } - ledger := transaction.Ledger - ledgerClosedAt := transaction.LedgerCloseTime.Format(time.RFC3339) - - for operationIndex, operationEvents := range v3.Events { - if int32(operationIndex) < operationCursor { - continue - } - for eventIndex, event := range operationEvents.Events { - if int32(eventIndex) < eventCursor { - continue - } - if request.Matches(event) { - info, err := eventInfoForEvent( - toid.New(ledger, pagingToken.TransactionOrder, int32(operationIndex)), - int32(eventIndex), - ledgerClosedAt, - event, - ) - if err != nil { - return err - } - results = append(results, info) - - // Check if we've gotten "limit" events - if request.Pagination != nil && request.Pagination.Limit > 0 && uint(len(results)) >= request.Pagination.Limit { - return io.EOF - } - } + type entry struct { + cursor events.Cursor + ledgerCloseTimestamp int64 + event xdr.ContractEvent + } + var found []entry + err := scanner.Scan( + events.Range{ + Start: start, + ClampStart: false, + End: events.MaxCursor, + ClampEnd: true, + }, + func(event xdr.ContractEvent, cursor events.Cursor, ledgerCloseTimestamp int64) bool { + if request.Matches(event) { + found = append(found, entry{cursor, ledgerCloseTimestamp, event}) } + return uint(len(found)) < limit + }, + ) + if err != nil { + return nil, &jrpc2.Error{ + Code: code.InvalidRequest, + Message: err.Error(), } - return nil - }) - if err == io.EOF { - err = nil } - return results, err -} -// ForEachTransaction runs f for each transaction in a range from start -// (inclusive) to finish (exclusive). If f returns any error, -// ForEachTransaction stops immediately and returns that error. -func (a EventStore) ForEachTransaction(start, finish *toid.ID, f func(transaction horizon.Transaction) error) error { - delay := 10 * time.Millisecond - cursor := toid.New(start.LedgerSequence, start.TransactionOrder, 0) - for { - transactions, err := a.Client.Transactions(horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: cursor.String(), - Limit: 200, - IncludeFailed: false, - }) + var results []EventInfo + for _, entry := range found { + info, err := eventInfoForEvent( + entry.event, + entry.cursor, + time.Unix(entry.ledgerCloseTimestamp, 0).UTC().Format(time.RFC3339), + ) if err != nil { - hErr := horizonclient.GetError(err) - if hErr != nil && hErr.Response != nil && (hErr.Response.StatusCode == http.StatusTooManyRequests || hErr.Response.StatusCode >= 500) { - // rate-limited, or horizon server-side error, we can retry. - - // exponential backoff, to not hammer Horizon - delay *= 2 - - if delay > time.Second { - return err - } - - // retry - time.Sleep(delay) - continue - } else { - // Unknown error, bail. - return err - } - } - - for _, transaction := range transactions.Embedded.Records { - pt, err := strconv.ParseInt(transaction.PagingToken(), 10, 64) //lint:ignore gomnd - if err != nil { - return errors.Wrapf(err, "invalid paging token %s", transaction.PagingToken()) - } - if pt >= finish.ToInt64() { - // Done! - return nil - } - id := toid.Parse(pt) - cursor = &id - - if err := f(transaction); err != nil { - return err - } - } - - if len(transactions.Embedded.Records) < 200 { - // Did not return "limit" transactions, and the query is open-ended, so this must be the end. - return nil + return nil, errors.Wrap(err, "could not parse event") } + results = append(results, info) } + return results, nil } -func eventInfoForEvent(opID *toid.ID, eventIndex int32, ledgerClosedAt string, event xdr.ContractEvent) (EventInfo, error) { +func eventInfoForEvent(event xdr.ContractEvent, cursor events.Cursor, ledgerClosedAt string) (EventInfo, error) { v0, ok := event.Body.GetV0() if !ok { return EventInfo{}, errors.New("unknown event version") } - eventType := EventTypeContract - if event.Type == xdr.ContractEventTypeSystem { - eventType = "system" + var eventType string + switch event.Type { + case xdr.ContractEventTypeSystem: + eventType = EventTypeSystem + case xdr.ContractEventTypeContract: + eventType = EventTypeContract + default: + return EventInfo{}, errors.New("unknown event type") } - // Build a lexically order-able id for this event record. This is - // based on Horizon's db2/history.Effect.ID method. - id := EventID{ - ID: opID, - EventOrder: eventIndex, - }.String() - // base64-xdr encode the topic topic := make([]string, 0, 4) for _, segment := range v0.Topics { @@ -494,28 +355,22 @@ func eventInfoForEvent(opID *toid.ID, eventIndex int32, ledgerClosedAt string, e return EventInfo{ EventType: eventType, - Ledger: opID.LedgerSequence, + Ledger: int32(cursor.Ledger), LedgerClosedAt: ledgerClosedAt, ContractID: hex.EncodeToString((*event.ContractId)[:]), - ID: id, - PagingToken: id, + ID: cursor.String(), + PagingToken: cursor.String(), Topic: topic, Value: EventInfoValue{XDR: data}, }, nil } // NewGetEventsHandler returns a json rpc handler to fetch and filter events -func NewGetEventsHandler(store EventStore) jrpc2.Handler { +func NewGetEventsHandler(eventsStore *events.MemoryStore, maxLimit uint) jrpc2.Handler { return handler.New(func(ctx context.Context, request GetEventsRequest) ([]EventInfo, error) { - response, err := store.GetEvents(request) + response, err := getEvents(eventsStore, request, maxLimit) if err != nil { - if herr, ok := err.(*horizonclient.Error); ok { - return response, (&jrpc2.Error{ - Code: code.InvalidRequest, - Message: herr.Problem.Title, - }).WithData(herr.Problem.Extras) - } - return response, err + return nil, err } return response, nil }) diff --git a/cmd/soroban-rpc/internal/methods/get_events_test.go b/cmd/soroban-rpc/internal/methods/get_events_test.go index eb75d90904..ea82c73a38 100644 --- a/cmd/soroban-rpc/internal/methods/get_events_test.go +++ b/cmd/soroban-rpc/internal/methods/get_events_test.go @@ -3,19 +3,17 @@ package methods import ( "encoding/json" "fmt" - "io" - "net/http" "strings" "testing" "time" - "github.com/stellar/go/clients/horizonclient" - "github.com/stellar/go/protocols/horizon" - "github.com/stellar/go/support/render/problem" - "github.com/stellar/go/toid" - "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/network" + "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events" ) func TestTopicFilterMatches(t *testing.T) { @@ -224,47 +222,47 @@ func topicFilterToString(t TopicFilter) string { func TestGetEventsRequestValid(t *testing.T) { assert.NoError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: 1, Filters: []EventFilter{}, Pagination: nil, - }).Valid()) + }).Valid(1000)) assert.EqualError(t, (&GetEventsRequest{ StartLedger: 1, - EndLedger: 0, Filters: []EventFilter{}, - Pagination: nil, - }).Valid(), "endLedger must be after or the same as startLedger") + Pagination: &PaginationOptions{Limit: 1001}, + }).Valid(1000), "limit must not exceed 1000") assert.EqualError(t, (&GetEventsRequest{ StartLedger: 0, - EndLedger: 4321, Filters: []EventFilter{}, Pagination: nil, - }).Valid(), "endLedger must be less than 4320 ledgers after startLedger") + }).Valid(1000), "startLedger must be positive") assert.EqualError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: -100, + Filters: []EventFilter{}, + Pagination: nil, + }).Valid(1000), "startLedger must be positive") + + assert.EqualError(t, (&GetEventsRequest{ + StartLedger: 1, Filters: []EventFilter{ {}, {}, {}, {}, {}, {}, }, Pagination: nil, - }).Valid(), "maximum 5 filters per request") + }).Valid(1000), "maximum 5 filters per request") assert.EqualError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: 1, Filters: []EventFilter{ {EventType: "foo"}, }, Pagination: nil, - }).Valid(), "filter 1 invalid: if set, type must be either 'system' or 'contract'") + }).Valid(1000), "filter 1 invalid: if set, type must be either 'system' or 'contract'") assert.EqualError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: 1, Filters: []EventFilter{ {ContractIDs: []string{ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", @@ -276,20 +274,18 @@ func TestGetEventsRequestValid(t *testing.T) { }}, }, Pagination: nil, - }).Valid(), "filter 1 invalid: maximum 5 contract IDs per filter") + }).Valid(1000), "filter 1 invalid: maximum 5 contract IDs per filter") assert.EqualError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: 1, Filters: []EventFilter{ {ContractIDs: []string{"a"}}, }, Pagination: nil, - }).Valid(), "filter 1 invalid: contract ID 1 invalid") + }).Valid(1000), "filter 1 invalid: contract ID 1 invalid") assert.EqualError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: 1, Filters: []EventFilter{ { Topics: []TopicFilter{ @@ -298,22 +294,20 @@ func TestGetEventsRequestValid(t *testing.T) { }, }, Pagination: nil, - }).Valid(), "filter 1 invalid: maximum 5 topics per filter") + }).Valid(1000), "filter 1 invalid: maximum 5 topics per filter") assert.EqualError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: 1, Filters: []EventFilter{ {Topics: []TopicFilter{ {}, }}, }, Pagination: nil, - }).Valid(), "filter 1 invalid: topic 1 invalid: topic must have at least one segment") + }).Valid(1000), "filter 1 invalid: topic 1 invalid: topic must have at least one segment") assert.EqualError(t, (&GetEventsRequest{ - StartLedger: 0, - EndLedger: 0, + StartLedger: 1, Filters: []EventFilter{ {Topics: []TopicFilter{ { @@ -326,281 +320,65 @@ func TestGetEventsRequestValid(t *testing.T) { }}, }, Pagination: nil, - }).Valid(), "filter 1 invalid: topic 1 invalid: topic cannot have more than 4 segments") -} - -func TestEventStoreForEachTransaction(t *testing.T) { - t.Run("empty", func(t *testing.T) { - client := &mockTransactionClient{} - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(horizon.TransactionsPage{}, nil).Once() - - start := toid.New(1, 0, 0) - finish := toid.New(2, 0, 0) - found := []horizon.Transaction{} - - err := EventStore{Client: client}.ForEachTransaction(start, finish, func(tx horizon.Transaction) error { - found = append(found, tx) - return nil - }) - assert.NoError(t, err) - - client.AssertExpectations(t) - assert.Equal(t, []horizon.Transaction{}, found) - }) - - t.Run("one", func(t *testing.T) { - client := &mockTransactionClient{} - var result horizon.TransactionsPage - result.Embedded.Records = []horizon.Transaction{ - {ID: "1", PT: toid.New(1, 0, 0).String()}, - } - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: result.Embedded.Records[0].PagingToken(), - Limit: 200, - IncludeFailed: false, - }).Return(result, nil).Once() - - start := toid.New(1, 0, 0) - finish := toid.New(2, 0, 0) - - found := []horizon.Transaction{} - called := 0 - err := EventStore{Client: client}.ForEachTransaction(start, finish, func(tx horizon.Transaction) error { - called++ - found = append(found, tx) - return nil - }) - assert.NoError(t, err) - - assert.Equal(t, 1, called) - client.AssertExpectations(t) - // Should find all transactions - assert.Equal(t, result.Embedded.Records, found) - }) - - t.Run("retry", func(t *testing.T) { - client := &mockTransactionClient{} - var result horizon.TransactionsPage - result.Embedded.Records = []horizon.Transaction{ - {ID: "1", PT: toid.New(1, 0, 0).String()}, - } - - // Return an error the first time - client.On("Transactions", mock.Anything).Return(horizon.TransactionsPage{}, horizonclient.Error{ - Response: &http.Response{ - StatusCode: http.StatusTooManyRequests, - }, - }).Once() - - // Then success on the retry - client.On("Transactions", mock.Anything).Return(result, nil).Once() - - start := toid.New(1, 0, 0) - finish := toid.New(2, 0, 0) - found := []horizon.Transaction{} - called := 0 - err := EventStore{Client: client}.ForEachTransaction(start, finish, func(tx horizon.Transaction) error { - called++ - found = append(found, tx) - return nil - }) - assert.NoError(t, err) - - assert.Equal(t, 1, called) - client.AssertExpectations(t) - // Should find all transactions - assert.Equal(t, result.Embedded.Records, found) - }) - - t.Run("timeout", func(t *testing.T) { - client := &mockTransactionClient{} - var result horizon.TransactionsPage - result.Embedded.Records = []horizon.Transaction{ - {ID: "1", PT: toid.New(1, 0, 0).String()}, - } - - // Return an error every time - client.On("Transactions", mock.Anything).Return(horizon.TransactionsPage{}, horizonclient.Error{ - Response: &http.Response{StatusCode: http.StatusTooManyRequests}, - Problem: problem.P{Title: "too many requests"}, - }).Times(7) - - start := toid.New(1, 0, 0) - finish := toid.New(2, 0, 0) - called := 0 - err := EventStore{Client: client}.ForEachTransaction(start, finish, func(tx horizon.Transaction) error { - called++ - return nil - }) - assert.EqualError(t, err, "horizon error: \"too many requests\" - check horizon.Error.Problem for more information") - - assert.Equal(t, 0, called) - client.AssertExpectations(t) - }) - - t.Run("cuts off after f error", func(t *testing.T) { - client := &mockTransactionClient{} - var result horizon.TransactionsPage - result.Embedded.Records = []horizon.Transaction{ - {ID: "1", PT: toid.New(1, 0, 0).String()}, - {ID: "2", PT: toid.New(2, 0, 0).String()}, - } - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: result.Embedded.Records[0].PagingToken(), - Limit: 200, - IncludeFailed: false, - }).Return(result, nil).Once() - - start := toid.New(1, 0, 0) - finish := toid.New(2, 0, 0) - - found := []horizon.Transaction{} - called := 0 - err := EventStore{Client: client}.ForEachTransaction(start, finish, func(tx horizon.Transaction) error { - called++ - found = append(found, tx) - return io.EOF - }) - assert.EqualError(t, err, "EOF") - - assert.Equal(t, 1, called) - client.AssertExpectations(t) - // Should find just the first record - assert.Equal(t, result.Embedded.Records[:1], found) - }) - - t.Run("pagination within a ledger", func(t *testing.T) { - client := &mockTransactionClient{} - result := []horizon.Transaction{} - for i := 0; i < 210; i++ { - result = append( - result, - horizon.Transaction{ID: fmt.Sprintf("%d", i), PT: toid.New(1, int32(i), 0).String()}, - ) - } - pages := []horizon.TransactionsPage{{}, {}} - pages[0].Embedded.Records = result[:200] - pages[1].Embedded.Records = result[200:] - start := toid.New(1, 0, 0) - finish := toid.New(2, 0, 0) - - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: start.String(), - Limit: 200, - IncludeFailed: false, - }).Return(pages[0], nil).Once() - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: pages[0].Embedded.Records[199].PagingToken(), - Limit: 200, - IncludeFailed: false, - }).Return(pages[1], nil).Once() - - found := []horizon.Transaction{} - called := 0 - err := EventStore{Client: client}.ForEachTransaction(start, finish, func(tx horizon.Transaction) error { - called++ - found = append(found, tx) - return nil - }) - assert.NoError(t, err) - - assert.Equal(t, 210, called) - client.AssertExpectations(t) - // Should find all transactions - assert.Equal(t, result, found) - }) - - t.Run("pagination across ledgers", func(t *testing.T) { - client := &mockTransactionClient{} - result := []horizon.Transaction{} - // Create two full pages, split across two ledgers - for ledger := 1; ledger < 3; ledger++ { - for i := 0; i < 180; i++ { - result = append( - result, - horizon.Transaction{ - ID: fmt.Sprintf("%d-%d", ledger, i), - PT: toid.New(int32(ledger), int32(i), 0).String(), - }, - ) - } - } - pages := []horizon.TransactionsPage{{}, {}} - pages[0].Embedded.Records = result[:200] - pages[1].Embedded.Records = result[200:] - start := toid.New(1, 0, 0) - finish := toid.New(3, 0, 0) - - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: start.String(), - Limit: 200, - IncludeFailed: false, - }).Return(pages[0], nil).Once() - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: pages[0].Embedded.Records[199].PagingToken(), - Limit: 200, - IncludeFailed: false, - }).Return(pages[1], nil).Once() - - found := []horizon.Transaction{} - called := 0 - err := EventStore{Client: client}.ForEachTransaction(start, finish, func(tx horizon.Transaction) error { - called++ - found = append(found, tx) - return nil - }) - assert.NoError(t, err) - - assert.Equal(t, 360, called) - client.AssertExpectations(t) - // Should find all transactions - assert.Equal(t, result, found) - }) + }).Valid(1000), "filter 1 invalid: topic 1 invalid: topic cannot have more than 4 segments") } -func TestEventStoreGetEvents(t *testing.T) { - now := time.Now() +func TestGetEvents(t *testing.T) { + now := time.Now().UTC() counter := xdr.ScSymbol("COUNTER") counterScVal := xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter} counterXdr, err := xdr.MarshalBase64(counterScVal) assert.NoError(t, err) t.Run("empty", func(t *testing.T) { - client := &mockTransactionClient{} - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(horizon.TransactionsPage{}, nil).Once() - - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) + _, err = getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 2, - }) + }, 1000) + assert.EqualError(t, err, "[-32600] event store is empty") + }) + + t.Run("startLedger validation", func(t *testing.T) { + contractID := xdr.Hash([32]byte{}) + store, err := events.NewMemoryStore("unit-tests", 100) assert.NoError(t, err) + var txMeta []xdr.TransactionMeta + txMeta = append(txMeta, transactionMetaWithEvents( + []xdr.ContractEvent{ + contractEvent( + contractID, + xdr.ScVec{xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }}, + xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }, + ), + }, + )) + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(2, now.Unix(), txMeta...))) - client.AssertExpectations(t) - assert.Equal(t, []EventInfo(nil), events) + _, err = getEvents(store, GetEventsRequest{ + StartLedger: 1, + }, 1000) + assert.EqualError(t, err, "[-32600] start is before oldest ledger") + + _, err = getEvents(store, GetEventsRequest{ + StartLedger: 3, + }, 1000) + assert.EqualError(t, err, "[-32600] start is after newest ledger") }) t.Run("no filtering returns all", func(t *testing.T) { - client := &mockTransactionClient{} - results := []horizon.Transaction{} contractID := xdr.Hash([32]byte{}) + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) + var txMeta []xdr.TransactionMeta for i := 0; i < 10; i++ { - meta := transactionMetaWithEvents(t, + txMeta = append(txMeta, transactionMetaWithEvents( []xdr.ContractEvent{ contractEvent( contractID, @@ -614,34 +392,23 @@ func TestEventStoreGetEvents(t *testing.T) { }, ), }, - ) - results = append(results, horizon.Transaction{ - ID: fmt.Sprintf("%d", i), - PT: toid.New(1, int32(i), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: meta, - }) + )) } - page := horizon.TransactionsPage{} - page.Embedded.Records = results - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(page, nil).Once() + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + result, err := getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 2, - }) + }, 1000) assert.NoError(t, err) - client.AssertExpectations(t) - expected := []EventInfo{} - for i, tx := range results { - id := EventID{ID: toid.New(tx.Ledger, int32(i), 0), EventOrder: 0}.String() + var expected []EventInfo + for i := range txMeta { + id := events.Cursor{ + Ledger: 1, + Tx: uint32(i + 1), + Op: 0, + Event: 0, + }.String() value, err := xdr.MarshalBase64(xdr.ScVal{ Type: xdr.ScValTypeScvSymbol, Sym: &counter, @@ -649,8 +416,8 @@ func TestEventStoreGetEvents(t *testing.T) { assert.NoError(t, err) expected = append(expected, EventInfo{ EventType: EventTypeContract, - Ledger: tx.Ledger, - LedgerClosedAt: tx.LedgerCloseTime.Format(time.RFC3339), + Ledger: 1, + LedgerClosedAt: now.Format(time.RFC3339), ContractID: "0000000000000000000000000000000000000000000000000000000000000000", ID: id, PagingToken: id, @@ -660,18 +427,19 @@ func TestEventStoreGetEvents(t *testing.T) { }, }) } - assert.Equal(t, expected, events) + assert.Equal(t, expected, result) }) t.Run("filtering by contract id", func(t *testing.T) { - client := &mockTransactionClient{} - results := []horizon.Transaction{} + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) + var txMeta []xdr.TransactionMeta contractIds := []xdr.Hash{ xdr.Hash([32]byte{}), xdr.Hash([32]byte{1}), } for i := 0; i < 5; i++ { - meta := transactionMetaWithEvents(t, + txMeta = append(txMeta, transactionMetaWithEvents( []xdr.ContractEvent{ contractEvent( contractIds[i%len(contractIds)], @@ -685,53 +453,38 @@ func TestEventStoreGetEvents(t *testing.T) { }, ), }, - ) - results = append(results, horizon.Transaction{ - ID: fmt.Sprintf("%d", i), - PT: toid.New(1, int32(i), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: meta, - }) + )) } - page := horizon.TransactionsPage{} - page.Embedded.Records = results - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(page, nil).Once() + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + results, err := getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 2, Filters: []EventFilter{ {ContractIDs: []string{contractIds[0].HexString()}}, }, - }) + }, 1000) assert.NoError(t, err) - client.AssertExpectations(t) expectedIds := []string{ - EventID{ID: toid.New(1, int32(0), 0), EventOrder: 0}.String(), - EventID{ID: toid.New(1, int32(2), 0), EventOrder: 0}.String(), - EventID{ID: toid.New(1, int32(4), 0), EventOrder: 0}.String(), + events.Cursor{Ledger: 1, Tx: 1, Op: 0, Event: 0}.String(), + events.Cursor{Ledger: 1, Tx: 3, Op: 0, Event: 0}.String(), + events.Cursor{Ledger: 1, Tx: 5, Op: 0, Event: 0}.String(), } eventIds := []string{} - for _, event := range events { + for _, event := range results { eventIds = append(eventIds, event.ID) } assert.Equal(t, expectedIds, eventIds) }) t.Run("filtering by topic", func(t *testing.T) { - client := &mockTransactionClient{} - results := []horizon.Transaction{} + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) + var txMeta []xdr.TransactionMeta contractID := xdr.Hash([32]byte{}) for i := 0; i < 10; i++ { number := xdr.Int64(i) - meta := transactionMetaWithEvents(t, + txMeta = append(txMeta, transactionMetaWithEvents( []xdr.ContractEvent{ // Generate a unique topic like /counter/4 for each event so we can check contractEvent( @@ -743,28 +496,13 @@ func TestEventStoreGetEvents(t *testing.T) { xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, ), }, - ) - results = append(results, horizon.Transaction{ - ID: fmt.Sprintf("%d", i), - PT: toid.New(1, int32(i), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: meta, - }) + )) } - page := horizon.TransactionsPage{} - page.Embedded.Records = results - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(page, nil).Once() + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) number := xdr.Int64(4) - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + results, err := getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 2, Filters: []EventFilter{ {Topics: []TopicFilter{ []SegmentFilter{ @@ -773,12 +511,10 @@ func TestEventStoreGetEvents(t *testing.T) { }, }}, }, - }) + }, 1000) assert.NoError(t, err) - client.AssertExpectations(t) - tx := results[4] - id := EventID{ID: toid.New(tx.Ledger, int32(4), 0), EventOrder: 0}.String() + id := events.Cursor{Ledger: 1, Tx: 5, Op: 0, Event: 0}.String() assert.NoError(t, err) value, err := xdr.MarshalBase64(xdr.ScVal{ Type: xdr.ScValTypeScvU63, @@ -788,8 +524,8 @@ func TestEventStoreGetEvents(t *testing.T) { expected := []EventInfo{ { EventType: EventTypeContract, - Ledger: tx.Ledger, - LedgerClosedAt: tx.LedgerCloseTime.Format(time.RFC3339), + Ledger: 1, + LedgerClosedAt: now.Format(time.RFC3339), ContractID: "0000000000000000000000000000000000000000000000000000000000000000", ID: id, PagingToken: id, @@ -797,102 +533,63 @@ func TestEventStoreGetEvents(t *testing.T) { Value: EventInfoValue{XDR: value}, }, } - assert.Equal(t, expected, events) + assert.Equal(t, expected, results) }) t.Run("filtering by both contract id and topic", func(t *testing.T) { - client := &mockTransactionClient{} + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) contractID := xdr.Hash([32]byte{}) otherContractID := xdr.Hash([32]byte{1}) number := xdr.Int64(1) - results := []horizon.Transaction{ + txMeta := []xdr.TransactionMeta{ // This matches neither the contract id nor the topic - { - ID: fmt.Sprintf("%d", 0), - PT: toid.New(1, int32(0), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, - []xdr.ContractEvent{ - contractEvent( - otherContractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - }, - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - ), + transactionMetaWithEvents([]xdr.ContractEvent{ + contractEvent( + otherContractID, + xdr.ScVec{ + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, }, + xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, ), - }, + }), // This matches the contract id but not the topic - { - ID: fmt.Sprintf("%d", 1), - PT: toid.New(1, int32(1), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, - []xdr.ContractEvent{ - contractEvent( - contractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - }, - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - ), + transactionMetaWithEvents([]xdr.ContractEvent{ + contractEvent( + contractID, + xdr.ScVec{ + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, }, + xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, ), - }, + }), // This matches the topic but not the contract id - { - ID: fmt.Sprintf("%d", 2), - PT: toid.New(1, int32(2), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, - []xdr.ContractEvent{ - contractEvent( - otherContractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - }, - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - ), + transactionMetaWithEvents([]xdr.ContractEvent{ + contractEvent( + otherContractID, + xdr.ScVec{ + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, + xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, }, + xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, ), - }, + }), // This matches both the contract id and the topic - { - ID: fmt.Sprintf("%d", 3), - PT: toid.New(1, int32(3), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, - []xdr.ContractEvent{ - contractEvent( - contractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - }, - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - ), + transactionMetaWithEvents([]xdr.ContractEvent{ + contractEvent( + contractID, + xdr.ScVec{ + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, + xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, }, + xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, ), - }, + }), } - page := horizon.TransactionsPage{} - page.Embedded.Records = results - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(page, nil).Once() + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + results, err := getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 2, Filters: []EventFilter{ { ContractIDs: []string{contractID.HexString()}, @@ -904,12 +601,10 @@ func TestEventStoreGetEvents(t *testing.T) { }, }, }, - }) + }, 1000) assert.NoError(t, err) - client.AssertExpectations(t) - tx := results[3] - id := EventID{ID: toid.New(tx.Ledger, int32(3), 0), EventOrder: 0}.String() + id := events.Cursor{Ledger: 1, Tx: 4, Op: 0, Event: 0}.String() value, err := xdr.MarshalBase64(xdr.ScVal{ Type: xdr.ScValTypeScvU63, U63: &number, @@ -918,8 +613,8 @@ func TestEventStoreGetEvents(t *testing.T) { expected := []EventInfo{ { EventType: EventTypeContract, - Ledger: tx.Ledger, - LedgerClosedAt: tx.LedgerCloseTime.Format(time.RFC3339), + Ledger: 1, + LedgerClosedAt: now.Format(time.RFC3339), ContractID: contractID.HexString(), ID: id, PagingToken: id, @@ -927,64 +622,47 @@ func TestEventStoreGetEvents(t *testing.T) { Value: EventInfoValue{XDR: value}, }, } - assert.Equal(t, expected, events) + assert.Equal(t, expected, results) }) t.Run("filtering by event type", func(t *testing.T) { - client := &mockTransactionClient{} + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) contractID := xdr.Hash([32]byte{}) - results := []horizon.Transaction{ - { - ID: fmt.Sprintf("%d", 0), - PT: toid.New(1, int32(0), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, - []xdr.ContractEvent{ - contractEvent( - contractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - }, - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - ), - systemEvent( - contractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - }, - xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, - ), + txMeta := []xdr.TransactionMeta{ + transactionMetaWithEvents([]xdr.ContractEvent{ + contractEvent( + contractID, + xdr.ScVec{ + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, }, + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, ), - }, + systemEvent( + contractID, + xdr.ScVec{ + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, + }, + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter}, + ), + }), } - page := horizon.TransactionsPage{} - page.Embedded.Records = results - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(page, nil).Once() + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + results, err := getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 2, Filters: []EventFilter{ {EventType: EventTypeSystem}, }, - }) + }, 1000) assert.NoError(t, err) - client.AssertExpectations(t) - tx := results[0] - id := EventID{ID: toid.New(tx.Ledger, int32(0), 0), EventOrder: 1}.String() + id := events.Cursor{Ledger: 1, Tx: 1, Op: 0, Event: 1}.String() expected := []EventInfo{ { EventType: EventTypeSystem, - Ledger: tx.Ledger, - LedgerClosedAt: tx.LedgerCloseTime.Format(time.RFC3339), + Ledger: 1, + LedgerClosedAt: now.Format(time.RFC3339), ContractID: contractID.HexString(), ID: id, PagingToken: id, @@ -992,123 +670,76 @@ func TestEventStoreGetEvents(t *testing.T) { Value: EventInfoValue{XDR: counterXdr}, }, } - assert.Equal(t, expected, events) + assert.Equal(t, expected, results) }) - t.Run("pagination", func(t *testing.T) { - client := &mockTransactionClient{} + t.Run("with limit", func(t *testing.T) { + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) contractID := xdr.Hash([32]byte{}) - results := []horizon.Transaction{} + var txMeta []xdr.TransactionMeta for i := 0; i < 180; i++ { number := xdr.Int64(i) - results = append(results, horizon.Transaction{ - ID: fmt.Sprintf("%d", i), - PT: toid.New(1, int32(i), 0).String(), - Ledger: 1, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, - []xdr.ContractEvent{ - contractEvent( - contractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - }, - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - ), - }, - ), - }) - } - for i := 180; i < 210; i++ { - number := xdr.Int64(i) - results = append(results, horizon.Transaction{ - ID: fmt.Sprintf("%d", i), - PT: toid.New(2, int32(i-180), 0).String(), - Ledger: 2, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, - []xdr.ContractEvent{ - contractEvent( - contractID, - xdr.ScVec{ - xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - }, + txMeta = append(txMeta, transactionMetaWithEvents( + []xdr.ContractEvent{ + contractEvent( + contractID, + xdr.ScVec{ xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, - ), - }, - ), - }) + }, + xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number}, + ), + }, + )) } - pages := []horizon.TransactionsPage{{}, {}} - pages[0].Embedded.Records = results[:200] - pages[1].Embedded.Records = results[200:] - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(1, 0, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(pages[0], nil).Once() - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(2, 19, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(pages[1], nil).Once() - - // Find one on the second page - number := xdr.Int64(205) - numberScVal := xdr.ScVal{Type: xdr.ScValTypeScvU63, U63: &number} - numberXdr, err := xdr.MarshalBase64(numberScVal) - assert.NoError(t, err) + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + results, err := getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 2, - Filters: []EventFilter{ - {Topics: []TopicFilter{ - []SegmentFilter{ - {scval: &numberScVal}, - }, - }}, - }, - }) + Filters: []EventFilter{}, + Pagination: &PaginationOptions{Limit: 10}, + }, 1000) assert.NoError(t, err) - client.AssertExpectations(t) - tx := results[205] - id := EventID{ID: toid.New(tx.Ledger, int32(25), 0), EventOrder: 0}.String() - expected := []EventInfo{ - { + var expected []EventInfo + for i := 0; i < 10; i++ { + id := events.Cursor{ + Ledger: 1, + Tx: uint32(i + 1), + Op: 0, + Event: 0, + }.String() + value, err := xdr.MarshalBase64(txMeta[i].MustV3().Events[0].Events[0].Body.MustV0().Data) + assert.NoError(t, err) + expected = append(expected, EventInfo{ EventType: EventTypeContract, - Ledger: tx.Ledger, - LedgerClosedAt: tx.LedgerCloseTime.Format(time.RFC3339), - ContractID: contractID.HexString(), + Ledger: 1, + LedgerClosedAt: now.Format(time.RFC3339), + ContractID: "0000000000000000000000000000000000000000000000000000000000000000", ID: id, PagingToken: id, - Topic: []string{numberXdr}, - Value: EventInfoValue{XDR: numberXdr}, - }, + Topic: []string{value}, + Value: EventInfoValue{ + XDR: value, + }, + }) } - assert.Equal(t, expected, events) + assert.Equal(t, expected, results) }) t.Run("starting cursor in the middle of operations and events", func(t *testing.T) { - client := &mockTransactionClient{} + store, err := events.NewMemoryStore("unit-tests", 100) + assert.NoError(t, err) contractID := xdr.Hash([32]byte{}) - results := []horizon.Transaction{} datas := []xdr.ScSymbol{ // ledger/transaction/operation/event - xdr.ScSymbol("5/2/0/0"), - xdr.ScSymbol("5/2/0/1"), - xdr.ScSymbol("5/2/1/0"), - xdr.ScSymbol("5/2/1/1"), + xdr.ScSymbol("5/1/0/0"), + xdr.ScSymbol("5/1/0/1"), + xdr.ScSymbol("5/1/1/0"), + xdr.ScSymbol("5/1/1/1"), } - results = append(results, horizon.Transaction{ - ID: "l5/t2", - PT: toid.New(5, int32(2), 0).String(), - Ledger: 5, - LedgerCloseTime: now, - ResultMetaXdr: transactionMetaWithEvents(t, + txMeta := []xdr.TransactionMeta{ + transactionMetaWithEvents( []xdr.ContractEvent{ contractEvent( contractID, @@ -1142,54 +773,130 @@ func TestEventStoreGetEvents(t *testing.T) { ), }, ), - }) - page := horizon.TransactionsPage{} - page.Embedded.Records = results - client.On("Transactions", horizonclient.TransactionRequest{ - Order: horizonclient.Order("asc"), - Cursor: toid.New(5, 2, 0).String(), - Limit: 200, - IncludeFailed: false, - }).Return(page, nil).Once() + } + assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...))) - id := EventID{ID: toid.New(5, 2, 1), EventOrder: 1}.String() - events, err := EventStore{Client: client}.GetEvents(GetEventsRequest{ + id := events.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 0}.String() + results, err := getEvents(store, GetEventsRequest{ StartLedger: 1, - EndLedger: 6, Pagination: &PaginationOptions{ Cursor: id, + Limit: 2, }, - }) + }, 1000) assert.NoError(t, err) - expectedXdr, err := xdr.MarshalBase64(xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &datas[len(datas)-1]}) - assert.NoError(t, err) - client.AssertExpectations(t) - tx := results[0] - expected := []EventInfo{ - { + var expected []EventInfo + expectedIDs := []string{ + events.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 1}.String(), + events.Cursor{Ledger: 5, Tx: 1, Op: 1, Event: 0}.String(), + } + symbols := datas[1:3] + for i, id := range expectedIDs { + expectedXdr, err := xdr.MarshalBase64(xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &symbols[i]}) + assert.NoError(t, err) + expected = append(expected, EventInfo{ EventType: EventTypeContract, - Ledger: tx.Ledger, - LedgerClosedAt: tx.LedgerCloseTime.Format(time.RFC3339), + Ledger: 5, + LedgerClosedAt: now.Format(time.RFC3339), ContractID: contractID.HexString(), ID: id, PagingToken: id, Topic: []string{counterXdr}, Value: EventInfoValue{XDR: expectedXdr}, - }, + }) } - assert.Equal(t, expected, events) + assert.Equal(t, expected, results) }) } -func transactionMetaWithEvents(t *testing.T, events ...[]xdr.ContractEvent) string { - operationEvents := []xdr.OperationEvents{} +func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ...xdr.TransactionMeta) xdr.LedgerCloseMeta { + var txProcessing []xdr.TransactionResultMetaV2 + var phases []xdr.TransactionPhase + + for _, item := range txMeta { + var operations []xdr.Operation + for range item.MustV3().Events { + operations = append(operations, + xdr.Operation{ + Body: xdr.OperationBody{ + Type: xdr.OperationTypeInvokeHostFunction, + InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{ + Function: xdr.HostFunction{ + Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract, + InvokeArgs: &xdr.ScVec{}, + }, + }, + }, + }) + } + envelope := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Operations: operations, + }, + }, + } + txHash, err := network.HashTransactionInEnvelope(envelope, "unit-tests") + if err != nil { + panic(err) + } + + txProcessing = append(txProcessing, xdr.TransactionResultMetaV2{ + TxApplyProcessing: item, + Result: xdr.TransactionResultPairV2{ + TransactionHash: txHash, + }, + }) + components := []xdr.TxSetComponent{ + { + Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{ + envelope, + }, + }, + }, + } + phases = append(phases, xdr.TransactionPhase{ + V: 0, + V0Components: &components, + }) + } + + return xdr.LedgerCloseMeta{ + V: 2, + V2: &xdr.LedgerCloseMetaV2{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Hash: xdr.Hash{}, + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: xdr.TimePoint(closeTimestamp), + }, + LedgerSeq: xdr.Uint32(sequence), + }, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + PreviousLedgerHash: xdr.Hash{}, + Phases: phases, + }, + }, + TxProcessing: txProcessing, + }, + } +} +func transactionMetaWithEvents(events ...[]xdr.ContractEvent) xdr.TransactionMeta { + var operationEvents []xdr.OperationEvents for _, e := range events { operationEvents = append(operationEvents, xdr.OperationEvents{ Events: e, }) } - meta, err := xdr.MarshalBase64(xdr.TransactionMeta{ + return xdr.TransactionMeta{ V: 3, Operations: &[]xdr.OperationMeta{}, V3: &xdr.TransactionMetaV3{ @@ -1201,9 +908,7 @@ func transactionMetaWithEvents(t *testing.T, events ...[]xdr.ContractEvent) stri }, Events: operationEvents, }, - }) - assert.NoError(t, err) - return meta + } } func contractEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr.ContractEvent { @@ -1233,12 +938,3 @@ func systemEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr.Con }, } } - -type mockTransactionClient struct { - mock.Mock -} - -func (m *mockTransactionClient) Transactions(request horizonclient.TransactionRequest) (horizon.TransactionsPage, error) { - args := m.Called(request) - return args.Get(0).(horizon.TransactionsPage), args.Error(1) -} diff --git a/cmd/soroban-rpc/internal/test/integration.go b/cmd/soroban-rpc/internal/test/integration.go index fd2694e475..53be69cea0 100644 --- a/cmd/soroban-rpc/internal/test/integration.go +++ b/cmd/soroban-rpc/internal/test/integration.go @@ -115,6 +115,7 @@ func (i *Test) launchDaemon() { LedgerRetentionWindow: 17280, // Needed when Core is run with ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true CheckpointFrequency: 8, + MaxEventsLimit: 10000, } i.daemon = daemon.MustNew(config) i.server = httptest.NewServer(i.daemon) diff --git a/cmd/soroban-rpc/main.go b/cmd/soroban-rpc/main.go index b098582073..b3fd96e2d1 100644 --- a/cmd/soroban-rpc/main.go +++ b/cmd/soroban-rpc/main.go @@ -21,7 +21,7 @@ import ( func main() { var endpoint, horizonURL, stellarCoreURL, binaryPath, configPath, friendbotURL, networkPassphrase, dbPath, captivecoreStoragePath string - var captiveCoreHTTPPort, ledgerEntryStorageTimeoutMinutes uint + var captiveCoreHTTPPort, ledgerEntryStorageTimeoutMinutes, maxEventsLimit uint var checkpointFrequency uint32 var useDB bool var historyArchiveURLs []string @@ -198,6 +198,14 @@ func main() { " the default value is 17280 which corresponds to about 24 hours of ledgers", ConfigKey: &ledgerRetentionWindow, }, + { + Name: "max-events-limit", + ConfigKey: &maxEventsLimit, + OptType: types.Uint, + Required: false, + FlagDefault: uint(10000), + Usage: "Maximum amount of events allowed in a single getEvents response", + }, } cmd := &cobra.Command{ Use: "soroban-rpc", @@ -231,6 +239,7 @@ func main() { SQLiteDBPath: dbPath, CheckpointFrequency: checkpointFrequency, LedgerRetentionWindow: ledgerRetentionWindow, + MaxEventsLimit: maxEventsLimit, } exitCode := daemon.Run(config, endpoint) os.Exit(exitCode)