Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 74 additions & 7 deletions src/sources/kubernetes_logs/k8s_paths_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum;
pub struct K8sPathsProvider {
pod_state: Store<Pod>,
namespace_state: Store<Namespace>,
include_paths: Vec<glob::Pattern>,
exclude_paths: Vec<glob::Pattern>,
}

Expand All @@ -24,11 +25,13 @@ impl K8sPathsProvider {
pub fn new(
pod_state: Store<Pod>,
namespace_state: Store<Namespace>,
include_paths: Vec<glob::Pattern>,
exclude_paths: Vec<glob::Pattern>,
) -> Self {
Self {
pod_state,
namespace_state,
include_paths,
exclude_paths,
}
}
Expand Down Expand Up @@ -57,7 +60,11 @@ impl PathsProvider for K8sPathsProvider {
.flat_map(|pod| {
trace!(message = "Providing log paths for pod.", pod = ?pod.metadata.name);
let paths_iter = list_pod_log_paths(real_glob, pod.as_ref());
exclude_paths(paths_iter, &self.exclude_paths).collect::<Vec<_>>()
filter_paths(
filter_paths(paths_iter, &self.include_paths, true),
&self.exclude_paths,
false
).collect::<Vec<_>>()
})
.collect()
}
Expand Down Expand Up @@ -159,7 +166,7 @@ where
build_container_exclusion_patterns(dir, excluded_containers).collect();

// Return paths filtered with container exclusion.
exclude_paths(path_iter, exclusion_patterns)
filter_paths(path_iter, exclusion_patterns, false)
})
}

Expand All @@ -175,20 +182,22 @@ fn real_glob(pattern: &str) -> impl Iterator<Item = PathBuf> {
.flat_map(|paths| paths.into_iter())
}

fn exclude_paths<'a>(
fn filter_paths<'a>(
iter: impl Iterator<Item = PathBuf> + 'a,
patterns: impl AsRef<[glob::Pattern]> + 'a,
include: bool
) -> impl Iterator<Item = PathBuf> + 'a {
iter.filter(move |path| {
!patterns.as_ref().iter().any(|pattern| {
let m = patterns.as_ref().iter().any(|pattern| {
pattern.matches_path_with(
path,
glob::MatchOptions {
require_literal_separator: true,
..Default::default()
},
)
})
});
if include { m } else { !m }
})
}

Expand All @@ -199,7 +208,7 @@ mod tests {
use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta};

use super::{
build_container_exclusion_patterns, exclude_paths, extract_excluded_containers_for_pod,
build_container_exclusion_patterns, filter_paths, extract_excluded_containers_for_pod,
extract_pod_logs_directory, list_pod_log_paths,
};

Expand Down Expand Up @@ -508,7 +517,65 @@ mod tests {
.map(|pattern| glob::Pattern::new(pattern).unwrap())
.collect();
let actual_paths: Vec<_> =
exclude_paths(input_paths.into_iter().map(Into::into), &patterns).collect();
filter_paths(input_paths.into_iter().map(Into::into), &patterns, false).collect();
let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
assert_eq!(
actual_paths, expected_paths,
"failed for patterns {:?}",
&str_patterns
)
}
}

