Rework swarm and allow interrupting a dial#366
Conversation
|
Added another commit that should fix #318 |
|
Pushed another commit that should fix #320 |
|
Added a fix for #332 |
Signed-off-by: Pierre Krieger <pierre.krieger1708@gmail.com>
gnunicorn
left a comment
There was a problem hiding this comment.
Thanks!
There are a few things that are unclear to me and I'd like to have addressed.
| /// dialing fails or the handler has been called with the resulting future. | ||
| /// | ||
| /// The returned future is filled with the output of `then`. | ||
| pub(crate) fn dial_then<Du, F>(&self, multiaddr: Multiaddr, transport: Du, then: F) |
There was a problem hiding this comment.
I don't understand this change. With dial_then returning a future, can't the same be achieved through dial_then(..).then(|v| v) rather than doing that complex fn-management here. Am I missing something?
There was a problem hiding this comment.
The future returned by dial and dial_then doesn't get automatically processed as part of the swarm_future.
Before this change, if you call unique_connec.dial() (or get_or_dial()) and drop the Future returned by dial(), then the unique_connec will not update its state. By putting this logic as part of the swarm_future, we know that it's going to be the case.
There was a problem hiding this comment.
Is this not generally useful? Why pub(crate)?
There was a problem hiding this comment.
It's true that it's useful, but in my opinion this is a very weird API and I'm not sure we want to expose it.
core/src/swarm.rs
Outdated
| let _ = tx.send(then(val)); | ||
| }); | ||
| let mut then = Box::new(move |val: Result<(), IoError>| { | ||
| if let Some(then) = then.take() { |
There was a problem hiding this comment.
Maybe use .expect here to reduce complexity? There is not reasonable explanation the else should ever be reached, or?
| Err((err, _)) => { | ||
| debug!("Error in listener: {:?}", err); | ||
| break | ||
| for n in (0 .. shared.listeners.len()).rev() { |
There was a problem hiding this comment.
There is a lot of complexity and nuance hidden in these loops: taking each listener, starting from the last so we can efficiently remove and push items onto the vector, sometimes dropping the listener or replacing it with the remaining, while shuffling the order (although I am not sure this is intended or just a side-effect).
I had to read it three times, and wrote some code to understand what was happening before realizing all nuance and things that happen here.
This making these loops quite hard to understand and fragile to future changes. Is there any way we can reduce the complexity of these loops or at least add inline comments to make sure then next reader doesn't accidentally break it?
core/src/transport/interruptible.rs
Outdated
| } | ||
|
|
||
| impl<T> Interruptible<T> { | ||
| /// Internal function that builds a `Interruptible`. |
| }); | ||
|
|
||
| let dial_fut = dial_fut | ||
| .map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported")) |
There was a problem hiding this comment.
how are we sure the only reason this failed is because of multi-addr support-problems? Either explain via comment or make a more concrete matching of the error to not cover up internal errors by replacing the message indiscriminately, please.
There was a problem hiding this comment.
Yes, that's the design of the dial() and dial_then() methods. They can only Err if the multiaddress is not supported. Any other error is in the produced future.
See #367
| task_local! { | ||
| static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) | ||
| } | ||
| tasks_waiting.insert(TASK_ID.with(|&k| k), task::current()); |
There was a problem hiding this comment.
Clever. A pity that tasks do not come with an ID out of the box.
| } | ||
| } | ||
|
|
||
| // TODO: stronger typing |
There was a problem hiding this comment.
All the boxes within Shared :-/
| .map_err(|(err, _)| err) | ||
| .then(move |val| { | ||
| *inner.lock() = UniqueConnecInner::Empty; | ||
| drop(cleaner); // Make sure that `cleaner` gets called there. |
There was a problem hiding this comment.
Since cleaner is moved here, would it not be sufficient to declare cleaner = Arc::downgrade(&self.inner); above and put the Cleaner::drop body in here? Why a custom Drop impl instead?
There was a problem hiding this comment.
In case the user drops the future.
then is only reached if you execute the future, but not if you just destroy it.
cc #344
Removes several
Arcs, but most notably allows interrupting current dialing attempts.This change will also later allow more control, like removing listeners.