feat: upstream parity sync — DynamicData 9.4.x / ReactiveUI 23.x (Nov 2025 → Mar 2026)#3
feat: upstream parity sync — DynamicData 9.4.x / ReactiveUI 23.x (Nov 2025 → Mar 2026)#3michaelstonis wants to merge 6 commits intodevelopfrom
Conversation
Implements the AsyncDisposeMany operator (equivalent to DisposeMany but for IAsyncDisposable items), mirroring DynamicData 9.4.1 PR#1011. - Cache/Internal/AsyncDisposeMany.cs: tracks cache items and fire-and-forgets DisposeAsync() on remove/update/unsubscribe - List/Internal/AsyncDisposeMany.cs: same semantics for list change sets - ObservableCacheEx.AsyncDisposeMany<TObject,TKey>: public extension method - ObservableListEx.AsyncDisposeMany<T>: public extension method - Tests for both variants covering remove, clear, and unsubscribe disposal Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Fixes: - RxCommand (ReactiveUI #4196): replace simple bool _isExecuting with Interlocked counter (_executingCount) so concurrent Execute() calls don't prematurely clear IsExecuting. - SortAndBind (DD #935 ResetOnFirstTimeLoad): add ResetOnFirstTimeLoad to SortAndBindOptions (default true) and honour it in SortAndBindInternal so the first changeset always triggers a full reset when set. Audits (no code changes needed — comments added to source files): - DD #1013/#1048 (Cache Filter): no bogus overload; Refresh handling correct. - DD #968 (Switch): inner source subscribed to observer directly; errors propagate. - DD #938 (GroupOn OnCompleted): observer.OnCompleted passed to Subscribe; correct. - DD #940 (MergeChangeSets value types): uses EqualityComparer<T>.Default; correct. - DD #945 (Join initialization race): RecomputeAndEmit guards against empty overlap; correct. - DD #1012 (Join foreign-key re-grouping): Update replaces dict entry + RecomputeAndEmit; correct. - DD #1064-1069 (OnItemAdded/Removed/Refreshed): Add/AddRange, Remove/RemoveRange/Clear handled; correct. - DD #1059 (WhenValueChanged null fallback): explicit selectors used; callers pass default(T) directly; not affected. docs/UpstreamChangesReview.md updated for all ten audited items. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…w for March 2026 sprint - MigrationMatrix: add TransformOnObservable (Cache), AsyncDisposeMany (Cache/List), and Filter predicate state stream (Cache/List) rows as Implemented/Passing; update FilterOnProperty note to reflect full upstream removal in DD 9.4.31; bump totals to 65 operators, 61 implemented (93.8%), 311 tests; add last-updated date. - LibraryParity: update March 30 DynamicData sync row with full change summary; mark TransformOnObservable and AsyncDisposeMany as ported in component mapping; update Known Gaps to reflect all new operators now ported and all bug fixes resolved. - UpstreamChangesReview: check off all implemented/audited items (ToObservableChangeSet deadlock, List Filter Refresh, SortAndBind Move, SortAndBind ResetOnFirstTimeLoad, Background scheduling leak, AsyncDisposeMany, TransformOnObservable, Filter state stream, SortAndPage, Virtual sort, Internal lock primitive, nested binding, RxAppBuilder, RxObject/RxRecord allocations); update MigrationMatrix tracking entries; revise Prioritized Action Summary (0 high, 2 medium, 14+ low). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…AutoRefresh leak New operators (DD #941, DD #1008): - Cache/Internal/TransformOnObservable.cs: per-item observable transformation preserving ordering; first emission = Add, subsequent = Update, source Remove = Remove - Cache/ObservableCacheEx.TransformOnObservable.cs: public extension method - List/Internal/StatefulFilter.cs: filter with external state observable to avoid delegate allocation on state changes - Cache/ObservableCacheEx.Filter.cs: Filter<TObject,TKey,TState> overload - List/ObservableListEx.cs: Filter<T,TState> overload Bug fix (DD #1027 weak-reference audit): - List/Internal/AutoRefresh.cs: fixed per-item subscription leak; subscriptions now tracked and disposed on teardown - Cache/Internal/AutoRefresh.cs, ExpireAfter.cs, Batch.cs, List/Internal/ToObservableChangeSet.cs: audited and documented as safe Tests: 15 new tests across FilterWithStateTests.cs (cache+list) and TransformOnObservableTests.cs; total test count 311 (R3.DynamicData). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR performs an upstream parity sync for R3.DynamicData (to DynamicData 9.4.31) and reviews/ports relevant ReactiveUI patterns (to 23.1.8), adding new operators, fixing correctness and race issues, and updating parity tracking documentation.
Changes:
- Fixes and parity updates across existing operators (e.g.,
ToObservableChangeSetthread-safety work, listFilterrefresh semantics,SortAndBindmove/reset behavior,RxCommandconcurrent execution state). - Adds new DynamicData-parity operators:
TransformOnObservable(Cache),AsyncDisposeMany(Cache + List), andFilteroverloads taking a predicate + state stream (Cache + List). - Adds/updates parity tracking docs (
LibraryParity,UpstreamChangesReview,MigrationMatrix) and expands test coverage for new behaviors/operators.
Reviewed changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| README.md | Adds a “Library Parity” section linking to the new tracking docs. |
| R3Ext/Commands/RxCommand.cs | Fixes IsExecuting concurrency race using an Interlocked counter. |
| R3.DynamicData/List/SourceList.cs | Adds Refresh(int index) support and emits refresh changes. |
| R3.DynamicData/List/ObservableListEx.MergeChangeSets.cs | Adds audit notes re: value-type equality semantics. |
| R3.DynamicData/List/ObservableListEx.cs | Adds list Filter<T,TState> and AsyncDisposeMany APIs plus audit notes. |
| R3.DynamicData/List/ISourceList.cs | Extends interface with Refresh(int index). |
| R3.DynamicData/List/Internal/ToObservableChangeSet.cs | Adds gating/locking changes and moves OnNext out of lock. |
| R3.DynamicData/List/Internal/Switch.cs | Adds audit notes about error propagation behavior. |
| R3.DynamicData/List/Internal/StatefulFilter.cs | New list “filter with external state stream” operator implementation. |
| R3.DynamicData/List/Internal/Filter.cs | Emits Refresh instead of “refresh as replace” downstream. |
| R3.DynamicData/List/Internal/AutoRefresh.cs | Fixes per-item subscription disposal by adding explicit teardown. |
| R3.DynamicData/List/Internal/AsyncDisposeMany.cs | New list async-dispose operator (fire-and-forget). |
| R3.DynamicData/List/ChangeAwareList.cs | Adds Refresh(int index) to emit refresh changes. |
| R3.DynamicData/Cache/SortAndBindOptions.cs | Adds ResetOnFirstTimeLoad option (default true). |
| R3.DynamicData/Cache/ObservableCacheEx.WhenValueChanged.cs | Adds audit notes about null-fallback behavior. |
| R3.DynamicData/Cache/ObservableCacheEx.TransformOnObservable.cs | Public extension method for the new cache operator. |
| R3.DynamicData/Cache/ObservableCacheEx.SortAndBind.cs | Uses Move and honors ResetOnFirstTimeLoad on first emission. |
| R3.DynamicData/Cache/ObservableCacheEx.Joins.cs | Adds audit notes about join initialization / regrouping behavior. |
| R3.DynamicData/Cache/ObservableCacheEx.Grouping.cs | Adds audit notes about OnCompleted propagation. |
| R3.DynamicData/Cache/ObservableCacheEx.Filter.cs | Adds cache Filter<TObject,TKey,TState> overload. |
| R3.DynamicData/Cache/ObservableCacheEx.cs | Adds cache AsyncDisposeMany<TObject,TKey> extension. |
| R3.DynamicData/Cache/ObservableCacheEx.Batch.cs | Adds audit notes about timer disposal. |
| R3.DynamicData/Cache/Internal/TransformOnObservable.cs | New internal implementation for per-item observable transform. |
| R3.DynamicData/Cache/Internal/ExpireAfter.cs | Adds audit notes about timer cleanup and lifetimes. |
| R3.DynamicData/Cache/Internal/AutoRefresh.cs | Adds audit notes about subscription ownership/cleanup. |
| R3.DynamicData/Cache/Internal/AsyncDisposeMany.cs | New internal cache async-dispose operator implementation. |
| R3.DynamicData/Binding/BindingOptions.cs | Extends IObservableCollection<T> with Move(oldIndex,newIndex) for binding optimizations. |
| R3.DynamicData.Tests/List/FilterWithStateTests.cs | New tests for list filter-with-state behavior. |
| R3.DynamicData.Tests/List/FilterOperatorTests.cs | Adds tests verifying refresh semantics in list filtering. |
| R3.DynamicData.Tests/List/AsyncDisposeManyTests.cs | New list async-dispose operator tests. |
| R3.DynamicData.Tests/Cache/TransformOnObservableTests.cs | New tests for cache TransformOnObservable. |
| R3.DynamicData.Tests/Cache/FilterWithStateTests.cs | New tests for cache filter-with-state overload. |
| R3.DynamicData.Tests/Cache/AsyncDisposeManyTests.cs | New cache async-dispose operator tests. |
| docs/UpstreamChangesReview.md | Adds/records the upstream audit checklist and outcomes. |
| docs/MigrationMatrix.md | Updates matrix for new operators and updated test totals/status. |
| docs/LibraryParity.md | Adds durable parity tracking doc, workflow, and sync history. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| var current = new List<T>(); | ||
| var disp = source.Subscribe( | ||
| (observer, current), |
There was a problem hiding this comment.
disp (the subscription to source) is created but never disposed when the downstream subscription is disposed. The returned Disposable.Create(current, ...) only disposes the tracked items, so the operator continues to receive source changes after unsubscription, leaking the subscription and potentially calling observers after disposal. Combine disp with the cleanup disposable so disposing the operator tears down the upstream subscription as well.
There was a problem hiding this comment.
This was already fixed in commit 916808d — List/AsyncDisposeMany.cs now returns Disposable.Combine(disp, Disposable.Create(...)) so the upstream subscription is torn down on operator disposal, mirroring the Cache variant.
| return Disposable.Create(current, static current => | ||
| { | ||
| DisposeRange(current); | ||
| current.Clear(); | ||
| }); |
There was a problem hiding this comment.
The teardown disposable returned here does not include the upstream disp subscription created above, so disposing the operator won't unsubscribe from the source stream. Return a combined disposable (e.g., include disp plus the current-item async disposal cleanup) so operator disposal is complete and deterministic.
There was a problem hiding this comment.
Fixed in commit 916808d — the returned disposable is now Disposable.Combine(disp, Disposable.Create(current, ...)), so both the upstream subscription and the tracked-items cleanup are included in teardown.
| if (limitSizeTo > 0 && list.Count > limitSizeTo) | ||
| { | ||
| removalChanges = new List<Change<TObject>>(); | ||
| EnforceLimit(removalChanges); | ||
| } |
There was a problem hiding this comment.
EnforceLimit() can remove items that were just added in this batch, but pendingTimers was populated before the limit enforcement and is processed after the lock. That can schedule timers for items that are no longer in list, leaving entries in expirations that will never be cleaned up until operator disposal (and can keep items alive). Filter pendingTimers after EnforceLimit() (e.g., only schedule if the item is still present / not removed) or remove pending timer entries for evicted items.
| foreach (var (item, expiry) in pendingTimers) | ||
| { | ||
| lock (gate) | ||
| { | ||
| if (!expirations.ContainsKey(item)) | ||
| { | ||
| var capturedItem = item; | ||
| var timer = Observable.Timer(expiry).Subscribe(_ => RemoveExpired(capturedItem)); | ||
| expirations[item] = timer; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Timer subscriptions are created and subscribed while holding gate. If Observable.Timer can invoke the callback synchronously (e.g., zero/near-zero due time or inline scheduling), RemoveExpired will try to re-enter gate and deadlock. Create/subscribe timers outside the lock, and only lock briefly to check/update expirations (also consider disposing the removed timer in RemoveExpired when you remove it from expirations).
| foreach (var (item, expiry) in pendingTimers) | |
| { | |
| lock (gate) | |
| { | |
| if (!expirations.ContainsKey(item)) | |
| { | |
| var capturedItem = item; | |
| var timer = Observable.Timer(expiry).Subscribe(_ => RemoveExpired(capturedItem)); | |
| expirations[item] = timer; | |
| } | |
| } | |
| } | |
| // Determine which items still need timers while holding the gate, | |
| // but create and subscribe timers outside the lock to avoid deadlocks | |
| // if Observable.Timer can invoke callbacks synchronously. | |
| List<(TObject item, TimeSpan expiry)> timersToStart; | |
| lock (gate) | |
| { | |
| timersToStart = new List<(TObject, TimeSpan)>(); | |
| foreach (var (item, expiry) in pendingTimers) | |
| { | |
| if (!expirations.ContainsKey(item)) | |
| { | |
| timersToStart.Add((item, expiry)); | |
| } | |
| } | |
| } | |
| foreach (var (item, expiry) in timersToStart) | |
| { | |
| var capturedItem = item; | |
| var timerSubscription = Observable.Timer(expiry).Subscribe(_ => RemoveExpired(capturedItem)); | |
| lock (gate) | |
| { | |
| if (expirations.ContainsKey(item)) | |
| { | |
| // Another timer was registered for this item while we were subscribing; | |
| // dispose this one and keep the existing registration. | |
| timerSubscription.Dispose(); | |
| } | |
| else | |
| { | |
| expirations[item] = timerSubscription; | |
| } | |
| } | |
| } |
| for (int i = oldFiltered.Count - 1; i >= 0; i--) | ||
| { | ||
| if (!newFiltered.Contains(oldFiltered[i])) | ||
| { |
There was a problem hiding this comment.
This removal pass uses newFiltered.Contains(oldFiltered[i]), which relies on EqualityComparer<T>.Default and does not account for duplicates / multiple equal items. If the list can contain distinct items that compare equal, this can fail to remove the right instances (or fail to remove extras), producing incorrect downstream change sets. Consider reconciling by slot/index identity rather than value equality (e.g., track passing indices and apply removes/inserts based on index transitions).
| for (int i = oldFiltered.Count - 1; i >= 0; i--) | |
| { | |
| if (!newFiltered.Contains(oldFiltered[i])) | |
| { | |
| // Build a multiset of the new filtered items to reconcile counts correctly, | |
| // even when multiple distinct items compare equal. | |
| var counts = new System.Collections.Generic.Dictionary<T, int>(EqualityComparer<T>.Default); | |
| for (int i = 0; i < newFiltered.Count; i++) | |
| { | |
| var item = newFiltered[i]; | |
| if (counts.TryGetValue(item, out var count)) | |
| { | |
| counts[item] = count + 1; | |
| } | |
| else | |
| { | |
| counts[item] = 1; | |
| } | |
| } | |
| // Remove any surplus items from the existing filtered list, respecting multiplicities. | |
| for (int i = oldFiltered.Count - 1; i >= 0; i--) | |
| { | |
| var item = oldFiltered[i]; | |
| if (counts.TryGetValue(item, out var remaining) && remaining > 0) | |
| { | |
| counts[item] = remaining - 1; | |
| } | |
| else | |
| { |
| for (int i = oldFiltered.Count - 1; i >= 0; i--) | ||
| { | ||
| if (!newFiltered.Contains(oldFiltered[i])) | ||
| { | ||
| filtered.RemoveAt(i); | ||
| } | ||
| } | ||
|
|
||
| for (int i = 0; i < newFiltered.Count; i++) | ||
| { | ||
| if (i >= filtered.Count || !EqualityComparer<T>.Default.Equals(filtered[i], newFiltered[i])) | ||
| { | ||
| int existingIndex = filtered.IndexOf(newFiltered[i]); | ||
| if (existingIndex >= 0) | ||
| { | ||
| filtered.Move(existingIndex, i); | ||
| } | ||
| else | ||
| { | ||
| filtered.Insert(i, newFiltered[i]); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This insertion/move logic uses filtered.IndexOf(newFiltered[i]), which again matches by EqualityComparer<T>.Default. With duplicates or multiple equal items, IndexOf can return the wrong instance and cause incorrect Move/Insert operations (identity changes and ordering glitches). A safer approach is to compute the filtered list directly from slots and update filtered based on per-slot pass transitions (or otherwise reconcile using indices rather than IndexOf/Contains).
| for (int i = oldFiltered.Count - 1; i >= 0; i--) | |
| { | |
| if (!newFiltered.Contains(oldFiltered[i])) | |
| { | |
| filtered.RemoveAt(i); | |
| } | |
| } | |
| for (int i = 0; i < newFiltered.Count; i++) | |
| { | |
| if (i >= filtered.Count || !EqualityComparer<T>.Default.Equals(filtered[i], newFiltered[i])) | |
| { | |
| int existingIndex = filtered.IndexOf(newFiltered[i]); | |
| if (existingIndex >= 0) | |
| { | |
| filtered.Move(existingIndex, i); | |
| } | |
| else | |
| { | |
| filtered.Insert(i, newFiltered[i]); | |
| } | |
| } | |
| } | |
| // First, ensure that "filtered" contains exactly the required multiplicities | |
| // of each item in "newFiltered", even when duplicates are present. | |
| var requiredCounts = new Dictionary<T, int>(EqualityComparer<T>.Default); | |
| foreach (var item in newFiltered) | |
| { | |
| if (requiredCounts.TryGetValue(item, out var count)) | |
| { | |
| requiredCounts[item] = count + 1; | |
| } | |
| else | |
| { | |
| requiredCounts[item] = 1; | |
| } | |
| } | |
| // Remove surplus items from the end based on requiredCounts. | |
| for (int i = filtered.Count - 1; i >= 0; i--) | |
| { | |
| var item = filtered[i]; | |
| if (requiredCounts.TryGetValue(item, out var remaining) && remaining > 0) | |
| { | |
| requiredCounts[item] = remaining - 1; | |
| } | |
| else | |
| { | |
| filtered.RemoveAt(i); | |
| } | |
| } | |
| // Next, reorder and insert so that "filtered" exactly matches "newFiltered" by index. | |
| for (int i = 0; i < newFiltered.Count; i++) | |
| { | |
| var desired = newFiltered[i]; | |
| if (i < filtered.Count && EqualityComparer<T>.Default.Equals(filtered[i], desired)) | |
| { | |
| continue; | |
| } | |
| filtered.Insert(i, desired); | |
| } | |
| // Finally, trim any extra items that may remain at the end. | |
| while (filtered.Count > newFiltered.Count) | |
| { | |
| filtered.RemoveAt(filtered.Count - 1); | |
| } |
| **Bug fixes pending audit:** | ||
| - `RxCommand` cancellation race condition (RxUI 22.3.1 #4196). | ||
| - Nested property binding redundant setter calls (RxUI 23.1.0 #4240). | ||
| - Builder StackOverflow / activator negative refcount (RxUI 23.1.8 #4301). |
There was a problem hiding this comment.
The "ReactiveUI Gaps" section still lists the RxCommand cancellation/concurrent execution race (RxUI #4196) as "pending audit", but this PR updates RxCommand.Execute() to use an Interlocked counter and documents it as fixed. Update this list to avoid implying the issue is still unresolved.
|
@copilot apply changes based on the comments in this thread |
… deadlock/eviction, StatefulFilter duplicate handling, LibraryParity doc Agent-Logs-Url: https://github.com/michaelstonis/R3Ext/sessions/7759b8fa-856c-4910-9456-912097274648 Co-authored-by: michaelstonis <120685+michaelstonis@users.noreply.github.com>
Applied all changes from the review thread in commit
|
Overview
Full upstream parity sync covering changes in DynamicData (9.0.x → 9.4.x) and ReactiveUI (22.2.x → 23.1.x) from the project's founding in November 2025 through March 2026. All 🔴 high-priority and 🟡 medium-priority items from
docs/UpstreamChangesReview.mdare resolved.Bug Fixes
Filter.HandleRefreshemittedReplaceinstead ofRefreshChangeAwareList.Refresh(int)added;HandleRefreshnow calls itToObservableChangeSetrace condition (timer thread vs. subscriber)gatelock;OnNextcalled outside lockSourceList.Refresh(int)missing from interface + implementationISourceListand both public/internal editorsRxCommandconcurrent execution clearedIsExecutingtoo earlyInterlockedcounter (_executingCount) — only clears when last execution completesAutoRefreshper-item subscription leakNew Operators
TransformOnObservable<TObject, TKey, TDestination>AsyncDisposeMany<TObject, TKey>/AsyncDisposeMany<T>IAsyncDisposabledisposal, mirroringDisposeMany(DD #1011)Filter<TObject, TKey, TState>/Filter<T, TState>Behavior Improvements
SortAndBindMove optimization — Applied changesetMovedreason maps to a singleCollectionChanged(Move)notification instead of RemoveAt + Insert (DD #935)SortAndBindOptions.ResetOnFirstTimeLoad— New option (defaulttrue) that always performs a full reset on the very first changeset, fixing the ignored-on-initial-load regressionDocumentation
docs/LibraryParity.md(new) — Durable upstream tracking document with version table, component mapping, sync workflow, and sync historyREADME.md— Added "Library Parity" section with status tabledocs/UpstreamChangesReview.md— All items annotated; 🔴 0 / 🟡 2 remaining (low-priority platform features)docs/MigrationMatrix.md— Updated: 3 new operators, 311 tests, last-updated dateTests
AsyncDisposeManyTests(Cache + List),TransformOnObservableTests,FilterWithStateTests(Cache + List), additions toFilterOperatorTestsAudited — Not Affected
The following upstream issues were reviewed and confirmed to not require changes in this codebase:
EqualityComparer<T>.Default)Remaining Low-Priority Items
Intentionally deferred to a future sprint:
ReactiveOwningComponentBase, MAUI platform schedulers,RxAppBuilderDynamicCombiner,InvokeEvaluate/IEvaluateAwareTracked via
docs/LibraryParity.mdanddocs/UpstreamChangesReview.md.