diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index d84df20e702..a63df88ee43 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -151,6 +151,18 @@ dependencies = [
"futures-core",
]
+[[package]]
+name = "async-speed-limit"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "481ce9cb6a828f4679495f7376cb6779978d925dd9790b99b48d1bbde6d0f00b"
+dependencies = [
+ "futures-core",
+ "futures-io",
+ "futures-timer",
+ "pin-project-lite",
+]
+
[[package]]
name = "async-stream"
version = "0.3.3"
@@ -1422,6 +1434,12 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1"
+[[package]]
+name = "futures-timer"
+version = "3.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
+
[[package]]
name = "futures-util"
version = "0.3.24"
@@ -3244,12 +3262,14 @@ name = "quickwit-common"
version = "0.3.1"
dependencies = [
"anyhow",
+ "async-speed-limit",
"colored",
"env_logger",
"home",
"itertools",
"num_cpus",
"once_cell",
+ "pin-project-lite",
"pnet",
"prometheus",
"rand 0.8.5",
diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml
index ae7739031ea..7b20c387ee7 100644
--- a/quickwit/Cargo.toml
+++ b/quickwit/Cargo.toml
@@ -31,6 +31,7 @@ anyhow = "1"
arc-swap = "1.4"
assert-json-diff = "2"
assert_cmd = "2"
+async-speed-limit = "0.4"
async-trait = "0.1"
atty = "0.2"
azure_core = "0.5.0"
@@ -90,6 +91,7 @@ openssl-probe = "0.1.4"
opentelemetry = { version = "0.18", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.17", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11.0"
+pin-project-lite = "0.2.9"
pnet = { version = "0.31.0", features = ["std"] }
predicates = "2"
prometheus = { version = "0.13", features = ["process"] }
diff --git a/quickwit/quickwit-actors/src/actor.rs b/quickwit/quickwit-actors/src/actor.rs
index 89208333b92..44f4b41ac9e 100644
--- a/quickwit/quickwit-actors/src/actor.rs
+++ b/quickwit/quickwit-actors/src/actor.rs
@@ -32,13 +32,14 @@ use tokio::sync::{oneshot, watch};
use tracing::{debug, error};
use crate::actor_state::{ActorState, AtomicState};
-use crate::progress::{Progress, ProtectedZoneGuard};
use crate::registry::ActorRegistry;
use crate::scheduler::{Callback, ScheduleEvent, Scheduler};
use crate::spawn_builder::SpawnBuilder;
#[cfg(any(test, feature = "testsuite"))]
use crate::Universe;
-use crate::{AskError, Command, KillSwitch, Mailbox, QueueCapacity, SendError};
+use crate::{
+ AskError, Command, KillSwitch, Mailbox, Progress, ProtectedZoneGuard, QueueCapacity, SendError,
+};
/// The actor exit status represents the outcome of the execution of an actor,
/// after the end of the execution.
diff --git a/quickwit/quickwit-actors/src/lib.rs b/quickwit/quickwit-actors/src/lib.rs
index 9f42fd3b454..2bba844c4f6 100644
--- a/quickwit/quickwit-actors/src/lib.rs
+++ b/quickwit/quickwit-actors/src/lib.rs
@@ -36,10 +36,8 @@ mod actor_state;
pub mod channel_with_priority;
mod command;
mod envelope;
-mod kill_switch;
mod mailbox;
mod observation;
-mod progress;
mod registry;
mod scheduler;
mod spawn_builder;
@@ -52,9 +50,8 @@ mod universe;
pub use actor::{Actor, ActorExitStatus, Handler};
pub use actor_handle::{ActorHandle, Health, Supervisable};
pub use command::Command;
-pub use kill_switch::KillSwitch;
pub use observation::{Observation, ObservationType};
-pub use progress::{Progress, ProtectedZoneGuard};
+use quickwit_common::{KillSwitch, Progress, ProtectedZoneGuard};
pub(crate) use scheduler::Scheduler;
use thiserror::Error;
pub use universe::Universe;
diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml
index 9d0e6c5ff9b..6e26e52f7fb 100644
--- a/quickwit/quickwit-common/Cargo.toml
+++ b/quickwit/quickwit-common/Cargo.toml
@@ -11,12 +11,14 @@ documentation = "https://quickwit.io/docs/"
[dependencies]
anyhow = { workspace = true }
+async-speed-limit = {workspace=true}
colored = { workspace = true }
env_logger = { workspace = true }
home = { workspace = true }
itertools = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
+pin-project-lite = { workspace = true }
pnet = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs
new file mode 100644
index 00000000000..46a5a2ddc84
--- /dev/null
+++ b/quickwit/quickwit-common/src/io.rs
@@ -0,0 +1,430 @@
+// Copyright (C) 2022 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+// This file contains code copied from the Resource trait
+// in async-speed-limit from the TiKV project.
+// https://github.com/tikv/async-speed-limit/blob/master/src/io.rs
+//
+// Copyright 2019 TiKV Project Authors. Licensed under MIT or Apache-2.0.
+
+// We are simply porting the logic to tokio here and adding the functionality to
+// plug some metrics.
+
+use std::future::Future;
+use std::io;
+use std::io::IoSlice;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+use async_speed_limit::clock::StandardClock;
+use async_speed_limit::limiter::Consume;
+use async_speed_limit::Limiter;
+use once_cell::sync::Lazy;
+use pin_project_lite::pin_project;
+use prometheus::{IntCounter, IntCounterVec};
+use tokio::io::AsyncWrite;
+
+use crate::metrics::new_counter_vec;
+use crate::{KillSwitch, Progress, ProtectedZoneGuard};
+
+// Max 1MB at a time.
+const MAX_NUM_BYTES_WRITTEN_AT_ONCE: usize = 1 << 20;
+
+fn truncate_bytes(bytes: &[u8]) -> &[u8] {
+ let num_bytes = bytes.len().min(MAX_NUM_BYTES_WRITTEN_AT_ONCE);
+ &bytes[..num_bytes]
+}
+
+struct IoMetrics {
+ write_bytes: IntCounterVec,
+}
+
+impl Default for IoMetrics {
+ fn default() -> Self {
+ let write_bytes = new_counter_vec(
+ "write_bytes",
+ "Number of bytes written by a given component.",
+ "quickwit",
+ &["index", "component"],
+ );
+ Self { write_bytes }
+ }
+}
+
+static IO_METRICS: Lazy = Lazy::new(IoMetrics::default);
+
+/// Parameter used in `async_speed_limit`.
+///
+/// The default value is good and does not need to be tweaked.
+/// We use a smaller value in unit test to get reasonably accurate throttling one very
+/// short period of times.
+///
+/// For more details, please refer to `async_speed_limit` documentation.
+const REFILL_DURATION: Duration = if cfg!(test) {
+ Duration::from_millis(10)
+} else {
+ // Default value in async_speed_limit
+ Duration::from_millis(100)
+};
+
+#[derive(Clone)]
+pub struct IoControls {
+ throughput_limiter: Limiter,
+ bytes_counter: IntCounter,
+ progress: Progress,
+ kill_switch: KillSwitch,
+}
+
+impl Default for IoControls {
+ fn default() -> Self {
+ let default_bytes_counter =
+ IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap();
+ IoControls {
+ throughput_limiter: Limiter::new(f64::INFINITY),
+ progress: Progress::default(),
+ kill_switch: KillSwitch::default(),
+ bytes_counter: default_bytes_counter,
+ }
+ }
+}
+
+impl IoControls {
+ #[must_use]
+ pub fn progress(&self) -> &Progress {
+ &self.progress
+ }
+
+ pub fn kill(&self) {
+ self.kill_switch.kill();
+ }
+
+ pub fn num_bytes(&self) -> u64 {
+ self.bytes_counter.get()
+ }
+
+ pub fn check_if_alive(&self) -> io::Result {
+ if self.kill_switch.is_dead() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Directory kill switch was activated.",
+ ));
+ }
+ let guard = self.progress.protect_zone();
+ Ok(guard)
+ }
+
+ pub fn set_index_and_component(mut self, index: &str, component: &str) -> Self {
+ self.bytes_counter = IO_METRICS
+ .write_bytes
+ .with_label_values(&[index, component]);
+ self
+ }
+
+ pub fn set_throughput_limit(mut self, throughput: f64) -> Self {
+ self.throughput_limiter = Limiter::builder(throughput).refill(REFILL_DURATION).build();
+ self
+ }
+
+ pub fn set_bytes_counter(mut self, bytes_counter: IntCounter) -> Self {
+ self.bytes_counter = bytes_counter;
+ self
+ }
+
+ pub fn set_progress(mut self, progress: Progress) -> Self {
+ self.progress = progress;
+ self
+ }
+
+ pub fn set_kill_switch(mut self, kill_switch: KillSwitch) -> Self {
+ self.kill_switch = kill_switch;
+ self
+ }
+ fn consume_blocking(&self, num_bytes: usize) -> io::Result<()> {
+ let _guard = self.check_if_alive()?;
+ self.throughput_limiter.blocking_consume(num_bytes);
+ self.bytes_counter.inc_by(num_bytes as u64);
+ Ok(())
+ }
+}
+
+pin_project! {
+ pub struct ControlledWrite {
+ #[pin]
+ underlying_wrt: W,
+ waiter: Option>,
+ io_controls_access: A,
+ }
+}
+
+impl ControlledWrite {
+ // This function was copied from TiKV's `async-speed-limit`.
+ // Copyright 2019 TiKV Project Authors. Licensed under MIT or Apache-2.0.
+ /// Wraps a poll function with a delay after it.
+ ///
+ /// This method calls the given `poll` function until it is fulfilled. After
+ /// that, the result is saved into this `Resource` instance (therefore
+ /// different `poll_***` calls should not be interleaving), while returning
+ /// `Pending` until the limiter has completely consumed the result.
+ #[allow(dead_code)]
+ pub(crate) fn poll_limited(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ poll: impl FnOnce(Pin<&mut W>, &mut Context<'_>) -> Poll>,
+ ) -> Poll> {
+ let this = self.project();
+
+ let _protect_guard = match this
+ .io_controls_access
+ .apply(|io_controls| io_controls.check_if_alive())
+ {
+ Ok(protect_guard) => protect_guard,
+ Err(io_err) => {
+ return Poll::Ready(Err(io_err));
+ }
+ };
+
+ if let Some(waiter) = this.waiter {
+ let res = Pin::new(waiter).poll(cx);
+ if res.is_pending() {
+ return Poll::Pending;
+ }
+ *this.waiter = None;
+ }
+
+ let res: Poll> = poll(this.underlying_wrt, cx);
+ if let Poll::Ready(obj) = &res {
+ let len = *obj.as_ref().unwrap_or(&0);
+ if len > 0 {
+ let waiter = this.io_controls_access.apply(|io_controls| {
+ io_controls.bytes_counter.inc_by(len as u64);
+ io_controls.throughput_limiter.consume(len)
+ });
+ *this.waiter = Some(waiter)
+ }
+ }
+ res
+ }
+}
+
+/// Quirky spec: truncates the list of bufs, and keep as many leftmost elements
+/// as possible, within the constraint of not exceeding `max_len` bytes.
+///
+/// Please keep this function private
+fn quirky_truncate_slices<'a, 'b>(bufs: &'b [IoSlice<'a>], max_len: usize) -> &'b [IoSlice<'a>] {
+ if bufs.is_empty() {
+ return bufs;
+ }
+ let mut cumulated_len = bufs[0].len();
+ for (i, buf) in bufs.iter().enumerate().skip(1) {
+ cumulated_len += buf.len();
+ if cumulated_len > max_len {
+ return &bufs[..i];
+ }
+ }
+ bufs
+}
+
+impl AsyncWrite for ControlledWrite {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll> {
+ let buf = truncate_bytes(buf);
+ // The shadowing is on purpose.
+ self.poll_limited(cx, |r, cx| r.poll_write(cx, buf))
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll> {
+ if bufs.is_empty() {
+ return Poll::Ready(Ok(0));
+ }
+ // The shadowing is on purpose.
+ let bufs = quirky_truncate_slices(bufs, MAX_NUM_BYTES_WRITTEN_AT_ONCE);
+ self.poll_limited(cx, |r, cx| r.poll_write_vectored(cx, bufs))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ self.project().underlying_wrt.poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ self.project().underlying_wrt.poll_shutdown(cx)
+ }
+}
+
+pub trait IoControlsAccess: Sized {
+ fn wrap_write(self, wrt: W) -> ControlledWrite {
+ ControlledWrite {
+ underlying_wrt: wrt,
+ waiter: None,
+ io_controls_access: self,
+ }
+ }
+
+ fn apply(&self, f: F) -> R
+ where F: Fn(&IoControls) -> R;
+}
+
+impl IoControlsAccess for IoControls {
+ fn apply(&self, f: F) -> R
+ where F: Fn(&IoControls) -> R {
+ f(self)
+ }
+}
+
+impl ControlledWrite
+where A: IoControlsAccess
+{
+ pub fn underlying_wrt(&mut self) -> &mut W {
+ &mut self.underlying_wrt
+ }
+
+ fn check_if_alive(&self) -> io::Result {
+ self.io_controls_access
+ .apply(|io_controls| io_controls.check_if_alive())
+ }
+}
+
+impl io::Write for ControlledWrite
+where A: IoControlsAccess
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result {
+ let buf = truncate_bytes(buf);
+ let written_num_bytes = self.underlying_wrt.write(buf)?;
+ self.io_controls_access
+ .apply(|io_controls| io_controls.consume_blocking(written_num_bytes))?;
+ Ok(written_num_bytes)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ // We voluntarily avoid to check the kill switch on flush.
+ // This is because the `RAMDirectory` currently panics if flush
+ // is not called before `Drop`.
+ let _guard = self.check_if_alive();
+ self.underlying_wrt.flush()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::{IoSlice, Write};
+ use std::time::Duration;
+
+ use tokio::io::{sink, AsyncWriteExt};
+ use tokio::time::Instant;
+
+ use crate::io::{IoControls, IoControlsAccess};
+
+ #[tokio::test]
+ async fn test_controlled_writer_limited_async() {
+ let io_controls = IoControls::default().set_throughput_limit(2_000_000f64);
+ let mut controlled_write = io_controls.clone().wrap_write(sink());
+ let buf = vec![44u8; 1_000];
+ let start = Instant::now();
+ // We write 200 KB
+ for _ in 0..200 {
+ controlled_write.write_all(&buf).await.unwrap();
+ }
+ controlled_write.flush().await.unwrap();
+ let elapsed = start.elapsed();
+ assert!(elapsed >= Duration::from_millis(50));
+ assert!(elapsed <= Duration::from_millis(150));
+ assert_eq!(io_controls.num_bytes(), 200_000u64);
+ }
+
+ #[tokio::test]
+ async fn test_controlled_writer_no_limit_async() {
+ let io_controls = IoControls::default();
+ let mut controlled_write = io_controls.clone().wrap_write(sink());
+ let buf = vec![44u8; 1_000];
+ let start = Instant::now();
+ // We write 2MB
+ for _ in 0..2_000 {
+ controlled_write.write_all(&buf).await.unwrap();
+ }
+ controlled_write.flush().await.unwrap();
+ let elapsed = start.elapsed();
+ assert!(elapsed <= Duration::from_millis(5));
+ assert_eq!(io_controls.num_bytes(), 2_000_000u64);
+ }
+
+ #[test]
+ fn test_controlled_writer_limited_sync() {
+ let io_controls = IoControls::default().set_throughput_limit(2_000_000f64);
+ let mut controlled_write = io_controls.clone().wrap_write(std::io::sink());
+ let buf = vec![44u8; 1_000];
+ let start = Instant::now();
+ // We write 200 KB
+ for _ in 0..200 {
+ controlled_write.write_all(&buf).unwrap();
+ }
+ controlled_write.flush().unwrap();
+ let elapsed = start.elapsed();
+ assert!(elapsed >= Duration::from_millis(50));
+ assert!(elapsed <= Duration::from_millis(150));
+ assert_eq!(io_controls.num_bytes(), 200_000u64);
+ }
+
+ #[test]
+ fn test_controlled_writer_no_limit_sync() {
+ let io_controls = IoControls::default();
+ let mut controlled_write = io_controls.clone().wrap_write(std::io::sink());
+ let buf = vec![44u8; 1_000];
+ let start = Instant::now();
+ // We write 2MB
+ for _ in 0..2_000 {
+ controlled_write.write_all(&buf).unwrap();
+ }
+ controlled_write.flush().unwrap();
+ let elapsed = start.elapsed();
+ assert!(elapsed <= Duration::from_millis(5));
+ assert_eq!(io_controls.num_bytes(), 2_000_000u64);
+ }
+
+ #[test]
+ fn test_truncate_io_slices_one_slice_too_long_corner_case() {
+ let one_slice = IoSlice::new(&b"abcdef"[..]);
+ assert_eq!(super::quirky_truncate_slices(&[one_slice], 2).len(), 1);
+ }
+
+ #[test]
+ fn test_truncate_io_empty() {
+ assert_eq!(super::quirky_truncate_slices(&[], 2).len(), 0);
+ }
+
+ #[test]
+ fn test_truncate_io_slices() {
+ let slices = &[
+ IoSlice::new(&b"abc"[..]),
+ IoSlice::new(&b"defg"[..]),
+ IoSlice::new(&b"hi"[..]),
+ ];
+ assert_eq!(super::quirky_truncate_slices(slices, 0).len(), 1);
+ assert_eq!(super::quirky_truncate_slices(slices, 6).len(), 1);
+ assert_eq!(super::quirky_truncate_slices(slices, 7).len(), 2);
+ assert_eq!(super::quirky_truncate_slices(slices, 9).len(), 3);
+ assert_eq!(super::quirky_truncate_slices(slices, 10).len(), 3);
+ }
+}
diff --git a/quickwit/quickwit-actors/src/kill_switch.rs b/quickwit/quickwit-common/src/kill_switch.rs
similarity index 100%
rename from quickwit/quickwit-actors/src/kill_switch.rs
rename to quickwit/quickwit-common/src/kill_switch.rs
diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs
index 55dd6c1cfad..b13f732da09 100644
--- a/quickwit/quickwit-common/src/lib.rs
+++ b/quickwit/quickwit-common/src/lib.rs
@@ -21,8 +21,11 @@ mod checklist;
mod coolid;
pub mod fs;
+pub mod io;
+mod kill_switch;
pub mod metrics;
pub mod net;
+mod progress;
pub mod rand;
pub mod runtimes;
pub mod uri;
@@ -35,6 +38,8 @@ pub use checklist::{
print_checklist, run_checklist, ChecklistError, BLUE_COLOR, GREEN_COLOR, RED_COLOR,
};
pub use coolid::new_coolid;
+pub use kill_switch::KillSwitch;
+pub use progress::{Progress, ProtectedZoneGuard};
use tracing::{error, info};
pub fn chunk_range(range: Range, chunk_size: usize) -> impl Iterator- > {
diff --git a/quickwit/quickwit-actors/src/progress.rs b/quickwit/quickwit-common/src/progress.rs
similarity index 100%
rename from quickwit/quickwit-actors/src/progress.rs
rename to quickwit/quickwit-common/src/progress.rs
diff --git a/quickwit/quickwit-config/src/index_config.rs b/quickwit/quickwit-config/src/index_config.rs
index d2e4ae4d40a..b9e0ed9b0c9 100644
--- a/quickwit/quickwit-config/src/index_config.rs
+++ b/quickwit/quickwit-config/src/index_config.rs
@@ -72,6 +72,18 @@ pub struct IndexingResources {
pub __num_threads_deprecated: IgnoredAny, // DEPRECATED
#[serde(default = "IndexingResources::default_heap_size")]
pub heap_size: Byte,
+ /// Sets the maximum write IO throughput for the merge pipeline,
+ /// in bytes per secs. On hardware where IO is limited, this parameter can help limiting
+ /// the impact of merges on indexing.
+ #[serde(default)]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub max_merge_write_throughput: Option,
+ /// Sets the maximum write IO throughput for the janitor, in bytes per secs.
+ /// On hardware where IO is limited, this parameter can help limiting
+ /// the impact on indexing.
+ #[serde(default)]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub max_janitor_write_throughput: Option,
}
impl PartialEq for IndexingResources {
@@ -88,8 +100,8 @@ impl IndexingResources {
#[cfg(any(test, feature = "testsuite"))]
pub fn for_test() -> Self {
Self {
- __num_threads_deprecated: IgnoredAny,
heap_size: Byte::from_bytes(20_000_000), // 20MB
+ ..Default::default()
}
}
}
@@ -97,8 +109,10 @@ impl IndexingResources {
impl Default for IndexingResources {
fn default() -> Self {
Self {
- __num_threads_deprecated: IgnoredAny,
heap_size: Self::default_heap_size(),
+ max_merge_write_throughput: None,
+ max_janitor_write_throughput: None,
+ __num_threads_deprecated: IgnoredAny,
}
}
}
@@ -646,8 +660,8 @@ mod tests {
assert_eq!(
index_config.indexing_settings.resources,
IndexingResources {
- __num_threads_deprecated: serde::de::IgnoredAny,
- heap_size: Byte::from_bytes(3_000_000_000)
+ heap_size: Byte::from_bytes(3_000_000_000),
+ ..Default::default()
}
);
assert_eq!(
diff --git a/quickwit/quickwit-indexing/src/actors/index_serializer.rs b/quickwit/quickwit-indexing/src/actors/index_serializer.rs
index 8c7a70765fd..3bf7373cedc 100644
--- a/quickwit/quickwit-indexing/src/actors/index_serializer.rs
+++ b/quickwit/quickwit-indexing/src/actors/index_serializer.rs
@@ -19,6 +19,7 @@
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
+use quickwit_common::io::IoControls;
use quickwit_common::runtimes::RuntimeType;
use tokio::runtime::Handle;
use tracing::instrument;
@@ -73,14 +74,25 @@ impl Handler for IndexSerializer {
batch_builder: IndexedSplitBatchBuilder,
ctx: &ActorContext,
) -> Result<(), ActorExitStatus> {
- let splits: Vec = {
- let _protect = ctx.protect_zone();
- batch_builder
- .splits
- .into_iter()
- .map(|split_builder| split_builder.finalize())
- .collect::>()?
- };
+ let mut splits: Vec = Vec::with_capacity(batch_builder.splits.len());
+ for split_builder in batch_builder.splits {
+ // TODO Consider & test removing this protect guard.
+ //
+ // In theory the controlled directory should be sufficient.
+ let _protect_guard = ctx.protect_zone();
+ if let Some(controlled_directory) = &split_builder.controlled_directory_opt {
+ let io_controls = IoControls::default()
+ .set_progress(ctx.progress().clone())
+ .set_kill_switch(ctx.kill_switch().clone())
+ .set_index_and_component(
+ &split_builder.split_attrs.pipeline_id.index_id,
+ "index_serializer",
+ );
+ controlled_directory.set_io_controls(io_controls);
+ }
+ let split = split_builder.finalize()?;
+ splits.push(split);
+ }
let indexed_split_batch = IndexedSplitBatch {
batch_parent_span: batch_builder.batch_parent_span,
splits,
diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs
index fbf5b67a324..33c6f8466e6 100644
--- a/quickwit/quickwit-indexing/src/actors/indexer.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexer.rs
@@ -28,6 +28,7 @@ use fail::fail_point;
use fnv::FnvHashMap;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
+use quickwit_common::io::IoControls;
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::IndexingSettings;
use quickwit_doc_mapper::{DocMapper, SortBy, QUICKWIT_TOKENIZER_MANAGER};
@@ -86,14 +87,19 @@ impl IndexerState {
.settings(self.index_settings.clone())
.schema(self.schema.clone())
.tokenizers(QUICKWIT_TOKENIZER_MANAGER.clone());
+
+ let io_controls = IoControls::default()
+ .set_progress(ctx.progress().clone())
+ .set_kill_switch(ctx.kill_switch().clone())
+ .set_index_and_component(&self.pipeline_id.index_id, "indexer");
+
let indexed_split = IndexedSplitBuilder::new_in_dir(
self.pipeline_id.clone(),
partition_id,
last_delete_opstamp,
self.indexing_directory.scratch_directory().clone(),
index_builder,
- ctx.progress().clone(),
- ctx.kill_switch().clone(),
+ io_controls,
)?;
info!(
split_id = indexed_split.split_id(),
diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
index 1abaee9d2bf..21aa4a4bcb2 100644
--- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
@@ -23,9 +23,10 @@ use std::time::Duration;
use async_trait::async_trait;
use quickwit_actors::{
- create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, KillSwitch,
- Mailbox, QueueCapacity, Supervisable,
+ create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox,
+ QueueCapacity, Supervisable,
};
+use quickwit_common::KillSwitch;
use quickwit_config::{IndexingSettings, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, MetastoreError};
diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs
index cea7d8028f7..4c70882212e 100644
--- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs
@@ -23,14 +23,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use quickwit_actors::{
- create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox,
- Observation, QueueCapacity, Supervisable,
+ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox, Observation,
+ Supervisable,
};
use quickwit_common::fs::get_cache_directory_path;
use quickwit_config::{
build_doc_mapper, IndexerConfig, SourceConfig, SourceParams, VecSourceParams,
};
-use quickwit_doc_mapper::DocMapper;
use quickwit_ingest_api::QUEUES_DIR_NAME;
use quickwit_metastore::{IndexMetadata, Metastore, MetastoreError};
use quickwit_proto::{ServiceError, ServiceErrorCode};
@@ -41,7 +40,6 @@ use tracing::{error, info};
use super::merge_pipeline::{MergePipeline, MergePipelineParams};
use super::MergePlanner;
-use crate::merge_policy::MergePolicy;
use crate::models::{
DetachPipeline, IndexingDirectory, IndexingPipelineId, Observe, ObservePipeline,
ShutdownPipeline, ShutdownPipelines, SpawnMergePipeline, SpawnPipeline, SpawnPipelines,
@@ -287,15 +285,23 @@ impl IndexingService {
&index_metadata.indexing_settings,
)
.map_err(IndexingServiceError::InvalidParams)?;
+
+ let merge_pipeline_params = MergePipelineParams {
+ pipeline_id: pipeline_id.clone(),
+ doc_mapper: doc_mapper.clone(),
+ indexing_directory: indexing_directory.clone(),
+ metastore: self.metastore.clone(),
+ split_store: split_store.clone(),
+ merge_policy,
+ merge_max_io_num_bytes_per_sec: index_metadata
+ .indexing_settings
+ .resources
+ .max_merge_write_throughput,
+ max_concurrent_split_uploads: self.max_concurrent_split_uploads,
+ };
+
let merge_planner_mailbox = self
- .get_or_create_merge_pipeline(
- ctx,
- &pipeline_id,
- doc_mapper.clone(),
- indexing_directory.clone(),
- split_store.clone(),
- merge_policy,
- )
+ .get_or_create_merge_pipeline(merge_pipeline_params, ctx)
.await?;
let max_concurrent_split_uploads_index = (self.max_concurrent_split_uploads / 2).max(1);
@@ -422,34 +428,17 @@ impl IndexingService {
async fn get_or_create_merge_pipeline(
&mut self,
+ merge_pipeline_params: MergePipelineParams,
ctx: &ActorContext,
- pipeline_id: &IndexingPipelineId,
- doc_mapper: Arc,
- indexing_directory: IndexingDirectory,
- split_store: IndexingSplitStore,
- merge_policy: Arc,
) -> Result, IndexingServiceError> {
- let merge_pipeline_id = MergePipelineId::from(pipeline_id);
+ let merge_pipeline_id = MergePipelineId::from(&merge_pipeline_params.pipeline_id);
if let Some(merge_pipeline_mailbox_handle) =
self.merge_pipeline_handles.get(&merge_pipeline_id)
{
return Ok(merge_pipeline_mailbox_handle.mailbox.clone());
}
-
- let (merge_planner_mailbox, merge_planner_inbox) =
- create_mailbox::("MergePlanner".to_string(), QueueCapacity::Unbounded);
- let merge_pipeline_params = MergePipelineParams {
- pipeline_id: pipeline_id.clone(),
- doc_mapper,
- indexing_directory,
- metastore: self.metastore.clone(),
- split_store,
- merge_policy,
- max_concurrent_split_uploads: self.max_concurrent_split_uploads,
- merge_planner_mailbox: merge_planner_mailbox.clone(),
- merge_planner_inbox,
- };
let merge_pipeline = MergePipeline::new(merge_pipeline_params);
+ let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone();
let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline);
let merge_pipeline_mailbox_handle = MergePipelineHandle {
mailbox: merge_planner_mailbox.clone(),
diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs
index eb1b5d9a936..4c859106af4 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs
@@ -28,6 +28,7 @@ use async_trait::async_trait;
use fail::fail_point;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
+use quickwit_common::io::IoControls;
use quickwit_common::runtimes::RuntimeType;
use quickwit_directories::UnionDirectory;
use quickwit_doc_mapper::fast_field_reader::timestamp_field_reader;
@@ -53,6 +54,7 @@ pub struct MergeExecutor {
pipeline_id: IndexingPipelineId,
metastore: Arc,
doc_mapper: Arc,
+ io_controls: IoControls,
merge_packager_mailbox: Mailbox,
}
@@ -219,78 +221,6 @@ pub fn combine_partition_ids(splits: &[SplitMetadata]) -> u64 {
combine_partition_ids_aux(splits.iter().map(|split| split.partition_id))
}
-async fn merge_split_directories(
- union_index_meta: IndexMeta,
- split_directories: Vec>,
- delete_tasks: Vec,
- doc_mapper_opt: Option>,
- output_path: &Path,
- ctx: &ActorContext,
-) -> anyhow::Result {
- let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?;
- // This directory is here to receive the merged split, as well as the final meta.json file.
- let output_directory = ControlledDirectory::new(
- Box::new(MmapDirectory::open(output_path)?),
- ctx.progress().clone(),
- ctx.kill_switch().clone(),
- );
- let mut directory_stack: Vec> = vec![
- output_directory.box_clone(),
- Box::new(shadowing_meta_json_directory),
- ];
- directory_stack.extend(split_directories.into_iter());
- let union_directory = UnionDirectory::union_of(directory_stack);
- let union_index = open_index(union_directory)?;
-
- ctx.record_progress();
- let _protect_guard = ctx.protect_zone();
-
- let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?;
- let num_delete_tasks = delete_tasks.len();
- if num_delete_tasks > 0 {
- let doc_mapper = doc_mapper_opt
- .ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?;
- for delete_task in delete_tasks {
- let delete_query = delete_task
- .delete_query
- .expect("A delete task must have a delete query.");
- let search_request = SearchRequest {
- index_id: delete_query.index_id,
- query: delete_query.query,
- start_timestamp: delete_query.start_timestamp,
- end_timestamp: delete_query.end_timestamp,
- search_fields: delete_query.search_fields,
- ..Default::default()
- };
- debug!(
- "Delete all documents matched by query `{:?}`",
- search_request
- );
- let query = doc_mapper.query(union_index.schema(), &search_request)?;
- index_writer.delete_query(query)?;
- }
- debug!("commit-delete-operations");
- index_writer.commit()?;
- }
-
- let segment_ids: Vec = union_index
- .searchable_segment_metas()?
- .into_iter()
- .map(|segment_meta| segment_meta.id())
- .collect();
-
- // A merge is useless if there is no delete and only one segment.
- if num_delete_tasks == 0 && segment_ids.len() <= 1 {
- return Ok(output_directory);
- }
-
- debug!(segment_ids=?segment_ids,"merging-segments");
- // TODO it would be nice if tantivy could let us run the merge in the current thread.
- index_writer.merge(&segment_ids).wait()?;
-
- Ok(output_directory)
-}
-
pub fn merge_split_attrs(
merge_split_id: String,
pipeline_id: &IndexingPipelineId,
@@ -335,12 +265,14 @@ impl MergeExecutor {
pipeline_id: IndexingPipelineId,
metastore: Arc,
doc_mapper: Arc,
+ io_controls: IoControls,
merge_packager_mailbox: Mailbox,
) -> Self {
MergeExecutor {
pipeline_id,
metastore,
doc_mapper,
+ io_controls,
merge_packager_mailbox,
}
}
@@ -356,15 +288,16 @@ impl MergeExecutor {
let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?;
// TODO it would be nice if tantivy could let us run the merge in the current thread.
fail_point!("before-merge-split");
- let controlled_directory = merge_split_directories(
- union_index_meta,
- split_directories,
- Vec::new(),
- None,
- merge_scratch_directory.path(),
- ctx,
- )
- .await?;
+ let controlled_directory = self
+ .merge_split_directories(
+ union_index_meta,
+ split_directories,
+ Vec::new(),
+ None,
+ merge_scratch_directory.path(),
+ ctx,
+ )
+ .await?;
fail_point!("after-merge-split");
// This will have the side effect of deleting the directory containing the downloaded
@@ -413,15 +346,16 @@ impl MergeExecutor {
);
let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?;
- let controlled_directory = merge_split_directories(
- union_index_meta,
- split_directories,
- delete_tasks,
- Some(self.doc_mapper.clone()),
- merge_scratch_directory.path(),
- ctx,
- )
- .await?;
+ let controlled_directory = self
+ .merge_split_directories(
+ union_index_meta,
+ split_directories,
+ delete_tasks,
+ Some(self.doc_mapper.clone()),
+ merge_scratch_directory.path(),
+ ctx,
+ )
+ .await?;
// This will have the side effect of deleting the directory containing the downloaded split.
let mut merged_index = Index::open(controlled_directory.clone())?;
@@ -482,6 +416,82 @@ impl MergeExecutor {
};
Ok(Some(indexed_split))
}
+
+ async fn merge_split_directories(
+ &self,
+ union_index_meta: IndexMeta,
+ split_directories: Vec>,
+ delete_tasks: Vec,
+ doc_mapper_opt: Option>,
+ output_path: &Path,
+ ctx: &ActorContext,
+ ) -> anyhow::Result {
+ let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?;
+
+ // This directory is here to receive the merged split, as well as the final meta.json file.
+ let output_directory = ControlledDirectory::new(
+ Box::new(MmapDirectory::open(output_path)?),
+ self.io_controls
+ .clone()
+ .set_kill_switch(ctx.kill_switch().clone())
+ .set_progress(ctx.progress().clone()),
+ );
+ let mut directory_stack: Vec> = vec![
+ output_directory.box_clone(),
+ Box::new(shadowing_meta_json_directory),
+ ];
+ directory_stack.extend(split_directories.into_iter());
+ let union_directory = UnionDirectory::union_of(directory_stack);
+ let union_index = open_index(union_directory)?;
+
+ ctx.record_progress();
+ let _protect_guard = ctx.protect_zone();
+
+ let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?;
+ let num_delete_tasks = delete_tasks.len();
+ if num_delete_tasks > 0 {
+ let doc_mapper = doc_mapper_opt
+ .ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?;
+ for delete_task in delete_tasks {
+ let delete_query = delete_task
+ .delete_query
+ .expect("A delete task must have a delete query.");
+ let search_request = SearchRequest {
+ index_id: delete_query.index_id,
+ query: delete_query.query,
+ start_timestamp: delete_query.start_timestamp,
+ end_timestamp: delete_query.end_timestamp,
+ search_fields: delete_query.search_fields,
+ ..Default::default()
+ };
+ debug!(
+ "Delete all documents matched by query `{:?}`",
+ search_request
+ );
+ let query = doc_mapper.query(union_index.schema(), &search_request)?;
+ index_writer.delete_query(query)?;
+ }
+ debug!("commit-delete-operations");
+ index_writer.commit()?;
+ }
+
+ let segment_ids: Vec = union_index
+ .searchable_segment_metas()?
+ .into_iter()
+ .map(|segment_meta| segment_meta.id())
+ .collect();
+
+ // A merge is useless if there is no delete and only one segment.
+ if num_delete_tasks == 0 && segment_ids.len() <= 1 {
+ return Ok(output_directory);
+ }
+
+ debug!(segment_ids=?segment_ids,"merging-segments");
+ // TODO it would be nice if tantivy could let us run the merge in the current thread.
+ index_writer.merge(&segment_ids).wait()?;
+
+ Ok(output_directory)
+ }
}
fn open_index>>(directory: T) -> tantivy::Result {
@@ -567,6 +577,7 @@ mod tests {
pipeline_id,
metastore,
test_sandbox.doc_mapper(),
+ IoControls::default(),
merge_packager_mailbox,
);
let universe = Universe::new();
@@ -707,6 +718,7 @@ mod tests {
pipeline_id,
metastore,
test_sandbox.doc_mapper(),
+ IoControls::default(),
merge_packager_mailbox,
);
let universe = Universe::new();
diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
index 7187de3a31b..a04c8f97b79 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
@@ -21,10 +21,13 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
+use byte_unit::Byte;
use quickwit_actors::{
- Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Inbox, KillSwitch, Mailbox,
- Supervisable,
+ create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Inbox,
+ Mailbox, QueueCapacity, Supervisable,
};
+use quickwit_common::io::IoControls;
+use quickwit_common::KillSwitch;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, MetastoreError, SplitState};
use tokio::join;
@@ -60,6 +63,8 @@ struct Spawn {
pub struct MergePipeline {
params: MergePipelineParams,
+ merge_planner_mailbox: Mailbox,
+ merge_planner_inbox: Inbox,
previous_generations_statistics: MergeStatistics,
statistics: MergeStatistics,
handles: Option,
@@ -88,15 +93,23 @@ impl Actor for MergePipeline {
impl MergePipeline {
pub fn new(params: MergePipelineParams) -> Self {
+ let (merge_planner_mailbox, merge_planner_inbox) =
+ create_mailbox::("MergePlanner".to_string(), QueueCapacity::Unbounded);
Self {
params,
previous_generations_statistics: Default::default(),
handles: None,
kill_switch: KillSwitch::default(),
statistics: MergeStatistics::default(),
+ merge_planner_inbox,
+ merge_planner_mailbox,
}
}
+ pub fn merge_planner_mailbox(&self) -> &Mailbox {
+ &self.merge_planner_mailbox
+ }
+
fn supervisables(&self) -> Vec<&dyn Supervisable> {
if let Some(handles) = &self.handles {
let supervisables: Vec<&dyn Supervisable> = vec![
@@ -203,7 +216,7 @@ impl MergePipeline {
let merge_publisher = Publisher::new(
PublisherType::MergePublisher,
self.params.metastore.clone(),
- Some(self.params.merge_planner_mailbox.clone()),
+ Some(self.merge_planner_mailbox.clone()),
None,
);
let (merge_publisher_mailbox, merge_publisher_handler) = ctx
@@ -238,10 +251,31 @@ impl MergePipeline {
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_packager);
+ let max_merge_write_throughput: f64 = self
+ .params
+ .merge_max_io_num_bytes_per_sec
+ .as_ref()
+ .map(|bytes_per_sec| bytes_per_sec.get_bytes() as f64)
+ .unwrap_or(f64::INFINITY);
+
+ let split_downloader_io_controls = IoControls::default()
+ .set_throughput_limit(max_merge_write_throughput)
+ .set_index_and_component(
+ self.params.pipeline_id.index_id.as_str(),
+ "split_downloader_merge",
+ );
+
+ // The merge and split download share the same throughput limiter.
+ // This is how cloning the `IoControls` works.
+ let merge_executor_io_controls = split_downloader_io_controls
+ .clone()
+ .set_index_and_component(self.params.pipeline_id.index_id.as_str(), "merger");
+
let merge_executor = MergeExecutor::new(
self.params.pipeline_id.clone(),
self.params.metastore.clone(),
self.params.doc_mapper.clone(),
+ merge_executor_io_controls,
merge_packager_mailbox,
);
let (merge_executor_mailbox, merge_executor_handler) = ctx
@@ -253,6 +287,7 @@ impl MergePipeline {
scratch_directory: self.params.indexing_directory.scratch_directory().clone(),
split_store: self.params.split_store.clone(),
executor_mailbox: merge_executor_mailbox,
+ io_controls: split_downloader_io_controls,
};
let (merge_split_downloader_mailbox, merge_split_downloader_handler) = ctx
.spawn_actor()
@@ -270,8 +305,8 @@ impl MergePipeline {
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_mailboxes(
- self.params.merge_planner_mailbox.clone(),
- self.params.merge_planner_inbox.clone(),
+ self.merge_planner_mailbox.clone(),
+ self.merge_planner_inbox.clone(),
)
.spawn(merge_planner);
@@ -400,8 +435,7 @@ pub struct MergePipelineParams {
pub split_store: IndexingSplitStore,
pub merge_policy: Arc,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
- pub merge_planner_mailbox: Mailbox,
- pub merge_planner_inbox: Inbox,
+ pub merge_max_io_num_bytes_per_sec: Option,
}
#[cfg(test)]
@@ -409,7 +443,7 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;
- use quickwit_actors::{create_mailbox, ActorExitStatus, QueueCapacity, Universe};
+ use quickwit_actors::{ActorExitStatus, Universe};
use quickwit_doc_mapper::default_doc_mapper_for_test;
use quickwit_metastore::MockMetastore;
use quickwit_storage::RamStorage;
@@ -435,8 +469,6 @@ mod tests {
};
let storage = Arc::new(RamStorage::default());
let split_store = IndexingSplitStore::create_without_local_store(storage.clone());
- let (merge_planner_mailbox, merge_planner_inbox) =
- create_mailbox("MergePlanner".to_string(), QueueCapacity::Unbounded);
let pipeline_params = MergePipelineParams {
pipeline_id,
doc_mapper: Arc::new(default_doc_mapper_for_test()),
@@ -445,8 +477,7 @@ mod tests {
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads: 2,
- merge_planner_mailbox,
- merge_planner_inbox,
+ merge_max_io_num_bytes_per_sec: None,
};
let pipeline = MergePipeline::new(pipeline_params);
let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline);
diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs
index 516214a3216..c2e1b23eb40 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs
@@ -21,6 +21,7 @@ use std::path::Path;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
+use quickwit_common::io::IoControls;
use quickwit_metastore::SplitMetadata;
use tantivy::Directory;
use tracing::{debug, info, instrument};
@@ -35,6 +36,7 @@ pub struct MergeSplitDownloader {
pub scratch_directory: ScratchDirectory,
pub split_store: IndexingSplitStore,
pub executor_mailbox: Mailbox,
+ pub io_controls: IoControls,
}
impl Actor for MergeSplitDownloader {
@@ -107,10 +109,15 @@ impl MergeSplitDownloader {
);
return Err(ActorExitStatus::Killed);
}
+ let io_controls = self
+ .io_controls
+ .clone()
+ .set_progress(ctx.progress().clone())
+ .set_kill_switch(ctx.kill_switch().clone());
let _protect_guard = ctx.protect_zone();
let tantivy_dir = self
.split_store
- .fetch_and_open_split(split.split_id(), download_directory)
+ .fetch_and_open_split(split.split_id(), download_directory, &io_controls)
.await
.map_err(|error| {
let split_id = split.split_id();
@@ -165,6 +172,7 @@ mod tests {
scratch_directory,
split_store,
executor_mailbox: merge_executor_mailbox,
+ io_controls: IoControls::default(),
};
let (merge_split_downloader_mailbox, merge_split_downloader_handler) =
universe.spawn_builder().spawn(merge_split_downloader);
diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs
index 8402c615bc8..b1d40f1b2d4 100644
--- a/quickwit/quickwit-indexing/src/actors/packager.rs
+++ b/quickwit/quickwit-indexing/src/actors/packager.rs
@@ -134,14 +134,6 @@ impl Handler for Packager {
split_ids=?split_ids,
"start-packaging-splits"
);
- for split in &batch.splits {
- if let Some(controlled_directory) = &split.controlled_directory_opt {
- controlled_directory.set_progress_and_kill_switch(
- ctx.progress().clone(),
- ctx.kill_switch().clone(),
- );
- }
- }
fail_point!("packager:before");
let mut packaged_splits = Vec::new();
for split in batch.splits {
diff --git a/quickwit/quickwit-indexing/src/controlled_directory.rs b/quickwit/quickwit-indexing/src/controlled_directory.rs
index 828f5379b0e..27e3ded6d35 100644
--- a/quickwit/quickwit-indexing/src/controlled_directory.rs
+++ b/quickwit/quickwit-indexing/src/controlled_directory.rs
@@ -18,12 +18,14 @@
// along with this program. If not, see .
use std::io::{BufWriter, IntoInnerError};
+use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use std::{fmt, io};
use arc_swap::ArcSwap;
-use quickwit_actors::{KillSwitch, Progress, ProtectedZoneGuard};
+use quickwit_common::io::{ControlledWrite, IoControls, IoControlsAccess};
+use quickwit_common::ProtectedZoneGuard;
use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use tantivy::directory::{
AntiCallToken, FileHandle, TerminatingWrite, WatchCallback, WatchHandle, WritePtr,
@@ -43,36 +45,24 @@ const BUFFER_NUM_BYTES: usize = 8_192;
/// - in the future, record a writing speed, possibly introduce some throttling, etc.
#[derive(Clone)]
pub struct ControlledDirectory {
- inner: Inner,
+ underlying: Arc,
+ io_controls: HotswappableIoControls,
}
impl ControlledDirectory {
- pub fn new(
- directory: Box,
- progress: Progress,
- kill_switch: KillSwitch,
- ) -> ControlledDirectory {
+ pub fn new(directory: Box, io_controls: IoControls) -> ControlledDirectory {
ControlledDirectory {
- inner: Inner {
- controls: Arc::new(ArcSwap::new(Arc::new(Controls {
- progress,
- kill_switch,
- }))),
- underlying: directory.into(),
- },
+ underlying: directory.into(),
+ io_controls: HotswappableIoControls::new(io_controls),
}
}
- fn check_if_alive(&self) -> io::Result {
- self.inner.controls.load().check_if_alive()
+ pub fn check_if_alive(&self) -> io::Result {
+ self.io_controls.load().check_if_alive()
}
- pub fn set_progress_and_kill_switch(&self, progress: Progress, kill_switch: KillSwitch) {
- progress.record_progress();
- self.inner.controls.store(Arc::new(Controls {
- progress,
- kill_switch,
- }));
+ pub fn set_io_controls(&self, io_controls: IoControls) {
+ self.io_controls.store(Arc::new(io_controls));
}
}
@@ -82,77 +72,11 @@ impl fmt::Debug for ControlledDirectory {
}
}
-#[derive(Clone)]
-struct Controls {
- progress: Progress,
- kill_switch: KillSwitch,
-}
-
-impl Controls {
- fn check_if_alive(&self) -> io::Result {
- if self.kill_switch.is_dead() {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "Directory kill switch was activated.",
- ));
- }
- let guard = self.progress.protect_zone();
- Ok(guard)
- }
-}
-
-#[derive(Clone)]
-struct Inner {
- controls: Arc>,
- underlying: Arc,
-}
-
-struct ControlledWrite {
- controls: Arc>,
- underlying_wrt: Box,
-}
-
-impl ControlledWrite {
- fn check_if_alive(&self) -> io::Result {
- self.controls.load().check_if_alive()
- }
-}
-
-impl io::Write for ControlledWrite {
- fn write(&mut self, buf: &[u8]) -> io::Result {
- let _guard = self.check_if_alive()?;
- self.underlying_wrt.write(buf)
- }
-
- fn flush(&mut self) -> io::Result<()> {
- // We voluntarily avoid to check the kill switch on flush.
- // This is because the RAMDirectory currently panics if flush
- // is not called before Drop.
- let _guard = self.check_if_alive();
- self.underlying_wrt.flush()
- }
-
- fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result {
- let _guard = self.check_if_alive()?;
- self.underlying_wrt.write_vectored(bufs)
- }
-
- fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
- let _guard = self.check_if_alive()?;
- self.underlying_wrt.write_all(buf)
- }
-
- fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
- let _guard = self.check_if_alive()?;
- self.underlying_wrt.write_fmt(fmt)
- }
-}
-
impl Directory for ControlledDirectory {
fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> {
self.check_if_alive()
.map_err(|io_err| OpenReadError::wrap_io_error(io_err, path.to_path_buf()))?;
- self.inner.underlying.get_file_handle(path)
+ self.underlying.get_file_handle(path)
}
fn delete(&self, path: &Path) -> Result<(), DeleteError> {
@@ -161,13 +85,13 @@ impl Directory for ControlledDirectory {
io_error: Arc::new(io_error),
filepath: path.to_path_buf(),
})?;
- self.inner.underlying.delete(path)
+ self.underlying.delete(path)
}
fn exists(&self, path: &Path) -> Result {
self.check_if_alive()
.map_err(|io_err| OpenReadError::wrap_io_error(io_err, path.to_path_buf()))?;
- self.inner.underlying.exists(path)
+ self.underlying.exists(path)
}
fn open_write(&self, path: &Path) -> Result {
@@ -175,50 +99,84 @@ impl Directory for ControlledDirectory {
.map_err(|io_err| OpenWriteError::wrap_io_error(io_err, path.to_path_buf()))?;
let underlying_wrt: Box = self
- .inner
.underlying
.open_write(path)?
.into_inner()
.map_err(IntoInnerError::into_error)
.map_err(|io_err| OpenWriteError::wrap_io_error(io_err, path.to_path_buf()))?;
- let controls = self.inner.controls.clone();
- let controlled_wrt = ControlledWrite {
- controls,
- underlying_wrt,
- };
+ let controlled_wrt = self.io_controls.clone().wrap_write(underlying_wrt);
Ok(BufWriter::with_capacity(
BUFFER_NUM_BYTES,
- Box::new(controlled_wrt),
+ Box::new(AdoptedControlledWrite(controlled_wrt)),
))
}
fn atomic_read(&self, path: &Path) -> Result, OpenReadError> {
self.check_if_alive()
.map_err(|io_err| OpenReadError::wrap_io_error(io_err, path.to_path_buf()))?;
- self.inner.underlying.atomic_read(path)
+ self.underlying.atomic_read(path)
}
fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
self.check_if_alive()?;
- self.inner.underlying.atomic_write(path, data)
+ self.underlying.atomic_write(path, data)
}
fn watch(&self, watch_callback: WatchCallback) -> tantivy::Result {
self.check_if_alive()?;
- self.inner.underlying.watch(watch_callback)
+ self.underlying.watch(watch_callback)
}
fn sync_directory(&self) -> io::Result<()> {
self.check_if_alive()?;
- self.inner.underlying.sync_directory()
+ self.underlying.sync_directory()
+ }
+}
+
+#[derive(Clone)]
+struct HotswappableIoControls(Arc>);
+
+impl Deref for HotswappableIoControls {
+ type Target = ArcSwap;
+
+ fn deref(&self) -> &Self::Target {
+ &*self.0
+ }
+}
+
+impl HotswappableIoControls {
+ pub fn new(io_controls: IoControls) -> Self {
+ Self(Arc::new(ArcSwap::new(Arc::new(io_controls))))
}
}
-impl TerminatingWrite for ControlledWrite {
+impl IoControlsAccess for HotswappableIoControls {
+ fn apply(&self, f: F) -> R
+ where F: Fn(&IoControls) -> R {
+ let guard = self.0.load();
+ f(&**guard)
+ }
+}
+
+// Wrapper to work around the orphan rule. (hence the word "Adopted").
+struct AdoptedControlledWrite(ControlledWrite>);
+
+impl io::Write for AdoptedControlledWrite {
+ fn write(&mut self, buf: &[u8]) -> io::Result {
+ self.0.write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.0.flush()
+ }
+}
+
+impl TerminatingWrite for AdoptedControlledWrite {
#[inline]
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
- self.underlying_wrt.flush()?;
- self.underlying_wrt.terminate_ref(token)
+ let underlying_wrt = self.0.underlying_wrt();
+ underlying_wrt.flush()?;
+ underlying_wrt.terminate_ref(token)
}
}
@@ -233,25 +191,36 @@ mod tests {
#[test]
fn test_records_progress_on_write() -> anyhow::Result<()> {
let directory = RamDirectory::default();
- let progress = Progress::default();
+ let io_controls = IoControls::default();
let controlled_directory =
- ControlledDirectory::new(Box::new(directory), progress.clone(), KillSwitch::default());
+ ControlledDirectory::new(Box::new(directory), io_controls.clone());
+ let progress = io_controls.progress().clone();
assert!(progress.registered_activity_since_last_call());
assert!(!progress.registered_activity_since_last_call());
let mut wrt = controlled_directory.open_write(Path::new("test"))?;
assert!(progress.registered_activity_since_last_call());
// We use a large buffer to force the buf writer to flush at least once.
let large_buffer = vec![0u8; wrt.capacity() + 1];
+ assert_eq!(io_controls.num_bytes(), 0u64);
wrt.write_all(&large_buffer)?;
+ assert_eq!(io_controls.num_bytes(), 8_193u64);
assert!(progress.registered_activity_since_last_call());
wrt.write_all(b"small payload")?;
+ // The buffering makes it so that this last write does not
+ // get actually written right away.
+ assert_eq!(io_controls.num_bytes(), 8_193u64);
// Here we check that the progress only concerns is only
// trigger when the BufWriter flushes.
assert!(!progress.registered_activity_since_last_call());
wrt.write_all(&large_buffer)?;
+ assert_eq!(io_controls.num_bytes(), 16_399);
assert!(progress.registered_activity_since_last_call());
assert!(!progress.registered_activity_since_last_call());
+ wrt.write_all(&b"aa"[..])?;
+ assert_eq!(io_controls.num_bytes(), 16_399u64);
wrt.terminate()?;
+ // Flush works as expected and makes sure all data buffered goes through
+ assert_eq!(io_controls.num_bytes(), 16_401u64);
assert!(progress.registered_activity_since_last_call());
Ok(())
}
@@ -259,17 +228,14 @@ mod tests {
#[test]
fn test_records_kill_switch_triggers_io_error() -> anyhow::Result<()> {
let directory = RamDirectory::default();
- let kill_switch = KillSwitch::default();
- let controlled_directory = ControlledDirectory::new(
- Box::new(directory),
- Progress::default(),
- kill_switch.clone(),
- );
+ let io_controls = IoControls::default();
+ let controlled_directory =
+ ControlledDirectory::new(Box::new(directory), io_controls.clone());
let mut wrt = controlled_directory.open_write(Path::new("test"))?;
// We use a large buffer to force the buf writer to flush at least once.
let large_buffer = vec![0u8; wrt.capacity() + 1];
wrt.write_all(&large_buffer)?;
- kill_switch.kill();
+ io_controls.kill();
let err = wrt.write_all(&large_buffer).err().unwrap();
assert_eq!(err.kind(), io::ErrorKind::Other);
wrt.terminate()?;
diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs
index 00698835752..ee9e4567b15 100644
--- a/quickwit/quickwit-indexing/src/models/indexed_split.rs
+++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs
@@ -20,7 +20,7 @@
use std::fmt;
use std::path::Path;
-use quickwit_actors::{KillSwitch, Progress};
+use quickwit_common::io::IoControls;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use tantivy::directory::MmapDirectory;
use tantivy::IndexBuilder;
@@ -79,8 +79,7 @@ impl IndexedSplitBuilder {
last_delete_opstamp: u64,
scratch_directory: ScratchDirectory,
index_builder: IndexBuilder,
- progress: Progress,
- kill_switch: KillSwitch,
+ io_controls: IoControls,
) -> anyhow::Result {
// We avoid intermediary merge, and instead merge all segments in the packager.
// The benefit is that we don't have to wait for potentially existing merges,
@@ -91,8 +90,9 @@ impl IndexedSplitBuilder {
scratch_directory.named_temp_child(split_scratch_directory_prefix)?;
let mmap_directory = MmapDirectory::open(split_scratch_directory.path())?;
let box_mmap_directory = Box::new(mmap_directory);
- let controlled_directory =
- ControlledDirectory::new(box_mmap_directory, progress, kill_switch);
+
+ let controlled_directory = ControlledDirectory::new(box_mmap_directory, io_controls);
+
let index_writer =
index_builder.single_segment_index_writer(controlled_directory.clone(), 10_000_000)?;
Ok(Self {
diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs
index e325055c66d..4a3814686f1 100644
--- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs
+++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs
@@ -26,6 +26,7 @@ use std::time::Instant;
use anyhow::Context;
#[cfg(any(test, feature = "testsuite"))]
use byte_unit::Byte;
+use quickwit_common::io::{IoControls, IoControlsAccess};
use quickwit_metastore::SplitMetadata;
use quickwit_storage::{PutPayload, Storage, StorageResult};
use tantivy::directory::MmapDirectory;
@@ -195,11 +196,12 @@ impl IndexingSplitStore {
///
/// As we fetch the split, we optimistically assume that this is for a merge
/// operation that will be successful and we remove the split from the cache.
- #[instrument(skip(self, output_dir_path), fields(cache_hit))]
+ #[instrument(skip(self, output_dir_path, io_controls), fields(cache_hit))]
pub async fn fetch_and_open_split(
&self,
split_id: &str,
output_dir_path: &Path,
+ io_controls: &IoControls,
) -> StorageResult> {
let path = PathBuf::from(quickwit_common::split_file(split_id));
if let Some(split_path) = self
@@ -215,10 +217,11 @@ impl IndexingSplitStore {
tracing::Span::current().record("cache_hit", false);
}
let dest_filepath = output_dir_path.join(&path);
- let mut dest_file = tokio::fs::File::create(&dest_filepath).await?;
+ let dest_file = tokio::fs::File::create(&dest_filepath).await?;
+ let mut dest_file_with_write_limit = io_controls.clone().wrap_write(dest_file);
self.inner
.remote_storage
- .copy_to(&path, &mut dest_file)
+ .copy_to(&path, &mut dest_file_with_write_limit)
.instrument(info_span!("fetch_split_from_remote_storage", path=?path))
.await?;
get_tantivy_directory_from_split_bundle(&dest_filepath)
@@ -242,6 +245,7 @@ mod tests {
use std::sync::Arc;
use byte_unit::Byte;
+ use quickwit_common::io::IoControls;
use quickwit_metastore::SplitMetadata;
use quickwit_storage::{RamStorage, SplitPayloadBuilder};
use tempfile::tempdir;
@@ -400,13 +404,14 @@ mod tests {
}
{
let output = tempfile::tempdir()?;
+ let io_controls = IoControls::default();
// get from cache
let _split1 = split_store
- .fetch_and_open_split(&split_id1, output.path())
+ .fetch_and_open_split(&split_id1, output.path(), &io_controls)
.await?;
// get from remote storage
let _split2 = split_store
- .fetch_and_open_split(&split_id2, output.path())
+ .fetch_and_open_split(&split_id2, output.path(), &io_controls)
.await?;
}
Ok(())
diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
index 9ed831d5eb0..261a0c6e22c 100644
--- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
+++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
@@ -23,9 +23,10 @@ use std::time::Duration;
use async_trait::async_trait;
use quickwit_actors::{
- Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, KillSwitch, Supervisor,
- SupervisorState,
+ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Supervisor, SupervisorState,
};
+use quickwit_common::io::IoControls;
+use quickwit_common::KillSwitch;
use quickwit_config::{build_doc_mapper, IndexingSettings};
use quickwit_indexing::actors::{
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType,
@@ -164,10 +165,24 @@ impl DeleteTaskPipeline {
pipeline_ord: 0,
source_id: "unknown".to_string(),
};
+ let throughput_limit: f64 = index_metadata
+ .indexing_settings
+ .resources
+ .max_janitor_write_throughput
+ .as_ref()
+ .map(|bytes_per_sec| bytes_per_sec.get_bytes() as f64)
+ .unwrap_or(f64::INFINITY);
+ let delete_executor_io_controls = IoControls::default()
+ .set_throughput_limit(throughput_limit)
+ .set_index_and_component(self.index_id.as_str(), "deleter");
+ let split_download_io_controls = delete_executor_io_controls
+ .clone()
+ .set_index_and_component(self.index_id.as_str(), "split_downloader_delete");
let delete_executor = MergeExecutor::new(
index_pipeline_id,
self.metastore.clone(),
doc_mapper.clone(),
+ delete_executor_io_controls,
packager_mailbox,
);
let (delete_executor_mailbox, task_executor_supervisor_handler) = ctx
@@ -180,6 +195,7 @@ impl DeleteTaskPipeline {
scratch_directory: indexing_directory.scratch_directory().clone(),
split_store: split_store.clone(),
executor_mailbox: delete_executor_mailbox,
+ io_controls: split_download_io_controls,
};
let (downloader_mailbox, downloader_supervisor_handler) = ctx
.spawn_actor()
diff --git a/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs b/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs
index e39ca060779..c0085ca58a2 100644
--- a/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs
+++ b/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs
@@ -159,8 +159,8 @@ pub(crate) fn sample_index_metadata_for_regression() -> IndexMetadata {
};
let merge_policy = MergePolicyConfig::StableLog(stable_log_config);
let indexing_resources = IndexingResources {
- __num_threads_deprecated: serde::de::IgnoredAny,
heap_size: Byte::from_bytes(3),
+ ..Default::default()
};
let indexing_settings = IndexingSettings {
timestamp_field: Some("timestamp".to_string()),