Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 154 additions & 22 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
var graphTopLevelBuckets = [][]byte{
nodeBucket,
edgeBucket,
edgeIndexBucket,
Comment thread
ellemouton marked this conversation as resolved.
graphMetaBucket,
}

Expand Down Expand Up @@ -2087,10 +2086,12 @@ func (c *ChannelGraph) NodeUpdatesInHorizon(startTime,
// words, we perform a set difference of our set of chan ID's and the ones
// passed in. This method can be used by callers to determine the set of
// channels another peer knows of that we don't.
func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {

var newChanIDs []uint64

err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand All @@ -2108,8 +2109,9 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
// We'll run through the set of chanIDs and collate only the
// set of channel that are unable to be found within our db.
var cidBytes [8]byte
for _, cid := range chanIDs {
byteOrder.PutUint64(cidBytes[:], cid)
for _, info := range chansInfo {
scid := info.ShortChannelID.ToUint64()
byteOrder.PutUint64(cidBytes[:], scid)

// If the edge is already known, skip it.
if v := edgeIndex.Get(cidBytes[:]); v != nil {
Expand All @@ -2118,13 +2120,37 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {

// If the edge is a known zombie, skip it.
if zombieIndex != nil {
isZombie, _, _ := isZombieEdge(zombieIndex, cid)
if isZombie {
isZombie, _, _ := isZombieEdge(
zombieIndex, scid,
)

isStillZombie := isZombieChan(
info.Node1UpdateTimestamp,
info.Node2UpdateTimestamp,
)

switch {
// If the edge is a known zombie and if we
// would still consider it a zombie given the
// latest update timestamps, then we skip this
// channel.
case isZombie && isStillZombie:
continue

// Otherwise, if we have marked it as a zombie
// but the latest update timestamps could bring
// it back from the dead, then we mark it alive,
// and we let it be added to the set of IDs to
// query our peer for.
case isZombie && !isStillZombie:
err := c.markEdgeLive(tx, scid)
Comment thread
ellemouton marked this conversation as resolved.
Outdated
if err != nil {
return err
}
}
}

newChanIDs = append(newChanIDs, cid)
newChanIDs = append(newChanIDs, scid)
}

return nil
Expand All @@ -2135,7 +2161,12 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
// If we don't know of any edges yet, then we'll return the entire set
// of chan IDs specified.
case err == ErrGraphNoEdgesFound:
return chanIDs, nil
ogChanIDs := make([]uint64, len(chansInfo))
for i, info := range chansInfo {
ogChanIDs[i] = info.ShortChannelID.ToUint64()
}

return ogChanIDs, nil

case err != nil:
return nil, err
Expand All @@ -2144,6 +2175,23 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
return newChanIDs, nil
}

// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
// latest received channel updates for the channel.
type ChannelUpdateInfo struct {
// ShortChannelID is the SCID identifier of the channel.
ShortChannelID lnwire.ShortChannelID

// Node1UpdateTimestamp is the timestamp of the latest received update
// from the node 1 channel peer. This will be set to zero time if no
// update has yet been received from this node.
Node1UpdateTimestamp time.Time

// Node2UpdateTimestamp is the timestamp of the latest received update
// from the node 2 channel peer. This will be set to zero time if no
// update has yet been received from this node.
Node2UpdateTimestamp time.Time
}

// BlockChannelRange represents a range of channels for a given block height.
type BlockChannelRange struct {
// Height is the height of the block all of the channels below were
Expand All @@ -2152,17 +2200,20 @@ type BlockChannelRange struct {

// Channels is the list of channels identified by their short ID
// representation known to us that were included in the block height
// above.
Channels []lnwire.ShortChannelID
// above. The list may include channel update timestamp information if
// requested.
Channels []ChannelUpdateInfo
}

// FilterChannelRange returns the channel ID's of all known channels which were
// mined in a block height within the passed range. The channel IDs are grouped
// by their common block height. This method can be used to quickly share with a
// peer the set of channels we know of within a particular range to catch them
// up after a period of time offline.
// up after a period of time offline. If withTimestamps is true then the
// timestamp info of the latest received channel update messages of the channel
// will be included in the response.
func (c *ChannelGraph) FilterChannelRange(startHeight,
endHeight uint32) ([]BlockChannelRange, error) {
endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {

startChanID := &lnwire.ShortChannelID{
BlockHeight: startHeight,
Expand All @@ -2181,7 +2232,7 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())

var channelsPerBlock map[uint32][]lnwire.ShortChannelID
var channelsPerBlock map[uint32][]ChannelUpdateInfo
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
Expand Down Expand Up @@ -2213,14 +2264,60 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
// we'll add it to our returned set.
rawCid := byteOrder.Uint64(k)
cid := lnwire.NewShortChanIDFromInt(rawCid)

chanInfo := ChannelUpdateInfo{
ShortChannelID: cid,
}

if !withTimestamps {
channelsPerBlock[cid.BlockHeight] = append(
channelsPerBlock[cid.BlockHeight],
chanInfo,
)

continue
}

node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)

rawPolicy := edges.Get(node1Key)
if len(rawPolicy) != 0 {
r := bytes.NewReader(rawPolicy)

edge, err := deserializeChanEdgePolicyRaw(r)
if err != nil && !errors.Is(
err, ErrEdgePolicyOptionalFieldNotFound,
) {

return err
}

chanInfo.Node1UpdateTimestamp = edge.LastUpdate
}

rawPolicy = edges.Get(node2Key)
if len(rawPolicy) != 0 {
r := bytes.NewReader(rawPolicy)

edge, err := deserializeChanEdgePolicyRaw(r)
if err != nil && !errors.Is(
err, ErrEdgePolicyOptionalFieldNotFound,
) {

return err
}

chanInfo.Node2UpdateTimestamp = edge.LastUpdate
}

channelsPerBlock[cid.BlockHeight] = append(
channelsPerBlock[cid.BlockHeight], cid,
channelsPerBlock[cid.BlockHeight], chanInfo,
)
}

return nil
}, func() {
channelsPerBlock = make(map[uint32][]lnwire.ShortChannelID)
channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
})

switch {
Expand Down Expand Up @@ -3119,6 +3216,24 @@ func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
return targetNode, err
}

// computeEdgePolicyKeys is a helper function that can be used to compute the
// keys used to index the channel edge policy info for the two nodes of the
// edge. The keys for node 1 and node 2 are returned respectively.
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
var (
node1Key [33 + 8]byte
node2Key [33 + 8]byte
)

copy(node1Key[:], info.NodeKey1Bytes[:])
copy(node2Key[:], info.NodeKey2Bytes[:])

byteOrder.PutUint64(node1Key[33:], info.ChannelID)
byteOrder.PutUint64(node2Key[33:], info.ChannelID)

return node1Key[:], node2Key[:]
}

// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
// the channel identified by the funding outpoint. If the channel can't be
// found, then ErrEdgeNotFound is returned. A struct which houses the general
Expand Down Expand Up @@ -3497,10 +3612,17 @@ func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,

// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
return c.markEdgeLive(nil, chanID)
}

// markEdgeLive clears an edge from the zombie index. This method can be called
// with an existing kvdb.RwTx or the argument can be set to nil in which case a
// new transaction will be created.
func (c *ChannelGraph) markEdgeLive(tx kvdb.RwTx, chanID uint64) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
dbFn := func(tx kvdb.RwTx) error {
edges := tx.ReadWriteBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand All @@ -3518,7 +3640,16 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
}

return zombieIndex.Delete(k[:])
}, func() {})
}

// If the transaction is nil, we'll create a new one. Otherwise, we use
// the existing transaction
var err error
if tx == nil {
err = kvdb.Update(c.db, dbFn, func() {})
} else {
err = dbFn(tx)
}
if err != nil {
return err
}
Expand All @@ -3528,11 +3659,12 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {

// We need to add the channel back into our graph cache, otherwise we
// won't use it for path finding.
edgeInfos, err := c.FetchChanInfos([]uint64{chanID})
if err != nil {
return err
}
if c.graphCache != nil {
edgeInfos, err := c.FetchChanInfos([]uint64{chanID})
if err != nil {
return err
}

for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1,
Expand Down
Loading