Skip to content

feat: cross-platform reactive extensions gap analysis — 40+ new operators#4

Open
michaelstonis wants to merge 8 commits intomainfrom
michaelstonis/reactive-gap-analysis
Open

feat: cross-platform reactive extensions gap analysis — 40+ new operators#4
michaelstonis wants to merge 8 commits intomainfrom
michaelstonis/reactive-gap-analysis

Conversation

@michaelstonis
Copy link
Copy Markdown
Owner

Motivation

R3 and R3Ext already cover the core reactive primitives, but a gap analysis against RxJS, RxJava/Kotlin Flow, and RxDart revealed ~40 operators that developers commonly expect when coming from those ecosystems. This PR fills those gaps with efficient, allocation-conscious implementations and full unit test coverage.

What's included

Transformation & FlatMap (TransformationExtensions, FlatMapExtensions)

  • MapTo — constant projection (Pattern A, zero allocation)
  • CompactMap — filter-and-map with ref/value type overloads
  • WithIndex — pairs each element with its zero-based sequence index
  • RunningFold / RunningReduce — compositional aliases for Scan
  • ConcatMap — queue-based sequential inner subscriptions
  • SwitchMap / FlatMapLatest — cancel-on-new with generation guard
  • ExhaustMap — ignores source while an inner observable is active
  • Expand — breadth-first recursive expansion
  • MergeScan — scan that merges inner observables
  • SwitchScan — scan that switches to the latest inner observable

Advanced Filtering (FilteringExtensions.Advanced)

  • IgnoreElements, IsEmpty, Every / All, Find, FindIndex
  • DefaultIfEmpty, ThrowIfEmpty
  • Audit, AuditTime — emit latest after duration observable fires
  • Sample — emit latest when sampler observable fires

Combination & Creation (CombinationExtensions)

  • ForkJoin — wait for all to complete, emit last values (params + typed tuple overloads)
  • OnErrorResumeNext — advance on both non-terminal error and completion
  • Iif / Condition — deferred conditional source selection
  • SequenceEqual — per-element comparison with custom comparer support
  • RepeatWhen — handler-driven repeat via notifier observable
  • Generate — synchronous state-machine source

Advanced Timing (TimingExtensions.Advanced, .Window, .BufferAdvanced)

  • TimeInterval — wraps each emission with elapsed time since the previous item
  • DelayWhen — per-element delay driven by a duration observable
  • RateLimit — token-bucket style N items per window, queues excess
  • BufferWithOverflow — bounded buffer with DropOldest, DropLatest, or Error strategy
  • Chunked — sliding/non-overlapping windows with configurable step
  • WindowCount, WindowTime — observable-of-observables windowing
  • BufferToggle, BufferWhen — observable-gated buffering

Error Handling & Side Effects (ErrorHandlingExtensions.Advanced, SideEffectExtensions)

  • RetryWhen — user-controlled retry via notifier observable
  • ReplaceError — emit fallback value and complete on any error
  • ReplaceEmpty — emit fallback if source completes empty
  • SelectSafe / WhereSafe — route selector/predicate exceptions to OnErrorResume instead of crashing
  • DoOnError, DoOnComplete, DoOnTerminate, DoAfterTerminate — lifecycle side-effects

Aggregate Streams & Subject Types

  • RunningCount, RunningSum, RunningAverage, RunningMin, RunningMax — live running aggregates using INumber<T> / IComparable<T> generic math
  • AsyncSubject<T> — buffers the last value and emits only on successful completion; late subscribers receive the buffered value immediately
  • ReadOnlySubject<T> / AsReadOnly() — wraps any Observable<T> to hide subject mutation methods

Design principles

All operators follow the established R3Ext patterns:

  • Pattern A (compositional) for stateless operators — zero extra allocation
  • Pattern B (Observable.Create + Lock gate) for stateful/timer-based operators — thread-safe with no closures in hot paths
  • No IScheduler; uses R3's TimeProvider / ITimer for all time-based work
  • OnErrorResume semantics preserved throughout (non-terminal errors)
  • OnCompleted(Result) used for both success and failure terminals

Tests

703 total tests pass (all pre-existing + 200+ new). Each new operator has 3–6 focused xUnit [Fact] tests using FakeTimeProvider, Subject<T>, and LiveList<T> — the same patterns used across the existing test suite.

michaelstonis and others added 8 commits April 1, 2026 19:01
Add TransformationExtensions with:
- MapTo<T, TResult>: project all values to a constant
- CompactMap<T, TResult>: select+filter nulls (ref and value type overloads)
- WithIndex<T>: pair each element with its zero-based index (Pattern B)
- RunningFold<T, TAccumulate>: alias for Scan with seed
- RunningReduce<T>: alias for Scan without seed

