Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use sqlx::PgPool;
use tokio::{sync::broadcast::Receiver, task::JoinSet, time::sleep};
use tokio::{sync::broadcast::Receiver, task::JoinSet, time::interval};
use tokio_util::sync::CancellationToken;

use tracing::debug;
Expand All @@ -15,18 +15,36 @@ use crate::enterprise::{

use super::ActivityLogStreamReconfigurationNotification;

// check if enterprise features are enabled every minute
const ENTERPRISE_CHECK_PERIOD_SECS: u64 = 60;

#[instrument(skip_all)]
pub async fn run_activity_log_stream_manager(
pool: PgPool,
notification: ActivityLogStreamReconfigurationNotification,
activity_log_messages_rx: Receiver<Bytes>,
) -> anyhow::Result<()> {
info!("Starting activity log stream manager");

let mut enterprise_check_timer = interval(Duration::from_secs(ENTERPRISE_CHECK_PERIOD_SECS));

// initialize enterprise features status
let mut enterprise_features_enabled = is_enterprise_enabled();

loop {
let mut handles = JoinSet::<()>::new();
let cancel_token = Arc::new(CancellationToken::new());
if is_enterprise_enabled() {

// check if activity log streams can be started
if enterprise_features_enabled {
info!("Starting all configured activity log streams");
let streams = ActivityLogStream::all(&pool).await?;
debug!("Found {} configured activity log streams", streams.len());

// spawn all configured streaming tasks in the background
for activity_log_stream in streams {
if let Ok(config) = ActivityLogStreamConfig::from(&activity_log_stream) {
debug!("Starting activity log stream with config: {config:?}");
match config {
ActivityLogStreamConfig::VectorHttp(stream_config) => {
let http_config = HttpActivityLogStreamConfig::from_vector(
Expand Down Expand Up @@ -60,27 +78,42 @@ pub async fn run_activity_log_stream_manager(
}
}
} else {
debug!("Activity log stream manager cannot start streams, license needs enterprise features enabled.");
info!("Activity log stream manager cannot start streams, license needs enterprise features enabled.");
}
// wait for next configs update or if license expired

// wait for one of the following:
// - stream config update
// - enterprise features got disabled/enabled
// - streaming task terminated early
loop {
tokio::select! {
_ = notification.notified() => {
debug!(
info!(
"Activity log stream manager configuration refresh notification received, reloading streaming tasks."
);
break;
}
_ = sleep(std::time::Duration::from_secs(60)) => {
if !is_enterprise_enabled() {
debug!("Activity log stream manager will reload, detected license enterprise features are not enabled");
_ = enterprise_check_timer.tick() => {
// check if enterprise features status has changed
let current_enterprise_features_enabled = is_enterprise_enabled();
if current_enterprise_features_enabled != enterprise_features_enabled {
warn!("Activity log stream manager will reload, detected license enterprise features status has changed");
enterprise_features_enabled = current_enterprise_features_enabled;
break;
}
}
task_output = handles.join_next(), if !handles.is_empty() => {
error!("Activity log streaming task has terminated early with result: {task_output:?}, reloading activity log stream manager");
break;
}
}
}
}

// trigger all spawned tasks to stop
cancel_token.cancel();
// wait for all tasks to actually stop
handles.join_all().await;

debug!("All activity log streaming tasks closed.");
}
}