From 239e1f476f89eb165af8a2224b693a58e6db1081 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 3 Sep 2024 14:53:30 +0200 Subject: [PATCH 1/4] fix(spooler): Reduce number of disk reads --- .../services/buffer/envelope_buffer/mod.rs | 19 ++++++++++++++----- .../services/buffer/envelope_stack/sqlite.rs | 8 +++++++- .../services/buffer/stack_provider/memory.rs | 6 ++++-- .../src/services/buffer/stack_provider/mod.rs | 15 ++++++++++++++- .../services/buffer/stack_provider/sqlite.rs | 14 ++++++++++++-- 5 files changed, 51 insertions(+), 11 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 605d6e1bd4f..1c971330110 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -19,7 +19,7 @@ use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; -use crate::services::buffer::stack_provider::StackProvider; +use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::MemoryChecker; @@ -230,8 +230,14 @@ where { stack.push(envelope).await?; } else { - self.push_stack(ProjectKeyPair::from_envelope(&envelope), Some(envelope)) - .await?; + // Since we have initialization code that creates all the necessary stacks, we assume + // that any new stack that is added during the envelope buffer's lifecycle, is recreated. + self.push_stack( + StackCreationType::Recreate, + ProjectKeyPair::from_envelope(&envelope), + Some(envelope), + ) + .await?; } self.priority_queue .change_priority_by(&project_key_pair, |prio| { @@ -355,6 +361,7 @@ where /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted. async fn push_stack( &mut self, + stack_creation_type: StackCreationType, project_key_pair: ProjectKeyPair, envelope: Option>, ) -> Result<(), EnvelopeBufferError> { @@ -362,7 +369,9 @@ where .as_ref() .map_or(Instant::now(), |e| e.meta().start_time()); - let mut stack = self.stack_provider.create_stack(project_key_pair); + let mut stack = self + .stack_provider + .create_stack(stack_creation_type, project_key_pair); if let Some(envelope) = envelope { stack.push(envelope).await?; } @@ -411,7 +420,7 @@ where /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`]. async fn load_stacks(&mut self, project_key_pairs: HashSet) { for project_key_pair in project_key_pairs { - self.push_stack(project_key_pair, None) + self.push_stack(StackCreationType::Create, project_key_pair, None) .await .expect("Pushing an empty stack raised an error"); } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 1268fd5ea4e..0cbb028f4ba 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -53,6 +53,7 @@ impl SqliteEnvelopeStack { max_batches: usize, own_key: ProjectKey, sampling_key: ProjectKey, + check_disk: bool, ) -> Self { Self { envelope_store, @@ -64,7 +65,7 @@ impl SqliteEnvelopeStack { sampling_key, batches_buffer: VecDeque::with_capacity(max_batches), batches_buffer_size: 0, - check_disk: true, + check_disk, } } @@ -263,6 +264,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelopes = mock_envelopes(4); @@ -315,6 +317,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); // We pop with an invalid db. @@ -334,6 +337,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); // We pop with no elements. @@ -351,6 +355,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelopes = mock_envelopes(5); @@ -388,6 +393,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelopes = mock_envelopes(15); diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs index 0414f4a55fa..2b0b7822cf1 100644 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -1,6 +1,8 @@ use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; -use crate::services::buffer::stack_provider::{InitializationState, StackProvider}; +use crate::services::buffer::stack_provider::{ + InitializationState, StackCreationType, StackProvider, +}; use crate::utils::MemoryChecker; #[derive(Debug)] @@ -23,7 +25,7 @@ impl StackProvider for MemoryStackProvider { InitializationState::empty() } - fn create_stack(&self, _: ProjectKeyPair) -> Self::Stack { + fn create_stack(&self, _: StackCreationType, _: ProjectKeyPair) -> Self::Stack { MemoryEnvelopeStack::new() } diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index fa852d47d1c..d60f9d1fcb2 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -28,6 +28,15 @@ impl InitializationState { } } +/// The creation type for the [`EnvelopeStack`]. +pub enum StackCreationType { + /// An [`EnvelopeStack`] that is created for the first time. + Create, + /// An [`EnvelopeStack`] that was recreated because it was dropped for any arbitrary reason + /// before. + Recreate, +} + /// A provider of [`EnvelopeStack`] instances that is responsible for creating them. pub trait StackProvider: std::fmt::Debug { /// The implementation of [`EnvelopeStack`] that this manager creates. @@ -37,7 +46,11 @@ pub trait StackProvider: std::fmt::Debug { fn initialize(&self) -> impl Future; /// Creates an [`EnvelopeStack`]. - fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack; + fn create_stack( + &self, + stack_creation_type: StackCreationType, + project_key_pair: ProjectKeyPair, + ) -> Self::Stack; /// Returns `true` if the store used by this [`StackProvider`] has space to add new /// stacks or items to the stacks. diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 4b732bdb3b6..29466e8ff7d 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -5,7 +5,9 @@ use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; -use crate::services::buffer::stack_provider::{InitializationState, StackProvider}; +use crate::services::buffer::stack_provider::{ + InitializationState, StackCreationType, StackProvider, +}; use crate::SqliteEnvelopeStack; #[derive(Debug)] @@ -46,13 +48,21 @@ impl StackProvider for SqliteStackProvider { } } - fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack { + fn create_stack( + &self, + stack_creation_type: StackCreationType, + project_key_pair: ProjectKeyPair, + ) -> Self::Stack { SqliteEnvelopeStack::new( self.envelope_store.clone(), self.disk_batch_size, self.max_batches, project_key_pair.own_key, project_key_pair.sampling_key, + // We want to check the disk by default if we are creating the stack for the first time, + // this is done because if we recreate the stack, it means that we popped it + // before and popping happens only when the stack is empty (both in memory and in disk). + matches!(stack_creation_type, StackCreationType::Create), ) } From db2060661b8ccfd5ce2a7d38dd8fd67a67bde3ab Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 3 Sep 2024 14:57:38 +0200 Subject: [PATCH 2/4] Fix --- relay-server/benches/benches.rs | 3 +++ relay-server/src/services/buffer/envelope_stack/sqlite.rs | 1 + relay-server/src/services/buffer/stack_provider/sqlite.rs | 6 ++++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index a028eca60f7..045ad1bd937 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -103,6 +103,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); let mut envelopes = Vec::with_capacity(size); @@ -139,6 +140,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); // Pre-fill the stack @@ -179,6 +181,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); // Pre-generate envelopes diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 0cbb028f4ba..13e1fb5a131 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -248,6 +248,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelope = mock_envelope(Instant::now()); diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 29466e8ff7d..7b12d24a57f 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -60,8 +60,10 @@ impl StackProvider for SqliteStackProvider { project_key_pair.own_key, project_key_pair.sampling_key, // We want to check the disk by default if we are creating the stack for the first time, - // this is done because if we recreate the stack, it means that we popped it - // before and popping happens only when the stack is empty (both in memory and in disk). + // since we might have some data on disk. + // On the other hand, if we are recreating a stack, it means that we popped it because + // it was empty, or we never had data on disk for that stack, so we assume by default + // that there is no need to check disk until some data is spooled. matches!(stack_creation_type, StackCreationType::Create), ) } From 0a82a68efc4334ca2f1bb5ee7fd90dc0028842eb Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 3 Sep 2024 15:16:35 +0200 Subject: [PATCH 3/4] Rename --- relay-server/src/services/buffer/envelope_buffer/mod.rs | 4 ++-- relay-server/src/services/buffer/stack_provider/mod.rs | 9 ++++----- .../src/services/buffer/stack_provider/sqlite.rs | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 1c971330110..433e46bf72c 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -233,7 +233,7 @@ where // Since we have initialization code that creates all the necessary stacks, we assume // that any new stack that is added during the envelope buffer's lifecycle, is recreated. self.push_stack( - StackCreationType::Recreate, + StackCreationType::New, ProjectKeyPair::from_envelope(&envelope), Some(envelope), ) @@ -420,7 +420,7 @@ where /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`]. async fn load_stacks(&mut self, project_key_pairs: HashSet) { for project_key_pair in project_key_pairs { - self.push_stack(StackCreationType::Create, project_key_pair, None) + self.push_stack(StackCreationType::Initialization, project_key_pair, None) .await .expect("Pushing an empty stack raised an error"); } diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index d60f9d1fcb2..1b1e3195092 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -30,11 +30,10 @@ impl InitializationState { /// The creation type for the [`EnvelopeStack`]. pub enum StackCreationType { - /// An [`EnvelopeStack`] that is created for the first time. - Create, - /// An [`EnvelopeStack`] that was recreated because it was dropped for any arbitrary reason - /// before. - Recreate, + /// An [`EnvelopeStack`] that is created during initialization. + Initialization, + /// An [`EnvelopeStack`] that is created when an envelope is received. + New, } /// A provider of [`EnvelopeStack`] instances that is responsible for creating them. diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 7b12d24a57f..9320c66b4f5 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -64,7 +64,7 @@ impl StackProvider for SqliteStackProvider { // On the other hand, if we are recreating a stack, it means that we popped it because // it was empty, or we never had data on disk for that stack, so we assume by default // that there is no need to check disk until some data is spooled. - matches!(stack_creation_type, StackCreationType::Create), + matches!(stack_creation_type, StackCreationType::Initialization), ) } From ef350f02a252ed901cd5bdfc315549e2f9201b96 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 4 Sep 2024 10:32:54 +0200 Subject: [PATCH 4/4] Improve --- relay-server/src/services/buffer/stack_provider/sqlite.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 9320c66b4f5..5399e8e6db2 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -30,6 +30,11 @@ impl SqliteStackProvider { max_disk_size: config.spool_envelopes_max_disk_size(), }) } + + /// Returns `true` when there might be data residing on disk, `false` otherwise. + fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool { + matches!(stack_creation_type, StackCreationType::Initialization) + } } impl StackProvider for SqliteStackProvider { @@ -64,7 +69,7 @@ impl StackProvider for SqliteStackProvider { // On the other hand, if we are recreating a stack, it means that we popped it because // it was empty, or we never had data on disk for that stack, so we assume by default // that there is no need to check disk until some data is spooled. - matches!(stack_creation_type, StackCreationType::Initialization), + Self::assume_data_on_disk(stack_creation_type), ) }