Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 54 additions & 26 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ const (
type ChannelGraph struct {
db kvdb.Backend

// cacheMu guards all caches (rejectCache, chanCache, graphCache). If
// this mutex will be acquired at the same time as the DB mutex then
// the cacheMu MUST be acquired first to prevent deadlock.
cacheMu sync.RWMutex
rejectCache *rejectCache
chanCache *channelCache
Expand Down Expand Up @@ -1331,8 +1334,8 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// will be returned if that outpoint isn't known to be
// a channel. If no error is returned, then a channel
// was successfully pruned.
err = c.delChannelEdge(
edges, edgeIndex, chanIndex, zombieIndex, nodes,
err = c.delChannelEdgeUnsafe(
edges, edgeIndex, chanIndex, zombieIndex,
chanID, false, false,
)
if err != nil && err != ErrEdgeNotFound {
Expand Down Expand Up @@ -1562,10 +1565,6 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
if err != nil {
return err
}
nodes, err := tx.CreateTopLevelBucket(nodeBucket)
Comment thread
ellemouton marked this conversation as resolved.
Outdated
if err != nil {
return err
}

// Scan from chanIDStart to chanIDEnd, deleting every
// found edge.
Expand All @@ -1590,8 +1589,8 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
}

for _, k := range keys {
err = c.delChannelEdge(
edges, edgeIndex, chanIndex, zombieIndex, nodes,
err = c.delChannelEdgeUnsafe(
edges, edgeIndex, chanIndex, zombieIndex,
k, false, false,
)
if err != nil && err != ErrEdgeNotFound {
Expand Down Expand Up @@ -1734,8 +1733,8 @@ func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
var rawChanID [8]byte
for _, chanID := range chanIDs {
byteOrder.PutUint64(rawChanID[:], chanID)
err := c.delChannelEdge(
edges, edgeIndex, chanIndex, zombieIndex, nodes,
err := c.delChannelEdgeUnsafe(
edges, edgeIndex, chanIndex, zombieIndex,
rawChanID[:], markZombie, strictZombiePruning,
)
if err != nil {
Expand Down Expand Up @@ -2091,6 +2090,9 @@ func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,

var newChanIDs []uint64

c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
Expand Down Expand Up @@ -2143,7 +2145,7 @@ func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
// and we let it be added to the set of IDs to
// query our peer for.
case isZombie && !isStillZombie:
err := c.markEdgeLive(tx, scid)
err := c.markEdgeLiveUnsafe(tx, scid)
Comment thread
ziggie1984 marked this conversation as resolved.
Outdated
if err != nil {
return err
}
Expand Down Expand Up @@ -2355,15 +2357,19 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
// skipped and the result will contain only those edges that exist at the time
// of the query. This can be used to respond to peer queries that are seeking to
// fill in gaps in their view of the channel graph.
func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
//
// NOTE: An optional transaction may be provided. If none is provided, then a
// new one will be created.
func (c *ChannelGraph) FetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
[]ChannelEdge, error) {
// TODO(roasbeef): sort cids?

var (
chanEdges []ChannelEdge
cidBytes [8]byte
)

err := kvdb.View(c.db, func(tx kvdb.RTx) error {
fetchChanInfos := func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand Down Expand Up @@ -2425,9 +2431,20 @@ func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
})
}
return nil
}, func() {
chanEdges = nil
})
}

if tx == nil {
err := kvdb.View(c.db, fetchChanInfos, func() {
chanEdges = nil
})
if err != nil {
return nil, err
}

return chanEdges, nil
}

err := fetchChanInfos(tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2473,8 +2490,16 @@ func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
return nil
}

func (c *ChannelGraph) delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
nodes kvdb.RwBucket, chanID []byte, isZombie, strictZombie bool) error {
Comment thread
ellemouton marked this conversation as resolved.
Outdated
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
// cache. It then goes on to delete any policy info and edge info for this
// channel from the DB and finally, if isZombie is true, it will add an entry
// for this channel in the zombie index.
//
// NOTE: this method MUST only be called if the cacheMu has already been
// acquired.
func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
strictZombie bool) error {

edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
if err != nil {
Expand Down Expand Up @@ -3612,16 +3637,19 @@ func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,

// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
return c.markEdgeLive(nil, chanID)
}

// markEdgeLive clears an edge from the zombie index. This method can be called
// with an existing kvdb.RwTx or the argument can be set to nil in which case a
// new transaction will be created.
func (c *ChannelGraph) markEdgeLive(tx kvdb.RwTx, chanID uint64) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

return c.markEdgeLiveUnsafe(nil, chanID)
}

// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
// called with an existing kvdb.RwTx or the argument can be set to nil in which
// case a new transaction will be created.
//
// NOTE: this method MUST only be called if the cacheMu has already been
// acquired.
func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
dbFn := func(tx kvdb.RwTx) error {
edges := tx.ReadWriteBucket(edgeBucket)
if edges == nil {
Expand Down Expand Up @@ -3660,7 +3688,7 @@ func (c *ChannelGraph) markEdgeLive(tx kvdb.RwTx, chanID uint64) error {
// We need to add the channel back into our graph cache, otherwise we
// won't use it for path finding.
if c.graphCache != nil {
edgeInfos, err := c.FetchChanInfos([]uint64{chanID})
edgeInfos, err := c.FetchChanInfos(tx, []uint64{chanID})
if err != nil {
return err
}
Expand Down
Loading