Add FlatMapExtensions with:
- ConcatMap: sequential inner subscription with pending queue
- SwitchMap / FlatMapLatest: cancel-and-replace with generation guard
- ExhaustMap: ignore source while inner is active
- Expand: breadth-first recursive expansion with active counter
- MergeScan: scan with merged inner observables
- SwitchScan: scan with switched inner observables

Add TransformationExtensionsTests (25 tests) and
FlatMapExtensionsTests (27 tests) covering normal behavior,
completion propagation, null/arg validation, and edge cases.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…it, BufferWithOverflow, Chunked)

Add five new timing extension operators to the R3Ext library:

- TimeInterval<T>: wraps each emission with elapsed time since previous
- DelayWhen<T>: delays each element by a per-element duration observable,
  with optional subscription-delay overload
- RateLimit<T>: allows at most N items per period, queuing excess
- BufferWithOverflow<T>: bounded pass-through buffer with DropOldest,
  DropLatest, or Error overflow strategies
- Chunked<T>: sliding or non-overlapping window by count with step

Also adds TimeInterval<T> struct, OverflowStrategy enum, and
TimingAdvancedTests.cs covering 3+ tests per operator.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…y/All, Find, FindIndex, DefaultIfEmpty, ThrowIfEmpty, Audit, AuditTime, Sample)

- Add FilteringExtensions.Advanced.cs with 10 new operators using Pattern A (compositional) and Pattern B (Observable.Create with Lock gate)
- Make FilteringExtensions partial to support the new file
- Add FilteringAdvancedTests.cs with 45 tests covering all operators (3+ per operator)
- Fix pre-existing CS compilation errors in CombinationExtensionsTests.cs and WindowingOperatorsTests.cs that blocked test execution

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- ErrorHandlingExtensions.Advanced.cs: RetryWhen, ReplaceError, ReplaceEmpty,
  SelectSafe, WhereSafe
- SideEffectExtensions.cs: DoOnError, DoOnComplete, DoOnTerminate, DoAfterTerminate
- ErrorHandlingExtraTests.cs, SideEffectExtensionsTests.cs: full test coverage
- Add pragma suppressions to resolve StyleCop errors in parallel-agent files
- Fix WindowingOperatorsTests.cs: replace invalid Observable.ToArray().Subscribe()
  with correct collect-on-complete subscription pattern

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ensions

Add named elements to tuple return types and Observable.Create generic
arguments in ForkJoin<T1,T2> and ForkJoin<T1,T2,T3> overloads.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- WindowCount<T>(count, skip): overlapping/non-overlapping count-based windows
- WindowTime<T>(timeSpan): periodic time-based windows
- WindowTime<T>(timeSpan, maxCount): windows bounded by time or item count
- BufferToggle<T,TOpen,TClose>: open/close buffers via observable signals
- BufferWhen<T,TClose>: single rolling buffer closed by a selector observable

All operators follow the existing Lock/ITimer/Observable.Create pattern from
TimingExtensions.Buffer.cs. Full test coverage: 30 tests across all 5 operators
covering argument validation, normal operation, completion, and edge cases.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add CombinationExtensions with:
- ForkJoin<T> (params/IEnumerable/typed 2- and 3-source overloads)
- OnErrorResumeNext<T> (sequential, advances on completion OR non-terminal error)
- Iif<T>/Condition<T> (deferred conditional subscription)
- SequenceEqual<T> (element-by-element comparison with optional comparer)
- RepeatWhen<T> (handler-driven repetition with notifier Subject)
- Generate<TState,TResult> and Generate<TState> (synchronous state machine)

Add CombinationExtensionsTests with 31 tests (3+ per operator).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- AggregateStreamExtensions: RunningCount, RunningSum (INumber<T>),
  RunningAverage (double/float/decimal/int overloads), RunningMin/Max
  with IComparable<T> and IComparer<T> overloads
- AsyncSubject<T>: buffers last value, emits only on successful
  completion; supports late subscribers and failure propagation
- ReadOnlySubject<T>: wraps any Observable<T> to hide subject methods;
  AsReadOnly() extensions for Observable<T>, Subject<T>, BehaviorSubject<T>
- Tests: 19 tests covering all new operators and subject types

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings April 2, 2026 00:42
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a broad set of Rx-style operators and subject types to R3Ext to close feature gaps versus other reactive ecosystems, with accompanying unit tests.

