From ee8b7362b0ed9cf5a4bd79f3204602d11fcb5b78 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 10 Jan 2022 11:04:52 +0800 Subject: [PATCH 01/10] self-contained example & gracefully shutdown reporter Signed-off-by: tison --- Cargo.lock | 1 + Cargo.toml | 1 + examples/simple_trace_report.rs | 61 ++++++++++++++++++++++++++++-- src/context/propagation/encoder.rs | 2 +- src/context/trace_context.rs | 22 +++++++++++ src/reporter/grpc.rs | 48 ++++++++++++++++++----- 6 files changed, 121 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5256ab4..5bfa1aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,6 +612,7 @@ dependencies = [ "prost", "prost-derive", "tokio", + "tokio-stream", "tonic", "tonic-build", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 0840f12..750cf3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ prost-derive = "0.8.0" uuid = { version = "0.8", features = ["serde", "v4"] } base64 = "0.13.0" tokio = { version = "1", features = ["full"] } +tokio-stream = {version = "0.1", features = ["net"]} async-stream = "0.3.2" [build-dependencies] diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs index 6bcb39c..82bfcb4 100644 --- a/examples/simple_trace_report.rs +++ b/examples/simple_trace_report.rs @@ -1,14 +1,67 @@ +use std::error::Error; + +use tokio; +use tokio_stream::wrappers::TcpListenerStream; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status, Streaming}; + use skywalking_rust::context::trace_context::TracingContext; use skywalking_rust::reporter::grpc::Reporter; -use tokio; +use skywalking_rust::skywalking_proto::v3::trace_segment_report_service_server::TraceSegmentReportService; +use skywalking_rust::skywalking_proto::v3::trace_segment_report_service_server::TraceSegmentReportServiceServer; +use skywalking_rust::skywalking_proto::v3::{Commands, SegmentCollection, SegmentObject}; + +#[derive(Default)] +pub struct TraceSegmentReportServer; + +impl TraceSegmentReportServer { + pub fn into_service(self) -> TraceSegmentReportServiceServer { + TraceSegmentReportServiceServer::new(self) + } +} + +#[tonic::async_trait] +impl TraceSegmentReportService for TraceSegmentReportServer { + async fn collect( + &self, + request: Request>, + ) -> Result, Status> { + let mut streams = request.into_inner(); + while let Some(segment) = streams.next().await { + println!("segment: {:?}", segment); + } + Ok(Response::new(Commands::default())) + } + + async fn collect_in_sync( + &self, + request: Request, + ) -> Result, Status> { + println!("request: {:?}", request.into_inner()); + Ok(Response::new(Commands::default())) + } +} #[tokio::main] -async fn main() { - let tx = Reporter::start("http://0.0.0.0:11800").await; +async fn main() -> Result<(), Box> { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(TraceSegmentReportServer::default().into_service()) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .unwrap(); + }); + + let reporter = Reporter::start(format!("https://{}", addr)).await; let mut context = TracingContext::default("service", "instance"); { let span = context.create_entry_span("op1").unwrap(); context.finalize_span(span); } - let _ = tx.send(context).await; + reporter.sender().send(context).await?; + reporter.shutdown().await?; + Ok(()) } diff --git a/src/context/propagation/encoder.rs b/src/context/propagation/encoder.rs index 937e5f7..f928ece 100644 --- a/src/context/propagation/encoder.rs +++ b/src/context/propagation/encoder.rs @@ -25,7 +25,7 @@ pub fn encode_propagation(context: &TracingContext, endpoint: &str, address: &st res += "1-"; res += format!("{}-", encode(context.trace_id.to_string())).as_str(); res += format!("{}-", encode(context.trace_segment_id.to_string())).as_str(); - res += format!("{}-", context.next_span_id.to_string()).as_str(); + res += format!("{}-", context.next_span_id).as_str(); res += format!("{}-", encode(context.service.as_str())).as_str(); res += format!("{}-", encode(context.service_instance.as_str())).as_str(); res += format!("{}-", encode(endpoint)).as_str(); diff --git a/src/context/trace_context.rs b/src/context/trace_context.rs index cea1725..d341bf9 100644 --- a/src/context/trace_context.rs +++ b/src/context/trace_context.rs @@ -21,6 +21,7 @@ use crate::skywalking_proto::v3::{ KeyStringValuePair, Log, RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject, SpanType, }; +use std::fmt::Formatter; use std::sync::Arc; use super::system_time::UnixTimeStampFetcher; @@ -61,6 +62,14 @@ pub struct Span { time_fetcher: Arc, } +impl std::fmt::Debug for Span { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Span") + .field("span_internal", &self.span_internal) + .finish() + } +} + static SKYWALKING_RUST_COMPONENT_ID: i32 = 11000; impl Span { @@ -148,6 +157,19 @@ pub struct TracingContext { segment_link: Option, } +impl std::fmt::Debug for TracingContext { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TracingContext") + .field("trace_id", &self.trace_id) + .field("trace_segment_id", &self.trace_segment_id) + .field("service", &self.service) + .field("service_instance", &self.service_instance) + .field("next_span_id", &self.next_span_id) + .field("spans", &self.spans) + .finish() + } +} + impl TracingContext { /// Generate a new trace context. Typically called when no context has /// been propagated and a new trace is to be started. diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index c2b7a76..38399c1 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -17,6 +17,7 @@ use crate::context::trace_context::TracingContext; use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient; use crate::skywalking_proto::v3::SegmentObject; +use std::error::Error; use tokio::sync::mpsc; use tonic::transport::Channel; @@ -32,12 +33,13 @@ async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<() } } -pub struct Reporter {} +pub struct Reporter { + tx: mpsc::Sender, + shutdown_tx: mpsc::Sender<()>, +} static CHANNEL_BUF_SIZE: usize = 1024; -pub type ContextReporter = mpsc::Sender; - impl Reporter { /// Open gRPC client stream to send collected trace context. /// This function generates a new async task which watch to arrive new trace context. @@ -51,23 +53,51 @@ impl Reporter { /// use skywalking_rust::reporter::grpc::Reporter; /// /// #[tokio::main] - /// async fn main (){ - /// let tx = Reporter::start("localhost:12800").await; + /// async fn main () -> Result<(), Box> { + /// let reporter = Reporter::start("localhost:12800").await; /// let mut context = TracingContext::default("service", "instance"); - /// tx.send(context).await; + /// reporter.sender().send(context).await?; + /// reporter.shutdown().await?; /// } /// ``` - pub async fn start(address: &str) -> ContextReporter { + pub async fn start(address: impl Into) -> Self { let (tx, mut rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(CHANNEL_BUF_SIZE); - let mut reporter = ReporterClient::connect(address.to_string()).await.unwrap(); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + + let mut reporter = ReporterClient::connect(address.into()).await.unwrap(); tokio::spawn(async move { + loop { + tokio::select! { + message = rx.recv() => { + if let Some(message) = message { + flush(&mut reporter, message.convert_segment_object()).await.unwrap(); + } else { + break; + } + }, + _ = shutdown_rx.recv() => { + break; + } + } + } + rx.close(); while let Some(message) = rx.recv().await { flush(&mut reporter, message.convert_segment_object()) .await .unwrap(); } }); - tx + Self { tx, shutdown_tx } + } + + pub async fn shutdown(self) -> Result<(), Box> { + self.shutdown_tx.send(()).await?; + self.shutdown_tx.closed().await; + Ok(()) + } + + pub fn sender(&self) -> mpsc::Sender { + self.tx.clone() } } From 89848ac0528b8c763868effd6b925fcd6008f256 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 10 Jan 2022 11:07:38 +0800 Subject: [PATCH 02/10] tokio-stream can be a dev-dependencies Signed-off-by: tison --- Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 750cf3b..db82830 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,12 +27,14 @@ prost-derive = "0.8.0" uuid = { version = "0.8", features = ["serde", "v4"] } base64 = "0.13.0" tokio = { version = "1", features = ["full"] } -tokio-stream = {version = "0.1", features = ["net"]} async-stream = "0.3.2" [build-dependencies] tonic-build = "0.5.2" +[dev-dependencies] +tokio-stream = {version = "0.1", features = ["net"]} + [[example]] name = "simple_trace_report" path = "examples/simple_trace_report.rs" From 393c3b86eabc4b54077bca7bd7bb7202ac9ccb4a Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 10 Jan 2022 23:15:25 +0800 Subject: [PATCH 03/10] revert example should not start a server Signed-off-by: tison --- examples/simple_trace_report.rs | 52 +-------------------------------- 1 file changed, 1 insertion(+), 51 deletions(-) diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs index 82bfcb4..192b620 100644 --- a/examples/simple_trace_report.rs +++ b/examples/simple_trace_report.rs @@ -1,61 +1,11 @@ use std::error::Error; -use tokio; -use tokio_stream::wrappers::TcpListenerStream; -use tokio_stream::StreamExt; -use tonic::{Request, Response, Status, Streaming}; - use skywalking_rust::context::trace_context::TracingContext; use skywalking_rust::reporter::grpc::Reporter; -use skywalking_rust::skywalking_proto::v3::trace_segment_report_service_server::TraceSegmentReportService; -use skywalking_rust::skywalking_proto::v3::trace_segment_report_service_server::TraceSegmentReportServiceServer; -use skywalking_rust::skywalking_proto::v3::{Commands, SegmentCollection, SegmentObject}; - -#[derive(Default)] -pub struct TraceSegmentReportServer; - -impl TraceSegmentReportServer { - pub fn into_service(self) -> TraceSegmentReportServiceServer { - TraceSegmentReportServiceServer::new(self) - } -} - -#[tonic::async_trait] -impl TraceSegmentReportService for TraceSegmentReportServer { - async fn collect( - &self, - request: Request>, - ) -> Result, Status> { - let mut streams = request.into_inner(); - while let Some(segment) = streams.next().await { - println!("segment: {:?}", segment); - } - Ok(Response::new(Commands::default())) - } - - async fn collect_in_sync( - &self, - request: Request, - ) -> Result, Status> { - println!("request: {:?}", request.into_inner()); - Ok(Response::new(Commands::default())) - } -} #[tokio::main] async fn main() -> Result<(), Box> { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; - let addr = listener.local_addr()?; - - tokio::spawn(async move { - tonic::transport::Server::builder() - .add_service(TraceSegmentReportServer::default().into_service()) - .serve_with_incoming(TcpListenerStream::new(listener)) - .await - .unwrap(); - }); - - let reporter = Reporter::start(format!("https://{}", addr)).await; + let reporter = Reporter::start("http://0.0.0.0:11800").await; let mut context = TracingContext::default("service", "instance"); { let span = context.create_entry_span("op1").unwrap(); From b8fc551a58b0446c63dcb61893f726bfd48f0681 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 10 Jan 2022 23:33:10 +0800 Subject: [PATCH 04/10] adapt e2e test logic Signed-off-by: tison --- tests/e2e/src/main.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/e2e/src/main.rs b/tests/e2e/src/main.rs index cc9f76c..b44de3d 100644 --- a/tests/e2e/src/main.rs +++ b/tests/e2e/src/main.rs @@ -125,11 +125,14 @@ struct Opt { #[tokio::main] async fn main() { let opt = Opt::from_args(); - let tx = Reporter::start("http://collector:19876").await; + let reporter = Reporter::start("http://collector:19876").await; + let tx = reporter.sender(); if opt.mode == "consumer" { run_consumer_service([0, 0, 0, 0], tx).await; } else if opt.mode == "producer" { run_producer_service([0, 0, 0, 0], tx).await; } + + reporter.shutdown().await.unwarp(); } From 9b20bb782dd14c9447e1317ec017c798dc6c7bda Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 13 Jan 2022 14:22:58 +0800 Subject: [PATCH 05/10] enable e2e test Signed-off-by: tison --- .github/workflows/{test.yaml => codecov.yaml} | 2 +- .github/workflows/e2e.yml | 25 +++++++++++++++++++ docker-compose.e2e.yml | 14 +++++------ rust-toolchain.toml | 3 +++ src/reporter/grpc.rs | 1 + tests/e2e/Cargo.lock | 4 ++- tests/e2e/Cargo.toml | 6 ++--- tests/e2e/docker/Dockerfile | 13 ++++++---- tests/e2e/docker/Dockerfile.tool | 15 ----------- tests/e2e/rust-toolchain.toml | 3 +++ tests/e2e/src/main.rs | 9 ++++--- 11 files changed, 58 insertions(+), 37 deletions(-) rename .github/workflows/{test.yaml => codecov.yaml} (96%) create mode 100644 .github/workflows/e2e.yml create mode 100644 rust-toolchain.toml delete mode 100644 tests/e2e/docker/Dockerfile.tool create mode 100644 tests/e2e/rust-toolchain.toml diff --git a/.github/workflows/test.yaml b/.github/workflows/codecov.yaml similarity index 96% rename from .github/workflows/test.yaml rename to .github/workflows/codecov.yaml index 91b5a22..2a8a124 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/codecov.yaml @@ -1,4 +1,4 @@ -name: test +name: codecov on: pull_request: push: diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml new file mode 100644 index 0000000..a581102 --- /dev/null +++ b/.github/workflows/e2e.yml @@ -0,0 +1,25 @@ +name: agent-test-tool + +on: + pull_request: + push: + branches: + - master + tags: + - 'v*' + +jobs: + e2e-rust: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + - name: Prepare service container + run: docker-compose -f docker-compose.e2e.yml up --build -d + - name: Run e2e + run: | + pip3 install --upgrade pip + pip3 install setuptools + pip3 install -r requirements.txt + python3 tests/e2e/run_e2e.py --expected_file=tests/e2e/data/expected_context.yaml --max_retry_times=3 --target_path=/ping diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index 3c28ead..c3b9fb4 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -1,12 +1,10 @@ version: "3.7" services: collector: - build: - context: . - dockerfile: ./tests/e2e/docker/Dockerfile.tool + image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:5acb890f225ca37ee60675ce3e330545e23e3cbc ports: - - 19876:19876 - - 12800:12800 + - "19876:19876" + - "12800:12800" consumer: build: @@ -14,7 +12,7 @@ services: dockerfile: ./tests/e2e/docker/Dockerfile expose: - 8082 - command: cargo run -- --mode consumer + command: --mode consumer depends_on: - collector @@ -23,8 +21,8 @@ services: context: . dockerfile: ./tests/e2e/docker/Dockerfile ports: - - 8081:8081 - command: cargo run -- --mode producer + - "8081:8081" + command: --mode producer depends_on: - collector - consumer \ No newline at end of file diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..98d86f6 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.57.0" +components = ["rustfmt", "clippy"] diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index 38399c1..4b0c690 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -58,6 +58,7 @@ impl Reporter { /// let mut context = TracingContext::default("service", "instance"); /// reporter.sender().send(context).await?; /// reporter.shutdown().await?; + /// Ok(()) /// } /// ``` pub async fn start(address: impl Into) -> Self { diff --git a/tests/e2e/Cargo.lock b/tests/e2e/Cargo.lock index 197c8a4..baccba5 100644 --- a/tests/e2e/Cargo.lock +++ b/tests/e2e/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "ansi_term" version = "0.12.1" @@ -671,7 +673,7 @@ dependencies = [ [[package]] name = "skywalking_rust" -version = "0.0.1" +version = "0.1.0" dependencies = [ "async-stream", "base64", diff --git a/tests/e2e/Cargo.toml b/tests/e2e/Cargo.toml index 6a5cf5b..0e3c003 100644 --- a/tests/e2e/Cargo.toml +++ b/tests/e2e/Cargo.toml @@ -2,12 +2,10 @@ name = "e2e" version = "0.1.0" authors = ["Shikugawa "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +edition = "2021" [dependencies] -skywalking_rust = { path = "../../" } +skywalking_rust = { path = "../.." } hyper = { version = "0.14", features = ["full"] } tokio = { version = "1", features = ["full"] } structopt = "0.3" diff --git a/tests/e2e/docker/Dockerfile b/tests/e2e/docker/Dockerfile index 2ec7258..a11bb48 100644 --- a/tests/e2e/docker/Dockerfile +++ b/tests/e2e/docker/Dockerfile @@ -1,5 +1,8 @@ -FROM rust:1.50.0 -RUN apt update && apt install -y protobuf-compiler -RUN rustup component add rustfmt -COPY . /tmp -WORKDIR tmp/tests/e2e +FROM rust:1.57 as build +WORKDIR /build +COPY . /build/ +RUN cd tests/e2e && cargo build --release --locked + +FROM gcr.io/distroless/cc +COPY --from=build /build/tests/e2e/target/release/e2e /bin/ +ENTRYPOINT ["/bin/e2e"] diff --git a/tests/e2e/docker/Dockerfile.tool b/tests/e2e/docker/Dockerfile.tool deleted file mode 100644 index 74ef01c..0000000 --- a/tests/e2e/docker/Dockerfile.tool +++ /dev/null @@ -1,15 +0,0 @@ -FROM openjdk:8 -WORKDIR /tests -ARG COMMIT_HASH=8db606f3470cce75c1b013ae498ac93b862b75b7 -ADD https://github.com/apache/skywalking-agent-test-tool/archive/${COMMIT_HASH}.tar.gz . -RUN tar -xf ${COMMIT_HASH}.tar.gz --strip 1 -RUN rm ${COMMIT_HASH}.tar.gz -RUN ./mvnw -B -DskipTests package - -FROM openjdk:8 -EXPOSE 19876 12800 -WORKDIR /tests -COPY --from=0 /tests/dist/skywalking-mock-collector.tar.gz /tests -RUN tar -xf skywalking-mock-collector.tar.gz --strip 1 -RUN chmod +x bin/collector-startup.sh -ENTRYPOINT bin/collector-startup.sh diff --git a/tests/e2e/rust-toolchain.toml b/tests/e2e/rust-toolchain.toml new file mode 100644 index 0000000..98d86f6 --- /dev/null +++ b/tests/e2e/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.57.0" +components = ["rustfmt", "clippy"] diff --git a/tests/e2e/src/main.rs b/tests/e2e/src/main.rs index b44de3d..2a8ec9b 100644 --- a/tests/e2e/src/main.rs +++ b/tests/e2e/src/main.rs @@ -7,6 +7,7 @@ use skywalking_rust::context::propagation::encoder::encode_propagation; use skywalking_rust::context::trace_context::TracingContext; use skywalking_rust::reporter::grpc::Reporter; use std::convert::Infallible; +use std::error::Error; use std::net::SocketAddr; use structopt::StructOpt; use tokio::sync::mpsc; @@ -66,7 +67,7 @@ async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender) { }); let addr = SocketAddr::from((host, 8081)); let server = Server::bind(&addr).serve(make_svc); - + println!("starting producer on {:?}...", &addr); if let Err(e) = server.await { eprintln!("server error: {}", e); } @@ -110,6 +111,7 @@ async fn run_consumer_service(host: [u8; 4], tx: mpsc::Sender) { let addr = SocketAddr::from((host, 8082)); let server = Server::bind(&addr).serve(make_svc); + println!("starting consumer on {:?}...", &addr); if let Err(e) = server.await { eprintln!("server error: {}", e); } @@ -123,7 +125,7 @@ struct Opt { } #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { let opt = Opt::from_args(); let reporter = Reporter::start("http://collector:19876").await; let tx = reporter.sender(); @@ -134,5 +136,6 @@ async fn main() { run_producer_service([0, 0, 0, 0], tx).await; } - reporter.shutdown().await.unwarp(); + reporter.shutdown().await?; + Ok(()) } From 86241d4abc8e692b17e9e4073e497b2f824fd231 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 13 Jan 2022 23:02:36 +0800 Subject: [PATCH 06/10] add healthcheck Signed-off-by: tison --- docker-compose.e2e.yml | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index c3b9fb4..95eb17e 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -1,10 +1,15 @@ -version: "3.7" +version: "3.9" services: collector: image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:5acb890f225ca37ee60675ce3e330545e23e3cbc ports: - "19876:19876" - "12800:12800" + healthcheck: + test: ["CMD", "curl", "http://0.0.0.0:12800/healthCheck"] + interval: 10s + timeout: 5s + retries: 5 consumer: build: @@ -14,7 +19,8 @@ services: - 8082 command: --mode consumer depends_on: - - collector + collector: + condition: service_healthy producer: build: @@ -24,5 +30,7 @@ services: - "8081:8081" command: --mode producer depends_on: - - collector - - consumer \ No newline at end of file + collector: + condition: service_healthy + consumer: + condition: service_started From a62a35e2f01a752a6f723da57bb7fbfc310efc56 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 13 Jan 2022 23:17:43 +0800 Subject: [PATCH 07/10] e2e main supports healthCheck Signed-off-by: tison --- docker-compose.e2e.yml | 7 ++++++- tests/e2e/src/main.rs | 9 +++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index 95eb17e..98eab9b 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -21,6 +21,11 @@ services: depends_on: collector: condition: service_healthy + healthcheck: + test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ] + interval: 10s + timeout: 5s + retries: 5 producer: build: @@ -33,4 +38,4 @@ services: collector: condition: service_healthy consumer: - condition: service_started + condition: service_healthy diff --git a/tests/e2e/src/main.rs b/tests/e2e/src/main.rs index 2a8ec9b..6d30358 100644 --- a/tests/e2e/src/main.rs +++ b/tests/e2e/src/main.rs @@ -13,6 +13,7 @@ use structopt::StructOpt; use tokio::sync::mpsc; static NOT_FOUND_MSG: &str = "not found"; +static SUCCESS_MSG: &str = "Success"; async fn handle_ping( _req: Request, @@ -46,6 +47,10 @@ async fn producer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/ping") => handle_ping(_req, client, tx).await, + (&Method::GET, "/healthCheck") => Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(SUCCESS_MSG)) + .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG)) @@ -96,6 +101,10 @@ async fn consumer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/pong") => handle_pong(_req, tx).await, + (&Method::GET, "/healthCheck") => Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(SUCCESS_MSG)) + .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG)) From 7bcd3a43247b8289dffbb2990c581dfbde8c5369 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 13 Jan 2022 23:25:58 +0800 Subject: [PATCH 08/10] reconcil docker-compose file Signed-off-by: tison --- docker-compose.e2e.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index 98eab9b..cb4de8a 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -6,7 +6,7 @@ services: - "19876:19876" - "12800:12800" healthcheck: - test: ["CMD", "curl", "http://0.0.0.0:12800/healthCheck"] + test: [ "CMD", "curl", "http://0.0.0.0:12800/healthCheck" ] interval: 10s timeout: 5s retries: 5 @@ -15,8 +15,8 @@ services: build: context: . dockerfile: ./tests/e2e/docker/Dockerfile - expose: - - 8082 + ports: + - "8082:8082" command: --mode consumer depends_on: collector: From bbda7cfc539dfecf71254f2db0ad16fe51caa1c7 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 13 Jan 2022 23:51:27 +0800 Subject: [PATCH 09/10] revrt e2e main healthCheck Adding healthCheck causes hang without explicit reason. Given that the consumer started quickly, I tend to revert the healthCheck for now. Signed-off-by: tison --- docker-compose.e2e.yml | 11 +++-------- tests/e2e/src/main.rs | 9 --------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index cb4de8a..e155ebc 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -15,17 +15,12 @@ services: build: context: . dockerfile: ./tests/e2e/docker/Dockerfile - ports: - - "8082:8082" + expose: + - 8082 command: --mode consumer depends_on: collector: condition: service_healthy - healthcheck: - test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ] - interval: 10s - timeout: 5s - retries: 5 producer: build: @@ -38,4 +33,4 @@ services: collector: condition: service_healthy consumer: - condition: service_healthy + condition: service_started diff --git a/tests/e2e/src/main.rs b/tests/e2e/src/main.rs index 6d30358..2a8ec9b 100644 --- a/tests/e2e/src/main.rs +++ b/tests/e2e/src/main.rs @@ -13,7 +13,6 @@ use structopt::StructOpt; use tokio::sync::mpsc; static NOT_FOUND_MSG: &str = "not found"; -static SUCCESS_MSG: &str = "Success"; async fn handle_ping( _req: Request, @@ -47,10 +46,6 @@ async fn producer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/ping") => handle_ping(_req, client, tx).await, - (&Method::GET, "/healthCheck") => Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(SUCCESS_MSG)) - .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG)) @@ -101,10 +96,6 @@ async fn consumer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/pong") => handle_pong(_req, tx).await, - (&Method::GET, "/healthCheck") => Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(SUCCESS_MSG)) - .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG)) From b21e2f465f6808d66773f5fc51cdf586b20da119 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 14 Jan 2022 00:23:26 +0800 Subject: [PATCH 10/10] e2e main healthCheck using image with curl Signed-off-by: tison --- docker-compose.e2e.yml | 9 ++++++--- tests/e2e/docker/Dockerfile | 5 +---- tests/e2e/src/main.rs | 9 +++++++++ 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index e155ebc..875fbaf 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -7,9 +7,8 @@ services: - "12800:12800" healthcheck: test: [ "CMD", "curl", "http://0.0.0.0:12800/healthCheck" ] - interval: 10s + interval: 5s timeout: 5s - retries: 5 consumer: build: @@ -21,6 +20,10 @@ services: depends_on: collector: condition: service_healthy + healthcheck: + test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ] + interval: 5s + timeout: 5s producer: build: @@ -33,4 +36,4 @@ services: collector: condition: service_healthy consumer: - condition: service_started + condition: service_healthy diff --git a/tests/e2e/docker/Dockerfile b/tests/e2e/docker/Dockerfile index a11bb48..ebce92c 100644 --- a/tests/e2e/docker/Dockerfile +++ b/tests/e2e/docker/Dockerfile @@ -2,7 +2,4 @@ FROM rust:1.57 as build WORKDIR /build COPY . /build/ RUN cd tests/e2e && cargo build --release --locked - -FROM gcr.io/distroless/cc -COPY --from=build /build/tests/e2e/target/release/e2e /bin/ -ENTRYPOINT ["/bin/e2e"] +ENTRYPOINT ["/build/tests/e2e/target/release/e2e"] diff --git a/tests/e2e/src/main.rs b/tests/e2e/src/main.rs index 2a8ec9b..6d30358 100644 --- a/tests/e2e/src/main.rs +++ b/tests/e2e/src/main.rs @@ -13,6 +13,7 @@ use structopt::StructOpt; use tokio::sync::mpsc; static NOT_FOUND_MSG: &str = "not found"; +static SUCCESS_MSG: &str = "Success"; async fn handle_ping( _req: Request, @@ -46,6 +47,10 @@ async fn producer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/ping") => handle_ping(_req, client, tx).await, + (&Method::GET, "/healthCheck") => Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(SUCCESS_MSG)) + .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG)) @@ -96,6 +101,10 @@ async fn consumer_response( ) -> Result, Infallible> { match (_req.method(), _req.uri().path()) { (&Method::GET, "/pong") => handle_pong(_req, tx).await, + (&Method::GET, "/healthCheck") => Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(SUCCESS_MSG)) + .unwrap()), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(NOT_FOUND_MSG))