From b78bdd89e984cdd2e27fb1397028576dcbe18c97 Mon Sep 17 00:00:00 2001 From: Heap-Hop Date: Wed, 27 Nov 2024 16:18:17 +0900 Subject: [PATCH 1/2] add example tcp_echo_server_non_blocking --- Cargo.toml | 2 + examples/tcp_echo_server_non_blocking.rs | 54 ++++++++++++++++++++++++ test-programs/Cargo.toml | 1 + 3 files changed, 57 insertions(+) create mode 100644 examples/tcp_echo_server_non_blocking.rs diff --git a/Cargo.toml b/Cargo.toml index 2acea92..7560dbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ wstd-macro.workspace = true anyhow.workspace = true futures-lite.workspace = true serde_json.workspace = true +futures.workspace = true [workspace] members = [ @@ -50,6 +51,7 @@ anyhow = "1" cargo_metadata = "0.18.1" futures-core = "0.3.19" futures-lite = "1.12.0" +futures = "0.3.31" heck = "0.5" http = "1.1" pin-project-lite = "0.2.8" diff --git a/examples/tcp_echo_server_non_blocking.rs b/examples/tcp_echo_server_non_blocking.rs new file mode 100644 index 0000000..1239e21 --- /dev/null +++ b/examples/tcp_echo_server_non_blocking.rs @@ -0,0 +1,54 @@ +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use std::{ + cell::RefCell, + future::Future, + pin::{pin, Pin}, + rc::Rc, + task::Poll, +}; +use wstd::io; +use wstd::iter::AsyncIterator; +use wstd::net::TcpListener; + +type StreamTasks = Rc>>>>>>; + +#[wstd::main] +async fn main() -> io::Result<()> { + let listener = TcpListener::bind("127.0.0.1:8080").await?; + println!("Listening on {}", listener.local_addr()?); + println!("type `nc localhost 8080` to create a TCP client"); + + let stream_tasks: StreamTasks = StreamTasks::default(); + let mut listening_task = pin!(start_listening(listener, stream_tasks.clone())); + + futures::future::poll_fn(|cx| { + if let Poll::Ready(_) = listening_task.as_mut().poll(cx) { + return Poll::Ready(()); + }; + + let mut stream_tasks_ref = stream_tasks.borrow_mut(); + if let Poll::Ready(Some(res)) = stream_tasks_ref.poll_next_unpin(cx) { + println!("Task finished: {:?}", res); + println!("Tasks len: {}", stream_tasks_ref.len()); + } + + Poll::Pending + }) + .await; + Ok(()) +} + +async fn start_listening(listener: TcpListener, stream_tasks: StreamTasks) -> io::Result<()> { + let mut incoming = listener.incoming(); + while let Some(stream) = incoming.next().await { + let stream = stream?; + println!("Accepted from: {}", stream.peer_addr()?); + + let stream_task = async move { io::copy(&stream, &stream).await }.boxed_local(); + + stream_tasks.borrow_mut().push(stream_task); + println!("Task added"); + println!("Tasks len: {}", stream_tasks.borrow().len()); + } + Ok(()) +} diff --git a/test-programs/Cargo.toml b/test-programs/Cargo.toml index 633c274..0ee5104 100644 --- a/test-programs/Cargo.toml +++ b/test-programs/Cargo.toml @@ -8,3 +8,4 @@ publish = false futures-lite.workspace = true serde_json.workspace = true wstd.workspace = true +futures.workspace = true From 7bd4d0949a34c05fb48d48918c5c0709aef628b9 Mon Sep 17 00:00:00 2001 From: Heap-Hop Date: Thu, 28 Nov 2024 18:03:02 +0900 Subject: [PATCH 2/2] use futures-concurrency and futures-lite in example tcp_echo_server_non_blocking --- Cargo.toml | 4 ++-- examples/tcp_echo_server_non_blocking.rs | 11 ++++++----- test-programs/Cargo.toml | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7560dbf..bfa447d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ wstd-macro.workspace = true anyhow.workspace = true futures-lite.workspace = true serde_json.workspace = true -futures.workspace = true +futures-concurrency.workspace = true [workspace] members = [ @@ -51,7 +51,7 @@ anyhow = "1" cargo_metadata = "0.18.1" futures-core = "0.3.19" futures-lite = "1.12.0" -futures = "0.3.31" +futures-concurrency = "7.6.2" heck = "0.5" http = "1.1" pin-project-lite = "0.2.8" diff --git a/examples/tcp_echo_server_non_blocking.rs b/examples/tcp_echo_server_non_blocking.rs index 1239e21..78f3edf 100644 --- a/examples/tcp_echo_server_non_blocking.rs +++ b/examples/tcp_echo_server_non_blocking.rs @@ -1,4 +1,5 @@ -use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use futures_concurrency::future::FutureGroup; +use futures_lite::{FutureExt, StreamExt}; use std::{ cell::RefCell, future::Future, @@ -10,7 +11,7 @@ use wstd::io; use wstd::iter::AsyncIterator; use wstd::net::TcpListener; -type StreamTasks = Rc>>>>>>; +type StreamTasks = Rc>>>>>>; #[wstd::main] async fn main() -> io::Result<()> { @@ -21,13 +22,13 @@ async fn main() -> io::Result<()> { let stream_tasks: StreamTasks = StreamTasks::default(); let mut listening_task = pin!(start_listening(listener, stream_tasks.clone())); - futures::future::poll_fn(|cx| { + futures_lite::future::poll_fn(|cx| { if let Poll::Ready(_) = listening_task.as_mut().poll(cx) { return Poll::Ready(()); }; let mut stream_tasks_ref = stream_tasks.borrow_mut(); - if let Poll::Ready(Some(res)) = stream_tasks_ref.poll_next_unpin(cx) { + if let Poll::Ready(Some(res)) = stream_tasks_ref.poll_next(cx) { println!("Task finished: {:?}", res); println!("Tasks len: {}", stream_tasks_ref.len()); } @@ -46,7 +47,7 @@ async fn start_listening(listener: TcpListener, stream_tasks: StreamTasks) -> io let stream_task = async move { io::copy(&stream, &stream).await }.boxed_local(); - stream_tasks.borrow_mut().push(stream_task); + stream_tasks.borrow_mut().insert(stream_task); println!("Task added"); println!("Tasks len: {}", stream_tasks.borrow().len()); } diff --git a/test-programs/Cargo.toml b/test-programs/Cargo.toml index 0ee5104..a56373d 100644 --- a/test-programs/Cargo.toml +++ b/test-programs/Cargo.toml @@ -8,4 +8,4 @@ publish = false futures-lite.workspace = true serde_json.workspace = true wstd.workspace = true -futures.workspace = true +futures-concurrency.workspace = true