@@ -21,6 +21,7 @@ import (
2121 "context"
2222 "fmt"
2323 "os"
24+ "sync"
2425
2526 "github.com/functionstream/function-stream/common/config"
2627 "github.com/functionstream/function-stream/common/model"
@@ -96,8 +97,10 @@ func NewDefaultPebbleStateStoreFactory() (api.StateStoreFactory, error) {
9697func (fact * PebbleStateStoreFactory ) NewStateStore (f * model.Function ) (api.StateStore , error ) {
9798 if f == nil {
9899 return & PebbleStateStore {
99- db : fact .db ,
100- keyPrefix : []byte {},
100+ db : fact .db ,
101+ keyPrefix : []byte {},
102+ iterators : make (map [int64 ]* pebbleIteratorInfo ),
103+ nextIteratorID : 1 ,
101104 }, nil
102105 }
103106 c := & PebbleStateStoreConfig {}
@@ -112,18 +115,28 @@ func (fact *PebbleStateStoreFactory) NewStateStore(f *model.Function) (api.State
112115 keyPrefix = []byte (f .Name )
113116 }
114117 return & PebbleStateStore {
115- db : fact .db ,
116- keyPrefix : keyPrefix ,
118+ db : fact .db ,
119+ keyPrefix : keyPrefix ,
120+ iterators : make (map [int64 ]* pebbleIteratorInfo ),
121+ nextIteratorID : 1 ,
117122 }, nil
118123}
119124
120125func (fact * PebbleStateStoreFactory ) Close () error {
121126 return fact .db .Close ()
122127}
123128
129+ type pebbleIteratorInfo struct {
130+ iter * pebble.Iterator
131+ prefix []byte
132+ }
133+
124134type PebbleStateStore struct {
125- db * pebble.DB
126- keyPrefix []byte
135+ db * pebble.DB
136+ keyPrefix []byte
137+ iterators map [int64 ]* pebbleIteratorInfo
138+ nextIteratorID int64
139+ iteratorMu sync.RWMutex
127140}
128141
129142func (s * PebbleStateStore ) getKey (key string ) []byte {
@@ -432,5 +445,164 @@ func (pb *PebbleStateStore) Merge(ctx context.Context, keyGroup, key, namespace,
432445}
433446
434447func (s * PebbleStateStore ) Close () error {
448+ // Close all active iterators
449+ s .iteratorMu .Lock ()
450+ for id , info := range s .iterators {
451+ if info .iter != nil {
452+ _ = info .iter .Close ()
453+ }
454+ delete (s .iterators , id )
455+ }
456+ s .iteratorMu .Unlock ()
457+ return nil
458+ }
459+
460+ // NewIterator creates a new iterator with the specified prefix and returns its ID.
461+ // The iterator will iterate over all keys that start with the given prefix.
462+ func (s * PebbleStateStore ) NewIterator (prefix []byte ) (int64 , error ) {
463+ s .iteratorMu .Lock ()
464+ defer s .iteratorMu .Unlock ()
465+
466+ // Generate a new iterator ID
467+ id := s .nextIteratorID
468+ s .nextIteratorID ++
469+
470+ // Add key prefix if present
471+ var fullPrefix []byte
472+ if len (s .keyPrefix ) > 0 {
473+ fullPrefix = make ([]byte , 0 , len (s .keyPrefix )+ len (prefix ))
474+ fullPrefix = append (fullPrefix , s .keyPrefix ... )
475+ fullPrefix = append (fullPrefix , prefix ... )
476+ } else {
477+ fullPrefix = make ([]byte , len (prefix ))
478+ copy (fullPrefix , prefix )
479+ }
480+
481+ // Create a new iterator
482+ iter , err := s .db .NewIter (& pebble.IterOptions {
483+ LowerBound : fullPrefix ,
484+ })
485+ if err != nil {
486+ return 0 , fmt .Errorf ("failed to create iterator: %w" , err )
487+ }
488+
489+ // Store prefix for later validation
490+ prefixCopy := make ([]byte , len (prefix ))
491+ copy (prefixCopy , prefix )
492+
493+ // Seek to the prefix
494+ if len (fullPrefix ) > 0 {
495+ iter .SeekGE (fullPrefix )
496+ } else {
497+ // If prefix is empty, seek to the beginning
498+ iter .First ()
499+ }
500+
501+ // Store iterator info
502+ s .iterators [id ] = & pebbleIteratorInfo {
503+ iter : iter ,
504+ prefix : prefixCopy ,
505+ }
506+
507+ return id , nil
508+ }
509+
510+ // HasNext checks if the iterator has a next element.
511+ // It returns true if there is a next element, false otherwise.
512+ func (s * PebbleStateStore ) HasNext (id int64 ) (bool , error ) {
513+ info , exists := s .iterators [id ]
514+ if ! exists {
515+ return false , fmt .Errorf ("iterator with id %d not found" , id )
516+ }
517+
518+ if info .iter == nil {
519+ return false , fmt .Errorf ("iterator with id %d is closed" , id )
520+ }
521+
522+ // Check if iterator is valid
523+ if ! info .iter .Valid () {
524+ return false , nil
525+ }
526+
527+ // If prefix is specified, check if current key still has the prefix
528+ if len (info .prefix ) > 0 {
529+ keyBytes := info .iter .Key ()
530+ // Check if key still has the prefix (considering keyPrefix)
531+ fullPrefix := info .prefix
532+ if len (s .keyPrefix ) > 0 {
533+ fullPrefix = make ([]byte , 0 , len (s .keyPrefix )+ len (info .prefix ))
534+ fullPrefix = append (fullPrefix , s .keyPrefix ... )
535+ fullPrefix = append (fullPrefix , info .prefix ... )
536+ }
537+ if ! bytes .HasPrefix (keyBytes , fullPrefix ) {
538+ return false , nil
539+ }
540+ }
541+
542+ return true , nil
543+ }
544+
545+ // Next returns the value of the next element and advances the iterator.
546+ // It returns the value as a byte slice.
547+ func (s * PebbleStateStore ) Next (id int64 ) ([]byte , error ) {
548+ info , exists := s .iterators [id ]
549+ if ! exists {
550+ return nil , fmt .Errorf ("iterator with id %d not found" , id )
551+ }
552+
553+ if info .iter == nil {
554+ return nil , fmt .Errorf ("iterator with id %d is closed" , id )
555+ }
556+
557+ // Check if iterator is valid
558+ if ! info .iter .Valid () {
559+ return nil , fmt .Errorf ("iterator has no more elements" )
560+ }
561+
562+ // If prefix is specified, verify current key has the prefix
563+ if len (info .prefix ) > 0 {
564+ keyBytes := info .iter .Key ()
565+ fullPrefix := info .prefix
566+ if len (s .keyPrefix ) > 0 {
567+ fullPrefix = make ([]byte , 0 , len (s .keyPrefix )+ len (info .prefix ))
568+ fullPrefix = append (fullPrefix , s .keyPrefix ... )
569+ fullPrefix = append (fullPrefix , info .prefix ... )
570+ }
571+ if ! bytes .HasPrefix (keyBytes , fullPrefix ) {
572+ return nil , fmt .Errorf ("iterator has no more elements" )
573+ }
574+ }
575+
576+ // Get the value
577+ valueBytes := info .iter .Value ()
578+
579+ // Return a copy of the data since iterator data may be invalidated on Next()
580+ result := make ([]byte , len (valueBytes ))
581+ copy (result , valueBytes )
582+
583+ // Advance iterator
584+ info .iter .Next ()
585+
586+ return result , nil
587+ }
588+
589+ // CloseIterator closes the iterator with the specified ID.
590+ func (s * PebbleStateStore ) CloseIterator (id int64 ) error {
591+ s .iteratorMu .Lock ()
592+ defer s .iteratorMu .Unlock ()
593+
594+ info , exists := s .iterators [id ]
595+ if ! exists {
596+ return fmt .Errorf ("iterator with id %d not found" , id )
597+ }
598+
599+ // Close the iterator
600+ if info .iter != nil {
601+ _ = info .iter .Close ()
602+ }
603+
604+ // Remove from map
605+ delete (s .iterators , id )
606+
435607 return nil
436608}
0 commit comments