Conversation
|
Note for self: the latest commit (fix dialer not counting as active substream) is not included in |
gnunicorn
left a comment
There was a problem hiding this comment.
Thanks for that!
Reading through it though, things feel a little all over the place (still) and with replicated code parts, especially around the important connection management and upgrade parts, making this prone to easily breaking it with future changes (or forgetting to copy other parts).
May I suggest, rather than using shared: Arc<Mutex<Shared<T, C>>> and having each user deal with it (hopefully properly), to encapsulate it within a Wrapper and centralize the repeated codebase there?
Something like
struct ConnectionManager {
inner: Arc<Mutex<Shared<T, C>>>
}
impl ConnectionManager {
pub fn remove_substream(connec_id: u64, addr: &Multiaddr)
pub fn poll_incoming(listener: Option<u64>)
pub fn insert_connection( ....
}
core/src/connection_reuse.rs
Outdated
| transport: UpgradedNode<T, C>, | ||
|
|
||
| /// All the connections that were opened, whether successful and/or active or not. | ||
| // TODO: this will grow forever |
There was a problem hiding this comment.
well, maybe use an LRU-Cache instead to prevent boundless growth? Doesn't have to be addressed in this PR though we might want to put this TODO into a ticket otherwise (could be nice for new contributors)...
| muxer: M, | ||
| /// Next incoming substream. | ||
| next_incoming: M::InboundSubstream, | ||
| /// Future of the address of the client. |
core/src/connection_reuse.rs
Outdated
|
|
||
| /// The `PeerState` is poisonned. Happens if a panic happened while executing some of the | ||
| /// functions. | ||
| Poisonned, |
core/src/connection_reuse.rs
Outdated
| }, | ||
| }; | ||
|
|
||
| match mem::replace(&mut *connec, PeerState::Poisonned) { |
There was a problem hiding this comment.
Why are we replacing a potentially just created Pending-State with a Poisoned one immediately and then replace it within every match, too? Why not do a regular match?
There was a problem hiding this comment.
You can't switch from Pending to Active if you use a regular match block.
| type Error = IoError; | ||
|
|
||
| fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||
| loop { |
There was a problem hiding this comment.
this loop is over 110 lines long and contains a lot of complexity, mem::replace (which we immediately overwrite), interdependence and early returns - requiring a huge mental model to keep in your head to follow all that is happening. Any chance we could split this up into easier consumable chunks?
core/src/connection_reuse.rs
Outdated
| Ok(Async::Ready(substream)) | ||
| }, | ||
| Ok(Async::Ready(None)) | Ok(Async::NotReady) => { | ||
| // TODO: will add an element to the list every time |
There was a problem hiding this comment.
comment not true with hashmap anymore.
core/src/connection_reuse.rs
Outdated
|
|
||
| for (addr, state) in shared.connections.iter_mut() { | ||
| match *state { | ||
| PeerState::Active { ref mut next_incoming, ref muxer, ref mut num_substreams, connection_id, ref client_addr, listener_id } => { |
core/src/connection_reuse.rs
Outdated
| /// | ||
| /// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if | ||
| /// one or more connections are matching the `listener` but they are not ready. | ||
| fn poll_incoming<T, C>(shared_arc: &Arc<Mutex<Shared<T, C>>>, shared: &mut Shared<T, C>, listener: Option<u64>) |
There was a problem hiding this comment.
having both shared_arc: &Arc<Mutex<Shared<T, C>>> and shared: &mut Shared<T, C> as parameters feels very odd, and makes me wonder if this isn't supposed to be somewhere else closed to Shared itself.
core/src/connection_reuse.rs
Outdated
| } | ||
|
|
||
| /// Removes one substream from an active connection. Closes the connection if necessary. | ||
| fn remove_one_substream<T, C>(shared: &mut Shared<T, C>, connec_id: u64, addr: &Multiaddr) |
There was a problem hiding this comment.
this should be closer to Shared as well.
| use libp2p_tcp_transport::TcpConfig; | ||
| use std::sync::{atomic, mpsc}; | ||
| use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport, transport}; | ||
| use std::sync::atomic; |
There was a problem hiding this comment.
are you sure the changes in this file belong into this PR?
| Ok(Async::Ready(Some(inner))) => { | ||
| trace!("New incoming substream from {}", client_addr); | ||
| *num_substreams += 1; | ||
| Ok((inner, connection_id.clone(), client_addr.clone())) |
There was a problem hiding this comment.
connection_id implements Copy.
| ); | ||
| self.outbound = Some(ConnectionReuseDialOut { | ||
| stream: muxer.clone().outbound(), | ||
| connection_id: connection_id.clone(), |
There was a problem hiding this comment.
connection_id implements Copy.
| let first_outbound = muxer.clone().outbound(); | ||
| self.outbound = Some(ConnectionReuseDialOut { | ||
| stream: first_outbound, | ||
| connection_id: connection_id.clone(), |
There was a problem hiding this comment.
connection_id implements Copy.
| // our connection was upgraded, replace it. | ||
| Some(PeerState::Active { | ||
| muxer, | ||
| connection_id: connection_id.clone(), |
There was a problem hiding this comment.
connection_id implements Copy.
| ConnectionReuseSubstream { | ||
| inner: inner.clone(), | ||
| shared: self.shared.clone(), | ||
| connection_id: connection_id.clone(), |
There was a problem hiding this comment.
connection_id implements Copy.
| use transport::{MuxedTransport, Transport, UpgradedNode}; | ||
| use upgrade::ConnectionUpgrade; | ||
|
|
||
| use std::clone::Clone; |
| } | ||
| continue; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Personally I would have found a match easier to read.
| Some(Err(err)) => Err(err), | ||
| None => { | ||
| if found_one { | ||
| Ok(Async::NotReady) |
There was a problem hiding this comment.
If all muxers return Ok(Async::Ready(None)) we would need to register the current task for eventual notification I think.
| }; | ||
|
|
||
| if let Some(new_state) = replace_with { | ||
| mem::replace(&mut *state, new_state); |
There was a problem hiding this comment.
Why not simply *state = new_state?
| }; | ||
|
|
||
| self.shared.insert_connection(self.addr.clone(), state); | ||
| return Ok(Async::NotReady); |
There was a problem hiding this comment.
Here we have just stored the dial future, but we have not polled it yet. By just returning NotReady we may not be woken up again, no?
| { | ||
| if *listener_id == listener { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Should this not be if *listener_id != listener { continue }?
There was a problem hiding this comment.
yeah, this was a bug, I later fixed. Unfortunately I was unable to push to this again - even just to revert my changes back to the original PR.
If we wanted to go to this route, we should clean up my messed up history mixup first. Sorry about that.
|
attempting a different approach. out on hold. |
|
closing in favor of #446. |
Fix #344
Second rewrite. This time with more manual polling, otherwise we have tons of
Arcs.The PR is not finished as the code should be made cleaner (more comments), and tests should be written.