diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 27ed7ba5284..378391ec69f 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -729,6 +729,14 @@ func TestEdgeInfoUpdates(t *testing.T) { // Create an edge and add it to the db. edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2) + + // Make sure inserting the policy at this point, before the edge info + // is added, will fail. + if err := graph.UpdateEdgePolicy(edge1); err != ErrEdgeNotFound { + t.Fatalf("expected ErrEdgeNotFound, got: %v", err) + } + + // Add the edge info. if err := graph.AddChannelEdge(edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index 921e55c546c..76d0cb257c4 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), if err == nil && shouldFail { t.Fatal("error wasn't received on migration stage") } else if err != nil && !shouldFail { - t.Fatal("error was received on migration stage") + t.Fatalf("error was received on migration stage: %v", err) } // afterMigration usually used for checking the database state and diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 647502c1a4b..5cded9fe4d8 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -323,29 +323,56 @@ func migrateEdgePolicies(tx *bbolt.Tx) error { return nil } - // checkKey gets the policy from the database with a low-level call + // checkEdge gets the policy from the database with a low-level call // so that it is still possible to distinguish between unknown and // not present. - checkKey := func(channelId uint64, keyBytes []byte) error { + checkEdge := func(channelId uint64, nodeA, nodeB [33]byte) error { var channelID [8]byte byteOrder.PutUint64(channelID[:], channelId) - _, err := fetchChanEdgePolicy(edges, - channelID[:], keyBytes, nodes) - - if err == ErrEdgeNotFound { - log.Tracef("Adding unknown edge policy present for node %x, channel %v", - keyBytes, channelId) + // To fix buggy DBs where an edge was added without a + // corresponding node, first check that the nodes exist in the + // graph, add them if not. + for _, nodePub := range [][33]byte{nodeA, nodeB} { + b := nodes.Get(nodePub[:]) + if b == nil { + log.Tracef("Adding missing node %x", nodePub[:]) + node := &LightningNode{ + PubKeyBytes: nodePub, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, node) + if err != nil { + return fmt.Errorf("unable to add "+ + "node %x", nodePub[:]) + } + } + } - err := putChanEdgePolicyUnknown(edges, channelId, keyBytes) - if err != nil { + // Check both policies, adding an "unknown" policy if not + // found. + for _, nodePub := range [][33]byte{nodeA, nodeB} { + _, err := fetchChanEdgePolicy(edges, + channelID[:], nodePub[:], nodes) + + if err == ErrEdgeNotFound { + log.Tracef("Adding unknown edge policy "+ + "present for node %x, channel %v\n", + nodePub[:], channelId) + + err := putChanEdgePolicyUnknown( + edges, channelId, nodePub[:], + ) + if err != nil { + return err + } + + } else if err != nil { return err } - - return nil } - return err + return nil } // Iterate over all channels and check both edge policies. @@ -353,22 +380,54 @@ func migrateEdgePolicies(tx *bbolt.Tx) error { infoReader := bytes.NewReader(edgeInfoBytes) edgeInfo, err := deserializeChanEdgeInfo(infoReader) if err != nil { - return err + return fmt.Errorf("unable to parse edge info: %v", err) } - for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:], - edgeInfo.NodeKey2Bytes[:]} { + return checkEdge( + edgeInfo.ChannelID, edgeInfo.NodeKey1Bytes, + edgeInfo.NodeKey2Bytes, + ) + }) - if err := checkKey(edgeInfo.ChannelID, key); err != nil { - return err - } + if err != nil { + return fmt.Errorf("unable to update edge policies: %v", err) + } + + // Finally we iterate over all edge policies and delete those not + // having any correspnding edge info. + var keysToRemove [][]byte + err = edges.ForEach(func(edgeKey, v []byte) error { + // Skip buckets. + if v == nil { + return nil + } + + if len(edgeKey) != 33+8 { + return fmt.Errorf("malformed edge key: %x", edgeKey) + } + + var chanID [8]byte + copy(chanID[:], edgeKey[33:]) + + edgeInfoBytes := edgeIndex.Get(chanID[:]) + + // If no edge info is found, we schedule if for deletion. + if edgeInfoBytes == nil { + keysToRemove = append(keysToRemove, edgeKey) } return nil - }) + }) if err != nil { - return fmt.Errorf("unable to update edge policies: %v", err) + return fmt.Errorf("unable to iterate edge policies: %v", err) + } + + log.Infof("Removing %d policies from edge bucket.", len(keysToRemove)) + for _, key := range keysToRemove { + if err := edges.Delete(key); err != nil { + return err + } } log.Infof("Migration of edge policies complete!") diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index ed2829c0e55..b6815182329 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -5,8 +5,10 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + prand "math/rand" "reflect" "testing" + "time" "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" @@ -14,6 +16,475 @@ import ( "github.com/go-errors/errors" ) +func createTestEdge(t *testing.T, db *DB) (*ChannelEdgeInfo, + *ChannelEdgePolicy, *ChannelEdgePolicy) { + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + + height := uint32(prand.Int31()) + edgeInfo, chanID := createEdge(height, 0, 0, 0, node1, node2) + + edge1 := &ChannelEdgePolicy{ + SigBytes: testSig.Serialize(), + ChannelID: chanID.ToUint64(), + LastUpdate: time.Unix(433453, 0), + Flags: 0, + TimeLockDelta: 99, + MinHTLC: 2342135, + FeeBaseMSat: 4352345, + FeeProportionalMillionths: 3452352, + Node: node2, + ExtraOpaqueData: []byte("new unknown feature2"), + db: db, + } + edge2 := &ChannelEdgePolicy{ + SigBytes: testSig.Serialize(), + ChannelID: chanID.ToUint64(), + LastUpdate: time.Unix(124234, 0), + Flags: 1, + TimeLockDelta: 99, + MinHTLC: 2342135, + FeeBaseMSat: 4352345, + FeeProportionalMillionths: 90392423, + Node: node1, + ExtraOpaqueData: []byte("new unknown feature1"), + db: db, + } + + return &edgeInfo, edge1, edge2 +} + +// TestMigrateEdgePolices checks that we properly migrate edge polices, +// regardless of the state of the original database. +func TestMigrateEdgePolicies(t *testing.T) { + t.Parallel() + + // A channel policy can be one of three types: + // 1) in the db + // 2) not in the db (we will migrate away from this) + // 3) in the db, but marked as "unknown" (this is the new format we'll + // migrate to) + type policyStatus int + const ( + existing policyStatus = iota + nonExisting + unknown + ) + + // We generate all combinations of previous database states an edge can + // be in, to make sure we are able to migrate them all without + // problems. + type testCase struct { + policy1Status policyStatus + policy2Status policyStatus + node1Exists bool + node2Exists bool + edgeInfoExists bool + } + + var tests []testCase + for _, pol1 := range []policyStatus{existing, nonExisting, unknown} { + for _, pol2 := range []policyStatus{existing, nonExisting, unknown} { + for _, node1 := range []bool{true, false} { + for _, node2 := range []bool{true, false} { + for _, e := range []bool{true, false} { + tests = append(tests, testCase{ + policy1Status: pol1, + policy2Status: pol2, + node1Exists: node1, + node2Exists: node2, + edgeInfoExists: e, + }) + } + } + } + } + } + + // helper method to set the node's policy to a specific state + // in the db. + setPolicy := func(tx *bbolt.Tx, node [33]byte, channelID uint64, + status policyStatus) error { + + edges := tx.Bucket(edgeBucket) + if edges == nil { + t.Fatalf("edge bucket did not exist") + } + var edgeKey [33 + 8]byte + copy(edgeKey[:], node[:]) + byteOrder.PutUint64(edgeKey[33:], channelID) + + switch status { + + // We keep it, assert it is already there, and not unknown. + case existing: + pol := edges.Get(edgeKey[:]) + if pol == nil || len(pol) < 10 { + return fmt.Errorf("unexpected policy: %v", pol) + } + + // We delete the policy from the DB. + case nonExisting: + err := edges.Delete(edgeKey[:]) + if err != nil { + return fmt.Errorf("unable to delete key: %v", + err) + } + + // We replace the policy in the DB with an unknown policy. + case unknown: + err := edges.Put(edgeKey[:], unknownPolicy) + if err != nil { + return fmt.Errorf("unable to put edgepolicy: "+ + "%v", err) + } + } + return nil + } + + // helper method to add or delete a node in the db. + setNode := func(tx *bbolt.Tx, node [33]byte, exists bool) error { + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return fmt.Errorf("node bucket not found") + } + + switch exists { + + // It should already exist. + case true: + nodeBytes := nodes.Get(node[:]) + if nodeBytes == nil { + return fmt.Errorf("couldn't find node") + } + + // Delete the node from the db. + case false: + err := nodes.Delete(node[:]) + if err != nil { + return fmt.Errorf("unable to delete node") + } + } + + return nil + } + + // helper method to add or delete the ede info from the db. + setInfo := func(tx *bbolt.Tx, edge *ChannelEdgeInfo, + exists bool) error { + + edges := tx.Bucket(edgeBucket) + if edges == nil { + t.Fatalf("edge bucket did not exist") + } + + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + t.Fatalf("edgeIndex bucket did not exist") + } + + chanIndex := edges.Bucket(channelPointBucket) + if chanIndex == nil { + return fmt.Errorf("channe index not found") + } + + var chanKey [8]byte + binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) + var op bytes.Buffer + err := writeOutpoint(&op, &edge.ChannelPoint) + if err != nil { + return err + } + + switch exists { + + // Check edge exists. + case true: + e := edgeIndex.Get(chanKey[:]) + if e == nil { + return fmt.Errorf("edge not found") + } + + c := chanIndex.Get(op.Bytes()) + if c == nil { + return fmt.Errorf("channel not in chan index") + } + + // Delete edge. + case false: + err := edgeIndex.Delete(chanKey[:]) + if err != nil { + return fmt.Errorf("unable to delete edge: %v", + err) + } + + err = chanIndex.Delete(op.Bytes()) + if err != nil { + return fmt.Errorf("unable to delete from chan "+ + "index: %v", err) + } + } + + return nil + } + + // helper method that asserts the policy with the original status is in + // the expected state after migration. + assertMigratedPolicy := func(tx *bbolt.Tx, node [33]byte, + channelID uint64, status policyStatus) error { + + edges := tx.Bucket(edgeBucket) + if edges == nil { + return fmt.Errorf("edge bucket did not exist") + } + + var edgeKey [33 + 8]byte + copy(edgeKey[:], node[:]) + byteOrder.PutUint64(edgeKey[33:], channelID) + + edgeBytes := edges.Get(edgeKey[:]) + + // Policy statuses should should be either complete or unknown. + switch status { + + // Existing policy should be here still, and not unknown. + case existing: + if edgeBytes == nil || len(edgeBytes) < 10 { + return fmt.Errorf("expected existing policy " + + "to be present") + } + + // Otherwise it should be unknown. + case nonExisting: + fallthrough + case unknown: + if !bytes.Equal(edgeBytes, unknownPolicy) { + return fmt.Errorf("expected policy to " + + "be unknown") + } + } + return nil + } + + // Run through all test cases. + for _, test := range tests { + var chanID uint64 + var node1Bytes []byte + var node2Bytes []byte + + // beforeMigrationFunc will take the db into the state set by + // this particular test case. + beforeMigrationFunc := func(db *DB) { + graph := db.ChannelGraph() + + // We being by adding the edge, the two nodes and the + // two channel edge policies to the DB. + info, policy1, policy2 := createTestEdge(t, db) + if err := graph.AddChannelEdge(info); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + if err := graph.UpdateEdgePolicy(policy1); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + if err := graph.UpdateEdgePolicy(policy2); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + + // Then we delete or modify the information in the + // database according to the testcase. + err := db.Update(func(tx *bbolt.Tx) error { + + // Set the edge policies. + err := setPolicy( + tx, info.NodeKey1Bytes, + policy1.ChannelID, test.policy1Status, + ) + if err != nil { + return err + } + + err = setPolicy( + tx, info.NodeKey2Bytes, + policy2.ChannelID, test.policy2Status, + ) + if err != nil { + return err + } + + // Set the nodes. + err = setNode( + tx, info.NodeKey1Bytes, + test.node1Exists, + ) + if err != nil { + return err + } + + err = setNode( + tx, info.NodeKey2Bytes, + test.node2Exists, + ) + if err != nil { + return err + } + + // And finally set the edge info. + err = setInfo(tx, info, test.edgeInfoExists) + if err != nil { + return err + } + + return nil + }) + if err != nil { + t.Fatalf("unable to update db: %v", err) + } + + chanID = info.ChannelID + node1Bytes = info.NodeKey1Bytes[:] + node2Bytes = info.NodeKey2Bytes[:] + } + + // afterMigrationFunc asserts that the db is migrated to the + // expected format. + afterMigrationFunc := func(db *DB) { + meta, err := db.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.DbVersionNumber != 1 { + t.Fatal("migration 'migrateEdgePolicies' " + + "wasn't applied") + } + + graph := db.ChannelGraph() + + // If we were migrating from a DB having the edge info + // present, it SHOULD still be present. If the original + // DB didn't have the edge info, it should not be + // there. + shouldExist := test.edgeInfoExists + _, _, found, err := graph.HasChannelEdge(chanID) + if err != nil { + t.Fatalf("unable to query for edge: %v", err) + } + + if shouldExist != found { + t.Fatalf("expected to find edge: %v ", + test.edgeInfoExists) + } + + // If the edge existed, we should be able to properly + // fetch policies and nodes. + if shouldExist { + e, _, _, err := graph.FetchChannelEdgesByID( + chanID, + ) + if err != nil { + t.Fatalf("unable to fetch edge: %v", + err) + } + + if e == nil { + t.Fatalf("expected edge info to be " + + "present") + } + + err = db.View(func(tx *bbolt.Tx) error { + err := assertMigratedPolicy( + tx, e.NodeKey1Bytes, + e.ChannelID, test.policy1Status, + ) + if err != nil { + return err + } + return assertMigratedPolicy( + tx, e.NodeKey2Bytes, + e.ChannelID, test.policy2Status, + ) + + }) + if err != nil { + t.Fatal(err) + } + + // Both nodes should be there. + var numNodes int + err = graph.ForEachNode(nil, + func(*bbolt.Tx, *LightningNode) error { + numNodes++ + return nil + }) + if err != nil { + t.Fatalf("unable to terate nodes: %v", + err) + } + + if numNodes != 2 { + t.Fatalf("expected 2 nodes, found %v", + numNodes) + } + + return + } + + // If the edge shouldn't exist, make sure there are no + // traces of this edge in the DB. Fetching the edge + // should fail with ErrEdgeNotFound. + _, _, _, err = graph.FetchChannelEdgesByID( + chanID, + ) + if err != ErrEdgeNotFound { + t.Fatalf("expected ErrEdgeNotFound, got %v", + err) + } + + // Inspect the DB to make sure no data for this + // channel ID exists. + err = db.View(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return fmt.Errorf("edge bucket did " + + "not exist") + } + + for _, node := range [][]byte{node1Bytes, node2Bytes} { + var edgeKey [33 + 8]byte + copy(edgeKey[:], node) + byteOrder.PutUint64( + edgeKey[33:], chanID, + ) + + edgeBytes := edges.Get(edgeKey[:]) + if edgeBytes != nil { + t.Fatalf("expected to not "+ + "find edge policy for "+ + "chanID %v", chanID) + } + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + migrateEdgePolicies, + false) + } +} + // TestPaymentStatusesMigration checks that already completed payments will have // their payment statuses set to Completed after the migration. func TestPaymentStatusesMigration(t *testing.T) { diff --git a/routing/router.go b/routing/router.go index f69d817a0bc..4ec40e9b09b 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1123,8 +1123,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { } r.rejectMtx.RUnlock() - channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - // We make sure to hold the mutex for this channel ID, // such that no other goroutine is concurrently doing // database accesses for the same channel ID. @@ -1140,6 +1138,15 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { } + // If the channel doesn't exist in our database, we cannot + // apply the updated policy. + if !exists { + return newErrf(ErrIgnored, "Ignoring update "+ + "(flags=%v|%v) for unknown chan_id=%v", + msg.MessageFlags, msg.ChannelFlags, + msg.ChannelID) + } + // As edges are directional edge node has a unique policy for // the direction of the edge they control. Therefore we first // check if we already have the most up to date information for @@ -1172,36 +1179,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { } } - if !exists && !r.cfg.AssumeChannelValid { - // Before we can update the channel information, we'll - // ensure that the target channel is still open by - // querying the utxo-set for its existence. - chanPoint, fundingTxOut, err := r.fetchChanPoint( - &channelID, - ) - if err != nil { - r.rejectMtx.Lock() - r.rejectCache[msg.ChannelID] = struct{}{} - r.rejectMtx.Unlock() - - return errors.Errorf("unable to fetch chan "+ - "point for chan_id=%v: %v", - msg.ChannelID, err) - } - _, err = r.cfg.Chain.GetUtxo( - chanPoint, fundingTxOut.PkScript, - channelID.BlockHeight, - ) - if err != nil { - r.rejectMtx.Lock() - r.rejectCache[msg.ChannelID] = struct{}{} - r.rejectMtx.Unlock() - - return errors.Errorf("unable to fetch utxo for "+ - "chan_id=%v: %v", msg.ChannelID, err) - } - } - // Now that we know this isn't a stale update, we'll apply the // new edge policy to the proper directional edge within the // channel graph. diff --git a/routing/router_test.go b/routing/router_test.go index 2ccaf4dad31..802a9900617 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -1025,6 +1025,76 @@ func TestIgnoreNodeAnnouncement(t *testing.T) { } } +// TestIgnoreChannelEdgePolicyForUnknownChannel checks that a router will +// ignore a channel policy for a channel not in the graph. +func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { + t.Parallel() + + const startingBlockHeight = 101 + ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, + basicGraphFilePath) + defer cleanUp() + if err != nil { + t.Fatalf("unable to create router: %v", err) + } + + var pub1 [33]byte + copy(pub1[:], priv1.PubKey().SerializeCompressed()) + + var pub2 [33]byte + copy(pub2[:], priv2.PubKey().SerializeCompressed()) + + // Add the edge between the two unknown nodes to the graph, and check + // that the nodes are found after the fact. + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + 10000, 500) + if err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + fundingBlock := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{fundingTx}, + } + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) + + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID.ToUint64(), + NodeKey1Bytes: pub1, + NodeKey2Bytes: pub2, + BitcoinKey1Bytes: pub1, + BitcoinKey2Bytes: pub2, + AuthProof: nil, + } + edgePolicy := &channeldb.ChannelEdgePolicy{ + SigBytes: testSig.Serialize(), + ChannelID: edge.ChannelID, + LastUpdate: testTime, + TimeLockDelta: 10, + MinHTLC: 1, + FeeBaseMSat: 10, + FeeProportionalMillionths: 10000, + } + + // Attempt to update the edge. This should be ignored, since the edge + // is not yet added to the router. + err = ctx.router.UpdateEdge(edgePolicy) + if !IsError(err, ErrIgnored) { + t.Fatalf("expected to get ErrIgnore, instead got: %v", err) + } + + // Add the edge. + if err := ctx.router.AddEdge(edge); err != nil { + t.Fatalf("expected to be able to add edge to the channel graph,"+ + " even though the vertexes were unknown: %v.", err) + } + + // Now updating the edge policy should succeed. + if err := ctx.router.UpdateEdge(edgePolicy); err != nil { + t.Fatalf("unable to update edge policy: %v", err) + } +} + // TestAddEdgeUnknownVertexes tests that if an edge is added that contains two // vertexes which we don't know of, the edge should be available for use // regardless. This is due to the fact that we don't actually need node