diff --git a/crates/http-service/src/executor/http.rs b/crates/http-service/src/executor/http.rs index 16e4f36..4f91c47 100644 --- a/crates/http-service/src/executor/http.rs +++ b/crates/http-service/src/executor/http.rs @@ -187,7 +187,7 @@ mod tests { use http_body_util::Empty; use key_value_store::ReadStats; use runtime::app::{KvStoreOption, SecretOption, Status}; - use runtime::logger::{Logger, NullAppender}; + use runtime::logger::Logger; use runtime::service::ServiceBuilder; use runtime::util::stats::CdnPhase; use runtime::{ @@ -249,7 +249,7 @@ mod tests { type BackendConnector = FastEdgeConnector; fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger { - Logger::new(NullAppender) + Logger::new() } fn backend(&self) -> Backend { diff --git a/crates/runtime/src/logger.rs b/crates/runtime/src/logger.rs index 959b647..a2dc128 100644 --- a/crates/runtime/src/logger.rs +++ b/crates/runtime/src/logger.rs @@ -17,40 +17,173 @@ use wasmtime_wasi_io::streams::StreamResult; #[derive(Clone)] pub struct Logger { properties: HashMap, - appender: Arc, + appenders: Vec>, } pub trait AppenderBuilder { fn build(&self, properties: HashMap) -> Box; } +/// Fans out writes sequentially to multiple [`AsyncWrite`] sinks. +struct MultiWriter { + writers: Vec>, + write_current: usize, + write_n: usize, + flush_current: usize, + shutdown_current: usize, +} + +impl MultiWriter { + fn new(writers: Vec>) -> Self { + Self { + writers, + write_current: 0, + write_n: 0, + flush_current: 0, + shutdown_current: 0, + } + } +} + +impl AsyncWrite for MultiWriter { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + + if this.writers.is_empty() { + return Poll::Ready(Ok(buf.len())); + } + + // First writer determines how many bytes are accepted. + if this.write_current == 0 { + // SAFETY: writers are heap-allocated (Box) and won't move. + match unsafe { Pin::new_unchecked(&mut *this.writers[0]) }.poll_write(cx, buf) { + Poll::Ready(Ok(n)) => { + this.write_n = n; + this.write_current = 1; + } + Poll::Ready(Err(e)) => { + tracing::warn!(cause=?e, "MultiWriter: appender 0 write error"); + this.write_n = buf.len(); + this.write_current = 1; + } + Poll::Pending => return Poll::Pending, + } + } + + // Remaining writers receive exactly write_n bytes. + while this.write_current < this.writers.len() { + let idx = this.write_current; + let n = this.write_n; + // SAFETY: writers are heap-allocated (Box) and won't move. + match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_write(cx, &buf[..n]) { + Poll::Ready(Ok(_)) => this.write_current += 1, + Poll::Ready(Err(e)) => { + tracing::warn!(cause=?e, "MultiWriter: appender {idx} write error"); + this.write_current += 1; + } + Poll::Pending => return Poll::Pending, + } + } + + let n = this.write_n; + this.write_current = 0; + Poll::Ready(Ok(n)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + while this.flush_current < this.writers.len() { + let idx = this.flush_current; + // SAFETY: writers are heap-allocated (Box) and won't move. + match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_flush(cx) { + Poll::Ready(_) => this.flush_current += 1, + Poll::Pending => return Poll::Pending, + } + } + + this.flush_current = 0; + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + while this.shutdown_current < this.writers.len() { + let idx = this.shutdown_current; + // SAFETY: writers are heap-allocated (Box) and won't move. + match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_shutdown(cx) { + Poll::Ready(_) => this.shutdown_current += 1, + Poll::Pending => return Poll::Pending, + } + } + + this.shutdown_current = 0; + Poll::Ready(Ok(())) + } +} + impl IsTerminal for Logger { fn is_terminal(&self) -> bool { false } } -#[async_trait] impl StdoutStream for Logger { fn async_stream(&self) -> Box { - self.appender.build(self.properties.clone()) + let writers = self + .appenders + .iter() + .map(|a| a.build(self.properties.clone())) + .collect(); + Box::new(MultiWriter::new(writers)) } } impl Logger { - pub fn new(sink: S) -> Self { + pub fn new() -> Self { Self { properties: Default::default(), - appender: Arc::new(sink), + appenders: vec![], } } + /// Builder-style method to attach an additional appender. + pub fn with_appender(mut self, sink: S) -> Self { + self.appenders.push(Arc::new(sink)); + self + } + + /// Attach an additional appender to an existing logger. + pub fn add_appender(&mut self, sink: S) { + self.appenders.push(Arc::new(sink)); + } + pub async fn write_msg(&self, msg: String) { - if let Err(error) = Box::into_pin(self.async_stream()) - .write_all(msg.as_bytes()) - .await - { - tracing::warn!(cause=?error, "write_msg"); + let bytes = msg.as_bytes(); + for appender in &self.appenders { + if let Err(error) = Box::into_pin(appender.build(self.properties.clone())) + .write_all(bytes) + .await + { + tracing::warn!(cause=?error, "write_msg"); + } + } + } +} + +impl Default for Logger { + fn default() -> Self { + Self { + properties: Default::default(), + appenders: vec![Arc::new(NullAppender)], } } } @@ -61,7 +194,7 @@ impl Extend<(String, String)> for Logger { } } -pub struct NullAppender; +struct NullAppender; impl AppenderBuilder for NullAppender { fn build(&self, _fields: HashMap) -> Box { diff --git a/release.toml b/release.toml index b4f35ef..0e1acf7 100644 --- a/release.toml +++ b/release.toml @@ -1,3 +1,3 @@ pre-release-hook = ["git", "cliff", "-o", "CHANGELOG.md", "--tag", "{{version}}" ] -allow-branch = ["releases/**"] +#allow-branch = ["releases/**"] publish = false diff --git a/src/context.rs b/src/context.rs index a5fb5aa..0842c4e 100644 --- a/src/context.rs +++ b/src/context.rs @@ -51,7 +51,8 @@ impl PreCompiledLoader for Context { impl ContextT for Context { type BackendConnector = HttpsConnector; fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger { - Logger::new(Console::default()) + let logger = Logger::new(); + logger.with_appender(Console::default()) } fn backend(&self) -> Backend> {