Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ var (
// at the base level of this bucket several prefixed keys are stored which
// house channel meta-data such as total satoshis sent, number of updates
// etc. These fields are stored at this top level rather than within a
// node's channel bucket in orer to facilitate sequential prefix scans
// node's channel bucket in order to facilitate sequential prefix scans
// to gather stats such as total satoshis received.
openChannelBucket = []byte("ocb")

// chanIDBucket is a thrid-level bucket stored within a node's ID bucket
// chanIDBucket is a third-level bucket stored within a node's ID bucket
// in the open channel bucket. The resolution path looks something like:
// ocb -> nodeID -> cib. This bucket contains a series of keys with no
// values, these keys are the channel ID's of all the active channels
Expand Down
98 changes: 92 additions & 6 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,32 @@ import (
)

const (
dbName = "channel.db"
dbName = "channel.db"
dbFilePermission = 0600
)

// Migration is a function which takes a prior outdated version of the database
// instances and mutates the key/bucket structure to arrive at a more up-to-date
// version of the database.
type migration func(tx *bolt.Tx) error

type version struct {
number uint32
migration migration
}

var (
// DBVersions is storing all versions of database. If current version of
// database don't match with latest version this list will be used for
// retrieving all migration function that are need to apply to the
// current db.
DBVersions = []version{
{
number: 1,
migration: nil, // The base DB version requires no migration
},
}

// Big endian is the preferred byte order, due to cursor scans over integer
// keys iterating in order.
byteOrder = binary.BigEndian
Expand All @@ -32,9 +54,9 @@ var bufPool = &sync.Pool{
// information related to nodes, routing data, open/closed channels, fee
// schedules, and reputation data.
type DB struct {
store *bolt.DB

store *bolt.DB
netParams *chaincfg.Params
dbPath string
}

// Open opens an existing channeldb created under the passed namespace with
Expand All @@ -49,12 +71,16 @@ func Open(dbPath string, netParams *chaincfg.Params) (*DB, error) {
}
}

bdb, err := bolt.Open(path, 0600, nil)
bdb, err := bolt.Open(path, dbFilePermission, nil)
if err != nil {
return nil, err
}

return &DB{store: bdb, netParams: netParams}, nil
return &DB{
store: bdb,
netParams: netParams,
dbPath: dbPath,
}, nil
}

// Wipe completely deletes all saved state within all used buckets within the
Expand Down Expand Up @@ -103,7 +129,7 @@ func createChannelDB(dbPath string) error {
}

path := filepath.Join(dbPath, dbName)
bdb, err := bolt.Open(path, 0600, nil)
bdb, err := bolt.Open(path, dbFilePermission, nil)
if err != nil {
return err
}
Expand All @@ -125,6 +151,10 @@ func createChannelDB(dbPath string) error {
return err
}

if _, err := tx.CreateBucket(metaBucket); err != nil {
return err
}

return nil
})
if err != nil {
Expand Down Expand Up @@ -268,3 +298,59 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {

return channels, err
}

// SyncVersions function is used for safe db version synchronization. It applies
// migration functions to the current database and recovers the previous
// state of db if at least one error/panic appeared during migration.
func (d *DB) SyncVersions(versions []version) error {
meta, err := d.FetchMeta(nil)
if err != nil {
return err
}

latestVersion := getLatestDBVersion(versions)

if meta.dbVersionNumber < latestVersion {
migrations := getMigrationsToApply(versions, meta.dbVersionNumber)

return d.store.Update(func(tx *bolt.Tx) error {
for _, migration := range migrations {
if migration == nil {
continue
}

if err := migration(tx); err != nil {
return err
}
}

meta.dbVersionNumber = latestVersion
if err := d.PutMeta(meta, tx); err != nil {
return err
}

return nil
})

}

return nil
}

func getLatestDBVersion(versions []version) uint32 {
return versions[len(versions)-1].number
}

// getMigrationsToApply retrieves the migration function that should be
// applied to the database.
func getMigrationsToApply(versions []version, version uint32) []migration {
migrations := make([]migration, 0, len(versions))

for _, v := range versions {
if v.number > version {
migrations = append(migrations, v.migration)
}
}

return migrations
}
1 change: 1 addition & 0 deletions channeldb/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ var (
ErrDuplicateInvoice = fmt.Errorf("invoice with payment hash already exists")

ErrNodeNotFound = fmt.Errorf("link node with target identity not found")
ErrMetaNotFound = fmt.Errorf("unable to locate meta information")
)
90 changes: 90 additions & 0 deletions channeldb/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package channeldb

import (
"github.com/boltdb/bolt"
)

var (
// metaBucket stores all the meta information concerning the state of
// the database.
metaBucket = []byte("metadata")

// dbVersionKey is a boltdb key and it's used for storing/retrieveing
// current database version.
dbVersionKey = []byte("dbp")
)

// Meta structure holds the database meta information.
type Meta struct {
dbVersionNumber uint32
}

// FetchMeta fetches the meta data from boltdb and returns filled meta
// structure. If transaction object is specified then it will be used rather
// than initiation creation of new one.
func (d *DB) FetchMeta(tx *bolt.Tx) (*Meta, error) {
meta := &Meta{}
fetchMeta := func(tx *bolt.Tx) error {
if metaBucket := tx.Bucket(metaBucket); metaBucket != nil {
fetchDbVersion(metaBucket, meta)
return nil
} else {
return ErrMetaNotFound
}
}

var err error

if tx == nil {
err = d.store.View(fetchMeta)
} else {
err = fetchMeta(tx)
}

if err != nil {
return nil, err
}

return meta, nil
}

// PutMeta gets as input meta structure and put it into boltdb. If transaction
// object is specified then it will be used rather than initiation creation of
// new one.
func (d *DB) PutMeta(meta *Meta, tx *bolt.Tx) error {
putMeta := func(tx *bolt.Tx) error {
metaBucket := tx.Bucket(metaBucket)
if metaBucket == nil {
return ErrMetaNotFound
}

if err := putDbVersion(metaBucket, meta); err != nil {
return err
}

return nil
}

if tx == nil {
return d.store.Update(putMeta)
} else {
return putMeta(tx)
}
}

func putDbVersion(metaBucket *bolt.Bucket, meta *Meta) error {
scratch := make([]byte, 4)
byteOrder.PutUint32(scratch, meta.dbVersionNumber)
if err := metaBucket.Put(dbVersionKey, scratch); err != nil {
return err
}
return nil
}

func fetchDbVersion(metaBucket *bolt.Bucket, meta *Meta) {
if data := metaBucket.Get(dbVersionKey); data != nil {
meta.dbVersionNumber = byteOrder.Uint32(data)
} else {
meta.dbVersionNumber = getLatestDBVersion(DBVersions)
}
}
Loading