diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index a028eca60f7..045ad1bd937 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -103,6 +103,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); let mut envelopes = Vec::with_capacity(size); @@ -139,6 +140,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); // Pre-fill the stack @@ -179,6 +181,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + true, ); // Pre-generate envelopes diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 605d6e1bd4f..433e46bf72c 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -19,7 +19,7 @@ use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; -use crate::services::buffer::stack_provider::StackProvider; +use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::MemoryChecker; @@ -230,8 +230,14 @@ where { stack.push(envelope).await?; } else { - self.push_stack(ProjectKeyPair::from_envelope(&envelope), Some(envelope)) - .await?; + // Since we have initialization code that creates all the necessary stacks, we assume + // that any new stack that is added during the envelope buffer's lifecycle, is recreated. + self.push_stack( + StackCreationType::New, + ProjectKeyPair::from_envelope(&envelope), + Some(envelope), + ) + .await?; } self.priority_queue .change_priority_by(&project_key_pair, |prio| { @@ -355,6 +361,7 @@ where /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted. async fn push_stack( &mut self, + stack_creation_type: StackCreationType, project_key_pair: ProjectKeyPair, envelope: Option>, ) -> Result<(), EnvelopeBufferError> { @@ -362,7 +369,9 @@ where .as_ref() .map_or(Instant::now(), |e| e.meta().start_time()); - let mut stack = self.stack_provider.create_stack(project_key_pair); + let mut stack = self + .stack_provider + .create_stack(stack_creation_type, project_key_pair); if let Some(envelope) = envelope { stack.push(envelope).await?; } @@ -411,7 +420,7 @@ where /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`]. async fn load_stacks(&mut self, project_key_pairs: HashSet) { for project_key_pair in project_key_pairs { - self.push_stack(project_key_pair, None) + self.push_stack(StackCreationType::Initialization, project_key_pair, None) .await .expect("Pushing an empty stack raised an error"); } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 1268fd5ea4e..13e1fb5a131 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -53,6 +53,7 @@ impl SqliteEnvelopeStack { max_batches: usize, own_key: ProjectKey, sampling_key: ProjectKey, + check_disk: bool, ) -> Self { Self { envelope_store, @@ -64,7 +65,7 @@ impl SqliteEnvelopeStack { sampling_key, batches_buffer: VecDeque::with_capacity(max_batches), batches_buffer_size: 0, - check_disk: true, + check_disk, } } @@ -247,6 +248,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelope = mock_envelope(Instant::now()); @@ -263,6 +265,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelopes = mock_envelopes(4); @@ -315,6 +318,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); // We pop with an invalid db. @@ -334,6 +338,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); // We pop with no elements. @@ -351,6 +356,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelopes = mock_envelopes(5); @@ -388,6 +394,7 @@ mod tests { 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + true, ); let envelopes = mock_envelopes(15); diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs index 0414f4a55fa..2b0b7822cf1 100644 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -1,6 +1,8 @@ use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; -use crate::services::buffer::stack_provider::{InitializationState, StackProvider}; +use crate::services::buffer::stack_provider::{ + InitializationState, StackCreationType, StackProvider, +}; use crate::utils::MemoryChecker; #[derive(Debug)] @@ -23,7 +25,7 @@ impl StackProvider for MemoryStackProvider { InitializationState::empty() } - fn create_stack(&self, _: ProjectKeyPair) -> Self::Stack { + fn create_stack(&self, _: StackCreationType, _: ProjectKeyPair) -> Self::Stack { MemoryEnvelopeStack::new() } diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index fa852d47d1c..1b1e3195092 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -28,6 +28,14 @@ impl InitializationState { } } +/// The creation type for the [`EnvelopeStack`]. +pub enum StackCreationType { + /// An [`EnvelopeStack`] that is created during initialization. + Initialization, + /// An [`EnvelopeStack`] that is created when an envelope is received. + New, +} + /// A provider of [`EnvelopeStack`] instances that is responsible for creating them. pub trait StackProvider: std::fmt::Debug { /// The implementation of [`EnvelopeStack`] that this manager creates. @@ -37,7 +45,11 @@ pub trait StackProvider: std::fmt::Debug { fn initialize(&self) -> impl Future; /// Creates an [`EnvelopeStack`]. - fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack; + fn create_stack( + &self, + stack_creation_type: StackCreationType, + project_key_pair: ProjectKeyPair, + ) -> Self::Stack; /// Returns `true` if the store used by this [`StackProvider`] has space to add new /// stacks or items to the stacks. diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 4b732bdb3b6..5399e8e6db2 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -5,7 +5,9 @@ use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; -use crate::services::buffer::stack_provider::{InitializationState, StackProvider}; +use crate::services::buffer::stack_provider::{ + InitializationState, StackCreationType, StackProvider, +}; use crate::SqliteEnvelopeStack; #[derive(Debug)] @@ -28,6 +30,11 @@ impl SqliteStackProvider { max_disk_size: config.spool_envelopes_max_disk_size(), }) } + + /// Returns `true` when there might be data residing on disk, `false` otherwise. + fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool { + matches!(stack_creation_type, StackCreationType::Initialization) + } } impl StackProvider for SqliteStackProvider { @@ -46,13 +53,23 @@ impl StackProvider for SqliteStackProvider { } } - fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack { + fn create_stack( + &self, + stack_creation_type: StackCreationType, + project_key_pair: ProjectKeyPair, + ) -> Self::Stack { SqliteEnvelopeStack::new( self.envelope_store.clone(), self.disk_batch_size, self.max_batches, project_key_pair.own_key, project_key_pair.sampling_key, + // We want to check the disk by default if we are creating the stack for the first time, + // since we might have some data on disk. + // On the other hand, if we are recreating a stack, it means that we popped it because + // it was empty, or we never had data on disk for that stack, so we assume by default + // that there is no need to check disk until some data is spooled. + Self::assume_data_on_disk(stack_creation_type), ) }