Changes:

  • Introduces new transformation, filtering, flat-mapping, combination/creation, timing/windowing/buffering, and error-handling operators.
  • Adds new subject types/wrappers (AsyncSubject<T>, ReadOnlySubject<T> + AsReadOnly).
  • Adds extensive xUnit coverage for the new operators and subjects.

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
R3Ext/Timing/TimingExtensions.Window.cs Adds WindowCount and WindowTime windowing operators.
R3Ext/Timing/TimingExtensions.BufferAdvanced.cs Adds BufferToggle and BufferWhen advanced buffering operators.
R3Ext/Timing/TimingExtensions.Advanced.cs Adds timing operators (TimeInterval, DelayWhen, RateLimit, BufferWithOverflow, Chunked) plus TimeInterval<T> and OverflowStrategy.
R3Ext/Subjects/ReadOnlySubjectWrapper.cs Adds read-only observable wrapper and AsReadOnly extensions.
R3Ext/Subjects/AsyncSubject.cs Adds AsyncSubject<T> implementation.
R3Ext/Extensions/TransformationExtensions.cs Adds MapTo, CompactMap, WithIndex, RunningFold, RunningReduce.
R3Ext/Extensions/SideEffectExtensions.cs Adds focused lifecycle side-effect operators (DoOnError, DoOnComplete, DoOnTerminate, DoAfterTerminate).
R3Ext/Extensions/FlatMapExtensions.cs Adds higher-order operators (ConcatMap, SwitchMap, FlatMapLatest, ExhaustMap, Expand, MergeScan, SwitchScan).
R3Ext/Extensions/FilteringExtensions.cs Makes FilteringExtensions partial to support advanced additions.
R3Ext/Extensions/FilteringExtensions.Advanced.cs Adds advanced filtering operators (IgnoreElements, IsEmpty, Every/All, Find, FindIndex, DefaultIfEmpty, ThrowIfEmpty, Audit, AuditTime, Sample).
R3Ext/Extensions/CombinationExtensions.cs Adds combination/creation operators (ForkJoin, OnErrorResumeNext, Iif/Condition, SequenceEqual, RepeatWhen, Generate).
R3Ext/Extensions/AggregateStreamExtensions.cs Adds running aggregate stream operators (RunningCount/Sum/Average/Min/Max).
R3Ext/ErrorHandling/ErrorHandlingExtensions.Advanced.cs Adds advanced error-handling operators (RetryWhen, ReplaceError, ReplaceEmpty, SelectSafe, WhereSafe).
R3Ext.Tests/WindowingOperatorsTests.cs Tests for windowing/buffering operators.
R3Ext.Tests/TransformationExtensionsTests.cs Tests for transformation operators.
R3Ext.Tests/TimingAdvancedTests.cs Tests for advanced timing/buffering operators.
R3Ext.Tests/SubjectTypesTests.cs Tests for AsyncSubject<T> and ReadOnlySubject<T>.
R3Ext.Tests/SideEffectExtensionsTests.cs Tests for side-effect operators.
R3Ext.Tests/FlatMapExtensionsTests.cs Tests for flat-mapping operators.
R3Ext.Tests/FilteringAdvancedTests.cs Tests for advanced filtering operators.
R3Ext.Tests/ErrorHandlingExtraTests.cs Tests for new error-handling operators.
R3Ext.Tests/CombinationExtensionsTests.cs Tests for combination/creation operators.
R3Ext.Tests/AggregateStreamTests.cs Tests for running aggregate stream operators.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +117 to +137
ex =>
{
Subject<T>[]? windows;
using (gate.EnterScope())
{
if (disposed)
{
return;
}

windows = openWindows.Select(w => w.Subject).ToArray();
openWindows.Clear();
}

foreach (Subject<T> w in windows)
{
w.OnCompleted();
}

observer.OnErrorResume(ex);
},
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WindowCount treats OnErrorResume as terminal for open windows by clearing openWindows and completing them. In R3, OnErrorResume is non-terminal, so this can cause subsequent source values to be dropped until the next window boundary (and also hides the error from inner window subscribers). Consider forwarding OnErrorResume to each open window (and keeping them open), or if the intended behavior is to close windows on error, immediately open/emit a fresh window so the next OnNext is not lost.

Copilot uses AI. Check for mistakes.
Comment on lines +250 to +267
ex =>
{
Subject<T>? window;
using (gate.EnterScope())
{
if (disposed)
{
return;
}

timer?.Dispose();
window = currentWindow;
currentWindow = null;
}

window?.OnCompleted();
observer.OnErrorResume(ex);
},
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In WindowTime (time-only overload), the OnErrorResume handler disposes the timer and sets currentWindow to null. Because OnErrorResume is non-terminal in R3, subsequent OnNext calls after an error will be silently dropped and no further windows will be produced. Instead, keep the timer/windowing active (e.g., forward OnErrorResume without tearing down state), or roll to a new window and restart the timer after reporting the error.

