From 0aeda55f49a9f34705eb25ce785b9511fd193f8d Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Tue, 15 Oct 2024 09:50:35 +1100 Subject: [PATCH 1/3] Adopt PR#14431 & Resolve Merge Conflicts Co-Authored-By: Joseph <21144246+JoJoJet@users.noreply.github.com> --- crates/bevy_asset/src/server/info.rs | 8 +- crates/bevy_asset/src/server/mod.rs | 127 ++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 4 deletions(-) diff --git a/crates/bevy_asset/src/server/info.rs b/crates/bevy_asset/src/server/info.rs index 42fda6dd2db89..bab21a2a639f3 100644 --- a/crates/bevy_asset/src/server/info.rs +++ b/crates/bevy_asset/src/server/info.rs @@ -8,7 +8,7 @@ use alloc::sync::{Arc, Weak}; use bevy_ecs::world::World; use bevy_tasks::Task; use bevy_utils::{tracing::warn, Entry, HashMap, HashSet, TypeIdMap}; -use core::any::TypeId; +use core::{task::Waker, any::TypeId}; use crossbeam_channel::Sender; use derive_more::derive::{Display, Error, From}; use either::Either; @@ -36,6 +36,8 @@ pub(crate) struct AssetInfo { /// The number of handle drops to skip for this asset. /// See usage (and comments) in `get_or_create_path_handle` for context. handle_drops_to_skip: usize, + /// List of tasks waiting for this asset to complete loading + pub(crate) waiting_tasks: Vec, } impl AssetInfo { @@ -54,6 +56,7 @@ impl AssetInfo { dependants_waiting_on_load: HashSet::default(), dependants_waiting_on_recursive_dep_load: HashSet::default(), handle_drops_to_skip: 0, + waiting_tasks: Vec::new(), } } } @@ -616,6 +619,9 @@ impl AssetInfos { info.load_state = LoadState::Failed(error.clone()); info.dep_load_state = DependencyLoadState::Failed(error.clone()); info.rec_dep_load_state = RecursiveDependencyLoadState::Failed(error.clone()); + for waker in info.waiting_tasks.drain(..) { + waker.wake(); + } ( core::mem::take(&mut info.dependants_waiting_on_load), core::mem::take(&mut info.dependants_waiting_on_recursive_dep_load), diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index e03f10b37e23e..5d074629d81f0 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -25,7 +25,7 @@ use bevy_utils::{ tracing::{error, info}, HashSet, }; -use core::{any::TypeId, future::Future, panic::AssertUnwindSafe}; +use core::{any::TypeId, future::Future, panic::AssertUnwindSafe, task::Poll}; use crossbeam_channel::{Receiver, Sender}; use derive_more::derive::{Display, Error, From}; use either::Either; @@ -413,7 +413,7 @@ impl AssetServer { &self, handle: UntypedHandle, path: AssetPath<'static>, - mut infos: RwLockWriteGuard, + infos: RwLockWriteGuard, guard: G, ) { // drop the lock on `AssetInfos` before spawning a task that may block on it in single-threaded @@ -433,7 +433,10 @@ impl AssetServer { }); #[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))] - infos.pending_tasks.insert(handle.id(), task); + { + let mut infos = infos; + infos.pending_tasks.insert(handle.id(), task); + } #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] task.detach(); @@ -1336,6 +1339,110 @@ impl AssetServer { }) }) } + + /// Returns a future that will suspend until the specified asset and its dependencies finish + /// loading. + /// + /// # Errors + /// + /// This will return an error if the asset or any of its dependencies fail to load, + /// or if the asset has not been queued up to be loaded. + pub async fn wait_for_asset( + &self, + // NOTE: We take a reference to a handle so we know it will outlive the future, + // which ensures the handle won't be dropped while waiting for the asset. + handle: &Handle, + ) -> Result<(), WaitForAssetError> { + self.wait_for_asset_id(handle.id()).await + } + + /// Returns a future that will suspend until the specified asset and its dependencies finish + /// loading. + /// + /// # Errors + /// + /// This will return an error if the asset or any of its dependencies fail to load, + /// or if the asset has not been queued up to be loaded. + pub async fn wait_for_asset_untyped( + &self, + // NOTE: We take a reference to a handle so we know it will outlive the future, + // which ensures the handle won't be dropped while waiting for the asset. + handle: &UntypedHandle, + ) -> Result<(), WaitForAssetError> { + self.wait_for_asset_id(handle.id()).await + } + + /// Returns a future that will suspend until the specified asset and its dependencies finish + /// loading. + /// + /// Note that since an asset ID does not count as a reference to the asset, + /// the future returned from this method will *not* keep the asset alive. + /// This may lead to the asset unexpectedly being dropped while you are waiting for it to + /// finish loading. + /// + /// When calling this method, make sure a strong handle is stored elsewhere to prevent the + /// asset from being dropped. + /// If you have access to an asset's strong [`Handle`], you should prefer to call + /// [`AssetServer::wait_for_asset`] + /// or [`wait_for_assest_untyped`](Self::wait_for_asset_untyped) to ensure the asset finishes + /// loading. + /// + /// # Errors + /// + /// This will return an error if the asset or any of its dependencies fail to load, + /// or if the asset has not been queued up to be loaded. + pub async fn wait_for_asset_id( + &self, + id: impl Into, + ) -> Result<(), WaitForAssetError> { + let id = id.into(); + core::future::poll_fn(move |cx| { + let infos = self.data.infos.read(); + let info = infos.get(id).ok_or(WaitForAssetError::NotLoaded)?; + match (&info.load_state, &info.rec_dep_load_state) { + (LoadState::Loaded, RecursiveDependencyLoadState::Loaded) => Poll::Ready(Ok(())), + // Return an error immediately if the asset is not in the process of loading + (LoadState::NotLoaded, _) => Poll::Ready(Err(WaitForAssetError::NotLoaded)), + // If the asset is loading, leave our waker behind + (LoadState::Loading, _) + | (_, RecursiveDependencyLoadState::Loading) + | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) => { + // Check if our waker is already there + let has_waker = info + .waiting_tasks + .iter() + .any(|waker| waker.will_wake(cx.waker())); + if !has_waker { + drop(infos); + let mut infos = self.data.infos.write(); + let info = infos.get_mut(id).ok_or(WaitForAssetError::NotLoaded)?; + // If the load state changed while reacquiring the lock, immediately + // reawaken the task + let is_loading = matches!( + (&info.load_state, &info.rec_dep_load_state), + (LoadState::Loading, _) + | (_, RecursiveDependencyLoadState::Loading) + | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) + ); + if !is_loading { + cx.waker().wake_by_ref(); + } else { + // Leave our waker behind + info.waiting_tasks.push(cx.waker().clone()); + } + } + Poll::Pending + } + (LoadState::Failed(error), _) => { + Poll::Ready(Err(WaitForAssetError::Failed(error.clone()))) + } + (_, RecursiveDependencyLoadState::Failed(error)) => { + Poll::Ready(Err(WaitForAssetError::DependencyFailed(error.clone()))) + } + } + }) + .await + } } /// A system that manages internal [`AssetServer`] events, such as finalizing asset loads. @@ -1359,6 +1466,11 @@ pub fn handle_internal_asset_events(world: &mut World) { .get(&id.type_id()) .expect("Asset event sender should exist"); sender(world, id); + if let Some(info) = infos.get_mut(id) { + for waker in info.waiting_tasks.drain(..) { + waker.wake(); + } + } } InternalAssetEvent::Failed { id, path, error } => { infos.process_asset_fail(id, error.clone()); @@ -1710,3 +1822,12 @@ impl core::fmt::Debug for AssetServer { /// This is appended to asset sources when loading a [`LoadedUntypedAsset`]. This provides a unique /// source for a given [`AssetPath`]. const UNTYPED_SOURCE_SUFFIX: &str = "--untyped"; + +/// An error when attempting to wait asynchronously for an [`Asset`] to load. +#[derive(Error, Debug, Clone, Display)] +pub enum WaitForAssetError { + #[display("tried to wait for an asset that is not being loaded")] + NotLoaded, + Failed(Arc), + DependencyFailed(Arc), +} \ No newline at end of file From f7b38a661647eef4e87ea8e1cb138e4270247d70 Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Tue, 15 Oct 2024 09:54:33 +1100 Subject: [PATCH 2/3] Formatting --- crates/bevy_asset/src/server/info.rs | 2 +- crates/bevy_asset/src/server/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_asset/src/server/info.rs b/crates/bevy_asset/src/server/info.rs index bab21a2a639f3..c33e887a8e8b2 100644 --- a/crates/bevy_asset/src/server/info.rs +++ b/crates/bevy_asset/src/server/info.rs @@ -8,7 +8,7 @@ use alloc::sync::{Arc, Weak}; use bevy_ecs::world::World; use bevy_tasks::Task; use bevy_utils::{tracing::warn, Entry, HashMap, HashSet, TypeIdMap}; -use core::{task::Waker, any::TypeId}; +use core::{any::TypeId, task::Waker}; use crossbeam_channel::Sender; use derive_more::derive::{Display, Error, From}; use either::Either; diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 5d074629d81f0..d9abb5274e2c6 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -1830,4 +1830,4 @@ pub enum WaitForAssetError { NotLoaded, Failed(Arc), DependencyFailed(Arc), -} \ No newline at end of file +} From 6049b5ecaaf755c398ce9ffb72f9bd5b06c4b09c Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Tue, 15 Oct 2024 11:33:55 +1100 Subject: [PATCH 3/3] Response to feedback Flattened `pole_fn` code to reduce width, and explicitly convert `AssetId` to an `UntypedAssetId` in `wait_for_asset` to avoid code-bloat. Co-Authored-By: andriyDev --- crates/bevy_asset/src/server/mod.rs | 110 +++++++++++++++++----------- 1 file changed, 66 insertions(+), 44 deletions(-) diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index d9abb5274e2c6..0a48f84e651f6 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -1353,7 +1353,7 @@ impl AssetServer { // which ensures the handle won't be dropped while waiting for the asset. handle: &Handle, ) -> Result<(), WaitForAssetError> { - self.wait_for_asset_id(handle.id()).await + self.wait_for_asset_id(handle.id().untyped()).await } /// Returns a future that will suspend until the specified asset and its dependencies finish @@ -1396,52 +1396,74 @@ impl AssetServer { id: impl Into, ) -> Result<(), WaitForAssetError> { let id = id.into(); - core::future::poll_fn(move |cx| { - let infos = self.data.infos.read(); - let info = infos.get(id).ok_or(WaitForAssetError::NotLoaded)?; - match (&info.load_state, &info.rec_dep_load_state) { - (LoadState::Loaded, RecursiveDependencyLoadState::Loaded) => Poll::Ready(Ok(())), - // Return an error immediately if the asset is not in the process of loading - (LoadState::NotLoaded, _) => Poll::Ready(Err(WaitForAssetError::NotLoaded)), - // If the asset is loading, leave our waker behind - (LoadState::Loading, _) - | (_, RecursiveDependencyLoadState::Loading) - | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) => { - // Check if our waker is already there - let has_waker = info - .waiting_tasks - .iter() - .any(|waker| waker.will_wake(cx.waker())); - if !has_waker { - drop(infos); - let mut infos = self.data.infos.write(); - let info = infos.get_mut(id).ok_or(WaitForAssetError::NotLoaded)?; - // If the load state changed while reacquiring the lock, immediately - // reawaken the task - let is_loading = matches!( - (&info.load_state, &info.rec_dep_load_state), - (LoadState::Loading, _) - | (_, RecursiveDependencyLoadState::Loading) - | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) - ); - if !is_loading { - cx.waker().wake_by_ref(); - } else { - // Leave our waker behind - info.waiting_tasks.push(cx.waker().clone()); - } - } - Poll::Pending - } - (LoadState::Failed(error), _) => { - Poll::Ready(Err(WaitForAssetError::Failed(error.clone()))) + core::future::poll_fn(move |cx| self.wait_for_asset_id_poll_fn(cx, id)).await + } + + /// Used by [`wait_for_asset_id`](AssetServer::wait_for_asset_id) in [`poll_fn`](core::future::poll_fn). + fn wait_for_asset_id_poll_fn( + &self, + cx: &mut core::task::Context<'_>, + id: UntypedAssetId, + ) -> Poll> { + let infos = self.data.infos.read(); + + let Some(info) = infos.get(id) else { + return Poll::Ready(Err(WaitForAssetError::NotLoaded)); + }; + + match (&info.load_state, &info.rec_dep_load_state) { + (LoadState::Loaded, RecursiveDependencyLoadState::Loaded) => Poll::Ready(Ok(())), + // Return an error immediately if the asset is not in the process of loading + (LoadState::NotLoaded, _) => Poll::Ready(Err(WaitForAssetError::NotLoaded)), + // If the asset is loading, leave our waker behind + (LoadState::Loading, _) + | (_, RecursiveDependencyLoadState::Loading) + | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) => { + // Check if our waker is already there + let has_waker = info + .waiting_tasks + .iter() + .any(|waker| waker.will_wake(cx.waker())); + + if has_waker { + return Poll::Pending; } - (_, RecursiveDependencyLoadState::Failed(error)) => { - Poll::Ready(Err(WaitForAssetError::DependencyFailed(error.clone()))) + + let mut infos = { + // Must drop read-only guard to acquire write guard + drop(infos); + self.data.infos.write() + }; + + let Some(info) = infos.get_mut(id) else { + return Poll::Ready(Err(WaitForAssetError::NotLoaded)); + }; + + // If the load state changed while reacquiring the lock, immediately + // reawaken the task + let is_loading = matches!( + (&info.load_state, &info.rec_dep_load_state), + (LoadState::Loading, _) + | (_, RecursiveDependencyLoadState::Loading) + | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) + ); + + if !is_loading { + cx.waker().wake_by_ref(); + } else { + // Leave our waker behind + info.waiting_tasks.push(cx.waker().clone()); } + + Poll::Pending } - }) - .await + (LoadState::Failed(error), _) => { + Poll::Ready(Err(WaitForAssetError::Failed(error.clone()))) + } + (_, RecursiveDependencyLoadState::Failed(error)) => { + Poll::Ready(Err(WaitForAssetError::DependencyFailed(error.clone()))) + } + } } }