From af123992af28c4b2e12672c7095f401ed522a8df Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 24 Oct 2022 15:15:24 +0900 Subject: [PATCH 1/2] Added generic IO controls. The `IoControls` object makes it possible to - limit the throughput of Write and AsyncWrite object, - abort a write if the killswitch is activated, - record progress when performing IO - record the number of writes write in a Prometheus counter. In this PR, the IoControls object is plugged into the the indexer, the merger and the delete pipeline. The merge and the delete pipeline can optionally get a separate write throughput limit, as defined in the index configuration. In the merge and the delete pipeline, split downloads and actual index writing share the same throughput limit, and attempt to consume it in a concurrent manner. We rely on async-speed-limit for the throttling. We could not use async_speed_limit::Resource directly as it is not compatible with tokio. This PR also does some minor refactoring on the merge pipeline parameters. --- quickwit/Cargo.lock | 20 + quickwit/Cargo.toml | 2 + quickwit/quickwit-actors/src/actor.rs | 5 +- quickwit/quickwit-actors/src/lib.rs | 5 +- quickwit/quickwit-common/Cargo.toml | 2 + quickwit/quickwit-common/src/io.rs | 400 ++++++++++++++++++ .../src/kill_switch.rs | 0 quickwit/quickwit-common/src/lib.rs | 5 + .../src/progress.rs | 0 quickwit/quickwit-config/src/index_config.rs | 22 +- .../src/actors/index_serializer.rs | 28 +- .../quickwit-indexing/src/actors/indexer.rs | 10 +- .../src/actors/indexing_pipeline.rs | 5 +- .../src/actors/indexing_service.rs | 53 +-- .../src/actors/merge_executor.rs | 192 +++++---- .../src/actors/merge_pipeline.rs | 55 ++- .../src/actors/merge_split_downloader.rs | 10 +- .../quickwit-indexing/src/actors/packager.rs | 8 - .../src/controlled_directory.rs | 194 ++++----- .../src/models/indexed_split.rs | 10 +- .../src/split_store/indexing_split_store.rs | 15 +- .../src/actors/delete_task_pipeline.rs | 20 +- .../index_metadata.rs | 2 +- 23 files changed, 771 insertions(+), 292 deletions(-) create mode 100644 quickwit/quickwit-common/src/io.rs rename quickwit/{quickwit-actors => quickwit-common}/src/kill_switch.rs (100%) rename quickwit/{quickwit-actors => quickwit-common}/src/progress.rs (100%) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d84df20e702..a63df88ee43 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -151,6 +151,18 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-speed-limit" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "481ce9cb6a828f4679495f7376cb6779978d925dd9790b99b48d1bbde6d0f00b" +dependencies = [ + "futures-core", + "futures-io", + "futures-timer", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -1422,6 +1434,12 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.24" @@ -3244,12 +3262,14 @@ name = "quickwit-common" version = "0.3.1" dependencies = [ "anyhow", + "async-speed-limit", "colored", "env_logger", "home", "itertools", "num_cpus", "once_cell", + "pin-project-lite", "pnet", "prometheus", "rand 0.8.5", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index ae7739031ea..7b20c387ee7 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -31,6 +31,7 @@ anyhow = "1" arc-swap = "1.4" assert-json-diff = "2" assert_cmd = "2" +async-speed-limit = "0.4" async-trait = "0.1" atty = "0.2" azure_core = "0.5.0" @@ -90,6 +91,7 @@ openssl-probe = "0.1.4" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-jaeger = { version = "0.17", features = ["rt-tokio"] } opentelemetry-otlp = "0.11.0" +pin-project-lite = "0.2.9" pnet = { version = "0.31.0", features = ["std"] } predicates = "2" prometheus = { version = "0.13", features = ["process"] } diff --git a/quickwit/quickwit-actors/src/actor.rs b/quickwit/quickwit-actors/src/actor.rs index 89208333b92..44f4b41ac9e 100644 --- a/quickwit/quickwit-actors/src/actor.rs +++ b/quickwit/quickwit-actors/src/actor.rs @@ -32,13 +32,14 @@ use tokio::sync::{oneshot, watch}; use tracing::{debug, error}; use crate::actor_state::{ActorState, AtomicState}; -use crate::progress::{Progress, ProtectedZoneGuard}; use crate::registry::ActorRegistry; use crate::scheduler::{Callback, ScheduleEvent, Scheduler}; use crate::spawn_builder::SpawnBuilder; #[cfg(any(test, feature = "testsuite"))] use crate::Universe; -use crate::{AskError, Command, KillSwitch, Mailbox, QueueCapacity, SendError}; +use crate::{ + AskError, Command, KillSwitch, Mailbox, Progress, ProtectedZoneGuard, QueueCapacity, SendError, +}; /// The actor exit status represents the outcome of the execution of an actor, /// after the end of the execution. diff --git a/quickwit/quickwit-actors/src/lib.rs b/quickwit/quickwit-actors/src/lib.rs index 9f42fd3b454..2bba844c4f6 100644 --- a/quickwit/quickwit-actors/src/lib.rs +++ b/quickwit/quickwit-actors/src/lib.rs @@ -36,10 +36,8 @@ mod actor_state; pub mod channel_with_priority; mod command; mod envelope; -mod kill_switch; mod mailbox; mod observation; -mod progress; mod registry; mod scheduler; mod spawn_builder; @@ -52,9 +50,8 @@ mod universe; pub use actor::{Actor, ActorExitStatus, Handler}; pub use actor_handle::{ActorHandle, Health, Supervisable}; pub use command::Command; -pub use kill_switch::KillSwitch; pub use observation::{Observation, ObservationType}; -pub use progress::{Progress, ProtectedZoneGuard}; +use quickwit_common::{KillSwitch, Progress, ProtectedZoneGuard}; pub(crate) use scheduler::Scheduler; use thiserror::Error; pub use universe::Universe; diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 9d0e6c5ff9b..6e26e52f7fb 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -11,12 +11,14 @@ documentation = "https://quickwit.io/docs/" [dependencies] anyhow = { workspace = true } +async-speed-limit = {workspace=true} colored = { workspace = true } env_logger = { workspace = true } home = { workspace = true } itertools = { workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } +pin-project-lite = { workspace = true } pnet = { workspace = true } prometheus = { workspace = true } rand = { workspace = true } diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs new file mode 100644 index 00000000000..df568f1ecbd --- /dev/null +++ b/quickwit/quickwit-common/src/io.rs @@ -0,0 +1,400 @@ +// Copyright (C) 2022 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// This file contains code copied from the Resource trait +// in async-speed-limit from the TiKV project. +// https://github.com/tikv/async-speed-limit/blob/master/src/io.rs +// +// Copyright 2019 TiKV Project Authors. Licensed under MIT or Apache-2.0. + +// We are simply porting the logic to tokio here and adding the functionality to +// plug some metrics. + +use std::future::Future; +use std::io; +use std::io::IoSlice; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use async_speed_limit::clock::StandardClock; +use async_speed_limit::limiter::Consume; +use async_speed_limit::Limiter; +use once_cell::sync::Lazy; +use pin_project_lite::pin_project; +use prometheus::{IntCounter, IntCounterVec}; +use tokio::io::AsyncWrite; + +use crate::metrics::new_counter_vec; +use crate::{KillSwitch, Progress, ProtectedZoneGuard}; + +// Max 1MB at a time. +const MAX_NUM_BYTES_WRITTEN_AT_ONCE: usize = 1 << 20; + +fn truncate_bytes(bytes: &[u8]) -> &[u8] { + let num_bytes = bytes.len().min(MAX_NUM_BYTES_WRITTEN_AT_ONCE); + &bytes[..num_bytes] +} + +struct IoMetrics { + write_num_bytes_total: IntCounterVec, +} + +impl Default for IoMetrics { + fn default() -> Self { + let write_num_bytes_total = new_counter_vec( + "write_num_bytes_total", + "Number of bytes written by a given component.", + "quickwit", + &["index", "component"], + ); + Self { + write_num_bytes_total, + } + } +} + +static IO_METRICS: Lazy = Lazy::new(IoMetrics::default); + +/// Parameter used in `async_speed_limit`. +/// +/// The default value is good and does not need to be tweaked. +/// We use a smaller value in unit test to get reasonably accurate throttling one very +/// short period of times. +/// +/// For more details, please refer to `async_speed_limit` documentation. +const REFILL_DURATION: Duration = if cfg!(test) { + Duration::from_millis(10) +} else { + // Default value in async_speed_limit + Duration::from_millis(100) +}; + +#[derive(Clone)] +pub struct IoControls { + throughput_limiter: Limiter, + bytes_counter: IntCounter, + progress: Progress, + kill_switch: KillSwitch, +} + +impl Default for IoControls { + fn default() -> Self { + let default_bytes_counter = + IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap(); + IoControls { + throughput_limiter: Limiter::new(f64::INFINITY), + progress: Progress::default(), + kill_switch: KillSwitch::default(), + bytes_counter: default_bytes_counter, + } + } +} + +impl IoControls { + #[must_use] + pub fn progress(&self) -> &Progress { + &self.progress + } + + pub fn kill(&self) { + self.kill_switch.kill(); + } + + pub fn num_bytes(&self) -> u64 { + self.bytes_counter.get() + } + + pub fn check_if_alive(&self) -> io::Result { + if self.kill_switch.is_dead() { + return Err(io::Error::new( + io::ErrorKind::Other, + "Directory kill switch was activated.", + )); + } + let guard = self.progress.protect_zone(); + Ok(guard) + } + + pub fn set_index_and_component(mut self, index: &str, component: &str) -> Self { + self.bytes_counter = IO_METRICS + .write_num_bytes_total + .with_label_values(&[index, component]); + self + } + + pub fn set_throughput_limit(mut self, throughput: f64) -> Self { + self.throughput_limiter = Limiter::builder(throughput).refill(REFILL_DURATION).build(); + self + } + + pub fn set_bytes_counter(mut self, bytes_counter: IntCounter) -> Self { + self.bytes_counter = bytes_counter; + self + } + + pub fn set_progress(mut self, progress: Progress) -> Self { + self.progress = progress; + self + } + + pub fn set_kill_switch(mut self, kill_switch: KillSwitch) -> Self { + self.kill_switch = kill_switch; + self + } + fn consume_blocking(&self, num_bytes: usize) -> io::Result<()> { + let _guard = self.check_if_alive()?; + self.throughput_limiter.blocking_consume(num_bytes); + self.bytes_counter.inc_by(num_bytes as u64); + Ok(()) + } +} + +pin_project! { + pub struct ControlledWrite { + #[pin] + underlying_wrt: W, + waiter: Option>, + io_controls_access: A, + } +} + +impl ControlledWrite { + // This function was copied from TiKV's `async-speed-limit`. + // Copyright 2019 TiKV Project Authors. Licensed under MIT or Apache-2.0. + /// Wraps a poll function with a delay after it. + /// + /// This method calls the given `poll` function until it is fulfilled. After + /// that, the result is saved into this `Resource` instance (therefore + /// different `poll_***` calls should not be interleaving), while returning + /// `Pending` until the limiter has completely consumed the result. + #[allow(dead_code)] + pub(crate) fn poll_limited( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + poll: impl FnOnce(Pin<&mut W>, &mut Context<'_>) -> Poll>, + ) -> Poll> { + let this = self.project(); + + let _protect_guard = match this + .io_controls_access + .apply(|io_controls| io_controls.check_if_alive()) + { + Ok(protect_guard) => protect_guard, + Err(io_err) => { + return Poll::Ready(Err(io_err)); + } + }; + + if let Some(waiter) = this.waiter { + let res = Pin::new(waiter).poll(cx); + if res.is_pending() { + return Poll::Pending; + } + *this.waiter = None; + } + + let res: Poll> = poll(this.underlying_wrt, cx); + if let Poll::Ready(obj) = &res { + let len = *obj.as_ref().unwrap_or(&0); + if len > 0 { + let waiter = this.io_controls_access.apply(|io_controls| { + io_controls.bytes_counter.inc_by(len as u64); + io_controls.throughput_limiter.consume(len) + }); + *this.waiter = Some(waiter) + } + } + res + } +} + +fn truncate_slices<'a, 'b>(bufs: &'b [IoSlice<'a>], max_len: usize) -> &'b [IoSlice<'a>] { + if bufs.is_empty() { + return bufs; + } + let mut cumulated_len = bufs[0].len(); + for (i, buf) in bufs.iter().enumerate() { + cumulated_len += buf.len(); + if cumulated_len > max_len { + return &bufs[..i]; + } + } + bufs +} + +impl AsyncWrite for ControlledWrite { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let buf = truncate_bytes(buf); + // The shadowing is on purpose. + self.poll_limited(cx, |r, cx| r.poll_write(cx, buf)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + // The shadowing is on purpose. + let bufs = truncate_slices(bufs, MAX_NUM_BYTES_WRITTEN_AT_ONCE); + self.poll_limited(cx, |r, cx| r.poll_write_vectored(cx, bufs)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().underlying_wrt.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().underlying_wrt.poll_shutdown(cx) + } +} + +pub trait IoControlsAccess: Sized { + fn wrap_write(self, wrt: W) -> ControlledWrite { + ControlledWrite { + underlying_wrt: wrt, + waiter: None, + io_controls_access: self, + } + } + + fn apply(&self, f: F) -> R + where F: Fn(&IoControls) -> R; +} + +impl IoControlsAccess for IoControls { + fn apply(&self, f: F) -> R + where F: Fn(&IoControls) -> R { + f(self) + } +} + +impl ControlledWrite +where A: IoControlsAccess +{ + pub fn underlying_wrt(&mut self) -> &mut W { + &mut self.underlying_wrt + } + + fn check_if_alive(&self) -> io::Result { + self.io_controls_access + .apply(|io_controls| io_controls.check_if_alive()) + } +} + +impl io::Write for ControlledWrite +where A: IoControlsAccess +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + let buf = truncate_bytes(buf); + let written_num_bytes = self.underlying_wrt.write(buf)?; + self.io_controls_access + .apply(|io_controls| io_controls.consume_blocking(written_num_bytes))?; + Ok(written_num_bytes) + } + + fn flush(&mut self) -> io::Result<()> { + // We voluntarily avoid to check the kill switch on flush. + // This is because the `RAMDirectory` currently panics if flush + // is not called before `Drop`. + let _guard = self.check_if_alive(); + self.underlying_wrt.flush() + } +} + +#[cfg(test)] +mod tests { + use std::io::Write; + use std::time::Duration; + + use tokio::io::{sink, AsyncWriteExt}; + use tokio::time::Instant; + + use crate::io::{IoControls, IoControlsAccess}; + + #[tokio::test] + async fn test_controlled_writer_limited_async() { + let io_controls = IoControls::default().set_throughput_limit(2_000_000f64); + let mut controlled_write = io_controls.clone().wrap_write(sink()); + let buf = vec![44u8; 1_000]; + let start = Instant::now(); + // We write 200 KB + for _ in 0..200 { + controlled_write.write_all(&buf).await.unwrap(); + } + controlled_write.flush().await.unwrap(); + let elapsed = start.elapsed(); + assert!(elapsed >= Duration::from_millis(50)); + assert!(elapsed <= Duration::from_millis(150)); + assert_eq!(io_controls.num_bytes(), 200_000u64); + } + + #[tokio::test] + async fn test_controlled_writer_no_limit_async() { + let io_controls = IoControls::default(); + let mut controlled_write = io_controls.clone().wrap_write(sink()); + let buf = vec![44u8; 1_000]; + let start = Instant::now(); + // We write 2MB + for _ in 0..2_000 { + controlled_write.write_all(&buf).await.unwrap(); + } + controlled_write.flush().await.unwrap(); + let elapsed = start.elapsed(); + assert!(elapsed <= Duration::from_millis(5)); + assert_eq!(io_controls.num_bytes(), 2_000_000u64); + } + + #[test] + fn test_controlled_writer_limited_sync() { + let io_controls = IoControls::default().set_throughput_limit(2_000_000f64); + let mut controlled_write = io_controls.clone().wrap_write(std::io::sink()); + let buf = vec![44u8; 1_000]; + let start = Instant::now(); + // We write 200 KB + for _ in 0..200 { + controlled_write.write_all(&buf).unwrap(); + } + controlled_write.flush().unwrap(); + let elapsed = start.elapsed(); + assert!(elapsed >= Duration::from_millis(50)); + assert!(elapsed <= Duration::from_millis(150)); + assert_eq!(io_controls.num_bytes(), 200_000u64); + } + + #[test] + fn test_controlled_writer_no_limit_sync() { + let io_controls = IoControls::default(); + let mut controlled_write = io_controls.clone().wrap_write(std::io::sink()); + let buf = vec![44u8; 1_000]; + let start = Instant::now(); + // We write 2MB + for _ in 0..2_000 { + controlled_write.write_all(&buf).unwrap(); + } + controlled_write.flush().unwrap(); + let elapsed = start.elapsed(); + assert!(elapsed <= Duration::from_millis(5)); + assert_eq!(io_controls.num_bytes(), 2_000_000u64); + } +} diff --git a/quickwit/quickwit-actors/src/kill_switch.rs b/quickwit/quickwit-common/src/kill_switch.rs similarity index 100% rename from quickwit/quickwit-actors/src/kill_switch.rs rename to quickwit/quickwit-common/src/kill_switch.rs diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 55dd6c1cfad..b13f732da09 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -21,8 +21,11 @@ mod checklist; mod coolid; pub mod fs; +pub mod io; +mod kill_switch; pub mod metrics; pub mod net; +mod progress; pub mod rand; pub mod runtimes; pub mod uri; @@ -35,6 +38,8 @@ pub use checklist::{ print_checklist, run_checklist, ChecklistError, BLUE_COLOR, GREEN_COLOR, RED_COLOR, }; pub use coolid::new_coolid; +pub use kill_switch::KillSwitch; +pub use progress::{Progress, ProtectedZoneGuard}; use tracing::{error, info}; pub fn chunk_range(range: Range, chunk_size: usize) -> impl Iterator> { diff --git a/quickwit/quickwit-actors/src/progress.rs b/quickwit/quickwit-common/src/progress.rs similarity index 100% rename from quickwit/quickwit-actors/src/progress.rs rename to quickwit/quickwit-common/src/progress.rs diff --git a/quickwit/quickwit-config/src/index_config.rs b/quickwit/quickwit-config/src/index_config.rs index d2e4ae4d40a..b9e0ed9b0c9 100644 --- a/quickwit/quickwit-config/src/index_config.rs +++ b/quickwit/quickwit-config/src/index_config.rs @@ -72,6 +72,18 @@ pub struct IndexingResources { pub __num_threads_deprecated: IgnoredAny, // DEPRECATED #[serde(default = "IndexingResources::default_heap_size")] pub heap_size: Byte, + /// Sets the maximum write IO throughput for the merge pipeline, + /// in bytes per secs. On hardware where IO is limited, this parameter can help limiting + /// the impact of merges on indexing. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub max_merge_write_throughput: Option, + /// Sets the maximum write IO throughput for the janitor, in bytes per secs. + /// On hardware where IO is limited, this parameter can help limiting + /// the impact on indexing. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub max_janitor_write_throughput: Option, } impl PartialEq for IndexingResources { @@ -88,8 +100,8 @@ impl IndexingResources { #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { Self { - __num_threads_deprecated: IgnoredAny, heap_size: Byte::from_bytes(20_000_000), // 20MB + ..Default::default() } } } @@ -97,8 +109,10 @@ impl IndexingResources { impl Default for IndexingResources { fn default() -> Self { Self { - __num_threads_deprecated: IgnoredAny, heap_size: Self::default_heap_size(), + max_merge_write_throughput: None, + max_janitor_write_throughput: None, + __num_threads_deprecated: IgnoredAny, } } } @@ -646,8 +660,8 @@ mod tests { assert_eq!( index_config.indexing_settings.resources, IndexingResources { - __num_threads_deprecated: serde::de::IgnoredAny, - heap_size: Byte::from_bytes(3_000_000_000) + heap_size: Byte::from_bytes(3_000_000_000), + ..Default::default() } ); assert_eq!( diff --git a/quickwit/quickwit-indexing/src/actors/index_serializer.rs b/quickwit/quickwit-indexing/src/actors/index_serializer.rs index 8c7a70765fd..3bf7373cedc 100644 --- a/quickwit/quickwit-indexing/src/actors/index_serializer.rs +++ b/quickwit/quickwit-indexing/src/actors/index_serializer.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::io::IoControls; use quickwit_common::runtimes::RuntimeType; use tokio::runtime::Handle; use tracing::instrument; @@ -73,14 +74,25 @@ impl Handler for IndexSerializer { batch_builder: IndexedSplitBatchBuilder, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - let splits: Vec = { - let _protect = ctx.protect_zone(); - batch_builder - .splits - .into_iter() - .map(|split_builder| split_builder.finalize()) - .collect::>()? - }; + let mut splits: Vec = Vec::with_capacity(batch_builder.splits.len()); + for split_builder in batch_builder.splits { + // TODO Consider & test removing this protect guard. + // + // In theory the controlled directory should be sufficient. + let _protect_guard = ctx.protect_zone(); + if let Some(controlled_directory) = &split_builder.controlled_directory_opt { + let io_controls = IoControls::default() + .set_progress(ctx.progress().clone()) + .set_kill_switch(ctx.kill_switch().clone()) + .set_index_and_component( + &split_builder.split_attrs.pipeline_id.index_id, + "index_serializer", + ); + controlled_directory.set_io_controls(io_controls); + } + let split = split_builder.finalize()?; + splits.push(split); + } let indexed_split_batch = IndexedSplitBatch { batch_parent_span: batch_builder.batch_parent_span, splits, diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index fbf5b67a324..33c6f8466e6 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -28,6 +28,7 @@ use fail::fail_point; use fnv::FnvHashMap; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::io::IoControls; use quickwit_common::runtimes::RuntimeType; use quickwit_config::IndexingSettings; use quickwit_doc_mapper::{DocMapper, SortBy, QUICKWIT_TOKENIZER_MANAGER}; @@ -86,14 +87,19 @@ impl IndexerState { .settings(self.index_settings.clone()) .schema(self.schema.clone()) .tokenizers(QUICKWIT_TOKENIZER_MANAGER.clone()); + + let io_controls = IoControls::default() + .set_progress(ctx.progress().clone()) + .set_kill_switch(ctx.kill_switch().clone()) + .set_index_and_component(&self.pipeline_id.index_id, "indexer"); + let indexed_split = IndexedSplitBuilder::new_in_dir( self.pipeline_id.clone(), partition_id, last_delete_opstamp, self.indexing_directory.scratch_directory().clone(), index_builder, - ctx.progress().clone(), - ctx.kill_switch().clone(), + io_controls, )?; info!( split_id = indexed_split.split_id(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 1abaee9d2bf..21aa4a4bcb2 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -23,9 +23,10 @@ use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{ - create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, KillSwitch, - Mailbox, QueueCapacity, Supervisable, + create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox, + QueueCapacity, Supervisable, }; +use quickwit_common::KillSwitch; use quickwit_config::{IndexingSettings, SourceConfig}; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{Metastore, MetastoreError}; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index cea7d8028f7..4c70882212e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -23,14 +23,13 @@ use std::sync::Arc; use async_trait::async_trait; use quickwit_actors::{ - create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox, - Observation, QueueCapacity, Supervisable, + Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox, Observation, + Supervisable, }; use quickwit_common::fs::get_cache_directory_path; use quickwit_config::{ build_doc_mapper, IndexerConfig, SourceConfig, SourceParams, VecSourceParams, }; -use quickwit_doc_mapper::DocMapper; use quickwit_ingest_api::QUEUES_DIR_NAME; use quickwit_metastore::{IndexMetadata, Metastore, MetastoreError}; use quickwit_proto::{ServiceError, ServiceErrorCode}; @@ -41,7 +40,6 @@ use tracing::{error, info}; use super::merge_pipeline::{MergePipeline, MergePipelineParams}; use super::MergePlanner; -use crate::merge_policy::MergePolicy; use crate::models::{ DetachPipeline, IndexingDirectory, IndexingPipelineId, Observe, ObservePipeline, ShutdownPipeline, ShutdownPipelines, SpawnMergePipeline, SpawnPipeline, SpawnPipelines, @@ -287,15 +285,23 @@ impl IndexingService { &index_metadata.indexing_settings, ) .map_err(IndexingServiceError::InvalidParams)?; + + let merge_pipeline_params = MergePipelineParams { + pipeline_id: pipeline_id.clone(), + doc_mapper: doc_mapper.clone(), + indexing_directory: indexing_directory.clone(), + metastore: self.metastore.clone(), + split_store: split_store.clone(), + merge_policy, + merge_max_io_num_bytes_per_sec: index_metadata + .indexing_settings + .resources + .max_merge_write_throughput, + max_concurrent_split_uploads: self.max_concurrent_split_uploads, + }; + let merge_planner_mailbox = self - .get_or_create_merge_pipeline( - ctx, - &pipeline_id, - doc_mapper.clone(), - indexing_directory.clone(), - split_store.clone(), - merge_policy, - ) + .get_or_create_merge_pipeline(merge_pipeline_params, ctx) .await?; let max_concurrent_split_uploads_index = (self.max_concurrent_split_uploads / 2).max(1); @@ -422,34 +428,17 @@ impl IndexingService { async fn get_or_create_merge_pipeline( &mut self, + merge_pipeline_params: MergePipelineParams, ctx: &ActorContext, - pipeline_id: &IndexingPipelineId, - doc_mapper: Arc, - indexing_directory: IndexingDirectory, - split_store: IndexingSplitStore, - merge_policy: Arc, ) -> Result, IndexingServiceError> { - let merge_pipeline_id = MergePipelineId::from(pipeline_id); + let merge_pipeline_id = MergePipelineId::from(&merge_pipeline_params.pipeline_id); if let Some(merge_pipeline_mailbox_handle) = self.merge_pipeline_handles.get(&merge_pipeline_id) { return Ok(merge_pipeline_mailbox_handle.mailbox.clone()); } - - let (merge_planner_mailbox, merge_planner_inbox) = - create_mailbox::("MergePlanner".to_string(), QueueCapacity::Unbounded); - let merge_pipeline_params = MergePipelineParams { - pipeline_id: pipeline_id.clone(), - doc_mapper, - indexing_directory, - metastore: self.metastore.clone(), - split_store, - merge_policy, - max_concurrent_split_uploads: self.max_concurrent_split_uploads, - merge_planner_mailbox: merge_planner_mailbox.clone(), - merge_planner_inbox, - }; let merge_pipeline = MergePipeline::new(merge_pipeline_params); + let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline); let merge_pipeline_mailbox_handle = MergePipelineHandle { mailbox: merge_planner_mailbox.clone(), diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index eb1b5d9a936..4c859106af4 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -28,6 +28,7 @@ use async_trait::async_trait; use fail::fail_point; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::io::IoControls; use quickwit_common::runtimes::RuntimeType; use quickwit_directories::UnionDirectory; use quickwit_doc_mapper::fast_field_reader::timestamp_field_reader; @@ -53,6 +54,7 @@ pub struct MergeExecutor { pipeline_id: IndexingPipelineId, metastore: Arc, doc_mapper: Arc, + io_controls: IoControls, merge_packager_mailbox: Mailbox, } @@ -219,78 +221,6 @@ pub fn combine_partition_ids(splits: &[SplitMetadata]) -> u64 { combine_partition_ids_aux(splits.iter().map(|split| split.partition_id)) } -async fn merge_split_directories( - union_index_meta: IndexMeta, - split_directories: Vec>, - delete_tasks: Vec, - doc_mapper_opt: Option>, - output_path: &Path, - ctx: &ActorContext, -) -> anyhow::Result { - let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?; - // This directory is here to receive the merged split, as well as the final meta.json file. - let output_directory = ControlledDirectory::new( - Box::new(MmapDirectory::open(output_path)?), - ctx.progress().clone(), - ctx.kill_switch().clone(), - ); - let mut directory_stack: Vec> = vec![ - output_directory.box_clone(), - Box::new(shadowing_meta_json_directory), - ]; - directory_stack.extend(split_directories.into_iter()); - let union_directory = UnionDirectory::union_of(directory_stack); - let union_index = open_index(union_directory)?; - - ctx.record_progress(); - let _protect_guard = ctx.protect_zone(); - - let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?; - let num_delete_tasks = delete_tasks.len(); - if num_delete_tasks > 0 { - let doc_mapper = doc_mapper_opt - .ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?; - for delete_task in delete_tasks { - let delete_query = delete_task - .delete_query - .expect("A delete task must have a delete query."); - let search_request = SearchRequest { - index_id: delete_query.index_id, - query: delete_query.query, - start_timestamp: delete_query.start_timestamp, - end_timestamp: delete_query.end_timestamp, - search_fields: delete_query.search_fields, - ..Default::default() - }; - debug!( - "Delete all documents matched by query `{:?}`", - search_request - ); - let query = doc_mapper.query(union_index.schema(), &search_request)?; - index_writer.delete_query(query)?; - } - debug!("commit-delete-operations"); - index_writer.commit()?; - } - - let segment_ids: Vec = union_index - .searchable_segment_metas()? - .into_iter() - .map(|segment_meta| segment_meta.id()) - .collect(); - - // A merge is useless if there is no delete and only one segment. - if num_delete_tasks == 0 && segment_ids.len() <= 1 { - return Ok(output_directory); - } - - debug!(segment_ids=?segment_ids,"merging-segments"); - // TODO it would be nice if tantivy could let us run the merge in the current thread. - index_writer.merge(&segment_ids).wait()?; - - Ok(output_directory) -} - pub fn merge_split_attrs( merge_split_id: String, pipeline_id: &IndexingPipelineId, @@ -335,12 +265,14 @@ impl MergeExecutor { pipeline_id: IndexingPipelineId, metastore: Arc, doc_mapper: Arc, + io_controls: IoControls, merge_packager_mailbox: Mailbox, ) -> Self { MergeExecutor { pipeline_id, metastore, doc_mapper, + io_controls, merge_packager_mailbox, } } @@ -356,15 +288,16 @@ impl MergeExecutor { let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?; // TODO it would be nice if tantivy could let us run the merge in the current thread. fail_point!("before-merge-split"); - let controlled_directory = merge_split_directories( - union_index_meta, - split_directories, - Vec::new(), - None, - merge_scratch_directory.path(), - ctx, - ) - .await?; + let controlled_directory = self + .merge_split_directories( + union_index_meta, + split_directories, + Vec::new(), + None, + merge_scratch_directory.path(), + ctx, + ) + .await?; fail_point!("after-merge-split"); // This will have the side effect of deleting the directory containing the downloaded @@ -413,15 +346,16 @@ impl MergeExecutor { ); let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?; - let controlled_directory = merge_split_directories( - union_index_meta, - split_directories, - delete_tasks, - Some(self.doc_mapper.clone()), - merge_scratch_directory.path(), - ctx, - ) - .await?; + let controlled_directory = self + .merge_split_directories( + union_index_meta, + split_directories, + delete_tasks, + Some(self.doc_mapper.clone()), + merge_scratch_directory.path(), + ctx, + ) + .await?; // This will have the side effect of deleting the directory containing the downloaded split. let mut merged_index = Index::open(controlled_directory.clone())?; @@ -482,6 +416,82 @@ impl MergeExecutor { }; Ok(Some(indexed_split)) } + + async fn merge_split_directories( + &self, + union_index_meta: IndexMeta, + split_directories: Vec>, + delete_tasks: Vec, + doc_mapper_opt: Option>, + output_path: &Path, + ctx: &ActorContext, + ) -> anyhow::Result { + let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?; + + // This directory is here to receive the merged split, as well as the final meta.json file. + let output_directory = ControlledDirectory::new( + Box::new(MmapDirectory::open(output_path)?), + self.io_controls + .clone() + .set_kill_switch(ctx.kill_switch().clone()) + .set_progress(ctx.progress().clone()), + ); + let mut directory_stack: Vec> = vec![ + output_directory.box_clone(), + Box::new(shadowing_meta_json_directory), + ]; + directory_stack.extend(split_directories.into_iter()); + let union_directory = UnionDirectory::union_of(directory_stack); + let union_index = open_index(union_directory)?; + + ctx.record_progress(); + let _protect_guard = ctx.protect_zone(); + + let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?; + let num_delete_tasks = delete_tasks.len(); + if num_delete_tasks > 0 { + let doc_mapper = doc_mapper_opt + .ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?; + for delete_task in delete_tasks { + let delete_query = delete_task + .delete_query + .expect("A delete task must have a delete query."); + let search_request = SearchRequest { + index_id: delete_query.index_id, + query: delete_query.query, + start_timestamp: delete_query.start_timestamp, + end_timestamp: delete_query.end_timestamp, + search_fields: delete_query.search_fields, + ..Default::default() + }; + debug!( + "Delete all documents matched by query `{:?}`", + search_request + ); + let query = doc_mapper.query(union_index.schema(), &search_request)?; + index_writer.delete_query(query)?; + } + debug!("commit-delete-operations"); + index_writer.commit()?; + } + + let segment_ids: Vec = union_index + .searchable_segment_metas()? + .into_iter() + .map(|segment_meta| segment_meta.id()) + .collect(); + + // A merge is useless if there is no delete and only one segment. + if num_delete_tasks == 0 && segment_ids.len() <= 1 { + return Ok(output_directory); + } + + debug!(segment_ids=?segment_ids,"merging-segments"); + // TODO it would be nice if tantivy could let us run the merge in the current thread. + index_writer.merge(&segment_ids).wait()?; + + Ok(output_directory) + } } fn open_index>>(directory: T) -> tantivy::Result { @@ -567,6 +577,7 @@ mod tests { pipeline_id, metastore, test_sandbox.doc_mapper(), + IoControls::default(), merge_packager_mailbox, ); let universe = Universe::new(); @@ -707,6 +718,7 @@ mod tests { pipeline_id, metastore, test_sandbox.doc_mapper(), + IoControls::default(), merge_packager_mailbox, ); let universe = Universe::new(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 7187de3a31b..a04c8f97b79 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -21,10 +21,13 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use byte_unit::Byte; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Inbox, KillSwitch, Mailbox, - Supervisable, + create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Inbox, + Mailbox, QueueCapacity, Supervisable, }; +use quickwit_common::io::IoControls; +use quickwit_common::KillSwitch; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{Metastore, MetastoreError, SplitState}; use tokio::join; @@ -60,6 +63,8 @@ struct Spawn { pub struct MergePipeline { params: MergePipelineParams, + merge_planner_mailbox: Mailbox, + merge_planner_inbox: Inbox, previous_generations_statistics: MergeStatistics, statistics: MergeStatistics, handles: Option, @@ -88,15 +93,23 @@ impl Actor for MergePipeline { impl MergePipeline { pub fn new(params: MergePipelineParams) -> Self { + let (merge_planner_mailbox, merge_planner_inbox) = + create_mailbox::("MergePlanner".to_string(), QueueCapacity::Unbounded); Self { params, previous_generations_statistics: Default::default(), handles: None, kill_switch: KillSwitch::default(), statistics: MergeStatistics::default(), + merge_planner_inbox, + merge_planner_mailbox, } } + pub fn merge_planner_mailbox(&self) -> &Mailbox { + &self.merge_planner_mailbox + } + fn supervisables(&self) -> Vec<&dyn Supervisable> { if let Some(handles) = &self.handles { let supervisables: Vec<&dyn Supervisable> = vec![ @@ -203,7 +216,7 @@ impl MergePipeline { let merge_publisher = Publisher::new( PublisherType::MergePublisher, self.params.metastore.clone(), - Some(self.params.merge_planner_mailbox.clone()), + Some(self.merge_planner_mailbox.clone()), None, ); let (merge_publisher_mailbox, merge_publisher_handler) = ctx @@ -238,10 +251,31 @@ impl MergePipeline { .set_kill_switch(self.kill_switch.clone()) .spawn(merge_packager); + let max_merge_write_throughput: f64 = self + .params + .merge_max_io_num_bytes_per_sec + .as_ref() + .map(|bytes_per_sec| bytes_per_sec.get_bytes() as f64) + .unwrap_or(f64::INFINITY); + + let split_downloader_io_controls = IoControls::default() + .set_throughput_limit(max_merge_write_throughput) + .set_index_and_component( + self.params.pipeline_id.index_id.as_str(), + "split_downloader_merge", + ); + + // The merge and split download share the same throughput limiter. + // This is how cloning the `IoControls` works. + let merge_executor_io_controls = split_downloader_io_controls + .clone() + .set_index_and_component(self.params.pipeline_id.index_id.as_str(), "merger"); + let merge_executor = MergeExecutor::new( self.params.pipeline_id.clone(), self.params.metastore.clone(), self.params.doc_mapper.clone(), + merge_executor_io_controls, merge_packager_mailbox, ); let (merge_executor_mailbox, merge_executor_handler) = ctx @@ -253,6 +287,7 @@ impl MergePipeline { scratch_directory: self.params.indexing_directory.scratch_directory().clone(), split_store: self.params.split_store.clone(), executor_mailbox: merge_executor_mailbox, + io_controls: split_downloader_io_controls, }; let (merge_split_downloader_mailbox, merge_split_downloader_handler) = ctx .spawn_actor() @@ -270,8 +305,8 @@ impl MergePipeline { .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_mailboxes( - self.params.merge_planner_mailbox.clone(), - self.params.merge_planner_inbox.clone(), + self.merge_planner_mailbox.clone(), + self.merge_planner_inbox.clone(), ) .spawn(merge_planner); @@ -400,8 +435,7 @@ pub struct MergePipelineParams { pub split_store: IndexingSplitStore, pub merge_policy: Arc, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. - pub merge_planner_mailbox: Mailbox, - pub merge_planner_inbox: Inbox, + pub merge_max_io_num_bytes_per_sec: Option, } #[cfg(test)] @@ -409,7 +443,7 @@ mod tests { use std::sync::Arc; use std::time::Duration; - use quickwit_actors::{create_mailbox, ActorExitStatus, QueueCapacity, Universe}; + use quickwit_actors::{ActorExitStatus, Universe}; use quickwit_doc_mapper::default_doc_mapper_for_test; use quickwit_metastore::MockMetastore; use quickwit_storage::RamStorage; @@ -435,8 +469,6 @@ mod tests { }; let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store(storage.clone()); - let (merge_planner_mailbox, merge_planner_inbox) = - create_mailbox("MergePlanner".to_string(), QueueCapacity::Unbounded); let pipeline_params = MergePipelineParams { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), @@ -445,8 +477,7 @@ mod tests { split_store, merge_policy: default_merge_policy(), max_concurrent_split_uploads: 2, - merge_planner_mailbox, - merge_planner_inbox, + merge_max_io_num_bytes_per_sec: None, }; let pipeline = MergePipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 516214a3216..c2e1b23eb40 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -21,6 +21,7 @@ use std::path::Path; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::io::IoControls; use quickwit_metastore::SplitMetadata; use tantivy::Directory; use tracing::{debug, info, instrument}; @@ -35,6 +36,7 @@ pub struct MergeSplitDownloader { pub scratch_directory: ScratchDirectory, pub split_store: IndexingSplitStore, pub executor_mailbox: Mailbox, + pub io_controls: IoControls, } impl Actor for MergeSplitDownloader { @@ -107,10 +109,15 @@ impl MergeSplitDownloader { ); return Err(ActorExitStatus::Killed); } + let io_controls = self + .io_controls + .clone() + .set_progress(ctx.progress().clone()) + .set_kill_switch(ctx.kill_switch().clone()); let _protect_guard = ctx.protect_zone(); let tantivy_dir = self .split_store - .fetch_and_open_split(split.split_id(), download_directory) + .fetch_and_open_split(split.split_id(), download_directory, &io_controls) .await .map_err(|error| { let split_id = split.split_id(); @@ -165,6 +172,7 @@ mod tests { scratch_directory, split_store, executor_mailbox: merge_executor_mailbox, + io_controls: IoControls::default(), }; let (merge_split_downloader_mailbox, merge_split_downloader_handler) = universe.spawn_builder().spawn(merge_split_downloader); diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 8402c615bc8..b1d40f1b2d4 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -134,14 +134,6 @@ impl Handler for Packager { split_ids=?split_ids, "start-packaging-splits" ); - for split in &batch.splits { - if let Some(controlled_directory) = &split.controlled_directory_opt { - controlled_directory.set_progress_and_kill_switch( - ctx.progress().clone(), - ctx.kill_switch().clone(), - ); - } - } fail_point!("packager:before"); let mut packaged_splits = Vec::new(); for split in batch.splits { diff --git a/quickwit/quickwit-indexing/src/controlled_directory.rs b/quickwit/quickwit-indexing/src/controlled_directory.rs index 828f5379b0e..27e3ded6d35 100644 --- a/quickwit/quickwit-indexing/src/controlled_directory.rs +++ b/quickwit/quickwit-indexing/src/controlled_directory.rs @@ -18,12 +18,14 @@ // along with this program. If not, see . use std::io::{BufWriter, IntoInnerError}; +use std::ops::Deref; use std::path::Path; use std::sync::Arc; use std::{fmt, io}; use arc_swap::ArcSwap; -use quickwit_actors::{KillSwitch, Progress, ProtectedZoneGuard}; +use quickwit_common::io::{ControlledWrite, IoControls, IoControlsAccess}; +use quickwit_common::ProtectedZoneGuard; use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use tantivy::directory::{ AntiCallToken, FileHandle, TerminatingWrite, WatchCallback, WatchHandle, WritePtr, @@ -43,36 +45,24 @@ const BUFFER_NUM_BYTES: usize = 8_192; /// - in the future, record a writing speed, possibly introduce some throttling, etc. #[derive(Clone)] pub struct ControlledDirectory { - inner: Inner, + underlying: Arc, + io_controls: HotswappableIoControls, } impl ControlledDirectory { - pub fn new( - directory: Box, - progress: Progress, - kill_switch: KillSwitch, - ) -> ControlledDirectory { + pub fn new(directory: Box, io_controls: IoControls) -> ControlledDirectory { ControlledDirectory { - inner: Inner { - controls: Arc::new(ArcSwap::new(Arc::new(Controls { - progress, - kill_switch, - }))), - underlying: directory.into(), - }, + underlying: directory.into(), + io_controls: HotswappableIoControls::new(io_controls), } } - fn check_if_alive(&self) -> io::Result { - self.inner.controls.load().check_if_alive() + pub fn check_if_alive(&self) -> io::Result { + self.io_controls.load().check_if_alive() } - pub fn set_progress_and_kill_switch(&self, progress: Progress, kill_switch: KillSwitch) { - progress.record_progress(); - self.inner.controls.store(Arc::new(Controls { - progress, - kill_switch, - })); + pub fn set_io_controls(&self, io_controls: IoControls) { + self.io_controls.store(Arc::new(io_controls)); } } @@ -82,77 +72,11 @@ impl fmt::Debug for ControlledDirectory { } } -#[derive(Clone)] -struct Controls { - progress: Progress, - kill_switch: KillSwitch, -} - -impl Controls { - fn check_if_alive(&self) -> io::Result { - if self.kill_switch.is_dead() { - return Err(io::Error::new( - io::ErrorKind::Other, - "Directory kill switch was activated.", - )); - } - let guard = self.progress.protect_zone(); - Ok(guard) - } -} - -#[derive(Clone)] -struct Inner { - controls: Arc>, - underlying: Arc, -} - -struct ControlledWrite { - controls: Arc>, - underlying_wrt: Box, -} - -impl ControlledWrite { - fn check_if_alive(&self) -> io::Result { - self.controls.load().check_if_alive() - } -} - -impl io::Write for ControlledWrite { - fn write(&mut self, buf: &[u8]) -> io::Result { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - // We voluntarily avoid to check the kill switch on flush. - // This is because the RAMDirectory currently panics if flush - // is not called before Drop. - let _guard = self.check_if_alive(); - self.underlying_wrt.flush() - } - - fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_vectored(bufs) - } - - fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_all(buf) - } - - fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_fmt(fmt) - } -} - impl Directory for ControlledDirectory { fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> { self.check_if_alive() .map_err(|io_err| OpenReadError::wrap_io_error(io_err, path.to_path_buf()))?; - self.inner.underlying.get_file_handle(path) + self.underlying.get_file_handle(path) } fn delete(&self, path: &Path) -> Result<(), DeleteError> { @@ -161,13 +85,13 @@ impl Directory for ControlledDirectory { io_error: Arc::new(io_error), filepath: path.to_path_buf(), })?; - self.inner.underlying.delete(path) + self.underlying.delete(path) } fn exists(&self, path: &Path) -> Result { self.check_if_alive() .map_err(|io_err| OpenReadError::wrap_io_error(io_err, path.to_path_buf()))?; - self.inner.underlying.exists(path) + self.underlying.exists(path) } fn open_write(&self, path: &Path) -> Result { @@ -175,50 +99,84 @@ impl Directory for ControlledDirectory { .map_err(|io_err| OpenWriteError::wrap_io_error(io_err, path.to_path_buf()))?; let underlying_wrt: Box = self - .inner .underlying .open_write(path)? .into_inner() .map_err(IntoInnerError::into_error) .map_err(|io_err| OpenWriteError::wrap_io_error(io_err, path.to_path_buf()))?; - let controls = self.inner.controls.clone(); - let controlled_wrt = ControlledWrite { - controls, - underlying_wrt, - }; + let controlled_wrt = self.io_controls.clone().wrap_write(underlying_wrt); Ok(BufWriter::with_capacity( BUFFER_NUM_BYTES, - Box::new(controlled_wrt), + Box::new(AdoptedControlledWrite(controlled_wrt)), )) } fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { self.check_if_alive() .map_err(|io_err| OpenReadError::wrap_io_error(io_err, path.to_path_buf()))?; - self.inner.underlying.atomic_read(path) + self.underlying.atomic_read(path) } fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> { self.check_if_alive()?; - self.inner.underlying.atomic_write(path, data) + self.underlying.atomic_write(path, data) } fn watch(&self, watch_callback: WatchCallback) -> tantivy::Result { self.check_if_alive()?; - self.inner.underlying.watch(watch_callback) + self.underlying.watch(watch_callback) } fn sync_directory(&self) -> io::Result<()> { self.check_if_alive()?; - self.inner.underlying.sync_directory() + self.underlying.sync_directory() + } +} + +#[derive(Clone)] +struct HotswappableIoControls(Arc>); + +impl Deref for HotswappableIoControls { + type Target = ArcSwap; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +impl HotswappableIoControls { + pub fn new(io_controls: IoControls) -> Self { + Self(Arc::new(ArcSwap::new(Arc::new(io_controls)))) } } -impl TerminatingWrite for ControlledWrite { +impl IoControlsAccess for HotswappableIoControls { + fn apply(&self, f: F) -> R + where F: Fn(&IoControls) -> R { + let guard = self.0.load(); + f(&**guard) + } +} + +// Wrapper to work around the orphan rule. (hence the word "Adopted"). +struct AdoptedControlledWrite(ControlledWrite>); + +impl io::Write for AdoptedControlledWrite { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl TerminatingWrite for AdoptedControlledWrite { #[inline] fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { - self.underlying_wrt.flush()?; - self.underlying_wrt.terminate_ref(token) + let underlying_wrt = self.0.underlying_wrt(); + underlying_wrt.flush()?; + underlying_wrt.terminate_ref(token) } } @@ -233,25 +191,36 @@ mod tests { #[test] fn test_records_progress_on_write() -> anyhow::Result<()> { let directory = RamDirectory::default(); - let progress = Progress::default(); + let io_controls = IoControls::default(); let controlled_directory = - ControlledDirectory::new(Box::new(directory), progress.clone(), KillSwitch::default()); + ControlledDirectory::new(Box::new(directory), io_controls.clone()); + let progress = io_controls.progress().clone(); assert!(progress.registered_activity_since_last_call()); assert!(!progress.registered_activity_since_last_call()); let mut wrt = controlled_directory.open_write(Path::new("test"))?; assert!(progress.registered_activity_since_last_call()); // We use a large buffer to force the buf writer to flush at least once. let large_buffer = vec![0u8; wrt.capacity() + 1]; + assert_eq!(io_controls.num_bytes(), 0u64); wrt.write_all(&large_buffer)?; + assert_eq!(io_controls.num_bytes(), 8_193u64); assert!(progress.registered_activity_since_last_call()); wrt.write_all(b"small payload")?; + // The buffering makes it so that this last write does not + // get actually written right away. + assert_eq!(io_controls.num_bytes(), 8_193u64); // Here we check that the progress only concerns is only // trigger when the BufWriter flushes. assert!(!progress.registered_activity_since_last_call()); wrt.write_all(&large_buffer)?; + assert_eq!(io_controls.num_bytes(), 16_399); assert!(progress.registered_activity_since_last_call()); assert!(!progress.registered_activity_since_last_call()); + wrt.write_all(&b"aa"[..])?; + assert_eq!(io_controls.num_bytes(), 16_399u64); wrt.terminate()?; + // Flush works as expected and makes sure all data buffered goes through + assert_eq!(io_controls.num_bytes(), 16_401u64); assert!(progress.registered_activity_since_last_call()); Ok(()) } @@ -259,17 +228,14 @@ mod tests { #[test] fn test_records_kill_switch_triggers_io_error() -> anyhow::Result<()> { let directory = RamDirectory::default(); - let kill_switch = KillSwitch::default(); - let controlled_directory = ControlledDirectory::new( - Box::new(directory), - Progress::default(), - kill_switch.clone(), - ); + let io_controls = IoControls::default(); + let controlled_directory = + ControlledDirectory::new(Box::new(directory), io_controls.clone()); let mut wrt = controlled_directory.open_write(Path::new("test"))?; // We use a large buffer to force the buf writer to flush at least once. let large_buffer = vec![0u8; wrt.capacity() + 1]; wrt.write_all(&large_buffer)?; - kill_switch.kill(); + io_controls.kill(); let err = wrt.write_all(&large_buffer).err().unwrap(); assert_eq!(err.kind(), io::ErrorKind::Other); wrt.terminate()?; diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 00698835752..ee9e4567b15 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -20,7 +20,7 @@ use std::fmt; use std::path::Path; -use quickwit_actors::{KillSwitch, Progress}; +use quickwit_common::io::IoControls; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use tantivy::directory::MmapDirectory; use tantivy::IndexBuilder; @@ -79,8 +79,7 @@ impl IndexedSplitBuilder { last_delete_opstamp: u64, scratch_directory: ScratchDirectory, index_builder: IndexBuilder, - progress: Progress, - kill_switch: KillSwitch, + io_controls: IoControls, ) -> anyhow::Result { // We avoid intermediary merge, and instead merge all segments in the packager. // The benefit is that we don't have to wait for potentially existing merges, @@ -91,8 +90,9 @@ impl IndexedSplitBuilder { scratch_directory.named_temp_child(split_scratch_directory_prefix)?; let mmap_directory = MmapDirectory::open(split_scratch_directory.path())?; let box_mmap_directory = Box::new(mmap_directory); - let controlled_directory = - ControlledDirectory::new(box_mmap_directory, progress, kill_switch); + + let controlled_directory = ControlledDirectory::new(box_mmap_directory, io_controls); + let index_writer = index_builder.single_segment_index_writer(controlled_directory.clone(), 10_000_000)?; Ok(Self { diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index e325055c66d..4a3814686f1 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -26,6 +26,7 @@ use std::time::Instant; use anyhow::Context; #[cfg(any(test, feature = "testsuite"))] use byte_unit::Byte; +use quickwit_common::io::{IoControls, IoControlsAccess}; use quickwit_metastore::SplitMetadata; use quickwit_storage::{PutPayload, Storage, StorageResult}; use tantivy::directory::MmapDirectory; @@ -195,11 +196,12 @@ impl IndexingSplitStore { /// /// As we fetch the split, we optimistically assume that this is for a merge /// operation that will be successful and we remove the split from the cache. - #[instrument(skip(self, output_dir_path), fields(cache_hit))] + #[instrument(skip(self, output_dir_path, io_controls), fields(cache_hit))] pub async fn fetch_and_open_split( &self, split_id: &str, output_dir_path: &Path, + io_controls: &IoControls, ) -> StorageResult> { let path = PathBuf::from(quickwit_common::split_file(split_id)); if let Some(split_path) = self @@ -215,10 +217,11 @@ impl IndexingSplitStore { tracing::Span::current().record("cache_hit", false); } let dest_filepath = output_dir_path.join(&path); - let mut dest_file = tokio::fs::File::create(&dest_filepath).await?; + let dest_file = tokio::fs::File::create(&dest_filepath).await?; + let mut dest_file_with_write_limit = io_controls.clone().wrap_write(dest_file); self.inner .remote_storage - .copy_to(&path, &mut dest_file) + .copy_to(&path, &mut dest_file_with_write_limit) .instrument(info_span!("fetch_split_from_remote_storage", path=?path)) .await?; get_tantivy_directory_from_split_bundle(&dest_filepath) @@ -242,6 +245,7 @@ mod tests { use std::sync::Arc; use byte_unit::Byte; + use quickwit_common::io::IoControls; use quickwit_metastore::SplitMetadata; use quickwit_storage::{RamStorage, SplitPayloadBuilder}; use tempfile::tempdir; @@ -400,13 +404,14 @@ mod tests { } { let output = tempfile::tempdir()?; + let io_controls = IoControls::default(); // get from cache let _split1 = split_store - .fetch_and_open_split(&split_id1, output.path()) + .fetch_and_open_split(&split_id1, output.path(), &io_controls) .await?; // get from remote storage let _split2 = split_store - .fetch_and_open_split(&split_id2, output.path()) + .fetch_and_open_split(&split_id2, output.path(), &io_controls) .await?; } Ok(()) diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 9ed831d5eb0..261a0c6e22c 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -23,9 +23,10 @@ use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, KillSwitch, Supervisor, - SupervisorState, + Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Supervisor, SupervisorState, }; +use quickwit_common::io::IoControls; +use quickwit_common::KillSwitch; use quickwit_config::{build_doc_mapper, IndexingSettings}; use quickwit_indexing::actors::{ MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType, @@ -164,10 +165,24 @@ impl DeleteTaskPipeline { pipeline_ord: 0, source_id: "unknown".to_string(), }; + let throughput_limit: f64 = index_metadata + .indexing_settings + .resources + .max_janitor_write_throughput + .as_ref() + .map(|bytes_per_sec| bytes_per_sec.get_bytes() as f64) + .unwrap_or(f64::INFINITY); + let delete_executor_io_controls = IoControls::default() + .set_throughput_limit(throughput_limit) + .set_index_and_component(self.index_id.as_str(), "deleter"); + let split_download_io_controls = delete_executor_io_controls + .clone() + .set_index_and_component(self.index_id.as_str(), "split_downloader_delete"); let delete_executor = MergeExecutor::new( index_pipeline_id, self.metastore.clone(), doc_mapper.clone(), + delete_executor_io_controls, packager_mailbox, ); let (delete_executor_mailbox, task_executor_supervisor_handler) = ctx @@ -180,6 +195,7 @@ impl DeleteTaskPipeline { scratch_directory: indexing_directory.scratch_directory().clone(), split_store: split_store.clone(), executor_mailbox: delete_executor_mailbox, + io_controls: split_download_io_controls, }; let (downloader_mailbox, downloader_supervisor_handler) = ctx .spawn_actor() diff --git a/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs b/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs index e39ca060779..c0085ca58a2 100644 --- a/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs +++ b/quickwit/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs @@ -159,8 +159,8 @@ pub(crate) fn sample_index_metadata_for_regression() -> IndexMetadata { }; let merge_policy = MergePolicyConfig::StableLog(stable_log_config); let indexing_resources = IndexingResources { - __num_threads_deprecated: serde::de::IgnoredAny, heap_size: Byte::from_bytes(3), + ..Default::default() }; let indexing_settings = IndexingSettings { timestamp_field: Some("timestamp".to_string()), From b8fac1b77acadf2c95a2cfcd02b2f376851559cb Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 3 Nov 2022 09:33:21 +0900 Subject: [PATCH 2/2] Added unit test --- quickwit/quickwit-common/src/io.rs | 52 +++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index df568f1ecbd..46a5a2ddc84 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -53,20 +53,18 @@ fn truncate_bytes(bytes: &[u8]) -> &[u8] { } struct IoMetrics { - write_num_bytes_total: IntCounterVec, + write_bytes: IntCounterVec, } impl Default for IoMetrics { fn default() -> Self { - let write_num_bytes_total = new_counter_vec( - "write_num_bytes_total", + let write_bytes = new_counter_vec( + "write_bytes", "Number of bytes written by a given component.", "quickwit", &["index", "component"], ); - Self { - write_num_bytes_total, - } + Self { write_bytes } } } @@ -134,7 +132,7 @@ impl IoControls { pub fn set_index_and_component(mut self, index: &str, component: &str) -> Self { self.bytes_counter = IO_METRICS - .write_num_bytes_total + .write_bytes .with_label_values(&[index, component]); self } @@ -225,12 +223,16 @@ impl ControlledWrite { } } -fn truncate_slices<'a, 'b>(bufs: &'b [IoSlice<'a>], max_len: usize) -> &'b [IoSlice<'a>] { +/// Quirky spec: truncates the list of bufs, and keep as many leftmost elements +/// as possible, within the constraint of not exceeding `max_len` bytes. +/// +/// Please keep this function private +fn quirky_truncate_slices<'a, 'b>(bufs: &'b [IoSlice<'a>], max_len: usize) -> &'b [IoSlice<'a>] { if bufs.is_empty() { return bufs; } let mut cumulated_len = bufs[0].len(); - for (i, buf) in bufs.iter().enumerate() { + for (i, buf) in bufs.iter().enumerate().skip(1) { cumulated_len += buf.len(); if cumulated_len > max_len { return &bufs[..i]; @@ -255,8 +257,11 @@ impl AsyncWrite for ControlledWrite { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { + if bufs.is_empty() { + return Poll::Ready(Ok(0)); + } // The shadowing is on purpose. - let bufs = truncate_slices(bufs, MAX_NUM_BYTES_WRITTEN_AT_ONCE); + let bufs = quirky_truncate_slices(bufs, MAX_NUM_BYTES_WRITTEN_AT_ONCE); self.poll_limited(cx, |r, cx| r.poll_write_vectored(cx, bufs)) } @@ -324,7 +329,7 @@ where A: IoControlsAccess #[cfg(test)] mod tests { - use std::io::Write; + use std::io::{IoSlice, Write}; use std::time::Duration; use tokio::io::{sink, AsyncWriteExt}; @@ -397,4 +402,29 @@ mod tests { assert!(elapsed <= Duration::from_millis(5)); assert_eq!(io_controls.num_bytes(), 2_000_000u64); } + + #[test] + fn test_truncate_io_slices_one_slice_too_long_corner_case() { + let one_slice = IoSlice::new(&b"abcdef"[..]); + assert_eq!(super::quirky_truncate_slices(&[one_slice], 2).len(), 1); + } + + #[test] + fn test_truncate_io_empty() { + assert_eq!(super::quirky_truncate_slices(&[], 2).len(), 0); + } + + #[test] + fn test_truncate_io_slices() { + let slices = &[ + IoSlice::new(&b"abc"[..]), + IoSlice::new(&b"defg"[..]), + IoSlice::new(&b"hi"[..]), + ]; + assert_eq!(super::quirky_truncate_slices(slices, 0).len(), 1); + assert_eq!(super::quirky_truncate_slices(slices, 6).len(), 1); + assert_eq!(super::quirky_truncate_slices(slices, 7).len(), 2); + assert_eq!(super::quirky_truncate_slices(slices, 9).len(), 3); + assert_eq!(super::quirky_truncate_slices(slices, 10).len(), 3); + } }