#[test]
fn test_include_paths() {
let cases = vec![
(
vec![
"/var/log/pods/a.log",
"/var/log/pods/b.log",
"/var/log/pods/c.log.foo",
"/var/log/pods/d.logbar",
"/tmp/foo"
],
vec!["/var/log/pods/*"],
vec![
"/var/log/pods/a.log",
"/var/log/pods/b.log",
"/var/log/pods/c.log.foo",
"/var/log/pods/d.logbar",
],
),
(
vec![
"/var/log/pods/a.log",
"/var/log/pods/b.log",
"/var/log/pods/c.log.foo",
"/var/log/pods/d.logbar",
],
vec!["/tmp/*"],
vec![],
),
(
vec![
"/var/log/pods/a.log",
"/tmp/foo",
],
vec!["**/*"],
vec![
"/var/log/pods/a.log",
"/tmp/foo",
],
),
];

for (input_paths, str_patterns, expected_paths) in cases {
let patterns: Vec<_> = str_patterns
.iter()
.map(|pattern| glob::Pattern::new(pattern).unwrap())
.collect();
let actual_paths: Vec<_> =
filter_paths(input_paths.into_iter().map(Into::into), &patterns, true).collect();
let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
assert_eq!(
actual_paths, expected_paths,
Expand Down
37 changes: 29 additions & 8 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ pub struct Config {
#[configurable(derived)]
node_annotation_fields: node_metadata_annotator::FieldsSpec,

/// A list of glob patterns to include while reading the files.
#[configurable(metadata(docs::examples = "**/include/**"))]
include_paths_glob_patterns: Vec<PathBuf>,

/// A list of glob patterns to exclude from reading the files.
#[configurable(metadata(docs::examples = "**/exclude/**"))]
exclude_paths_glob_patterns: Vec<PathBuf>,
Expand Down Expand Up @@ -270,6 +274,7 @@ impl Default for Config {
pod_annotation_fields: pod_metadata_annotator::FieldsSpec::default(),
namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec::default(),
node_annotation_fields: node_metadata_annotator::FieldsSpec::default(),
include_paths_glob_patterns: default_path_inclusion(),
exclude_paths_glob_patterns: default_path_exclusion(),
read_from: default_read_from(),
ignore_older_secs: None,
Expand Down Expand Up @@ -524,6 +529,7 @@ struct Source {
namespace_label_selector: String,
node_selector: String,
self_node_name: String,
include_paths: Vec<glob::Pattern>,
exclude_paths: Vec<glob::Pattern>,
read_from: ReadFrom,
ignore_older_secs: Option<u64>,
Expand Down Expand Up @@ -580,6 +586,8 @@ impl Source {

let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;

let include_paths = prepare_include_paths(config)?;

let exclude_paths = prepare_exclude_paths(config)?;

let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
Expand All @@ -603,6 +611,7 @@ impl Source {
namespace_label_selector,
node_selector,
self_node_name,
include_paths,
exclude_paths,
read_from: ReadFrom::from(config.read_from),
ignore_older_secs: config.ignore_older_secs,
Expand Down Expand Up @@ -636,6 +645,7 @@ impl Source {
namespace_label_selector,
node_selector,
self_node_name,
include_paths,
exclude_paths,
read_from,
ignore_older_secs,
Expand Down Expand Up @@ -728,7 +738,7 @@ impl Source {
)));

let paths_provider =
K8sPathsProvider::new(pod_state.clone(), ns_state.clone(), exclude_paths);
K8sPathsProvider::new(pod_state.clone(), ns_state.clone(), include_paths, exclude_paths);
let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace);
let ns_annotator =
NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace);
Expand Down Expand Up @@ -954,6 +964,10 @@ fn default_self_node_name_env_template() -> String {
format!("${{{}}}", SELF_NODE_NAME_ENV_KEY.to_owned())
}

fn default_path_inclusion() -> Vec<PathBuf> {
vec![PathBuf::from("**/*")]
}

fn default_path_exclusion() -> Vec<PathBuf> {
vec![PathBuf::from("**/*.gz"), PathBuf::from("**/*.tmp")]
}
Expand Down Expand Up @@ -997,11 +1011,18 @@ const fn default_rotate_wait_ms() -> Duration {
Duration::from_millis(u64::MAX/1000)
}

// This function constructs the patterns we exclude from file watching, created
// from the defaults or user provided configuration.
fn prepare_include_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
prepare_glob_patterns(&config.include_paths_glob_patterns, "Including")
}

fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
let exclude_paths = config
.exclude_paths_glob_patterns
prepare_glob_patterns(&config.exclude_paths_glob_patterns, "Excluding")
}

// This function constructs the patterns for file watching, created
// from the defaults or user provided configuration.
fn prepare_glob_patterns(paths: &Vec<PathBuf>, op: &str) -> crate::Result<Vec<glob::Pattern>> {
let ret = paths
.iter()
.map(|pattern| {
let pattern = pattern
Expand All @@ -1012,14 +1033,14 @@ fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
.collect::<crate::Result<Vec<_>>>()?;

info!(
message = "Excluding matching files.",
exclude_paths = ?exclude_paths
message = format!("{op} matching files."),
ret = ?ret
.iter()
.map(glob::Pattern::as_str)
.collect::<Vec<_>>()
);

Ok(exclude_paths)
Ok(ret)
}

// This function constructs the effective field selector to use, based on
Expand Down