Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/http-service/src/executor/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ mod tests {
use http_body_util::Empty;
use key_value_store::ReadStats;
use runtime::app::{KvStoreOption, SecretOption, Status};
use runtime::logger::{Logger, NullAppender};
use runtime::logger::Logger;
use runtime::service::ServiceBuilder;
use runtime::util::stats::CdnPhase;
use runtime::{
Expand Down Expand Up @@ -249,7 +249,7 @@ mod tests {
type BackendConnector = FastEdgeConnector;

fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger {
Logger::new(NullAppender)
Logger::new()
}

fn backend(&self) -> Backend<FastEdgeConnector> {
Expand Down
155 changes: 144 additions & 11 deletions crates/runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,173 @@ use wasmtime_wasi_io::streams::StreamResult;
#[derive(Clone)]
pub struct Logger {
properties: HashMap<String, String>,
appender: Arc<dyn AppenderBuilder + Send + Sync>,
appenders: Vec<Arc<dyn AppenderBuilder + Send + Sync>>,
}

pub trait AppenderBuilder {
fn build(&self, properties: HashMap<String, String>) -> Box<dyn AsyncWrite + Send + Sync>;
}

/// Fans out writes sequentially to multiple [`AsyncWrite`] sinks.
struct MultiWriter {
writers: Vec<Box<dyn AsyncWrite + Send + Sync>>,
write_current: usize,
write_n: usize,
flush_current: usize,
shutdown_current: usize,
}

impl MultiWriter {
fn new(writers: Vec<Box<dyn AsyncWrite + Send + Sync>>) -> Self {
Self {
writers,
write_current: 0,
write_n: 0,
flush_current: 0,
shutdown_current: 0,
}
}
}

impl AsyncWrite for MultiWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();

if this.writers.is_empty() {
return Poll::Ready(Ok(buf.len()));
}

// First writer determines how many bytes are accepted.
if this.write_current == 0 {
// SAFETY: writers are heap-allocated (Box) and won't move.
match unsafe { Pin::new_unchecked(&mut *this.writers[0]) }.poll_write(cx, buf) {
Poll::Ready(Ok(n)) => {
this.write_n = n;
this.write_current = 1;
}
Poll::Ready(Err(e)) => {
tracing::warn!(cause=?e, "MultiWriter: appender 0 write error");
this.write_n = buf.len();
this.write_current = 1;
}
Poll::Pending => return Poll::Pending,
}
}

// Remaining writers receive exactly write_n bytes.
while this.write_current < this.writers.len() {
let idx = this.write_current;
let n = this.write_n;
// SAFETY: writers are heap-allocated (Box) and won't move.
match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_write(cx, &buf[..n]) {
Poll::Ready(Ok(_)) => this.write_current += 1,
Poll::Ready(Err(e)) => {
tracing::warn!(cause=?e, "MultiWriter: appender {idx} write error");
this.write_current += 1;
}
Poll::Pending => return Poll::Pending,
}
}

let n = this.write_n;
this.write_current = 0;
Poll::Ready(Ok(n))
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();

while this.flush_current < this.writers.len() {
let idx = this.flush_current;
// SAFETY: writers are heap-allocated (Box) and won't move.
match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_flush(cx) {
Poll::Ready(_) => this.flush_current += 1,
Poll::Pending => return Poll::Pending,
}
}

this.flush_current = 0;
Poll::Ready(Ok(()))
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();

while this.shutdown_current < this.writers.len() {
let idx = this.shutdown_current;
// SAFETY: writers are heap-allocated (Box) and won't move.
match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_shutdown(cx) {
Poll::Ready(_) => this.shutdown_current += 1,
Poll::Pending => return Poll::Pending,
}
}

this.shutdown_current = 0;
Poll::Ready(Ok(()))
}
}

impl IsTerminal for Logger {
fn is_terminal(&self) -> bool {
false
}
}

#[async_trait]
impl StdoutStream for Logger {
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
self.appender.build(self.properties.clone())
let writers = self
.appenders
.iter()
.map(|a| a.build(self.properties.clone()))
.collect();
Box::new(MultiWriter::new(writers))
}
}

impl Logger {
pub fn new<S: AppenderBuilder + Sync + Send + 'static>(sink: S) -> Self {
pub fn new() -> Self {
Self {
properties: Default::default(),
appender: Arc::new(sink),
appenders: vec![],
}
}

/// Builder-style method to attach an additional appender.
pub fn with_appender<S: AppenderBuilder + Sync + Send + 'static>(mut self, sink: S) -> Self {
self.appenders.push(Arc::new(sink));
self
}

/// Attach an additional appender to an existing logger.
pub fn add_appender<S: AppenderBuilder + Sync + Send + 'static>(&mut self, sink: S) {
self.appenders.push(Arc::new(sink));
}

pub async fn write_msg(&self, msg: String) {
if let Err(error) = Box::into_pin(self.async_stream())
.write_all(msg.as_bytes())
.await
{
tracing::warn!(cause=?error, "write_msg");
let bytes = msg.as_bytes();
for appender in &self.appenders {
if let Err(error) = Box::into_pin(appender.build(self.properties.clone()))
.write_all(bytes)
.await
{
tracing::warn!(cause=?error, "write_msg");
}
}
}
}

impl Default for Logger {
fn default() -> Self {
Self {
properties: Default::default(),
appenders: vec![Arc::new(NullAppender)],
}
}
}
Expand All @@ -61,7 +194,7 @@ impl Extend<(String, String)> for Logger {
}
}

pub struct NullAppender;
struct NullAppender;

impl AppenderBuilder for NullAppender {
fn build(&self, _fields: HashMap<String, String>) -> Box<dyn AsyncWrite + Send + Sync> {
Expand Down
2 changes: 1 addition & 1 deletion release.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pre-release-hook = ["git", "cliff", "-o", "CHANGELOG.md", "--tag", "{{version}}" ]
allow-branch = ["releases/**"]
#allow-branch = ["releases/**"]
publish = false
3 changes: 2 additions & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ impl PreCompiledLoader<u64> for Context {
impl ContextT for Context {
type BackendConnector = HttpsConnector<HttpConnector>;
fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger {
Logger::new(Console::default())
let logger = Logger::new();
logger.with_appender(Console::default())
}

fn backend(&self) -> Backend<HttpsConnector<HttpConnector>> {
Expand Down
Loading