Copilot uses AI. Check for mistakes.
Comment on lines +407 to +412
timer?.Dispose();
window = currentWindow;
currentWindow = null;
}

window?.OnCompleted();
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In WindowTime (time + maxCount overload), the OnErrorResume handler disposes the timer and nulls out currentWindow. Since OnErrorResume is non-terminal in R3, this breaks the operator after the first error (future items are dropped). Align with other timing operators by not tearing down window/timer state on OnErrorResume, or by rolling to a new window and restarting timing after forwarding the error.

Suggested change
timer?.Dispose();
window = currentWindow;
currentWindow = null;
}
window?.OnCompleted();
window = currentWindow;
}
window?.OnErrorResume(ex);

Copilot uses AI. Check for mistakes.
Comment on lines +175 to +213
ex =>
{
bool wasActive;
using (gate.EnterScope())
{
wasActive = !innerFired;
if (!innerFired)
{
innerFired = true;
activeInner--;
if (innerSub is not null)
{
innerSubs.Remove(innerSub);
}
}
}

if (wasActive && !disposed)
{
observer.OnErrorResume(ex);
}
},
r =>
{
using (gate.EnterScope())
{
if (!innerFired)
{
innerFired = true;
activeInner--;
if (innerSub is not null)
{
innerSubs.Remove(innerSub);
}

CheckComplete();
}
}
});
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelayWhen: when a duration observable faults (ex => path), activeInner is decremented but CheckComplete() is never called. If the source has already completed and this was the last in-flight duration, the outer sequence can hang forever (never calling OnCompleted). Call CheckComplete() after decrementing/removing the inner subscription in the error path (under the gate) so completion can still propagate once all in-flight durations have finished.

Copilot uses AI. Check for mistakes.
Comment on lines +584 to +585
OverflowStrategy strategy = OverflowStrategy.DropOldest,
TimeProvider? timeProvider = null)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferWithOverflow has a TimeProvider? timeProvider = null parameter that is never used. This is confusing for callers (and may trigger analyzer warnings). Either remove the parameter from the public API, or use it as part of the implementation (e.g., if you intended time-based draining/metrics).

Suggested change
OverflowStrategy strategy = OverflowStrategy.DropOldest,
TimeProvider? timeProvider = null)
OverflowStrategy strategy = OverflowStrategy.DropOldest)

Copilot uses AI. Check for mistakes.
Comment on lines +255 to +279
void SubscribeToCloser()
{
IDisposable? sub = null;
sub = closingSelector().Take(1).Subscribe(
_ =>
{
T[] toEmit;
using (gate.EnterScope())
{
if (disposed)
{
return;
}

toEmit = buffer.ToArray();
buffer.Clear();
}

observer.OnNext(toEmit);

// Subscribe to the next closer outside the lock
SubscribeToCloser();
},
ex => observer.OnErrorResume(ex),
_ => { });
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferWhen: the close callback calls SubscribeToCloser() directly, which can recurse synchronously if closingSelector() returns an observable that emits immediately (e.g., Observable.Return(Unit.Default)). That can lead to deep recursion / stack overflow and an unbounded tight loop emitting empty buffers. Consider re-subscribing in an iterative loop (or scheduling the re-subscription outside the current call stack) and ensure the operator can be disposed to stop the loop.

Copilot uses AI. Check for mistakes.
Comment on lines +80 to +100
r =>
{
bool emitTrue = false;
using (gate.EnterScope())
{
if (disposed)
{
return;
}

disposed = true;
emitTrue = !hadValue;
}

if (emitTrue)
{
observer.OnNext(true);
}

observer.OnCompleted(r);
});
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsEmpty: in the completion path, emitTrue is set to !hadValue even when the sequence completes with a failure Result. This can emit true immediately before propagating a failure completion, which is surprising and inconsistent with other operators here (e.g., DefaultIfEmpty gates on r.IsSuccess). Consider only emitting true when r.IsSuccess && !hadValue, and otherwise just forward the failure completion.

Copilot uses AI. Check for mistakes.
Comment on lines +104 to +116
r =>
{
using (gate.EnterScope())
{
if (disposed)
{
return;
}

observer.OnCompleted(r);
}
});

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RetryWhen: when the handler observable (retrySignal) completes (the r => callback), the code completes the downstream observer but does not dispose the current upstream subscription or prevent further source emissions. If a handler completes independently of the source (e.g., returns Observable.Empty<Unit>()), the source may keep running and can continue calling observer.OnNext after completion. Consider setting disposed = true, disposing upstream, and completing/disposing the notifier when the handler completes to stop the pipeline cleanly.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants