Skip to content
8 changes: 8 additions & 0 deletions channeldb/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,14 @@ func TestEdgeInfoUpdates(t *testing.T) {

// Create an edge and add it to the db.
edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2)

// Make sure inserting the policy at this point, before the edge info
// is added, will fail.
if err := graph.UpdateEdgePolicy(edge1); err != ErrEdgeNotFound {
t.Fatalf("expected ErrEdgeNotFound, got: %v", err)
}

// Add the edge info.
if err := graph.AddChannelEdge(edgeInfo); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion channeldb/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
if err == nil && shouldFail {
t.Fatal("error wasn't received on migration stage")
} else if err != nil && !shouldFail {
t.Fatal("error was received on migration stage")
t.Fatalf("error was received on migration stage: %v", err)
}

// afterMigration usually used for checking the database state and
Expand Down
101 changes: 80 additions & 21 deletions channeldb/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,52 +323,111 @@ func migrateEdgePolicies(tx *bbolt.Tx) error {
return nil
}

// checkKey gets the policy from the database with a low-level call
// checkEdge gets the policy from the database with a low-level call
// so that it is still possible to distinguish between unknown and
// not present.
checkKey := func(channelId uint64, keyBytes []byte) error {
checkEdge := func(channelId uint64, nodeA, nodeB [33]byte) error {
var channelID [8]byte
byteOrder.PutUint64(channelID[:], channelId)

_, err := fetchChanEdgePolicy(edges,
channelID[:], keyBytes, nodes)

if err == ErrEdgeNotFound {
log.Tracef("Adding unknown edge policy present for node %x, channel %v",
keyBytes, channelId)
// To fix buggy DBs where an edge was added without a
// corresponding node, first check that the nodes exist in the
// graph, add them if not.
for _, nodePub := range [][33]byte{nodeA, nodeB} {
b := nodes.Get(nodePub[:])
if b == nil {
log.Tracef("Adding missing node %x", nodePub[:])
node := &LightningNode{
PubKeyBytes: nodePub,
HaveNodeAnnouncement: false,
Comment thread
joostjager marked this conversation as resolved.
Outdated
}
err := addLightningNode(tx, node)
if err != nil {
return fmt.Errorf("unable to add "+
"node %x", nodePub[:])
}
}
}

err := putChanEdgePolicyUnknown(edges, channelId, keyBytes)
if err != nil {
// Check both policies, adding an "unknown" policy if not
// found.
for _, nodePub := range [][33]byte{nodeA, nodeB} {
_, err := fetchChanEdgePolicy(edges,
channelID[:], nodePub[:], nodes)

if err == ErrEdgeNotFound {
log.Tracef("Adding unknown edge policy "+
"present for node %x, channel %v\n",
nodePub[:], channelId)

err := putChanEdgePolicyUnknown(
edges, channelId, nodePub[:],
)
if err != nil {
return err
}

} else if err != nil {
return err
}

return nil
}

return err
return nil
}

// Iterate over all channels and check both edge policies.
err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
infoReader := bytes.NewReader(edgeInfoBytes)
edgeInfo, err := deserializeChanEdgeInfo(infoReader)
if err != nil {
return err
return fmt.Errorf("unable to parse edge info: %v", err)
}

for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:],
edgeInfo.NodeKey2Bytes[:]} {
return checkEdge(
edgeInfo.ChannelID, edgeInfo.NodeKey1Bytes,
edgeInfo.NodeKey2Bytes,
)
})

if err := checkKey(edgeInfo.ChannelID, key); err != nil {
return err
}
if err != nil {
return fmt.Errorf("unable to update edge policies: %v", err)
}

// Finally we iterate over all edge policies and delete those not
// having any correspnding edge info.
var keysToRemove [][]byte
err = edges.ForEach(func(edgeKey, v []byte) error {
// Skip buckets.
if v == nil {
return nil
}

if len(edgeKey) != 33+8 {
return fmt.Errorf("malformed edge key: %x", edgeKey)
}

var chanID [8]byte
copy(chanID[:], edgeKey[33:])

edgeInfoBytes := edgeIndex.Get(chanID[:])

// If no edge info is found, we schedule if for deletion.
if edgeInfoBytes == nil {
keysToRemove = append(keysToRemove, edgeKey)
}

return nil
})

})
if err != nil {
return fmt.Errorf("unable to update edge policies: %v", err)
return fmt.Errorf("unable to iterate edge policies: %v", err)
}

log.Infof("Removing %d policies from edge bucket.", len(keysToRemove))
for _, key := range keysToRemove {
if err := edges.Delete(key); err != nil {
return err
}
}

log.Infof("Migration of edge policies complete!")
Expand Down
Loading