From fbff9d4ff629d4b40a2fb8abf7128f5ea34a7c06 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Thu, 16 Mar 2017 13:46:58 +0300 Subject: [PATCH 01/10] tests: add timeout in 'revoked uncooperative close retribution' test --- lnd_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lnd_test.go b/lnd_test.go index 5dc017980f8..86936318a86 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -1467,6 +1467,14 @@ func testRevokedCloseRetribution(net *networkHarness, t *harnessTest) { return bobChannelInfo.Channels[0], nil } + // Wait for Alice to receive the channel edge from the funding manager. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint) + if err != nil { + t.Fatalf("alice didn't see the alice->bob channel before "+ + "timeout: %v", err) + } + // Open up a payment stream to Alice that we'll use to send payment to // Bob. We also create a small helper function to send payments to Bob, // consuming the payment hashes we generated above. From 2942c9ed5246f07069427ec7eaa24df3fd0f6b03 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Thu, 16 Mar 2017 14:04:19 +0300 Subject: [PATCH 02/10] linter: fix new warnings --- lnwire/channel_announcement_test.go | 4 +--- lnwire/signature.go | 2 +- networktest.go | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/lnwire/channel_announcement_test.go b/lnwire/channel_announcement_test.go index d52ec51f6a8..36825b0b6e9 100644 --- a/lnwire/channel_announcement_test.go +++ b/lnwire/channel_announcement_test.go @@ -58,9 +58,7 @@ func TestChannelAnnoucementValidation(t *testing.T) { firstBitcoinPrivKey, firstBitcoinPubKey := getKeys("bitcoin-key-1") secondBitcoinPrivKey, secondBitcoinPubKey := getKeys("bitcoin-key-2") - var hash []byte - - hash = chainhash.DoubleHashB(firstNodePubKey.SerializeCompressed()) + hash := chainhash.DoubleHashB(firstNodePubKey.SerializeCompressed()) firstBitcoinSig, _ := firstBitcoinPrivKey.Sign(hash) hash = chainhash.DoubleHashB(secondNodePubKey.SerializeCompressed()) diff --git a/lnwire/signature.go b/lnwire/signature.go index 936027a0433..45f99631b3e 100644 --- a/lnwire/signature.go +++ b/lnwire/signature.go @@ -65,7 +65,7 @@ func deserializeSigFromWire(e **btcec.Signature, b [64]byte) error { // Create a canonical serialized signature. DER format is: // 0x30 0x02 r 0x02 s - sigBytes := make([]byte, 6+rLen+sLen, 6+rLen+sLen) + sigBytes := make([]byte, 6+rLen+sLen) sigBytes[0] = 0x30 // DER signature magic value sigBytes[1] = 4 + rLen + sLen // Length of rest of signature sigBytes[2] = 0x02 // Big integer magic value diff --git a/networktest.go b/networktest.go index 04e21b1bf17..d707c6f2013 100644 --- a/networktest.go +++ b/networktest.go @@ -441,7 +441,7 @@ func (l *lightningNode) lightningNetworkWatcher() { // If this is a open request, then it can be // dispatched if the number of edges seen for // the channel is at least two. - if numEdges, _ := openChans[targetChan]; numEdges >= 2 { + if numEdges := openChans[targetChan]; numEdges >= 2 { close(watchRequest.eventChan) continue } From 72eabf379bac7586cdf56cf6bee5eacb6e74216c Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Thu, 16 Mar 2017 15:13:40 +0300 Subject: [PATCH 03/10] gotest: use stable version of metalinter --- gotest.sh | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/gotest.sh b/gotest.sh index 38fd4772a43..1d5ff6c3162 100755 --- a/gotest.sh +++ b/gotest.sh @@ -85,15 +85,18 @@ lint_check() { print "* Run static checks" # Make sure gometalinter is installed and $GOPATH/bin is in your path. - if [ ! -x "$(type -p gometalinter)" ]; then + if [ ! -x "$(type -p gometalinter.v1)" ]; then print "** Install gometalinter" - go get -v github.com/alecthomas/gometalinter - gometalinter --install + go get -u gopkg.in/alecthomas/gometalinter.v1 + gometalinter.v1 --install fi + # Update metalinter if needed. + gometalinter.v1 --install 1>/dev/null + # Automatic checks linter_targets=$(glide novendor | grep -v lnrpc) - test -z "$(gometalinter --disable-all \ + test -z "$(gometalinter.v1 --disable-all \ --enable=gofmt \ --enable=vet \ --enable=golint \ From d6214632b8bfaec599e9a5c2dd649b44d2708bd4 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Thu, 16 Mar 2017 15:14:12 +0300 Subject: [PATCH 04/10] gotest: add additional port --- gotest.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gotest.sh b/gotest.sh index 1d5ff6c3162..c408fbbd4f3 100755 --- a/gotest.sh +++ b/gotest.sh @@ -20,10 +20,10 @@ print () { # check_test_ports checks that test lnd ports are not used. check_test_ports() { # Make sure that test lnd ports are not used. - o=$(lsof -i :19555,19556 | sed '1d' | awk '{ printf "%s\n", $2 }') + o=$(lsof -i :19555,19556,19557 | sed '1d' | awk '{ printf "%s\n", $2 }') if [ "$o" != "" ]; then printf "Can't run the lnd tests:\n" - printf "some program is using the test lnd ports (19555 | 19556)\n" + printf "some program is using the test lnd ports (19555 | 19556 | 19557)\n" exit 1 fi } From 6391f6376576583875f1449c17278981ffefea61 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Thu, 2 Mar 2017 15:59:26 +0300 Subject: [PATCH 05/10] fundingmanager: fix print info --- fundingmanager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fundingmanager.go b/fundingmanager.go index 8dc30e378cc..1752252aeba 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -492,7 +492,8 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { delay := msg.CsvDelay // TODO(roasbeef): error if funding flow already ongoing - fndgLog.Infof("Recv'd fundingRequest(amt=%v, delay=%v, pendingId=%v) "+ + fndgLog.Infof("Recv'd fundingRequest(amt=%v, "+ + "pushSatoshis=%v, delay=%v, pendingId=%v) "+ "from peer(%x)", amt, msg.PushSatoshis, delay, msg.ChannelID, fmsg.peerAddress.IdentityKey.SerializeCompressed()) From 08f4ece82ed65cc861edc8ad6aa646ae2f19aa86 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Thu, 2 Mar 2017 12:41:00 +0300 Subject: [PATCH 06/10] utxonursery: change print -> fatal --- utxonursery_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/utxonursery_test.go b/utxonursery_test.go index f21fcc7b034..df3c6172086 100644 --- a/utxonursery_test.go +++ b/utxonursery_test.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "fmt" "reflect" "testing" @@ -236,7 +235,7 @@ func TestSerializeKidOutput(t *testing.T) { deserializedKid, err := deserializeKidOutput(&b) if err != nil { - fmt.Printf(err.Error()) + t.Fatalf("can't deserialize kid output: %v", err) } if !reflect.DeepEqual(kid, deserializedKid) { From 1059909d865ef616600c3ee9df96afad970add49 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Wed, 1 Mar 2017 20:38:04 +0300 Subject: [PATCH 07/10] lnwire: add message header decode/encode test in this commit lnwire message header encode/decode tests were added, without it newcommer programmer may change the type inside message header and spend hours on debugging of integration test trying to understand why his node can't start and interact properly. --- lnwire/lnwire_test.go | 31 +++++++++++++++++++++++++++++++ lnwire/message.go | 34 ++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/lnwire/lnwire_test.go b/lnwire/lnwire_test.go index c729cf4674f..cf83ec1fe9d 100644 --- a/lnwire/lnwire_test.go +++ b/lnwire/lnwire_test.go @@ -4,10 +4,13 @@ import ( "encoding/hex" "net" + "bytes" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" + "reflect" + "testing" ) // Common variables and functions for the message tests @@ -74,3 +77,31 @@ var ( blue: 255, } ) + +// TestMessageHeaderEncodeDecode test that header is encoded and decoded +// properly. +func TestMessageHeaderEncodeDecode(t *testing.T) { + hdr := &messageHeader{ + magic: wire.BitcoinNet(0), + command: CmdInit, + length: 100, + } + + // Next encode the HDR message into an empty bytes buffer. + var b bytes.Buffer + if _, err := writeMessageHeader(&b, hdr); err != nil { + t.Fatalf("unable to encode ErrorGeneric: %v", err) + } + + // Deserialize the encoded HDR message into a new empty struct. + _, hdr2, err := readMessageHeader(&b) + if err != nil { + t.Fatalf("unable to decode ErrorGeneric: %v", err) + } + + // Assert equality of the two instances. + if !reflect.DeepEqual(hdr, hdr2) { + t.Fatalf("encode/decode error messages don't match %#v vs %#v", + hdr, hdr2) + } +} diff --git a/lnwire/message.go b/lnwire/message.go index 446dd722807..04c9bcde766 100644 --- a/lnwire/message.go +++ b/lnwire/message.go @@ -173,6 +173,25 @@ func readMessageHeader(r io.Reader) (int, *messageHeader, error) { return n, &hdr, nil } +// writeMessageHeader writes a lightning protocol message header to w. +func writeMessageHeader(w io.Writer, hdr *messageHeader) (int, error) { + // Encode the header for the message. This is done to a buffer + // rather than directly to the writer since writeElements doesn't + // return the number of bytes written. + hw := bytes.NewBuffer(make([]byte, 0, MessageHeaderSize)) + if err := writeElements(hw, hdr.magic, hdr.command, hdr.length); err != nil { + return 0, nil + } + + // Write the header first. + n, err := w.Write(hw.Bytes()) + if err != nil { + return n, err + } + + return n, nil +} + // discardInput reads n bytes from reader r in chunks and discards the read // bytes. This is used to skip payloads when various errors occur and helps // prevent rogue nodes from causing massive memory allocation through forging @@ -225,18 +244,13 @@ func WriteMessage(w io.Writer, msg Message, pver uint32, btcnet wire.BitcoinNet) } // Create header for the message. - hdr := messageHeader{magic: btcnet, command: cmd, length: uint32(lenp)} - - // Encode the header for the message. This is done to a buffer - // rather than directly to the writer since writeElements doesn't - // return the number of bytes written. - hw := bytes.NewBuffer(make([]byte, 0, MessageHeaderSize)) - if err := writeElements(hw, hdr.magic, hdr.command, hdr.length); err != nil { - return 0, nil + hdr := &messageHeader{ + magic: btcnet, + command: uint32(cmd), + length: uint32(lenp), } - // Write the header first. - n, err := w.Write(hw.Bytes()) + n, err := writeMessageHeader(w, hdr) totalBytes += n if err != nil { return totalBytes, err From 69be2de6458e8252073524e18c750826197ab897 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Wed, 1 Mar 2017 20:45:40 +0300 Subject: [PATCH 08/10] lnwire: add lnwire.MessageCode type which represent the lnwire command --- lnwire/channel_announcement.go | 4 +- lnwire/channel_update_announcement.go | 4 +- lnwire/close_complete.go | 2 +- lnwire/close_request.go | 2 +- lnwire/commit_sig.go | 2 +- lnwire/error_generic.go | 2 +- lnwire/funding_locked.go | 2 +- lnwire/init_message.go | 2 +- lnwire/lnwire_test.go | 2 +- lnwire/message.go | 108 ++++++++++++++++++-------- lnwire/node_announcement.go | 4 +- lnwire/ping.go | 2 +- lnwire/pong.go | 2 +- lnwire/revoke_and_ack.go | 2 +- lnwire/single_funding_complete.go | 2 +- lnwire/single_funding_request.go | 2 +- lnwire/single_funding_response.go | 6 +- lnwire/single_funding_signcomplete.go | 2 +- lnwire/update_add_htlc.go | 2 +- lnwire/update_fail_htlc.go | 2 +- lnwire/update_fulfill_htlc.go | 2 +- 21 files changed, 101 insertions(+), 57 deletions(-) diff --git a/lnwire/channel_announcement.go b/lnwire/channel_announcement.go index b7ab5e48cb8..d744f0c2469 100644 --- a/lnwire/channel_announcement.go +++ b/lnwire/channel_announcement.go @@ -91,8 +91,8 @@ func (a *ChannelAnnouncement) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (a *ChannelAnnouncement) Command() uint32 { - return CmdChannelAnnoucmentMessage +func (a *ChannelAnnouncement) Command() MessageCode { + return CmdChannelAnnouncement } // MaxPayloadLength returns the maximum allowed payload size for this message diff --git a/lnwire/channel_update_announcement.go b/lnwire/channel_update_announcement.go index fca97b36c5b..6e3d7f828ef 100644 --- a/lnwire/channel_update_announcement.go +++ b/lnwire/channel_update_announcement.go @@ -105,8 +105,8 @@ func (a *ChannelUpdateAnnouncement) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (a *ChannelUpdateAnnouncement) Command() uint32 { - return CmdChannelUpdateAnnoucmentMessage +func (a *ChannelUpdateAnnouncement) Command() MessageCode { + return CmdChannelUpdateAnnouncement } // MaxPayloadLength returns the maximum allowed payload size for this message diff --git a/lnwire/close_complete.go b/lnwire/close_complete.go index 94622be7ed3..0e03026c583 100644 --- a/lnwire/close_complete.go +++ b/lnwire/close_complete.go @@ -64,7 +64,7 @@ func (c *CloseComplete) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *CloseComplete) Command() uint32 { +func (c *CloseComplete) Command() MessageCode { return CmdCloseComplete } diff --git a/lnwire/close_request.go b/lnwire/close_request.go index b5ce49dea65..7cf81676e9b 100644 --- a/lnwire/close_request.go +++ b/lnwire/close_request.go @@ -81,7 +81,7 @@ func (c *CloseRequest) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *CloseRequest) Command() uint32 { +func (c *CloseRequest) Command() MessageCode { return CmdCloseRequest } diff --git a/lnwire/commit_sig.go b/lnwire/commit_sig.go index f51d35e3a84..7e00ab79c92 100644 --- a/lnwire/commit_sig.go +++ b/lnwire/commit_sig.go @@ -67,7 +67,7 @@ func (c *CommitSig) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *CommitSig) Command() uint32 { +func (c *CommitSig) Command() MessageCode { return CmdCommitSig } diff --git a/lnwire/error_generic.go b/lnwire/error_generic.go index 4476f9f4584..fd4786d0d01 100644 --- a/lnwire/error_generic.go +++ b/lnwire/error_generic.go @@ -98,7 +98,7 @@ func (c *ErrorGeneric) Encode(w io.Writer, pver uint32) error { // the wire. // // This is part of the lnwire.Message interface. -func (c *ErrorGeneric) Command() uint32 { +func (c *ErrorGeneric) Command() MessageCode { return CmdErrorGeneric } diff --git a/lnwire/funding_locked.go b/lnwire/funding_locked.go index da2c5fd3e0e..4c5badc07ec 100644 --- a/lnwire/funding_locked.go +++ b/lnwire/funding_locked.go @@ -71,7 +71,7 @@ func (c *FundingLocked) Encode(w io.Writer, pver uint32) error { // FundingLocked message on the wire. // // This is part of the lnwire.Message interface. -func (c *FundingLocked) Command() uint32 { +func (c *FundingLocked) Command() MessageCode { return CmdFundingLocked } diff --git a/lnwire/init_message.go b/lnwire/init_message.go index b9deddd9ae6..eb7f6746b23 100644 --- a/lnwire/init_message.go +++ b/lnwire/init_message.go @@ -59,7 +59,7 @@ func (msg *Init) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (msg *Init) Command() uint32 { +func (msg *Init) Command() MessageCode { return CmdInit } diff --git a/lnwire/lnwire_test.go b/lnwire/lnwire_test.go index cf83ec1fe9d..4a76a6b12f7 100644 --- a/lnwire/lnwire_test.go +++ b/lnwire/lnwire_test.go @@ -83,7 +83,7 @@ var ( func TestMessageHeaderEncodeDecode(t *testing.T) { hdr := &messageHeader{ magic: wire.BitcoinNet(0), - command: CmdInit, + command: uint32(CmdInit), length: 100, } diff --git a/lnwire/message.go b/lnwire/message.go index 04c9bcde766..b3bf73e3b1a 100644 --- a/lnwire/message.go +++ b/lnwire/message.go @@ -21,50 +21,99 @@ const MessageHeaderSize = 12 // individual limits imposed by messages themselves. const MaxMessagePayload = 1024 * 1024 * 32 // 32MB +// MessageCode represent the unique identifier of the lnwire command. +type MessageCode uint32 + +// String converts message code to the string representation. +func (c MessageCode) String() string { + switch c { + case CmdInit: + return "Init" + case CmdSingleFundingRequest: + return "SingleFundingRequest" + case CmdSingleFundingResponse: + return "SingleFundingResponse" + case CmdSingleFundingComplete: + return "SingleFundingComplete" + case CmdSingleFundingSignComplete: + return "SingleFundingSignComplete" + case CmdFundingLocked: + return "FundingLocked" + case CmdCloseRequest: + return "CloseRequest" + case CmdCloseComplete: + return "CloseComplete" + case CmdUpdateAddHTLC: + return "UpdateAddHTLC" + case CmdUpdateFailHTLC: + return "UpdateFailHTLC" + case CmdUpdateFufillHTLC: + return "UpdateFufillHTLC" + case CmdCommitSig: + return "CommitSig" + case CmdRevokeAndAck: + return "RevokeAndAck" + case CmdErrorGeneric: + return "ErrorGeneric" + case CmdChannelAnnouncement: + return "ChannelAnnouncement" + case CmdChannelUpdateAnnouncement: + return "ChannelUpdateAnnouncement" + case CmdNodeAnnouncement: + return "NodeAnnouncement" + case CmdPing: + return "Ping" + case CmdPong: + return "Pong" + default: + return "" + } +} + // Commands used in lightning message headers which detail the type of message. // TODO(roasbeef): update with latest type numbering from spec const ( - CmdInit = uint32(1) + CmdInit MessageCode = 1 // Commands for opening a channel funded by one party (single funder). - CmdSingleFundingRequest = uint32(100) - CmdSingleFundingResponse = uint32(110) - CmdSingleFundingComplete = uint32(120) - CmdSingleFundingSignComplete = uint32(130) + CmdSingleFundingRequest = 100 + CmdSingleFundingResponse = 110 + CmdSingleFundingComplete = 120 + CmdSingleFundingSignComplete = 130 // Command for locking a funded channel - CmdFundingLocked = uint32(200) + CmdFundingLocked = 200 // Commands for the workflow of cooperatively closing an active channel. - CmdCloseRequest = uint32(300) - CmdCloseComplete = uint32(310) + CmdCloseRequest = 300 + CmdCloseComplete = 310 // Commands for negotiating HTLCs. - CmdUpdateAddHTLC = uint32(1000) - CmdUpdateFufillHTLC = uint32(1010) - CmdUpdateFailHTLC = uint32(1020) + CmdUpdateAddHTLC = 1000 + CmdUpdateFufillHTLC = 1010 + CmdUpdateFailHTLC = 1020 // Commands for modifying commitment transactions. - CmdCommitSig = uint32(2000) - CmdRevokeAndAck = uint32(2010) + CmdCommitSig = 2000 + CmdRevokeAndAck = 2010 // Commands for reporting protocol errors. - CmdErrorGeneric = uint32(4000) + CmdErrorGeneric = 4000 // Commands for discovery service. - CmdChannelAnnoucmentMessage = uint32(5000) - CmdChannelUpdateAnnoucmentMessage = uint32(5010) - CmdNodeAnnoucmentMessage = uint32(5020) + CmdChannelAnnouncement = 5000 + CmdChannelUpdateAnnouncement = 5010 + CmdNodeAnnouncement = 5020 // Commands for connection keep-alive. - CmdPing = uint32(6000) - CmdPong = uint32(6010) + CmdPing = 6000 + CmdPong = 6010 ) // UnknownMessage is an implementation of the error interface that allows the // creation of an error in response to an unknown message. type UnknownMessage struct { - messageType uint32 + messageType MessageCode } // Error returns a human readable string describing the error. @@ -81,14 +130,14 @@ func (u *UnknownMessage) Error() string { type Message interface { Decode(io.Reader, uint32) error Encode(io.Writer, uint32) error - Command() uint32 + Command() MessageCode MaxPayloadLength(uint32) uint32 Validate() error } // makeEmptyMessage creates a new empty message of the proper concrete type // based on the command ID. -func makeEmptyMessage(command uint32) (Message, error) { +func makeEmptyMessage(command MessageCode) (Message, error) { var msg Message switch command { @@ -120,11 +169,11 @@ func makeEmptyMessage(command uint32) (Message, error) { msg = &RevokeAndAck{} case CmdErrorGeneric: msg = &ErrorGeneric{} - case CmdChannelAnnoucmentMessage: + case CmdChannelAnnouncement: msg = &ChannelAnnouncement{} - case CmdChannelUpdateAnnoucmentMessage: + case CmdChannelUpdateAnnouncement: msg = &ChannelUpdateAnnouncement{} - case CmdNodeAnnoucmentMessage: + case CmdNodeAnnouncement: msg = &NodeAnnouncement{} case CmdPing: msg = &Ping{} @@ -184,12 +233,7 @@ func writeMessageHeader(w io.Writer, hdr *messageHeader) (int, error) { } // Write the header first. - n, err := w.Write(hw.Bytes()) - if err != nil { - return n, err - } - - return n, nil + return w.Write(hw.Bytes()) } // discardInput reads n bytes from reader r in chunks and discards the read @@ -291,7 +335,7 @@ func ReadMessage(r io.Reader, pver uint32, btcnet wire.BitcoinNet) (int, Message } // Create struct of appropriate message type based on the command. - command := hdr.command + command := MessageCode(hdr.command) msg, err := makeEmptyMessage(command) if err != nil { discardInput(r, hdr.length) diff --git a/lnwire/node_announcement.go b/lnwire/node_announcement.go index 5f7e9f98da4..54d83cd5653 100644 --- a/lnwire/node_announcement.go +++ b/lnwire/node_announcement.go @@ -159,8 +159,8 @@ func (a *NodeAnnouncement) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (a *NodeAnnouncement) Command() uint32 { - return CmdNodeAnnoucmentMessage +func (a *NodeAnnouncement) Command() MessageCode { + return CmdNodeAnnouncement } // MaxPayloadLength returns the maximum allowed payload size for this message diff --git a/lnwire/ping.go b/lnwire/ping.go index fd75d32e157..01fa05bb02f 100644 --- a/lnwire/ping.go +++ b/lnwire/ping.go @@ -46,7 +46,7 @@ func (p *Pong) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (p *Pong) Command() uint32 { +func (p *Pong) Command() MessageCode { return CmdPong } diff --git a/lnwire/pong.go b/lnwire/pong.go index 2cf2be9b5d1..467af1e0b3c 100644 --- a/lnwire/pong.go +++ b/lnwire/pong.go @@ -45,7 +45,7 @@ func (p *Ping) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (p *Ping) Command() uint32 { +func (p *Ping) Command() MessageCode { return CmdPing } diff --git a/lnwire/revoke_and_ack.go b/lnwire/revoke_and_ack.go index 205d4c46230..31e69a07487 100644 --- a/lnwire/revoke_and_ack.go +++ b/lnwire/revoke_and_ack.go @@ -87,7 +87,7 @@ func (c *RevokeAndAck) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *RevokeAndAck) Command() uint32 { +func (c *RevokeAndAck) Command() MessageCode { return CmdRevokeAndAck } diff --git a/lnwire/single_funding_complete.go b/lnwire/single_funding_complete.go index a86e8f26c92..de54cd318f6 100644 --- a/lnwire/single_funding_complete.go +++ b/lnwire/single_funding_complete.go @@ -99,7 +99,7 @@ func (s *SingleFundingComplete) Encode(w io.Writer, pver uint32) error { // SingleFundingComplete on the wire. // // This is part of the lnwire.Message interface. -func (s *SingleFundingComplete) Command() uint32 { +func (s *SingleFundingComplete) Command() MessageCode { return CmdSingleFundingComplete } diff --git a/lnwire/single_funding_request.go b/lnwire/single_funding_request.go index fa0f8ec03dc..840f90d3471 100644 --- a/lnwire/single_funding_request.go +++ b/lnwire/single_funding_request.go @@ -152,7 +152,7 @@ func (c *SingleFundingRequest) Encode(w io.Writer, pver uint32) error { // SingleFundingRequest on the wire. // // This is part of the lnwire.Message interface. -func (c *SingleFundingRequest) Command() uint32 { +func (c *SingleFundingRequest) Command() MessageCode { return CmdSingleFundingRequest } diff --git a/lnwire/single_funding_response.go b/lnwire/single_funding_response.go index 6f56dc62be2..8eb7bcf5c1b 100644 --- a/lnwire/single_funding_response.go +++ b/lnwire/single_funding_response.go @@ -10,7 +10,7 @@ import ( // SingleFundingResponse is the message Bob sends to Alice after she initiates // the single funder channel workflow via a SingleFundingRequest message. Once -// Alice receives Bob's reponse, then she has all the items neccessary to +// Alice receives Bob's response, then she has all the items necessary to // construct the funding transaction, and both commitment transactions. type SingleFundingResponse struct { // ChannelID serves to uniquely identify the future channel created by @@ -26,7 +26,7 @@ type SingleFundingResponse struct { ChannelDerivationPoint *btcec.PublicKey // CommitmentKey is key the responder to the funding workflow wishes to - // use within their versino of the commitment transaction for any + // use within their version of the commitment transaction for any // delayed (CSV) or immediate outputs to them. CommitmentKey *btcec.PublicKey @@ -126,7 +126,7 @@ func (c *SingleFundingResponse) Encode(w io.Writer, pver uint32) error { // SingleFundingResponse on the wire. // // This is part of the lnwire.Message interface. -func (c *SingleFundingResponse) Command() uint32 { +func (c *SingleFundingResponse) Command() MessageCode { return CmdSingleFundingResponse } diff --git a/lnwire/single_funding_signcomplete.go b/lnwire/single_funding_signcomplete.go index 5ec96a0b4ed..4dfdcd5ffba 100644 --- a/lnwire/single_funding_signcomplete.go +++ b/lnwire/single_funding_signcomplete.go @@ -60,7 +60,7 @@ func (c *SingleFundingSignComplete) Encode(w io.Writer, pver uint32) error { // SingleFundingSignComplete on the wire. // // This is part of the lnwire.Message interface. -func (c *SingleFundingSignComplete) Command() uint32 { +func (c *SingleFundingSignComplete) Command() MessageCode { return CmdSingleFundingSignComplete } diff --git a/lnwire/update_add_htlc.go b/lnwire/update_add_htlc.go index 8079a1dbb60..31ff15a9b43 100644 --- a/lnwire/update_add_htlc.go +++ b/lnwire/update_add_htlc.go @@ -105,7 +105,7 @@ func (c *UpdateAddHTLC) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *UpdateAddHTLC) Command() uint32 { +func (c *UpdateAddHTLC) Command() MessageCode { return CmdUpdateAddHTLC } diff --git a/lnwire/update_fail_htlc.go b/lnwire/update_fail_htlc.go index 82c76f1c937..08d13078b7a 100644 --- a/lnwire/update_fail_htlc.go +++ b/lnwire/update_fail_htlc.go @@ -131,7 +131,7 @@ func (c *UpdateFailHTLC) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *UpdateFailHTLC) Command() uint32 { +func (c *UpdateFailHTLC) Command() MessageCode { return CmdUpdateFailHTLC } diff --git a/lnwire/update_fulfill_htlc.go b/lnwire/update_fulfill_htlc.go index 07699590c8f..12df7b0aeae 100644 --- a/lnwire/update_fulfill_htlc.go +++ b/lnwire/update_fulfill_htlc.go @@ -71,7 +71,7 @@ func (c *UpdateFufillHTLC) Encode(w io.Writer, pver uint32) error { // wire. // // This is part of the lnwire.Message interface. -func (c *UpdateFufillHTLC) Command() uint32 { +func (c *UpdateFufillHTLC) Command() MessageCode { return CmdUpdateFufillHTLC } From 9243a99347c63c529bc29a5fce00709d25e58152 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Wed, 1 Mar 2017 20:49:43 +0300 Subject: [PATCH 09/10] channeldb+retranmission: add retranmission subsystem Issue: #137 In this commit retranmission subsystem and boltdb mesage storage were added. Retransmission subsystem described in details in BOLT #2 (Message Retransmission) section. This subsystem keeps records of all messages that were sent to other peer and waits the ACK message to be received from other side and after that removes all acked messaged from the storage. --- channeldb/error.go | 4 + channeldb/messagestore.go | 174 ++++++++++++++++++++++++++++ channeldb/messagestore_test.go | 108 ++++++++++++++++++ retranmission.go | 203 +++++++++++++++++++++++++++++++++ retransmission_test.go | 197 ++++++++++++++++++++++++++++++++ 5 files changed, 686 insertions(+) create mode 100644 channeldb/messagestore.go create mode 100644 channeldb/messagestore_test.go create mode 100644 retranmission.go create mode 100644 retransmission_test.go diff --git a/channeldb/error.go b/channeldb/error.go index 5c237dbda4f..4683774468b 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -73,4 +73,8 @@ var ( // ErrNodeAliasNotFound is returned when alias for node can't be found. ErrNodeAliasNotFound = fmt.Errorf("alias for node not found") + + // ErrPeerMessagesNotFound is returned when no message have been + // found in the peer bucket or if bucket haven't been created yet. + ErrPeerMessagesNotFound = fmt.Errorf("peer messages not found") ) diff --git a/channeldb/messagestore.go b/channeldb/messagestore.go new file mode 100644 index 00000000000..6d4f1ee5abf --- /dev/null +++ b/channeldb/messagestore.go @@ -0,0 +1,174 @@ +package channeldb + +import ( + "bytes" + "encoding/binary" + + "github.com/boltdb/bolt" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/wire" +) + +var ( + // basePeerBucketKey is the base key for generating peer bucket keys. + // Peer bucket stores top-level bucket that maps: index -> code || msg + // concatenating the message code to the stored data allows the db logic + // to properly parse the wire message without trial and error or needing + // an additional index. + basePeerBucketKey = []byte("peermessagesstore") +) + +// MessageStore represents the storage for lnwire messages, it might be boltd storage, +// local storage, test storage or hybrid one. +// +// NOTE: The original purpose of creating this interface was in separation +// between the retransmission logic and specifics of storage itself. In case of +// such interface we may create the test storage and register the add, +// remove, get actions without the need of population of lnwire messages with +// data. +type MessageStore interface { + // Get returns the sorted set of messages in the order they have been + // added originally and also the array of associated index to this + // messages within the message store. + Get() ([]uint64, []lnwire.Message, error) + + // Add adds new message in the storage with preserving the order and + // returns the index of message within the message store. + Add(msg lnwire.Message) (uint64, error) + + // Remove deletes message with this indexes. + Remove(indexes []uint64) error +} + +// messagesStore represents the boltdb storage for messages inside +// retransmission sub-system. +type messagesStore struct { + // id is a unique slice of bytes identifying a peer. This value is + // typically a peer's identity public key serialized in compressed + // format. + id []byte + db *DB +} + +// NewMessageStore creates new instance of message storage. +func NewMessageStore(id []byte, db *DB) MessageStore { + return &messagesStore{ + id: id, + db: db, + } +} + +// Add adds message to the storage and returns the index which +// corresponds the the message by which it might be removed later. +func (s *messagesStore) Add(msg lnwire.Message) (uint64, error) { + var index uint64 + + err := s.db.Batch(func(tx *bolt.Tx) error { + var err error + + // Get or create the top peer bucket. + peerBucketKey := s.getPeerBucketKey() + peerBucket, err := tx.CreateBucketIfNotExists(peerBucketKey) + if err != nil { + return err + } + + // Generate next index number to add it to the message code + // bucket. + index, err = peerBucket.NextSequence() + if err != nil { + return err + } + indexBytes := make([]byte, 8) + binary.BigEndian.PutUint64(indexBytes, index) + + // Encode the message and place it in the top bucket. + var b bytes.Buffer + _, err = lnwire.WriteMessage(&b, msg, 0, wire.MainNet) + if err != nil { + return err + } + + return peerBucket.Put(indexBytes, b.Bytes()) + }) + + return index, err +} + +// Remove removes the message from storage by index that were assigned to +// message during its addition to the storage. +func (s *messagesStore) Remove(indexes []uint64) error { + return s.db.Batch(func(tx *bolt.Tx) error { + // Get or create the top peer bucket. + peerBucketKey := s.getPeerBucketKey() + peerBucket := tx.Bucket(peerBucketKey) + if peerBucket == nil { + return ErrPeerMessagesNotFound + } + + // Retrieve the messages indexes with this type/code and + // remove them from top peer bucket. + for _, index := range indexes { + var key [8]byte + binary.BigEndian.PutUint64(key[:], index) + + if err := peerBucket.Delete(key[:]); err != nil { + return err + } + } + + return nil + }) +} + +// Get retrieves messages from storage in the order in which they were +// originally added, proper order is needed for retransmission subsystem, as +// far this messages should be resent to remote peer in the same order as they +// were sent originally. +func (s *messagesStore) Get() ([]uint64, []lnwire.Message, error) { + var messages []lnwire.Message + var indexes []uint64 + + if err := s.db.View(func(tx *bolt.Tx) error { + peerBucketKey := s.getPeerBucketKey() + peerBucket := tx.Bucket(peerBucketKey) + if peerBucket == nil { + return nil + } + + // Iterate over messages buckets. + return peerBucket.ForEach(func(k, v []byte) error { + // Skip buckets fields. + if v == nil { + return nil + } + + // Decode the message from and add it to the array. + r := bytes.NewReader(v) + _, msg, _, err := lnwire.ReadMessage(r, 0, wire.MainNet) + if err != nil { + return err + } + + messages = append(messages, msg) + indexes = append(indexes, binary.BigEndian.Uint64(k)) + return nil + }) + }); err != nil { + return nil, nil, err + } + + // If bucket was haven't been created yet or just not contains any + // messages. + if len(messages) == 0 { + return nil, nil, ErrPeerMessagesNotFound + } + + return indexes, messages, nil +} + +// getPeerBucketKey generates the peer bucket boltd key by peer id and base +// peer bucket key. +func (s *messagesStore) getPeerBucketKey() []byte { + return append(basePeerBucketKey[:], s.id[:]...) +} diff --git a/channeldb/messagestore_test.go b/channeldb/messagestore_test.go new file mode 100644 index 00000000000..3dcd8da92dd --- /dev/null +++ b/channeldb/messagestore_test.go @@ -0,0 +1,108 @@ +package channeldb + +import ( + "bytes" + "crypto/sha256" + "testing" + + "reflect" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" +) + +// TestRetransmitterMessage tests the ability of message storage to +// add/remove/get messages and also checks that the order in which the +// messages had been added corresponds to the order of messages from Get +// function. +func TestRetransmitterMessageOrder(t *testing.T) { + db, clean, err := makeTestDB() + if err != nil { + t.Fatal(err) + } + defer clean() + s := NewMessageStore([]byte("id"), db) + + var ( + hash1, _ = chainhash.NewHash(bytes.Repeat([]byte("a"), 32)) + hash2, _ = chainhash.NewHash(bytes.Repeat([]byte("b"), 32)) + + chanPoint1 = wire.NewOutPoint(hash1, 0) + chanPoint2 = wire.NewOutPoint(hash2, 1) + + preimage1 = [sha256.Size]byte{0} + preimage2 = [sha256.Size]byte{1} + ) + + // Check that we are receiving the error without messages inside + // the storage. + _, messages, err := s.Get() + if err != nil && err != ErrPeerMessagesNotFound { + t.Fatalf("can't get the message: %v", err) + } else if len(messages) != 0 { + t.Fatal("wrong length of messages") + } + + msg1 := &lnwire.UpdateFufillHTLC{ + ChannelPoint: *chanPoint1, + ID: 0, + PaymentPreimage: preimage1, + } + + index1, err := s.Add(msg1) + if err != nil { + t.Fatalf("can't add the message to the message store: %v", err) + } + + msg2 := &lnwire.UpdateFufillHTLC{ + ChannelPoint: *chanPoint2, + ID: 1, + PaymentPreimage: preimage2, + } + + index2, err := s.Add(msg2) + if err != nil { + t.Fatalf("can't add the message to the message store: %v", err) + } + + _, messages, err = s.Get() + if err != nil { + t.Fatalf("can't get the message: %v", err) + } else if len(messages) != 2 { + t.Fatal("wrong length of messages") + } + + m, ok := messages[0].(*lnwire.UpdateFufillHTLC) + if !ok { + t.Fatal("wrong type of message") + } + + // Check the order + if !reflect.DeepEqual(m, msg1) { + t.Fatal("wrong order of message") + } + + m, ok = messages[1].(*lnwire.UpdateFufillHTLC) + if !ok { + t.Fatal("wrong type of message") + } + + // Check the order + if !reflect.DeepEqual(m, msg2) { + t.Fatal("wrong order of message") + } + + // Remove the messages by index and check that get function return + // non of the messages. + if err := s.Remove([]uint64{index1, index2}); err != nil { + t.Fatalf("can't remove the message: %v", err) + } + + _, messages, err = s.Get() + if err != nil && err != ErrPeerMessagesNotFound { + t.Fatalf("can't get the message: %v", err) + } else if len(messages) != 0 { + t.Fatal("wrong length of messages") + } +} diff --git a/retranmission.go b/retranmission.go new file mode 100644 index 00000000000..cdd1767b83b --- /dev/null +++ b/retranmission.go @@ -0,0 +1,203 @@ +package main + +import ( + "sync" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +// retransmitter represents the retransmission subsystem which is described +// in details in BOLT #2 (Message Retransmission). This subsystem keeps +// records of all messages that were sent to other peer and waits the ACK +// message to be received from other side. The ACK message denotes that the +// previous messages was read. Because communication transports are unreliable +// and may need to be re-established from time to time and reconnection +// introduces doubt as to what has been received such logic is needed to be sure +// that peers are in consistent state in terms of message communication. +type retransmitter struct { + // storage is a message storage which is needed to store the messages + // which weren't acked and restore them after node restarts, in order to + // send them to other side again. + storage channeldb.MessageStore + + // codeToIndex map is used to locate which messages can be deleted from + // the message storage in response to a retrieved ACK message. The + // mapping for items is code -> {index #1 , ... , index #n}. So when + // receiving a new message, we check for the existence of the message + // code in this index bucket, then delete all the messages from the + // top-level bucket that are returned. + codeToIndex map[lnwire.MessageCode][]uint64 + + // messagesToRetransmit list of messages that should be retransmitted + // to other side. + messagesToRetransmit []lnwire.Message + + mutex sync.RWMutex +} + +// newRetransmitter creates new instance of retransmitter. +func newRetransmitter(storage channeldb.MessageStore) (*retransmitter, error) { + + // Retrieve the messages from the message storage with their + // associated indexes. + indexes, messages, err := storage.Get() + if err != channeldb.ErrPeerMessagesNotFound && err != nil { + return nil, err + } + + // Initialize map of code to index map, so than later we can retrieve + // indexes of messages that should be removed. + codeToIndex := make(map[lnwire.MessageCode][]uint64) + for i, message := range messages { + codeToIndex[message.Command()] = append( + codeToIndex[message.Command()], + indexes[i], + ) + } + + return &retransmitter{ + storage: storage, + codeToIndex: codeToIndex, + messagesToRetransmit: messages, + }, nil +} + +// Register adds message that should be acknowledged in the message storage. +func (rt *retransmitter) Register(msg lnwire.Message) error { + switch msg.Command() { + // messages without acknowledgment + case lnwire.CmdCloseComplete, + lnwire.CmdChannelUpdateAnnouncement, + lnwire.CmdChannelAnnouncement, + lnwire.CmdNodeAnnouncement, + lnwire.CmdPing, + lnwire.CmdPong, + lnwire.CmdErrorGeneric, + lnwire.CmdInit: + return nil + default: + // Adds message to storage and returns the message index which + // have been associated with this message within the storage. + index, err := rt.storage.Add(msg) + if err != nil { + return err + } + + // Associate the message index within the message storage + // with message code in order to remove messages by index later. + rt.mutex.Lock() + rt.codeToIndex[msg.Command()] = append( + rt.codeToIndex[msg.Command()], index, + ) + rt.mutex.Unlock() + + return nil + } +} + +// Ack encapsulates the specification logic about which messages should be +// acknowledged by receiving this one. +func (rt *retransmitter) Ack(msg lnwire.Message) error { + switch msg.Command() { + + case lnwire.CmdSingleFundingResponse: + return rt.remove( + lnwire.CmdSingleFundingRequest, + ) + case lnwire.CmdSingleFundingComplete: + return rt.remove( + lnwire.CmdSingleFundingResponse, + ) + case lnwire.CmdSingleFundingSignComplete: + return rt.remove( + lnwire.CmdSingleFundingComplete, + ) + case lnwire.CmdFundingLocked: + return rt.remove( + lnwire.CmdSingleFundingSignComplete, + ) + case lnwire.CmdUpdateAddHTLC, + lnwire.CmdUpdateFailHTLC, + lnwire.CmdUpdateFufillHTLC: + return rt.remove( + lnwire.CmdFundingLocked, + ) + case lnwire.CmdRevokeAndAck: + return rt.remove( + lnwire.CmdUpdateAddHTLC, + lnwire.CmdUpdateFailHTLC, + lnwire.CmdUpdateFufillHTLC, + lnwire.CmdCommitSig, + lnwire.CmdCloseRequest, + lnwire.CmdFundingLocked, + ) + + case lnwire.CmdCommitSig, + lnwire.CmdCloseRequest: + return rt.remove( + lnwire.CmdFundingLocked, + lnwire.CmdRevokeAndAck, + ) + case lnwire.CmdCloseComplete: + return rt.remove( + lnwire.CmdCloseRequest, + ) + case lnwire.CmdPing, + lnwire.CmdPong, + lnwire.CmdChannelUpdateAnnouncement, + lnwire.CmdChannelAnnouncement, + lnwire.CmdNodeAnnouncement, + lnwire.CmdErrorGeneric, + lnwire.CmdInit, + lnwire.CmdSingleFundingRequest: + return nil + + default: + return errors.Errorf("wrong message type: %v", msg.Command()) + } +} + +// remove retrieves the messages storage indexes of the messages that +// corresponds to given types/codes and remove them from the message storage +// thereby acknowledge them. +func (rt *retransmitter) remove(codes ...lnwire.MessageCode) error { + rt.mutex.RLock() + var messagesToRemove []uint64 + for _, code := range codes { + indexes, ok := rt.codeToIndex[code] + if !ok { + continue + } + messagesToRemove = append(messagesToRemove, indexes...) + } + rt.mutex.RUnlock() + + if err := rt.storage.Remove(messagesToRemove); err != nil { + return err + } + + // After successful deletion the messages by index, clean up the code + // to index map. + rt.mutex.Lock() + for _, code := range codes { + delete(rt.codeToIndex, code) + } + rt.mutex.Unlock() + + return nil +} + +// MessagesToRetransmit returns the array of messages, that were not +// acknowledged in previous session with this peer, in the order they have been +// originally added in storage. +func (rt *retransmitter) MessagesToRetransmit() []lnwire.Message { + return rt.messagesToRetransmit +} + +// Flush removes the initialized messages after the have been successfully +// retransmitted. +func (rt *retransmitter) Flush() { + rt.messagesToRetransmit = nil +} diff --git a/retransmission_test.go b/retransmission_test.go new file mode 100644 index 00000000000..6fec2b2a90a --- /dev/null +++ b/retransmission_test.go @@ -0,0 +1,197 @@ +package main + +import ( + "testing" + + "sort" + + "github.com/lightningnetwork/lnd/lnwire" +) + +var ackTestVector = []struct { + name string + send lnwire.Message + recv lnwire.Message +}{ + { + name: "open_channel -> accept_channel", + send: &lnwire.SingleFundingRequest{}, + recv: &lnwire.SingleFundingResponse{}, + }, + { + name: "accept_channel -> funding_created", + send: &lnwire.SingleFundingResponse{}, + recv: &lnwire.SingleFundingComplete{}, + }, + { + name: "funding_created -> funding_signed", + send: &lnwire.SingleFundingComplete{}, + recv: &lnwire.SingleFundingSignComplete{}, + }, + { + name: "funding_signed -> funding_locked", + send: &lnwire.SingleFundingSignComplete{}, + recv: &lnwire.FundingLocked{}, + }, + { + name: "funding_locked -> update_add_htlc", + send: &lnwire.FundingLocked{}, + recv: &lnwire.UpdateAddHTLC{}, + }, + { + name: "funding_locked -> update_fulfill_htlc", + send: &lnwire.FundingLocked{}, + recv: &lnwire.UpdateFufillHTLC{}, + }, + { + name: "funding_locked -> update_fail_htlc", + send: &lnwire.FundingLocked{}, + recv: &lnwire.UpdateFailHTLC{}, + }, + // TODO(andrew.shvv) uncomment after update_fail_malformed_htlc will + // be included + //{ + // name: "funding_locked -> update_fail_malformed_htlc", + // send: &lnwire.SingleFundingOpenProof{}, + // recv: , + //}, + { + name: "funding_locked -> commitment_signed", + send: &lnwire.FundingLocked{}, + recv: &lnwire.CommitSig{}, + }, + { + name: "funding_locked -> revoke_and_ack", + send: &lnwire.FundingLocked{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "funding_locked -> shutdown", + send: &lnwire.FundingLocked{}, + recv: &lnwire.CloseRequest{}, + }, + { + name: "update_add_htlc -> revoke_and_ack", + send: &lnwire.UpdateAddHTLC{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "update_fulfill_htlc -> revoke_and_ack", + send: &lnwire.UpdateFufillHTLC{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "update_fail_htlc -> revoke_and_ack", + send: &lnwire.UpdateFailHTLC{}, + recv: &lnwire.RevokeAndAck{}, + }, + // TODO(andrew.shvv) uncomment after update_fail_malformed_htlc will + // be included + //{ + // name: "update_fail_malformed_htlc -> revoke_and_ack", + // send: , + // recv: &lnwire.RevokeAndAck{}, + //}, + { + name: "commitment_signed -> revoke_and_ack", + send: &lnwire.CommitSig{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "revoke_and_ack -> commitment_signed", + send: &lnwire.RevokeAndAck{}, + recv: &lnwire.CommitSig{}, + }, + { + name: "revoke_and_ack -> shutdown", + send: &lnwire.RevokeAndAck{}, + recv: &lnwire.CloseRequest{}, + }, + + { + name: "shutdown -> revoke_and_ack", + send: &lnwire.CloseRequest{}, + recv: &lnwire.RevokeAndAck{}, + }, + { + name: "shutdown -> closing_signed", + send: &lnwire.CloseRequest{}, + recv: &lnwire.CloseComplete{}, + }, +} + +// MockStore map implementation of message storage which not requires the +// messages to be decoded/encoded which means that we shouldn't populate the +// lnwire message with data. +type MockStore struct { + sequence uint64 + messages map[uint64]lnwire.Message +} + +func (s *MockStore) Get() ([]uint64, []lnwire.Message, error) { + indexes := make([]int, len(s.messages)) + messages := make([]lnwire.Message, len(s.messages)) + + i := 0 + for index := range s.messages { + indexes[i] = int(index) + i++ + } + sort.Ints(indexes) + + uindexes := make([]uint64, len(s.messages)) + for i, index := range indexes { + messages[i] = s.messages[uint64(index)] + uindexes[i] = uint64(index) + } + + return uindexes, messages, nil +} +func (s *MockStore) Add(msg lnwire.Message) (uint64, error) { + index := s.sequence + s.messages[index] = msg + s.sequence++ + + return index, nil +} + +func (s *MockStore) Remove(indexes []uint64) error { + for _, index := range indexes { + delete(s.messages, index) + } + return nil +} + +// TestRetransmitterSpecVector tests the behaviour of retransmission +// subsystem which is described in specification. +func TestRetransmitterSpecVector(t *testing.T) { + + s := &MockStore{messages: make(map[uint64]lnwire.Message)} + + rt, err := newRetransmitter(s) + if err != nil { + t.Fatalf("can't init retransmitter: %v", err) + } + + for _, test := range ackTestVector { + if err := rt.Register(test.send); err != nil { + t.Fatalf("can't register message: %v", err) + } + + _, messages, _ := s.Get() + if len(messages) != 1 { + t.Fatalf("test(%v): message(%v) wasn't registered", + test.name, test.send.Command()) + } + + if err := rt.Ack(test.recv); err != nil { + t.Fatalf("can't ack message: %v", err) + } + + _, messages, _ = s.Get() + if len(messages) != 0 { + t.Fatalf("test(%v): message(%v) wasn't acked", + test.name, test.send.Command()) + } + } +} From f8b26240ae06a2088ef7252d5547e64909eb08e7 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Thu, 16 Mar 2017 16:53:51 +0300 Subject: [PATCH 10/10] lnd: add retransmission subsystem Issue: #137 In this commit the retransmission subsystem was included in lnd, now upon peer reconnection we fetch all messages from message storage that were not acked and send them again to remote side. --- peer.go | 101 ++++++++++++++++++++++++++++++++++++++++++------------ server.go | 19 +++++----- 2 files changed, 90 insertions(+), 30 deletions(-) diff --git a/peer.go b/peer.go index f8cde6402d2..88c5e2bb86b 100644 --- a/peer.go +++ b/peer.go @@ -46,6 +46,7 @@ const ( // buffered channel acts as a semaphore to be used for synchronization purposes. type outgoinMsg struct { msg lnwire.Message + persist bool sentChan chan struct{} // MUST be buffered. } @@ -161,6 +162,14 @@ type peer struct { // on both sides. globalSharedFeatures *lnwire.SharedFeatures + // retransmitter is an retransmission subsystem aka message store, which + // stores outgoing messages that were not acked. Messages queue'd + // on-disk and in the situation when the server is unable to send the + // message to the peer due to it being offline this service will take of + // retransmitting the messages that were not acked to the remote upon + // reconnection. + retransmitter *retransmitter + queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -169,7 +178,7 @@ type peer struct { // newPeer creates a new peer from an establish connection object, and a // pointer to the main server. func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, - addr *lnwire.NetAddress, inbound bool) (*peer, error) { + addr *lnwire.NetAddress, inbound bool, db *channeldb.DB) (*peer, error) { nodePub := addr.IdentityKey @@ -205,6 +214,15 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, quit: make(chan struct{}), } + storeID := p.addr.IdentityKey.SerializeCompressed() + rt, err := newRetransmitter(channeldb.NewMessageStore(storeID, db)) + if err != nil { + peerLog.Errorf("unable to initialise retransmitter "+ + "for peerID(%v): %v", p.id, err) + return nil, err + } + p.retransmitter = rt + // Initiate the pending channel identifier properly depending on if this // node is inbound or outbound. This value will be used in an increasing // manner to track pending channels. @@ -285,7 +303,7 @@ func (p *peer) Start() error { return nil } - peerLog.Tracef("peer %v starting", p) + peerLog.Tracef("peer(%v) starting", p) p.wg.Add(2) go p.queueHandler() @@ -313,6 +331,21 @@ func (p *peer) Start() error { "must be init message") } + // If we had the interaction with this peer before than we should + // retrieve the messages that were not acked in previous session and + // sent them again in order to be sure that remote peer is handled them. + messages := p.retransmitter.MessagesToRetransmit() + if len(messages) != 0 { + peerLog.Infof("retransmission subsystem resends %v messages "+ + "to the peer(%v)", len(messages), p) + + for _, message := range messages { + // Sending over sendToPeer will cause block because of + // the usage of peer mutex. + p.queueMsg(message, false, nil) + } + } + p.wg.Add(3) go p.readHandler() go p.channelManager() @@ -350,7 +383,7 @@ func (p *peer) Disconnect() { return } - peerLog.Tracef("Disconnecting %s", p) + peerLog.Tracef("Disconnecting peer(%v)", p) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() @@ -376,7 +409,10 @@ func (p *peer) Disconnect() { // String returns the string representation of this peer. func (p *peer) String() string { - return p.conn.RemoteAddr().String() + return fmt.Sprintf("%x@%v", + p.addr.IdentityKey.SerializeCompressed(), + p.addr.Address) + } // readNextMessage reads, and returns the next message on the wire along with @@ -405,8 +441,8 @@ out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, _, err := p.readNextMessage() if err != nil { - peerLog.Infof("unable to read message from %v: %v", - p, err) + peerLog.Errorf("unable to read message from "+ + "peer(%v): %v", p, err) switch err.(type) { // If this is just a message we don't yet recognize, @@ -424,6 +460,12 @@ out: } } + if err := p.retransmitter.Ack(nextMsg); err != nil { + peerLog.Errorf("unable to ack messages for peer(%v):"+ + " %v", p, err) + break out + } + var ( isChanUpdate bool targetChan wire.OutPoint @@ -440,7 +482,7 @@ out: atomic.StoreInt64(&p.pingTime, delay) case *lnwire.Ping: - p.queueMsg(lnwire.NewPong(msg.Nonce), nil) + p.queueMsg(lnwire.NewPong(msg.Nonce), true, nil) case *lnwire.SingleFundingRequest: p.server.fundingMgr.processFundingRequest(msg, p.addr) @@ -492,7 +534,7 @@ out: p.htlcManMtx.Unlock() if !ok { peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", targetChan, p) + "channel %v from peer(%v)", targetChan, p) continue } channel <- nextMsg @@ -502,7 +544,7 @@ out: p.Disconnect() p.wg.Done() - peerLog.Tracef("readHandler for peer %v done", p) + peerLog.Tracef("readHandler for peer(%v) done", p) } // logWireMessage logs the receipt or sending of particular wire message. This @@ -586,6 +628,16 @@ func (p *peer) writeHandler() { atomic.StoreInt64(&p.pingLastSend, now) } + if outMsg.persist { + err := p.retransmitter.Register(outMsg.msg) + if err != nil { + peerLog.Errorf("unable to register "+ + "message in retransmitter for "+ + "peer(%v): %v", p, err) + p.Disconnect() + return + } + } // Write out the message to the socket, closing the // 'sentChan' if it's non-nil, The 'sentChan' allows // callers to optionally synchronize sends with the @@ -679,7 +731,7 @@ out: // Convert the bytes read into a uint64, and queue the // message for sending. nonce := binary.BigEndian.Uint64(pingBuf[:]) - p.queueMsg(lnwire.NewPing(nonce), nil) + p.queueMsg(lnwire.NewPing(nonce), true, nil) case <-p.quit: break out } @@ -695,9 +747,14 @@ func (p *peer) PingTime() int64 { // queueMsg queues a new lnwire.Message to be eventually sent out on the // wire. -func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { +func (p *peer) queueMsg(msg lnwire.Message, persist bool, + doneChan chan struct{}) { select { - case p.outgoingQueue <- outgoinMsg{msg, doneChan}: + case p.outgoingQueue <- outgoinMsg{ + msg: msg, + sentChan: doneChan, + persist: persist, + }: case <-p.quit: return } @@ -802,7 +859,7 @@ func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*cha return nil, err } closeReq := lnwire.NewCloseRequest(*chanPoint, closeSig) - p.queueMsg(closeReq, nil) + p.queueMsg(closeReq, true, nil) return txid, nil } @@ -1108,7 +1165,7 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, peerLog.Errorf("unable to expand revocation window: %v", err) continue } - p.queueMsg(rev, nil) + p.queueMsg(rev, true, nil) } state := &commitmentState{ @@ -1244,7 +1301,7 @@ func (p *peer) sendInitMsg() error { p.server.localFeatures, ) - p.queueMsg(msg, nil) + p.queueMsg(msg, true, nil) return nil } @@ -1278,7 +1335,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { return } - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) state.pendingBatch = append(state.pendingBatch, &pendingPayment{ htlc: htlc, @@ -1308,7 +1365,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) isSettle = true case *lnwire.UpdateFailHTLC: @@ -1329,7 +1386,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // Finally, we send the HTLC message to the peer which // initially created the HTLC. - p.queueMsg(htlc, nil) + p.queueMsg(htlc, true, nil) isSettle = true } @@ -1478,7 +1535,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { peerLog.Errorf("unable to revoke commitment: %v", err) return } - p.queueMsg(nextRevocation, nil) + p.queueMsg(nextRevocation, true, nil) // If we just initiated a state transition, and we were waiting // for a reply from the remote peer, then we don't need to @@ -1574,7 +1631,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { ID: logIndex, PaymentPreimage: preimage, } - p.queueMsg(settleMsg, nil) + p.queueMsg(settleMsg, true, nil) delete(state.htlcsToSettle, htlc.Index) settledPayments[htlc.RHash] = struct{}{} @@ -1604,7 +1661,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { ID: logIndex, Reason: []byte{byte(reason)}, } - p.queueMsg(cancelMsg, nil) + p.queueMsg(cancelMsg, true, nil) delete(state.htlcsToCancel, htlc.Index) cancelledHtlcs[htlc.Index] = struct{}{} @@ -1698,7 +1755,7 @@ func (p *peer) updateCommitTx(state *commitmentState, reply bool) error { ChannelPoint: *state.chanPoint, CommitSig: parsedSig, } - p.queueMsg(commitSig, nil) + p.queueMsg(commitSig, true, nil) // As we've just cleared out a batch, move all pending updates to the // map of cleared HTLCs, clearing out the set of pending updates. diff --git a/server.go b/server.go index 8990d04637f..7e459006ff6 100644 --- a/server.go +++ b/server.go @@ -264,6 +264,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, go s.connMgr.Connect(connReq) } + srvrLog.Infof("Identity key: %x", + s.identityPriv.PubKey().SerializeCompressed()) + return s, nil } @@ -445,7 +448,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound // Now that we've established a connection, create a peer, and // it to the set of currently active peers. - p, err := newPeer(conn, connReq, s, peerAddr, inbound) + p, err := newPeer(conn, connReq, s, peerAddr, inbound, s.chanDB) if err != nil { srvrLog.Errorf("unable to create peer %v", err) if p.connReq != nil { @@ -510,17 +513,17 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.peersMtx.Lock() defer s.peersMtx.Unlock() - srvrLog.Tracef("Established connection to: %v", conn.RemoteAddr()) - nodePub := conn.(*brontide.Conn).RemotePub() + srvrLog.Tracef("Established connection to: %x@%v", + nodePub.SerializeCompressed(), conn.RemoteAddr()) // If we already have an inbound connection from this peer, simply drop // the connection. pubStr := string(nodePub.SerializeCompressed()) if _, ok := s.peersByPub[pubStr]; ok { - srvrLog.Errorf("Established outbound connection to peer %x, but "+ - "already connected, dropping conn", - nodePub.SerializeCompressed()) + srvrLog.Errorf("Established outbound connection to peer"+ + "(%x@%v), but already connected, dropping conn", + nodePub.SerializeCompressed(), conn.RemoteAddr()) s.connMgr.Remove(connReq.ID()) conn.Close() return @@ -664,7 +667,7 @@ out: go func(p *peer) { for _, msg := range bMsg.msgs { - p.queueMsg(msg, nil) + p.queueMsg(msg, true, nil) } }(sPeer) } @@ -699,7 +702,7 @@ out: sMsg.errChan <- nil for _, msg := range sMsg.msgs { - targetPeer.queueMsg(msg, nil) + targetPeer.queueMsg(msg, true, nil) } }() case query := <-s.queries: