File tree Expand file tree Collapse file tree 3 files changed +67
-37
lines changed
Expand file tree Collapse file tree 3 files changed +67
-37
lines changed Original file line number Diff line number Diff line change @@ -19,24 +19,32 @@ where
1919 Box :: pin ( async move {
2020 // Using `scan` here because it is able to stop the stream early
2121 // if a failure occurs
22+ let mut is_error = false ;
2223 let mut found_error = None ;
2324 let out: V = stream
24- . scan ( & mut found_error, |error, elem| {
25- match elem {
26- Ok ( elem) => Some ( elem) ,
27- Err ( err) => {
28- * * error = Some ( err) ;
29- // Stop processing the stream on error
30- None
31- }
25+ . take_while ( |elem| {
26+ // Stop processing the stream on `Err`
27+ !is_error
28+ && ( elem. is_ok ( ) || {
29+ is_error = true ;
30+ // Capture first `Err`
31+ true
32+ } )
33+ } )
34+ . filter_map ( |elem| match elem {
35+ Ok ( value) => Some ( value) ,
36+ Err ( err) => {
37+ found_error = Some ( err) ;
38+ None
3239 }
3340 } )
3441 . collect ( )
3542 . await ;
3643
37- match found_error {
38- Some ( err) => Err ( err) ,
39- None => Ok ( out) ,
44+ if is_error {
45+ Err ( found_error. unwrap ( ) )
46+ } else {
47+ Ok ( out)
4048 }
4149 } )
4250 }
Original file line number Diff line number Diff line change @@ -42,22 +42,33 @@ where
4242 Box :: pin ( async move {
4343 // Using `scan` here because it is able to stop the stream early
4444 // if a failure occurs
45+ let mut is_error = false ;
4546 let mut found_error = None ;
46- let out = <T as Product < U > >:: product ( stream. scan ( & mut found_error, |error, elem| {
47- match elem {
48- Ok ( elem) => Some ( elem) ,
49- Err ( err) => {
50- * * error = Some ( err) ;
51- // Stop processing the stream on error
52- None
53- }
54- }
55- } ) )
47+ let out = <T as Product < U > >:: product (
48+ stream
49+ . take_while ( |elem| {
50+ // Stop processing the stream on `Err`
51+ !is_error
52+ && ( elem. is_ok ( ) || {
53+ is_error = true ;
54+ // Capture first `Err`
55+ true
56+ } )
57+ } )
58+ . filter_map ( |elem| match elem {
59+ Ok ( value) => Some ( value) ,
60+ Err ( err) => {
61+ found_error = Some ( err) ;
62+ None
63+ }
64+ } ) ,
65+ )
5666 . await ;
5767
58- match found_error {
59- Some ( err) => Err ( err) ,
60- None => Ok ( out) ,
68+ if is_error {
69+ Err ( found_error. unwrap ( ) )
70+ } else {
71+ Ok ( out)
6172 }
6273 } )
6374 }
Original file line number Diff line number Diff line change @@ -42,22 +42,33 @@ where
4242 Box :: pin ( async move {
4343 // Using `scan` here because it is able to stop the stream early
4444 // if a failure occurs
45+ let mut is_error = false ;
4546 let mut found_error = None ;
46- let out = <T as Sum < U > >:: sum ( stream. scan ( & mut found_error, |error, elem| {
47- match elem {
48- Ok ( elem) => Some ( elem) ,
49- Err ( err) => {
50- * * error = Some ( err) ;
51- // Stop processing the stream on error
52- None
53- }
54- }
55- } ) )
47+ let out = <T as Sum < U > >:: sum (
48+ stream
49+ . take_while ( |elem| {
50+ // Stop processing the stream on `Err`
51+ !is_error
52+ && ( elem. is_ok ( ) || {
53+ is_error = true ;
54+ // Capture first `Err`
55+ true
56+ } )
57+ } )
58+ . filter_map ( |elem| match elem {
59+ Ok ( value) => Some ( value) ,
60+ Err ( err) => {
61+ found_error = Some ( err) ;
62+ None
63+ }
64+ } ) ,
65+ )
5666 . await ;
5767
58- match found_error {
59- Some ( err) => Err ( err) ,
60- None => Ok ( out) ,
68+ if is_error {
69+ Err ( found_error. unwrap ( ) )
70+ } else {
71+ Ok ( out)
6172 }
6273 } )
6374 }
You can’t perform that action at this time.
0 commit comments