From 6eb9166cdee75114a5ce5b0d7da48ffcc2df992a Mon Sep 17 00:00:00 2001 From: Sergey Yedrikov Date: Tue, 24 Oct 2023 22:17:03 -0400 Subject: [PATCH] LOG-4739: Enhance Vector kubernetes_logs to support include_paths_glob_patterns --- .../kubernetes_logs/k8s_paths_provider.rs | 81 +++++++++++++++++-- src/sources/kubernetes_logs/mod.rs | 37 +++++++-- 2 files changed, 103 insertions(+), 15 deletions(-) diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index 83ccd2955b5ae..92047e3f21a11 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -16,6 +16,7 @@ use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum; pub struct K8sPathsProvider { pod_state: Store, namespace_state: Store, + include_paths: Vec, exclude_paths: Vec, } @@ -24,11 +25,13 @@ impl K8sPathsProvider { pub fn new( pod_state: Store, namespace_state: Store, + include_paths: Vec, exclude_paths: Vec, ) -> Self { Self { pod_state, namespace_state, + include_paths, exclude_paths, } } @@ -57,7 +60,11 @@ impl PathsProvider for K8sPathsProvider { .flat_map(|pod| { trace!(message = "Providing log paths for pod.", pod = ?pod.metadata.name); let paths_iter = list_pod_log_paths(real_glob, pod.as_ref()); - exclude_paths(paths_iter, &self.exclude_paths).collect::>() + filter_paths( + filter_paths(paths_iter, &self.include_paths, true), + &self.exclude_paths, + false + ).collect::>() }) .collect() } @@ -159,7 +166,7 @@ where build_container_exclusion_patterns(dir, excluded_containers).collect(); // Return paths filtered with container exclusion. - exclude_paths(path_iter, exclusion_patterns) + filter_paths(path_iter, exclusion_patterns, false) }) } @@ -175,12 +182,13 @@ fn real_glob(pattern: &str) -> impl Iterator { .flat_map(|paths| paths.into_iter()) } -fn exclude_paths<'a>( +fn filter_paths<'a>( iter: impl Iterator + 'a, patterns: impl AsRef<[glob::Pattern]> + 'a, + include: bool ) -> impl Iterator + 'a { iter.filter(move |path| { - !patterns.as_ref().iter().any(|pattern| { + let m = patterns.as_ref().iter().any(|pattern| { pattern.matches_path_with( path, glob::MatchOptions { @@ -188,7 +196,8 @@ fn exclude_paths<'a>( ..Default::default() }, ) - }) + }); + if include { m } else { !m } }) } @@ -199,7 +208,7 @@ mod tests { use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta}; use super::{ - build_container_exclusion_patterns, exclude_paths, extract_excluded_containers_for_pod, + build_container_exclusion_patterns, filter_paths, extract_excluded_containers_for_pod, extract_pod_logs_directory, list_pod_log_paths, }; @@ -508,7 +517,65 @@ mod tests { .map(|pattern| glob::Pattern::new(pattern).unwrap()) .collect(); let actual_paths: Vec<_> = - exclude_paths(input_paths.into_iter().map(Into::into), &patterns).collect(); + filter_paths(input_paths.into_iter().map(Into::into), &patterns, false).collect(); + let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect(); + assert_eq!( + actual_paths, expected_paths, + "failed for patterns {:?}", + &str_patterns + ) + } + } + + #[test] + fn test_include_paths() { + let cases = vec![ + ( + vec![ + "/var/log/pods/a.log", + "/var/log/pods/b.log", + "/var/log/pods/c.log.foo", + "/var/log/pods/d.logbar", + "/tmp/foo" + ], + vec!["/var/log/pods/*"], + vec![ + "/var/log/pods/a.log", + "/var/log/pods/b.log", + "/var/log/pods/c.log.foo", + "/var/log/pods/d.logbar", + ], + ), + ( + vec![ + "/var/log/pods/a.log", + "/var/log/pods/b.log", + "/var/log/pods/c.log.foo", + "/var/log/pods/d.logbar", + ], + vec!["/tmp/*"], + vec![], + ), + ( + vec![ + "/var/log/pods/a.log", + "/tmp/foo", + ], + vec!["**/*"], + vec![ + "/var/log/pods/a.log", + "/tmp/foo", + ], + ), + ]; + + for (input_paths, str_patterns, expected_paths) in cases { + let patterns: Vec<_> = str_patterns + .iter() + .map(|pattern| glob::Pattern::new(pattern).unwrap()) + .collect(); + let actual_paths: Vec<_> = + filter_paths(input_paths.into_iter().map(Into::into), &patterns, true).collect(); let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect(); assert_eq!( actual_paths, expected_paths, diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index c157e76e2d7ed..3c77c8b6db0ce 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -148,6 +148,10 @@ pub struct Config { #[configurable(derived)] node_annotation_fields: node_metadata_annotator::FieldsSpec, + /// A list of glob patterns to include while reading the files. + #[configurable(metadata(docs::examples = "**/include/**"))] + include_paths_glob_patterns: Vec, + /// A list of glob patterns to exclude from reading the files. #[configurable(metadata(docs::examples = "**/exclude/**"))] exclude_paths_glob_patterns: Vec, @@ -270,6 +274,7 @@ impl Default for Config { pod_annotation_fields: pod_metadata_annotator::FieldsSpec::default(), namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec::default(), node_annotation_fields: node_metadata_annotator::FieldsSpec::default(), + include_paths_glob_patterns: default_path_inclusion(), exclude_paths_glob_patterns: default_path_exclusion(), read_from: default_read_from(), ignore_older_secs: None, @@ -524,6 +529,7 @@ struct Source { namespace_label_selector: String, node_selector: String, self_node_name: String, + include_paths: Vec, exclude_paths: Vec, read_from: ReadFrom, ignore_older_secs: Option, @@ -580,6 +586,8 @@ impl Source { let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?; + let include_paths = prepare_include_paths(config)?; + let exclude_paths = prepare_exclude_paths(config)?; let glob_minimum_cooldown = config.glob_minimum_cooldown_ms; @@ -603,6 +611,7 @@ impl Source { namespace_label_selector, node_selector, self_node_name, + include_paths, exclude_paths, read_from: ReadFrom::from(config.read_from), ignore_older_secs: config.ignore_older_secs, @@ -636,6 +645,7 @@ impl Source { namespace_label_selector, node_selector, self_node_name, + include_paths, exclude_paths, read_from, ignore_older_secs, @@ -728,7 +738,7 @@ impl Source { ))); let paths_provider = - K8sPathsProvider::new(pod_state.clone(), ns_state.clone(), exclude_paths); + K8sPathsProvider::new(pod_state.clone(), ns_state.clone(), include_paths, exclude_paths); let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace); let ns_annotator = NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace); @@ -954,6 +964,10 @@ fn default_self_node_name_env_template() -> String { format!("${{{}}}", SELF_NODE_NAME_ENV_KEY.to_owned()) } +fn default_path_inclusion() -> Vec { + vec![PathBuf::from("**/*")] +} + fn default_path_exclusion() -> Vec { vec![PathBuf::from("**/*.gz"), PathBuf::from("**/*.tmp")] } @@ -997,11 +1011,18 @@ const fn default_rotate_wait_ms() -> Duration { Duration::from_millis(u64::MAX/1000) } -// This function constructs the patterns we exclude from file watching, created -// from the defaults or user provided configuration. +fn prepare_include_paths(config: &Config) -> crate::Result> { + prepare_glob_patterns(&config.include_paths_glob_patterns, "Including") +} + fn prepare_exclude_paths(config: &Config) -> crate::Result> { - let exclude_paths = config - .exclude_paths_glob_patterns + prepare_glob_patterns(&config.exclude_paths_glob_patterns, "Excluding") +} + +// This function constructs the patterns for file watching, created +// from the defaults or user provided configuration. +fn prepare_glob_patterns(paths: &Vec, op: &str) -> crate::Result> { + let ret = paths .iter() .map(|pattern| { let pattern = pattern @@ -1012,14 +1033,14 @@ fn prepare_exclude_paths(config: &Config) -> crate::Result> { .collect::>>()?; info!( - message = "Excluding matching files.", - exclude_paths = ?exclude_paths + message = format!("{op} matching files."), + ret = ?ret .iter() .map(glob::Pattern::as_str) .collect::>() ); - Ok(exclude_paths) + Ok(ret) } // This function constructs the effective field selector to use, based on