-
Notifications
You must be signed in to change notification settings - Fork 146
fix(dot/network): fix bugs in notifications protocol handlers; add metrics for inbound/outbound streams #2010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4666518
e03fa95
0692825
71f2937
2950c17
4597344
d873979
07ed907
bed53dc
39de2d7
a07d83d
178f727
6f3fd63
f335df9
67f59cc
941f954
d9bdff4
7f067c7
fa1fb31
d99a4ab
beb2e47
e7a5a15
00091c5
577ff25
3f5a323
f4eb7a5
178434e
c6a9465
3eb1b8e
dea40af
d9f7e36
f072ef3
56362e4
5a5ee4b
9584b4e
c2fa05e
d25196b
0319217
5a66fff
5ec3aa0
e30a399
02c15ac
83beda8
958c0c1
7c81966
4a8732e
dfc5e2c
a7de1d1
0fd0230
912a78b
8629b43
f077036
be2fcd8
7f1f429
a08a742
7d093ba
9aaa1f2
15bccf2
bfbca17
5efe331
0ef20de
9541b72
426a5fa
d8445b3
3a7a426
122c2a4
b3db52a
7ef53e3
d5e6339
99053f9
a8e9cff
b0d01a0
b291a9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| // Copyright 2021 ChainSafe Systems (ON) | ||
| // SPDX-License-Identifier: LGPL-3.0-only | ||
|
|
||
| package network | ||
|
|
||
| import ( | ||
| "errors" | ||
| ) | ||
|
|
||
| var ( | ||
| errCannotValidateHandshake = errors.New("failed to validate handshake") | ||
| errMessageTypeNotValid = errors.New("message type is not valid") | ||
| errMessageIsNotHandshake = errors.New("failed to convert message to Handshake") | ||
| errMissingHandshakeMutex = errors.New("outboundHandshakeMutex does not exist") | ||
| errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake") | ||
| errHandshakeTimeout = errors.New("handshake timeout reached") | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| // Copyright 2021 ChainSafe Systems (ON) | ||
| // SPDX-License-Identifier: LGPL-3.0-only | ||
|
|
||
| package network | ||
|
|
||
| import ( | ||
| libp2pnetwork "github.com/libp2p/go-libp2p-core/network" | ||
| ) | ||
|
|
||
| func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) { | ||
| // we NEED to reset the stream if we ever return from this function, as if we return, | ||
| // the stream will never again be read by us, so we need to tell the remote side we're | ||
| // done with this stream, and they should also forget about it. | ||
| defer s.resetInboundStream(stream) | ||
| s.streamManager.logNewStream(stream) | ||
|
|
||
| peer := stream.Conn().RemotePeer() | ||
| msgBytes := s.bufPool.get() | ||
|
qdm12 marked this conversation as resolved.
|
||
| defer s.bufPool.put(msgBytes) | ||
|
|
||
| for { | ||
| n, err := readStream(stream, msgBytes[:]) | ||
| if err != nil { | ||
| logger.Tracef( | ||
| "failed to read from stream id %s of peer %s using protocol %s: %s", | ||
| stream.ID(), stream.Conn().RemotePeer(), stream.Protocol(), err) | ||
| return | ||
| } | ||
|
|
||
| s.streamManager.logMessageReceived(stream.ID()) | ||
|
|
||
| // decode message based on message type | ||
| msg, err := decoder(msgBytes[:n], peer, isInbound(stream)) // stream should always be inbound if it passes through service.readStream | ||
| if err != nil { | ||
| logger.Tracef("failed to decode message from stream id %s using protocol %s: %s", | ||
| stream.ID(), stream.Protocol(), err) | ||
| continue | ||
| } | ||
|
|
||
| logger.Tracef( | ||
| "host %s received message from peer %s: %s", | ||
| s.host.id(), peer, msg) | ||
|
|
||
| if err = handler(stream, msg); err != nil { | ||
| logger.Tracef("failed to handle message %s from stream id %s: %s", msg, stream.ID(), err) | ||
| return | ||
| } | ||
|
|
||
| s.host.bwc.LogRecvMessage(int64(n)) | ||
| } | ||
| } | ||
|
|
||
| func (s *Service) resetInboundStream(stream libp2pnetwork.Stream) { | ||
| protocolID := stream.Protocol() | ||
| peerID := stream.Conn().RemotePeer() | ||
|
|
||
| s.notificationsMu.Lock() | ||
| defer s.notificationsMu.Unlock() | ||
|
|
||
| for _, prtl := range s.notificationsProtocols { | ||
| if prtl.protocolID != protocolID { | ||
| continue | ||
| } | ||
|
|
||
| prtl.inboundHandshakeData.Delete(peerID) | ||
| break | ||
| } | ||
|
|
||
| logger.Debugf( | ||
| "cleaning up inbound handshake data for protocol=%s, peer=%s", | ||
| stream.Protocol(), | ||
| peerID, | ||
| ) | ||
|
|
||
| _ = stream.Reset() | ||
|
qdm12 marked this conversation as resolved.
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,8 +9,71 @@ import ( | |
| "github.com/ChainSafe/gossamer/dot/types" | ||
| "github.com/ChainSafe/gossamer/lib/common" | ||
| "github.com/ChainSafe/gossamer/pkg/scale" | ||
|
|
||
| libp2pnetwork "github.com/libp2p/go-libp2p-core/network" | ||
| "github.com/libp2p/go-libp2p-core/peer" | ||
| ) | ||
|
|
||
| // handleLightStream handles streams with the <protocol-id>/light/2 protocol ID | ||
| func (s *Service) handleLightStream(stream libp2pnetwork.Stream) { | ||
| s.readStream(stream, s.decodeLightMessage, s.handleLightMsg) | ||
| } | ||
|
|
||
| func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, error) { | ||
|
qdm12 marked this conversation as resolved.
|
||
| s.lightRequestMu.RLock() | ||
| defer s.lightRequestMu.RUnlock() | ||
|
|
||
| // check if we are the requester | ||
| if _, ok := s.lightRequest[peer]; ok { | ||
| // if we are, decode the bytes as a LightResponse | ||
| return newLightResponseFromBytes(in) | ||
| } | ||
|
|
||
| // otherwise, decode bytes as LightRequest | ||
| return newLightRequestFromBytes(in) | ||
| } | ||
|
|
||
| func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) { | ||
| defer func() { | ||
| _ = stream.Close() | ||
|
qdm12 marked this conversation as resolved.
|
||
| }() | ||
|
|
||
| lr, ok := msg.(*LightRequest) | ||
| if !ok { | ||
| return nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to double check, it's all good here if the message is not a request? Maybe should calling layers check that and pass it as a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should never not be a request, just by reasoning of how the substrate networking protocol works (https://docs.rs/sc-network/0.9.0/sc_network/) if it's not a request, then we can ignore it, as it shouldn't be sent over an inbound stream. (in fact we can probably downscore the peer, but there's another issue open for that)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can't change it to be
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for clarifying!!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I think we could return an error since it's not meant to happen. Or just remove the type assertion check so it panics, up to you. Panicing is fine too in this case I'd say. |
||
| } | ||
|
|
||
| resp := NewLightResponse() | ||
| switch { | ||
| case lr.RemoteCallRequest != nil: | ||
| resp.RemoteCallResponse, err = remoteCallResp(lr.RemoteCallRequest) | ||
| case lr.RemoteHeaderRequest != nil: | ||
| resp.RemoteHeaderResponse, err = remoteHeaderResp(lr.RemoteHeaderRequest) | ||
| case lr.RemoteChangesRequest != nil: | ||
| resp.RemoteChangesResponse, err = remoteChangeResp(lr.RemoteChangesRequest) | ||
| case lr.RemoteReadRequest != nil: | ||
| resp.RemoteReadResponse, err = remoteReadResp(lr.RemoteReadRequest) | ||
| case lr.RemoteReadChildRequest != nil: | ||
| resp.RemoteReadResponse, err = remoteReadChildResp(lr.RemoteReadChildRequest) | ||
| default: | ||
| logger.Warn("ignoring LightRequest without request data") | ||
| return nil | ||
|
qdm12 marked this conversation as resolved.
|
||
| } | ||
|
|
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // TODO(arijit): Remove once we implement the internal APIs. Added to increase code coverage. (#1856) | ||
| logger.Debugf("LightResponse message: %s", resp) | ||
|
|
||
| err = s.host.writeToStream(stream, resp) | ||
| if err != nil { | ||
| logger.Warnf("failed to send LightResponse message to peer %s: %s", stream.Conn().RemotePeer(), err) | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| // Pair is a pair of arbitrary bytes. | ||
| type Pair struct { | ||
| First []byte | ||
|
|
@@ -46,6 +109,12 @@ func NewLightRequest() *LightRequest { | |
| } | ||
| } | ||
|
|
||
| func newLightRequestFromBytes(in []byte) (msg *LightRequest, err error) { | ||
| msg = NewLightRequest() | ||
| err = msg.Decode(in) | ||
| return msg, err | ||
| } | ||
|
|
||
| func newRequest() *request { | ||
| return &request{ | ||
| RemoteCallRequest: *newRemoteCallRequest(), | ||
|
|
@@ -122,6 +191,12 @@ func NewLightResponse() *LightResponse { | |
| } | ||
| } | ||
|
|
||
| func newLightResponseFromBytes(in []byte) (msg *LightResponse, err error) { | ||
| msg = NewLightResponse() | ||
| err = msg.Decode(in) | ||
| return msg, err | ||
| } | ||
|
|
||
| func newResponse() *response { | ||
| return &response{ | ||
| RemoteCallResponse: *newRemoteCallResponse(), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.