From 3cec0d7fa48924bd28523cafbe9add8ebf08a991 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 17 Oct 2022 01:17:19 +0200 Subject: [PATCH 01/27] Add a bunch of multiple iteration tests --- src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs | 12 ++++++++++++ src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs | 18 ++++++++++++++++++ .../TaskSeq.ToXXX.Tests.fs | 13 +++++++++++++ 3 files changed, 43 insertions(+) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs index 3738469e..4d6ff2b2 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs @@ -37,6 +37,18 @@ let ``TaskSeq-iter should go over all items`` () = task { sum |> should equal 55 // task-dummies started at 1 } + +[] +let ``TaskSeq-iter multiple iterations over same sequence`` () = task { + let tq = createDummyTaskSeq 10 + let mutable sum = 0 + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) + sum |> should equal 220 // task-dummies started at 1 +} + [] let ``TaskSeq-iteriAsync should go over all items`` () = task { let tq = createDummyTaskSeq 10 diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs index 08f94b4f..5edf2fd0 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs @@ -72,6 +72,24 @@ let ``TaskSeq-mapAsync maps in correct order`` () = task { validateSequence sq } +[] +let ``TaskSeq-mapAsync can map the same sequence multiple times`` () = task { + let mapAndCache = + TaskSeq.mapAsync (fun item -> task { return char (item + 64) }) + >> TaskSeq.toSeqCachedAsync + + let ts = createDummyTaskSeq 10 + let! result1 = mapAndCache ts + let! result2 = mapAndCache ts + let! result3 = mapAndCache ts + let! result4 = mapAndCache ts + + validateSequence result1 + validateSequence result2 + validateSequence result3 + validateSequence result4 +} + [] let ``TaskSeq-mapiAsync maps in correct order`` () = task { let! sq = diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs index 5d57feca..600e1fa1 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs @@ -27,6 +27,19 @@ let ``TaskSeq-toArrayAsync should succeed`` () = task { results |> should equal [| 1..10 |] } +[] +let ``TaskSeq-toArrayAsync can be applied multiple times to the same sequence`` () = task { + let tq = createDummyTaskSeq 10 + let! (results1: _[]) = tq |> TaskSeq.toArrayAsync + let! (results2: _[]) = tq |> TaskSeq.toArrayAsync + let! (results3: _[]) = tq |> TaskSeq.toArrayAsync + let! (results4: _[]) = tq |> TaskSeq.toArrayAsync + results1 |> should equal [| 1..10 |] + results2 |> should equal [| 1..10 |] + results3 |> should equal [| 1..10 |] + results4 |> should equal [| 1..10 |] +} + [] let ``TaskSeq-toListAsync should succeed`` () = task { let tq = createDummyTaskSeq 10 From e3fdee7b0f42eb26b24a75156801070e11f907e5 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 17 Oct 2022 11:59:55 +0200 Subject: [PATCH 02/27] Adding several comments and log statements to get to the bottom of this, partially fixed Issue has to do with the MemberwiseClone() for shadowing the enumerator is not enough to reset the necessary states. Furthermore, after this is sorta fixed, the ValueTask that is used to keep the MoveNext boolean gets accessed twice asynchronously, which is not allowed. It also seems that using 'use' on the taskSeq.GetAsyncEnumerator fails by double disposing. This can probably be considered "by design" but should carefully be considered. --- src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs | 22 +++++++++++++------ src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 16 ++++++++++++++ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs index 5edf2fd0..53745f13 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs @@ -78,16 +78,24 @@ let ``TaskSeq-mapAsync can map the same sequence multiple times`` () = task { TaskSeq.mapAsync (fun item -> task { return char (item + 64) }) >> TaskSeq.toSeqCachedAsync - let ts = createDummyTaskSeq 10 - let! result1 = mapAndCache ts - let! result2 = mapAndCache ts - let! result3 = mapAndCache ts - let! result4 = mapAndCache ts + let ts = createDummyDirectTaskSeq 10 + + let! result1 = + printfn "starting first" + mapAndCache ts + //let! result3 = mapAndCache ts + //let! result4 = mapAndCache ts validateSequence result1 + + let! result2 = + printfn "starting second" + mapAndCache ts + validateSequence result2 - validateSequence result3 - validateSequence result4 + //validateSequence result3 + //validateSequence result4 + () } [] diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 19191d01..b32f8d9e 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -237,6 +237,22 @@ and [] TaskSeq<'Machine, 'T let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> data.taken <- true clone.Machine.Data.cancellationToken <- ct + clone.Machine.Data.taken <- true + //clone.Machine.Data.builder <- AsyncIteratorMethodBuilder.Create() + // calling reset causes NRE in IValueTaskSource.GetResult above + //clone.Machine.Data.promiseOfValueOrEnd.Reset() + //clone.Machine.Data.boxed <- clone + //clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack + //clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception + //clone.Machine.Data.awaiter <- null + //clone.Machine.Data.current <- ValueNone + + if verbose then + printfn + "Cloning, resumption point original: %i, clone: %i" + ts.Machine.ResumptionPoint + clone.Machine.ResumptionPoint + (clone :> System.Collections.Generic.IAsyncEnumerator<'T>) interface IAsyncDisposable with From 68e632e50f335a4518db63a975e36d71aa993a83 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 04:21:30 +0200 Subject: [PATCH 03/27] Issue #39, adding bunch of tests that show various ways of getting InvalidState exceptions --- .../FSharpy.TaskSeq.Test.fsproj | 1 + src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs | 51 +++++- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 155 ++++++++++++++++++ src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs | 5 +- 4 files changed, 205 insertions(+), 7 deletions(-) create mode 100644 src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs diff --git a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj index 9883983b..d3e6fd2e 100644 --- a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj +++ b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj @@ -29,6 +29,7 @@ + diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs index 963181eb..1226460a 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs @@ -22,8 +22,14 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = interface IAsyncEnumerable with member reader.GetAsyncEnumerator(ct) = - output.WriteLine $"Cloning!! Current: {current}, lastPos: {lastPos}" - reader.MemberwiseClone() :?> IAsyncEnumerator<_> + { new IAsyncEnumerator<_> with + member this.Current = (reader :> IAsyncEnumerator<_>).Current + member this.MoveNextAsync() = (reader :> IAsyncEnumerator<_>).MoveNextAsync() + interface IAsyncDisposable with + member this.DisposeAsync() = ValueTask() + } + //output.WriteLine $"Cloning!! Current: {current}, lastPos: {lastPos}" + //reader.MemberwiseClone() :?> IAsyncEnumerator<_> interface IAsyncEnumerator with member _.Current = @@ -39,6 +45,8 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = let! bytesRead = buffered.ReadAsync(mem, 0, mem.Length) // offset refers to offset in target buffer, not source lastPos <- buffered.Position + let x: seq = seq { 1 } |> Seq.cast + if bytesRead > 0 then current <- ValueSome mem return true @@ -48,7 +56,6 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = } |> Task.toValueTask - interface IAsyncDisposable with member _.DisposeAsync() = try // this disposes of the mem stream @@ -57,7 +64,43 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) = // if the previous block raises, we should still try to get rid of the underlying stream stream.DisposeAsync().AsTask().Wait() + type ``Real world tests``(output: ITestOutputHelper) = + [] + let ``Reading a 10MB buffered IAsync through TaskSeq.toArray non-async should succeed`` () = task { + use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256) + // unreadable error with 'use' + //use bla = seq { 1} + let expected = Array.init 256 byte |> Array.replicate 8 + let results = reader |> TaskSeq.toArray + + (results, expected) + ||> Array.iter2 (fun a b -> should equal a b) + } + + [] + let ``Reading a user-code IAsync multiple times with TaskSeq.toArrayAsync should succeed`` () = task { + use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256) + let expected = Array.init 256 byte |> Array.replicate 8 + // read four times + let! results1 = reader |> TaskSeq.toArrayAsync + let! results2 = reader |> TaskSeq.toArrayAsync + let! results3 = reader |> TaskSeq.toArrayAsync + let! results4 = reader |> TaskSeq.toArrayAsync + + (results1, expected) + ||> Array.iter2 (fun a b -> should equal a b) + + (results2, expected) + ||> Array.iter2 (fun a b -> should equal a b) + + (results3, expected) + ||> Array.iter2 (fun a b -> should equal a b) + + (results4, expected) + ||> Array.iter2 (fun a b -> should equal a b) + } + [] let ``Reading a 10MB buffered IAsync stream from start to finish`` () = task { let mutable count = 0 @@ -76,6 +119,8 @@ type ``Real world tests``(output: ITestOutputHelper) = // the following is extremely slow, which is why we just use F#'s comparison instead // Using this takes 67s, compared to 0.25s using normal F# comparison. + // reader |> TaskSeq.toArray |> should equal expected // VERY SLOW!! + do! reader |> TaskSeq.iter (should equal expected) do! reader |> TaskSeq.iter ((=) expected >> (should be True)) let! len = reader |> TaskSeq.mapi (fun i _ -> i + 1) |> TaskSeq.last diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs new file mode 100644 index 00000000..25e2e413 --- /dev/null +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -0,0 +1,155 @@ +module FSharpy.Tests.``State transition bug and InvalidState`` + +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharpy +open System.Threading.Tasks +open System.Diagnostics +open System.Collections.Generic + +let getEmptyVariant variant : IAsyncEnumerable = + match variant with + | "do" -> taskSeq { do ignore () } + | "do!" -> taskSeq { do! task { return () } } // TODO: this doesn't work with Task, only Task... + | "yield! (seq)" -> taskSeq { yield! Seq.empty } + | "yield! (taskseq)" -> taskSeq { yield! taskSeq { do ignore () } } + | _ -> failwith "Uncovered variant of test" + + +[] +let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { + let tskSeq = taskSeq { do ignore () } + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + let! noNext = tskSeq.GetAsyncEnumerator().MoveNextAsync() + noNext |> should be False +} + +[] +let ``CE empty taskSeq with MoveNextAsync -- typed`` variant = task { + let tskSeq = getEmptyVariant variant + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + let! noNext = tskSeq.GetAsyncEnumerator().MoveNextAsync() + noNext |> should be False +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator = tskSeq.GetAsyncEnumerator() + use enumerator = tskSeq.GetAsyncEnumerator() + use enumerator = tskSeq.GetAsyncEnumerator() + () +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator = tskSeq.GetAsyncEnumerator() + use enumerator = tskSeq.GetAsyncEnumerator() + let! isNext = enumerator.MoveNextAsync() + () +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator = tskSeq.GetAsyncEnumerator() + let! isNext = enumerator.MoveNextAsync() + use enumerator = tskSeq.GetAsyncEnumerator() + let! isNext = enumerator.MoveNextAsync() + () +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = task { + let tskSeq = getEmptyVariant variant + + // let's get the enumerator a few times + for i in 0..10 do + printfn "Calling GetAsyncEnumerator for the #%i time" i + use enumerator = tskSeq.GetAsyncEnumerator() + let! isNext = enumerator.MoveNextAsync() + isNext |> should be False +} + +[] +let ``CE taskSeq with two items, MoveNext once too far`` variant = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + let enum = tskSeq.GetAsyncEnumerator() + let! isNext = enum.MoveNextAsync() // true + let! isNext = enum.MoveNextAsync() // true + let! isNext = enum.MoveNextAsync() // false + let! isNext = enum.MoveNextAsync() // error here, see + () +} + +[] +let ``CE taskSeq with two items, MoveNext too far`` variant = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // let's call MoveNext multiple times on an empty sequence + let enum = tskSeq.GetAsyncEnumerator() + + for i in 0..10 do + printfn "Calling MoveNext for the #%i time" i + let! isNext = enum.MoveNextAsync() + //isNext |> should be False + () +} + +[] +let ``CE taskSeq with two items, multiple TaskSeq.map`` variant = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // let's call MoveNext multiple times on an empty sequence + let ts1 = tskSeq |> TaskSeq.map (fun i -> i + 1) + let result1 = TaskSeq.toArray ts1 + let ts2 = ts1 |> TaskSeq.map (fun i -> i + 1) + let result2 = TaskSeq.toArray ts2 + () +} + +[] +let ``CE taskSeq with two items, multiple TaskSeq.mapAsync`` variant = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // let's call MoveNext multiple times on an empty sequence + let ts1 = tskSeq |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) + let result1 = TaskSeq.toArray ts1 + let ts2 = ts1 |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) + let result2 = TaskSeq.toArray ts2 + () +} + +[] +let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = + let tq = createDummyTaskSeq 10 + let (results1: _[]) = tq |> TaskSeq.toArray + let (results2: _[]) = tq |> TaskSeq.toArray + let (results3: _[]) = tq |> TaskSeq.toArray + let (results4: _[]) = tq |> TaskSeq.toArray + results1 |> should equal [| 1..10 |] + results2 |> should equal [| 1..10 |] + results3 |> should equal [| 1..10 |] + results4 |> should equal [| 1..10 |] diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs index a8d4a0dc..af74e4bf 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs @@ -1,13 +1,10 @@ -module FSharpy.Tests.``taskSeq Computation Expression`` +module FSharpy.Tests.``taskSeq Computation Expression`` open Xunit open FsUnit.Xunit open FsToolkit.ErrorHandling open FSharpy -open System.Threading.Tasks -open System.Diagnostics - [] let ``CE taskSeq with several yield!`` () = task { From dc7dccab3ba9421f63408fa1a229ae169e6ed73a Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 18:27:59 +0200 Subject: [PATCH 04/27] Add (failing) tests for calling IAsyncEnumerator.Current *before* iteration starts --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 25e2e413..a8139e86 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -80,6 +80,16 @@ let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant isNext |> should be False } +[] +let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current before MoveNextAsync + let current = enumerator.Current + current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + [] let ``CE taskSeq with two items, MoveNext once too far`` variant = task { let tskSeq = taskSeq { From cf1c24deb5c3bf650a027b90a9655be7e588be48 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 18:28:13 +0200 Subject: [PATCH 05/27] Fix: do not throw an exception on IAsyncEnumerator.Current when undefined, instead return default value for type --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index b32f8d9e..8232acff 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -289,7 +289,12 @@ and [] TaskSeq<'Machine, 'T | None -> match this.Machine.Data.current with | ValueSome x -> x - | ValueNone -> failwith "no current value" + | ValueNone -> + // Returning a default value is similar to how F#'s seq<'T> behaves + // According to the docs, behavior is Unspecified in case of a call + // to Current, which means that this is certainly fine, and arguably + // better than raising an exception. + Unchecked.defaultof<'T> member this.MoveNextAsync() = match this.hijack () with From 63fc705f613dfece3c3a7c159088b986e15ea6eb Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 18:34:44 +0200 Subject: [PATCH 06/27] Add (currently failing) tests for transitioning state to *after* sequence is completed --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 62 ++++++++++++++++--- 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index a8139e86..72c670c1 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -69,7 +69,7 @@ let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` vari } [] -let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = task { +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = task { let tskSeq = getEmptyVariant variant // let's get the enumerator a few times @@ -81,7 +81,7 @@ let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant } [] -let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { +let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { let tskSeq = getEmptyVariant variant let enumerator = tskSeq.GetAsyncEnumerator() @@ -91,7 +91,51 @@ let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { } [] -let ``CE taskSeq with two items, MoveNext once too far`` variant = task { +let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + let! isNext = enumerator.MoveNextAsync() + isNext |> should be False // empty sequence + + // call Current *after* MoveNextAsync returns false + let current = enumerator.Current + current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + +[] +let ``CE taskSeq with two items, call Current before MoveNextAsync`` () = task { + let tskSeq = taskSeq { + yield "foo" + yield "bar" + } + + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current before MoveNextAsync + let current = enumerator.Current + current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq with two items, call Current after MoveNextAsync returns false`` variant = task { + let tskSeq = taskSeq { + yield "foo" + yield "bar" + } + + let enumerator = tskSeq.GetAsyncEnumerator() + let! _ = enumerator.MoveNextAsync() + let! _ = enumerator.MoveNextAsync() + let! isNext = enumerator.MoveNextAsync() + isNext |> should be False // moved twice, third time returns False + + // call Current *after* MoveNextAsync returns false + let current = enumerator.Current + current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq with two items, MoveNext once too far`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -105,8 +149,8 @@ let ``CE taskSeq with two items, MoveNext once too far`` variant = task { () } -[] -let ``CE taskSeq with two items, MoveNext too far`` variant = task { +[] +let ``CE taskSeq with two items, MoveNext too far`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -122,8 +166,8 @@ let ``CE taskSeq with two items, MoveNext too far`` variant = task { () } -[] -let ``CE taskSeq with two items, multiple TaskSeq.map`` variant = task { +[] +let ``CE taskSeq with two items, multiple TaskSeq.map`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -137,8 +181,8 @@ let ``CE taskSeq with two items, multiple TaskSeq.map`` variant = task { () } -[] -let ``CE taskSeq with two items, multiple TaskSeq.mapAsync`` variant = task { +[] +let ``CE taskSeq with two items, multiple TaskSeq.mapAsync`` () = task { let tskSeq = taskSeq { yield 1 yield 2 From da0a1ce2b12fb7cf4eaf42cc555fcfb0e0b7432f Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 18:38:29 +0200 Subject: [PATCH 07/27] Fix tests, Fact vs Theory --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 72c670c1..ec232e33 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -91,7 +91,7 @@ let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { } [] -let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant = task { +let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant = task { let tskSeq = getEmptyVariant variant let enumerator = tskSeq.GetAsyncEnumerator() let! isNext = enumerator.MoveNextAsync() @@ -102,7 +102,7 @@ let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` varian current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer } -[] +[] let ``CE taskSeq with two items, call Current before MoveNextAsync`` () = task { let tskSeq = taskSeq { yield "foo" @@ -116,8 +116,8 @@ let ``CE taskSeq with two items, call Current before MoveNextAsync`` () = task { current |> should be Null // we return Unchecked.defaultof } -[] -let ``CE taskSeq with two items, call Current after MoveNextAsync returns false`` variant = task { +[] +let ``CE taskSeq with two items, call Current after MoveNextAsync returns false`` () = task { let tskSeq = taskSeq { yield "foo" yield "bar" From d8f986525872465a6bceaae951fc6e98b88e6fb1 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 18:55:10 +0200 Subject: [PATCH 08/27] Fix: ensure Current returns Unchecked.defaultof when *beyond* the end of the async stream --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 8232acff..32c99a32 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -336,6 +336,12 @@ and [] TaskSeq<'Machine, 'T if verbose then printfn "at MoveNextAsyncResult: case succeeded..." let result = data.promiseOfValueOrEnd.GetResult(version) + + if not result then + // if beyond the end of the stream, ensure we unset + // the Current value + data.current <- ValueNone + ValueTask(result) | ValueTaskSourceStatus.Faulted From b27b11eef60d23c7cb6f68711853a3f7889ab99b Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 19:05:58 +0200 Subject: [PATCH 09/27] Improve verbosity printing --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 32c99a32..10bf00c8 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -391,10 +391,12 @@ type TaskSeqBuilder() = printfn $"at Run.MoveNext, done" sm.Data.promiseOfValueOrEnd.SetResult(false) sm.Data.builder.Complete() + elif sm.Data.current.IsSome then if verbose then printfn $"at Run.MoveNext, yield" sm.Data.promiseOfValueOrEnd.SetResult(true) + else // Goto request match sm.Data.tailcallTarget with @@ -403,6 +405,7 @@ type TaskSeqBuilder() = printfn $"at Run.MoveNext, hijack" let mutable tg = tg moveNextRef &tg + | None -> if verbose then printfn $"at Run.MoveNext, await" From 817d6f753842e805ad52c32a6d2e26518a10146a Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 19:52:07 +0200 Subject: [PATCH 10/27] Record whether or not the IAsyncEnumerable is "past completion", which solves calling MoveNextAsync() multiple times after completion and removes the InvalidStateException --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 10bf00c8..5cd33d21 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -72,6 +72,9 @@ type TaskSeqStateMachineData<'T>() = [] val mutable taken: bool + [] + val mutable completed: bool + [] val mutable current: ValueOption<'T> @@ -307,6 +310,12 @@ and [] TaskSeq<'Machine, 'T if verbose then printfn "at MoveNextAsync: Resumption point = -1" ValueTask() + + elif ts.Machine.Data.completed then + // return False when beyond the last item + ts.Machine.Data.promiseOfValueOrEnd.Reset() + ValueTask() + else if verbose then printfn "at MoveNextAsync: normal resumption scenario" @@ -320,6 +329,7 @@ and [] TaskSeq<'Machine, 'T if verbose then printfn "at MoveNextAsync: done calling builder.MoveNext()" + data.builder.MoveNext(&ts) // If the move did a hijack then get the result from the final one match this.hijack () with | Some tg -> tg.MoveNextAsyncResult() @@ -391,6 +401,7 @@ type TaskSeqBuilder() = printfn $"at Run.MoveNext, done" sm.Data.promiseOfValueOrEnd.SetResult(false) sm.Data.builder.Complete() + sm.Data.completed <- true elif sm.Data.current.IsSome then if verbose then From 1e318c1329edccfc6fb7d4f2124c70a187cdac77 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 20:13:29 +0200 Subject: [PATCH 11/27] Make the test's code a little more readable --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 99 ++++++++++--------- 1 file changed, 54 insertions(+), 45 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index ec232e33..a5069778 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -1,13 +1,15 @@ module FSharpy.Tests.``State transition bug and InvalidState`` +open System +open System.Threading.Tasks +open System.Diagnostics +open System.Collections.Generic + open Xunit open FsUnit.Xunit open FsToolkit.ErrorHandling open FSharpy -open System.Threading.Tasks -open System.Diagnostics -open System.Collections.Generic let getEmptyVariant variant : IAsyncEnumerable = match variant with @@ -17,6 +19,15 @@ let getEmptyVariant variant : IAsyncEnumerable = | "yield! (taskseq)" -> taskSeq { yield! taskSeq { do ignore () } } | _ -> failwith "Uncovered variant of test" +/// Call MoveNextAsync() and check if return value is the expected value +let moveNextAndCheck expected (enumerator: IAsyncEnumerator<_>) = task { + let! (hasNext: bool) = enumerator.MoveNextAsync() + + if expected then + hasNext |> should be True + else + hasNext |> should be False +} [] let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { @@ -25,8 +36,7 @@ let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { Assert.IsAssignableFrom>(tskSeq) |> ignore - let! noNext = tskSeq.GetAsyncEnumerator().MoveNextAsync() - noNext |> should be False + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) } [] @@ -36,16 +46,15 @@ let ``CE empty taskSeq with MoveNextAsync -- typed`` variant = task { Assert.IsAssignableFrom>(tskSeq) |> ignore - let! noNext = tskSeq.GetAsyncEnumerator().MoveNextAsync() - noNext |> should be False + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) } [] let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { let tskSeq = getEmptyVariant variant - use enumerator = tskSeq.GetAsyncEnumerator() - use enumerator = tskSeq.GetAsyncEnumerator() - use enumerator = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() () } @@ -54,18 +63,19 @@ let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsyn let tskSeq = getEmptyVariant variant use enumerator = tskSeq.GetAsyncEnumerator() use enumerator = tskSeq.GetAsyncEnumerator() - let! isNext = enumerator.MoveNextAsync() - () + do! moveNextAndCheck false enumerator } [] let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` variant = task { let tskSeq = getEmptyVariant variant - use enumerator = tskSeq.GetAsyncEnumerator() - let! isNext = enumerator.MoveNextAsync() - use enumerator = tskSeq.GetAsyncEnumerator() - let! isNext = enumerator.MoveNextAsync() - () + use enumerator1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 + + // getting the enumerator again + use enumerator2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 // original should still work without raising + do! moveNextAndCheck false enumerator2 // new hone should also work without raising } [] @@ -73,11 +83,9 @@ let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = let tskSeq = getEmptyVariant variant // let's get the enumerator a few times - for i in 0..10 do - printfn "Calling GetAsyncEnumerator for the #%i time" i + for i in 0..100 do use enumerator = tskSeq.GetAsyncEnumerator() - let! isNext = enumerator.MoveNextAsync() - isNext |> should be False + do! moveNextAndCheck false enumerator // these are all empty } [] @@ -85,7 +93,7 @@ let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { let tskSeq = getEmptyVariant variant let enumerator = tskSeq.GetAsyncEnumerator() - // call Current before MoveNextAsync + // call Current *before* MoveNextAsync let current = enumerator.Current current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer } @@ -94,12 +102,10 @@ let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant = task { let tskSeq = getEmptyVariant variant let enumerator = tskSeq.GetAsyncEnumerator() - let! isNext = enumerator.MoveNextAsync() - isNext |> should be False // empty sequence + do! moveNextAndCheck false enumerator // false for empty seq // call Current *after* MoveNextAsync returns false - let current = enumerator.Current - current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer + enumerator.Current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer } [] @@ -123,15 +129,13 @@ let ``CE taskSeq with two items, call Current after MoveNextAsync returns false` yield "bar" } - let enumerator = tskSeq.GetAsyncEnumerator() - let! _ = enumerator.MoveNextAsync() - let! _ = enumerator.MoveNextAsync() - let! isNext = enumerator.MoveNextAsync() - isNext |> should be False // moved twice, third time returns False + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false // call Current *after* MoveNextAsync returns false - let current = enumerator.Current - current |> should be Null // we return Unchecked.defaultof + enum.Current |> should be Null // we return Unchecked.defaultof } [] @@ -142,28 +146,33 @@ let ``CE taskSeq with two items, MoveNext once too far`` () = task { } let enum = tskSeq.GetAsyncEnumerator() - let! isNext = enum.MoveNextAsync() // true - let! isNext = enum.MoveNextAsync() // true - let! isNext = enum.MoveNextAsync() // false - let! isNext = enum.MoveNextAsync() // error here, see - () + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + do! moveNextAndCheck false enum // this used to be an error, see issue #39 and PR #42 } [] let ``CE taskSeq with two items, MoveNext too far`` () = task { let tskSeq = taskSeq { - yield 1 - yield 2 + yield Guid.NewGuid() + yield Guid.NewGuid() } // let's call MoveNext multiple times on an empty sequence let enum = tskSeq.GetAsyncEnumerator() - for i in 0..10 do - printfn "Calling MoveNext for the #%i time" i - let! isNext = enum.MoveNextAsync() - //isNext |> should be False - () + // first get past the post + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + + // then call it bunch of times to ensure we don't get an InvalidOperationException, see issue #39 and PR #42 + for i in 0..100 do + do! moveNextAndCheck false enum + + // after whatever amount of time MoveNextAsync, we can still safely call Current + enum.Current |> should equal Guid.Empty // we return Unchecked.defaultof, which is Guid.Empty for guids } [] From c537169a32026c9b62065a4544eaf0a922fb42ee Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 20:33:37 +0200 Subject: [PATCH 12/27] Add two tests the show the difference in behavior between two seq processed in lock step, or individually to completion --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 75 +++++++++++++++++-- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index a5069778..47773762 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -29,6 +29,18 @@ let moveNextAndCheck expected (enumerator: IAsyncEnumerator<_>) = task { hasNext |> should be False } +/// Call MoveNextAsync() and check if Current has the expected value. Uses untyped 'should equal' +let moveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IAsyncEnumerator<_>) = task { + let! (hasNext: bool) = enumerator.MoveNextAsync() + + if successMoveNext then + hasNext |> should be True + else + hasNext |> should be False + + enumerator.Current |> should equal expectedValue +} + [] let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { let tskSeq = taskSeq { do ignore () } @@ -176,33 +188,84 @@ let ``CE taskSeq with two items, MoveNext too far`` () = task { } [] -let ``CE taskSeq with two items, multiple TaskSeq.map`` () = task { +let ``CE taskSeq with two items, cal GetAsyncEnumerator twice, both should have equal behavior`` () = task { let tskSeq = taskSeq { yield 1 yield 2 } - // let's call MoveNext multiple times on an empty sequence + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq with two items, cal GetAsyncEnumerator twice, both should have equal behavior -- in lockstep`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 & enum2 in lock step + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 1 enum2 // first item + + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent true 2 enum2 // second item + + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq with two items, call map multiple times over its own result`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // let's map once, and then again on the new sequence let ts1 = tskSeq |> TaskSeq.map (fun i -> i + 1) let result1 = TaskSeq.toArray ts1 let ts2 = ts1 |> TaskSeq.map (fun i -> i + 1) let result2 = TaskSeq.toArray ts2 - () + + tskSeq |> TaskSeq.toArray |> should equal [| 1; 2 |] + result1 |> should equal [| 2; 3 |] + result2 |> should equal [| 3; 4 |] } [] -let ``CE taskSeq with two items, multiple TaskSeq.mapAsync`` () = task { +let ``CE taskSeq with two items, call mapAsync multiple times over its own result`` () = task { let tskSeq = taskSeq { yield 1 yield 2 } - // let's call MoveNext multiple times on an empty sequence + // let's map once, and then again on the new sequence let ts1 = tskSeq |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) let result1 = TaskSeq.toArray ts1 let ts2 = ts1 |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) let result2 = TaskSeq.toArray ts2 - () + result1 |> should equal [| 2; 3 |] + result2 |> should equal [| 3; 4 |] } [] From fe8f5d4decb3a6ecb19c6c4c7ad1e108711f47cf Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 20:41:47 +0200 Subject: [PATCH 13/27] Renaming 'this' pointer for clarity, ts -> this. --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 5cd33d21..6d1bdf2c 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -241,19 +241,19 @@ and [] TaskSeq<'Machine, 'T data.taken <- true clone.Machine.Data.cancellationToken <- ct clone.Machine.Data.taken <- true - //clone.Machine.Data.builder <- AsyncIteratorMethodBuilder.Create() + clone.Machine.Data.builder <- AsyncIteratorMethodBuilder.Create() // calling reset causes NRE in IValueTaskSource.GetResult above - //clone.Machine.Data.promiseOfValueOrEnd.Reset() - //clone.Machine.Data.boxed <- clone - //clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack + clone.Machine.Data.promiseOfValueOrEnd.Reset() + clone.Machine.Data.boxed <- clone + clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack //clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception - //clone.Machine.Data.awaiter <- null - //clone.Machine.Data.current <- ValueNone + clone.Machine.Data.awaiter <- null + clone.Machine.Data.current <- ValueNone if verbose then printfn "Cloning, resumption point original: %i, clone: %i" - ts.Machine.ResumptionPoint + this.Machine.ResumptionPoint clone.Machine.ResumptionPoint (clone :> System.Collections.Generic.IAsyncEnumerator<'T>) @@ -311,9 +311,9 @@ and [] TaskSeq<'Machine, 'T printfn "at MoveNextAsync: Resumption point = -1" ValueTask() - elif ts.Machine.Data.completed then + elif this.Machine.Data.completed then // return False when beyond the last item - ts.Machine.Data.promiseOfValueOrEnd.Reset() + this.Machine.Data.promiseOfValueOrEnd.Reset() ValueTask() else From 33739e7584d43c9e09ff9e5aea3aed5a33072718 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 22:28:32 +0200 Subject: [PATCH 14/27] Initialize new TaskSeqStateMachineData() when getting a new enumerator --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 40 ++++++++++++++++++- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 16 ++++---- 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 47773762..964681f2 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -41,6 +41,17 @@ let moveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IAsyncEnu enumerator.Current |> should equal expectedValue } +/// Call MoveNext() and check if Current has the expected value. Uses untyped 'should equal' +let seqMoveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IEnumerator<_>) = + let (hasNext: bool) = enumerator.MoveNext() + + if successMoveNext then + hasNext |> should be True + else + hasNext |> should be False + + enumerator.Current |> should equal expectedValue + [] let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { let tskSeq = taskSeq { do ignore () } @@ -211,7 +222,32 @@ let ``CE taskSeq with two items, cal GetAsyncEnumerator twice, both should have } [] -let ``CE taskSeq with two items, cal GetAsyncEnumerator twice, both should have equal behavior -- in lockstep`` () = task { +let ``CE seq with two items, cal GetEnumerator twice`` () = task { + // this test is for behavioral comparisoni between the same Async test above with TaskSeq + let sq = seq { + yield 1 + yield 2 + } + + let enum1 = sq.GetEnumerator() + let enum2 = sq.GetEnumerator() + + // enum1 + do seqMoveNextAndCheckCurrent true 1 enum1 // first item + do seqMoveNextAndCheckCurrent true 2 enum1 // second item + do seqMoveNextAndCheckCurrent false 0 enum1 // third item: false + do seqMoveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + do seqMoveNextAndCheckCurrent true 1 enum2 // first item + do seqMoveNextAndCheckCurrent true 2 enum2 // second item + do seqMoveNextAndCheckCurrent false 0 enum2 // third item: false + do seqMoveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + + +[] +let ``CE taskSeq with two items, cal GetAsyncEnumerator twice -- in lockstep`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -270,7 +306,7 @@ let ``CE taskSeq with two items, call mapAsync multiple times over its own resul [] let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = - let tq = createDummyTaskSeq 10 + let tq = taskSeq { yield! [ 1..10 ] } let (results1: _[]) = tq |> TaskSeq.toArray let (results2: _[]) = tq |> TaskSeq.toArray let (results3: _[]) = tq |> TaskSeq.toArray diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 6d1bdf2c..45c49c8d 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -239,16 +239,18 @@ and [] TaskSeq<'Machine, 'T // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> data.taken <- true + clone.Machine.Data <- TaskSeqStateMachineData() clone.Machine.Data.cancellationToken <- ct clone.Machine.Data.taken <- true clone.Machine.Data.builder <- AsyncIteratorMethodBuilder.Create() - // calling reset causes NRE in IValueTaskSource.GetResult above - clone.Machine.Data.promiseOfValueOrEnd.Reset() - clone.Machine.Data.boxed <- clone - clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack - //clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception - clone.Machine.Data.awaiter <- null - clone.Machine.Data.current <- ValueNone + //// calling reset causes NRE in IValueTaskSource.GetResult above + //clone.Machine.Data.promiseOfValueOrEnd.Reset() + //clone.Machine.Data.boxed <- clone + ////clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack + //////clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception + //clone.Machine.Data.awaiter <- null + //clone.Machine.Data.current <- ValueNone + //clone.Machine.Data.completed <- false if verbose then printfn From 98c4fc16877827574d3453ab3e8dcf1b25b60ac2 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 22 Oct 2022 22:30:29 +0200 Subject: [PATCH 15/27] Add test for basic case of incorrect MoveNext state after creating new Enumerator, after a full loop through all items --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 964681f2..9992cad6 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -270,6 +270,28 @@ let ``CE taskSeq with two items, cal GetAsyncEnumerator twice -- in lockstep`` ( do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } +[] +let ``CE taskSeq with two items, call GetAsyncEnumerator twice -- after full iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + [] let ``CE taskSeq with two items, call map multiple times over its own result`` () = task { let tskSeq = taskSeq { From b1b29bb36feccc8dce744691a77d0eb2b91c869d Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 02:06:25 +0200 Subject: [PATCH 16/27] Fix incorrect ResumptionPoint resume by force-resetting the `Machine` in the builder --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 70 ++++++++++++++++--- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 25 +++++++ 2 files changed, 87 insertions(+), 8 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 9992cad6..256267c4 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -132,7 +132,7 @@ let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant } [] -let ``CE taskSeq with two items, call Current before MoveNextAsync`` () = task { +let ``CE taskSeq, call Current before MoveNextAsync`` () = task { let tskSeq = taskSeq { yield "foo" yield "bar" @@ -146,7 +146,7 @@ let ``CE taskSeq with two items, call Current before MoveNextAsync`` () = task { } [] -let ``CE taskSeq with two items, call Current after MoveNextAsync returns false`` () = task { +let ``CE taskSeq, call Current after MoveNextAsync returns false`` () = task { let tskSeq = taskSeq { yield "foo" yield "bar" @@ -162,7 +162,7 @@ let ``CE taskSeq with two items, call Current after MoveNextAsync returns false` } [] -let ``CE taskSeq with two items, MoveNext once too far`` () = task { +let ``CE taskSeq, MoveNext once too far`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -176,7 +176,7 @@ let ``CE taskSeq with two items, MoveNext once too far`` () = task { } [] -let ``CE taskSeq with two items, MoveNext too far`` () = task { +let ``CE taskSeq, MoveNext too far`` () = task { let tskSeq = taskSeq { yield Guid.NewGuid() yield Guid.NewGuid() @@ -199,7 +199,7 @@ let ``CE taskSeq with two items, MoveNext too far`` () = task { } [] -let ``CE taskSeq with two items, cal GetAsyncEnumerator twice, both should have equal behavior`` () = task { +let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -222,7 +222,7 @@ let ``CE taskSeq with two items, cal GetAsyncEnumerator twice, both should have } [] -let ``CE seq with two items, cal GetEnumerator twice`` () = task { +let ``CE seq -- comparison --, call GetEnumerator twice`` () = task { // this test is for behavioral comparisoni between the same Async test above with TaskSeq let sq = seq { yield 1 @@ -247,7 +247,7 @@ let ``CE seq with two items, cal GetEnumerator twice`` () = task { [] -let ``CE taskSeq with two items, cal GetAsyncEnumerator twice -- in lockstep`` () = task { +let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -271,7 +271,7 @@ let ``CE taskSeq with two items, cal GetAsyncEnumerator twice -- in lockstep`` ( } [] -let ``CE taskSeq with two items, call GetAsyncEnumerator twice -- after full iteration`` () = task { +let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = task { let tskSeq = taskSeq { yield 1 yield 2 @@ -292,6 +292,60 @@ let ``CE taskSeq with two items, call GetAsyncEnumerator twice -- after full ite do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + yield 3 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + + // move #1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + enum1.Current |> should equal 1 // remains the same + enum2.Current |> should equal 0 // should be at default location + + // move #2 + do! moveNextAndCheckCurrent true 1 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 1 + + // move #2 + do! moveNextAndCheckCurrent true 2 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 2 enum1 + enum1.Current |> should equal 2 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 3 enum1 + enum1.Current |> should equal 3 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent false 0 enum1 + enum1.Current |> should equal 0 + enum2.Current |> should equal 2 + + // move #2 + do! moveNextAndCheckCurrent true 3 enum2 + enum1.Current |> should equal 0 + enum2.Current |> should equal 3 + + // move #2 + do! moveNextAndCheckCurrent false 0 enum2 + enum1.Current |> should equal 0 +} + [] let ``CE taskSeq with two items, call map multiple times over its own result`` () = task { let tskSeq = taskSeq { diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 45c49c8d..b062d40b 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -239,10 +239,27 @@ and [] TaskSeq<'Machine, 'T // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> data.taken <- true + + // Explanation for resetting Machine use brute-force + // + // This appears to fix the problem that ResumptionPoint was not reset. I'd prefer + // a less drastical method. It solves a scenario like the following: + // let ts = taskSeq { yield 1; yield 2 } + // let e1 = ts.GetAsyncEnumerator() + // let! hasNext = e.MoveNextAsync() + // let e2 = ts.GetAsyncEnumerator() + // let! hasNext = e.MoveNextAsync() // without this hack, it would continue where e1 left off + // let a = e1.Current + // let b = e2.Current + // let isTrue = a = b // true with this, false without it + clone.Machine <- Unchecked.defaultof<_> + + // the following lines just re-initialize the key data fields State. clone.Machine.Data <- TaskSeqStateMachineData() clone.Machine.Data.cancellationToken <- ct clone.Machine.Data.taken <- true clone.Machine.Data.builder <- AsyncIteratorMethodBuilder.Create() + //// calling reset causes NRE in IValueTaskSource.GetResult above //clone.Machine.Data.promiseOfValueOrEnd.Reset() //clone.Machine.Data.boxed <- clone @@ -314,6 +331,9 @@ and [] TaskSeq<'Machine, 'T ValueTask() elif this.Machine.Data.completed then + if verbose then + printfn "at MoveNextAsync: completed = true" + // return False when beyond the last item this.Machine.Data.promiseOfValueOrEnd.Reset() ValueTask() @@ -332,6 +352,10 @@ and [] TaskSeq<'Machine, 'T printfn "at MoveNextAsync: done calling builder.MoveNext()" data.builder.MoveNext(&ts) + + if verbose then + printfn "at MoveNextAsync: done calling builder.MoveNext()" + // If the move did a hijack then get the result from the final one match this.hijack () with | Some tg -> tg.MoveNextAsyncResult() @@ -669,4 +693,5 @@ type TaskSeqBuilder() = sm.Data.current <- ValueNone // For tailcalls we return 'false' and re-run from the entry (trampoline) false + | _ -> b.YieldFrom(other).Invoke(&sm)) From 667468a81b03b3a2e10fd151ff853da4f52c13a4 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 02:07:30 +0200 Subject: [PATCH 17/27] Add a (failing) test to investigate Faulted state with multiple iterators when tasks are yielded, instead of concrete --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 256267c4..2948e2e4 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -346,6 +346,64 @@ let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = enum1.Current |> should equal 0 } +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration with Task-Delay`` () = task { + let delayRandom () = task { do! Task.Delay(Random().Next(100, 500)) } + + let tskSeq = taskSeq { + yield 1 + do! delayRandom () + yield 2 + do! delayRandom () + yield 3 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + + // move #1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + enum1.Current |> should equal 1 // remains the same + enum2.Current |> should equal 0 // should be at default location + + // move #2 + do! moveNextAndCheckCurrent true 1 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 1 + + // move #2 + do! moveNextAndCheckCurrent true 2 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 2 enum1 + enum1.Current |> should equal 2 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 3 enum1 + enum1.Current |> should equal 3 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent false 0 enum1 + enum1.Current |> should equal 0 + enum2.Current |> should equal 2 + + // move #2 + do! moveNextAndCheckCurrent true 3 enum2 + enum1.Current |> should equal 0 + enum2.Current |> should equal 3 + + // move #2 + do! moveNextAndCheckCurrent false 0 enum2 + enum1.Current |> should equal 0 +} + [] let ``CE taskSeq with two items, call map multiple times over its own result`` () = task { let tskSeq = taskSeq { From 2deb71d5a63ac129eed4ce2df066ef57fce78217 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 02:24:41 +0200 Subject: [PATCH 18/27] Adding a bunch of variants with delayed yields to check proper state transition --- .../FSharpy.TaskSeq.Test.fsproj | 1 + ...Seq.StateTransitionBug-delayed.Tests.CE.fs | 329 ++++++++++++++++++ .../TaskSeq.StateTransitionBug.Tests.CE.fs | 92 +---- src/FSharpy.TaskSeq.Test/TestUtils.fs | 37 ++ 4 files changed, 368 insertions(+), 91 deletions(-) create mode 100644 src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs diff --git a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj index d3e6fd2e..81bae0c7 100644 --- a/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj +++ b/src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj @@ -30,6 +30,7 @@ + diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs new file mode 100644 index 00000000..72dce394 --- /dev/null +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs @@ -0,0 +1,329 @@ +module FSharpy.Tests.``Bug #42 -- delayed`` // see PR #42 + +open System +open System.Threading.Tasks +open System.Diagnostics +open System.Collections.Generic + +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharpy + +// Module contains same tests as its previous file +// except that each item is delayed randomly to force +// an async Await behavior. + +let getEmptyVariant variant : IAsyncEnumerable = + match variant with + | "do" -> taskSeq { do! delayRandom () } + | "do!" -> taskSeq { do! task { return! delayRandom () } } // TODO: this doesn't work with Task, only Task... + | "yield! (seq)" -> taskSeq { + do! delayRandom () + yield! Seq.empty + } + | "yield! (taskseq)" -> taskSeq { yield! taskSeq { do! delayRandom () } } + | _ -> failwith "Uncovered variant of test" + + +[] +let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { + let tskSeq = taskSeq { do! delayRandom () } + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) +} + +[] +let ``CE empty taskSeq with MoveNextAsync -- typed`` variant = task { + let tskSeq = getEmptyVariant variant + + Assert.IsAssignableFrom>(tskSeq) + |> ignore + + do! moveNextAndCheck false (tskSeq.GetAsyncEnumerator()) +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + use _e = tskSeq.GetAsyncEnumerator() + () +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator = tskSeq.GetAsyncEnumerator() + use enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` variant = task { + let tskSeq = getEmptyVariant variant + use enumerator1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 + + // getting the enumerator again + use enumerator2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator1 // original should still work without raising + do! moveNextAndCheck false enumerator2 // new hone should also work without raising +} + +[] +let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = task { + let tskSeq = getEmptyVariant variant + + // let's get the enumerator a few times + for i in 0..100 do + use enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator // these are all empty +} + +[] +let ``CE empty taskSeq, call Current before MoveNextAsync`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current *before* MoveNextAsync + let current = enumerator.Current + current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + +[] +let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant = task { + let tskSeq = getEmptyVariant variant + let enumerator = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck false enumerator // false for empty seq + + // call Current *after* MoveNextAsync returns false + enumerator.Current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer +} + +[] +let ``CE taskSeq, call Current before MoveNextAsync`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield "foo" + do! delayRandom () + yield "bar" + } + + let enumerator = tskSeq.GetAsyncEnumerator() + + // call Current before MoveNextAsync + let current = enumerator.Current + current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq, call Current after MoveNextAsync returns false`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield "foo" + do! delayRandom () + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + + // call Current *after* MoveNextAsync returns false + enum.Current |> should be Null // we return Unchecked.defaultof +} + +[] +let ``CE taskSeq, MoveNext once too far`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield 1 + do! delayRandom () + yield 2 + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + do! moveNextAndCheck false enum // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, MoveNext too far`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield Guid.NewGuid() + do! delayRandom () + yield Guid.NewGuid() + } + + // let's call MoveNext multiple times on an empty sequence + let enum = tskSeq.GetAsyncEnumerator() + + // first get past the post + do! moveNextAndCheck true enum // first item + do! moveNextAndCheck true enum // second item + do! moveNextAndCheck false enum // third item: false + + // then call it bunch of times to ensure we don't get an InvalidOperationException, see issue #39 and PR #42 + for i in 0..100 do + do! moveNextAndCheck false enum + + // after whatever amount of time MoveNextAsync, we can still safely call Current + enum.Current |> should equal Guid.Empty // we return Unchecked.defaultof, which is Guid.Empty for guids +} + +[] +let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield 1 + do! delayRandom () + yield 2 + } + + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { + let tskSeq = taskSeq { + do! delayRandom () + yield 1 + do! delayRandom () + yield 2 + } + + let enum1 = tskSeq.GetAsyncEnumerator() + let enum2 = tskSeq.GetAsyncEnumerator() + + // enum1 & enum2 in lock step + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 1 enum2 // first item + + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent true 2 enum2 // second item + + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + yield 2 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum1 // first item + do! moveNextAndCheckCurrent true 2 enum1 // second item + do! moveNextAndCheckCurrent false 0 enum1 // third item: false + do! moveNextAndCheckCurrent false 0 enum1 // this used to be an error, see issue #39 and PR #42 + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheckCurrent true 1 enum2 // first item + do! moveNextAndCheckCurrent true 2 enum2 // second item + do! moveNextAndCheckCurrent false 0 enum2 // third item: false + do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 +} + +[] +let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = task { + let tskSeq = taskSeq { + yield 1 + do! delayRandom () + yield 2 + do! delayRandom () + yield 3 + } + + // enum1 + let enum1 = tskSeq.GetAsyncEnumerator() + + // move #1 + do! moveNextAndCheckCurrent true 1 enum1 // first item + + // enum2 + let enum2 = tskSeq.GetAsyncEnumerator() + enum1.Current |> should equal 1 // remains the same + enum2.Current |> should equal 0 // should be at default location + + // move #2 + do! moveNextAndCheckCurrent true 1 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 1 + + // move #2 + do! moveNextAndCheckCurrent true 2 enum2 + enum1.Current |> should equal 1 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 2 enum1 + enum1.Current |> should equal 2 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent true 3 enum1 + enum1.Current |> should equal 3 + enum2.Current |> should equal 2 + + // move #1 + do! moveNextAndCheckCurrent false 0 enum1 + enum1.Current |> should equal 0 + enum2.Current |> should equal 2 + + // move #2 + do! moveNextAndCheckCurrent true 3 enum2 + enum1.Current |> should equal 0 + enum2.Current |> should equal 3 + + // move #2 + do! moveNextAndCheckCurrent false 0 enum2 + enum1.Current |> should equal 0 +} + +[] +let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = + let tq = taskSeq { + yield! [ 1..3 ] + do! delayRandom () + yield! [ 4..7 ] + do! delayRandom () + } + + let (results1: _[]) = tq |> TaskSeq.toArray + let (results2: _[]) = tq |> TaskSeq.toArray + let (results3: _[]) = tq |> TaskSeq.toArray + let (results4: _[]) = tq |> TaskSeq.toArray + results1 |> should equal [| 1..10 |] + results2 |> should equal [| 1..10 |] + results3 |> should equal [| 1..10 |] + results4 |> should equal [| 1..10 |] diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 2948e2e4..ca422e68 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -1,4 +1,4 @@ -module FSharpy.Tests.``State transition bug and InvalidState`` +module FSharpy.Tests.``Bug #42`` // see PR #42 open System open System.Threading.Tasks @@ -19,38 +19,6 @@ let getEmptyVariant variant : IAsyncEnumerable = | "yield! (taskseq)" -> taskSeq { yield! taskSeq { do ignore () } } | _ -> failwith "Uncovered variant of test" -/// Call MoveNextAsync() and check if return value is the expected value -let moveNextAndCheck expected (enumerator: IAsyncEnumerator<_>) = task { - let! (hasNext: bool) = enumerator.MoveNextAsync() - - if expected then - hasNext |> should be True - else - hasNext |> should be False -} - -/// Call MoveNextAsync() and check if Current has the expected value. Uses untyped 'should equal' -let moveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IAsyncEnumerator<_>) = task { - let! (hasNext: bool) = enumerator.MoveNextAsync() - - if successMoveNext then - hasNext |> should be True - else - hasNext |> should be False - - enumerator.Current |> should equal expectedValue -} - -/// Call MoveNext() and check if Current has the expected value. Uses untyped 'should equal' -let seqMoveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IEnumerator<_>) = - let (hasNext: bool) = enumerator.MoveNext() - - if successMoveNext then - hasNext |> should be True - else - hasNext |> should be False - - enumerator.Current |> should equal expectedValue [] let ``CE empty taskSeq with MoveNextAsync -- untyped`` () = task { @@ -346,64 +314,6 @@ let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = enum1.Current |> should equal 0 } -[] -let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration with Task-Delay`` () = task { - let delayRandom () = task { do! Task.Delay(Random().Next(100, 500)) } - - let tskSeq = taskSeq { - yield 1 - do! delayRandom () - yield 2 - do! delayRandom () - yield 3 - } - - // enum1 - let enum1 = tskSeq.GetAsyncEnumerator() - - // move #1 - do! moveNextAndCheckCurrent true 1 enum1 // first item - - // enum2 - let enum2 = tskSeq.GetAsyncEnumerator() - enum1.Current |> should equal 1 // remains the same - enum2.Current |> should equal 0 // should be at default location - - // move #2 - do! moveNextAndCheckCurrent true 1 enum2 - enum1.Current |> should equal 1 - enum2.Current |> should equal 1 - - // move #2 - do! moveNextAndCheckCurrent true 2 enum2 - enum1.Current |> should equal 1 - enum2.Current |> should equal 2 - - // move #1 - do! moveNextAndCheckCurrent true 2 enum1 - enum1.Current |> should equal 2 - enum2.Current |> should equal 2 - - // move #1 - do! moveNextAndCheckCurrent true 3 enum1 - enum1.Current |> should equal 3 - enum2.Current |> should equal 2 - - // move #1 - do! moveNextAndCheckCurrent false 0 enum1 - enum1.Current |> should equal 0 - enum2.Current |> should equal 2 - - // move #2 - do! moveNextAndCheckCurrent true 3 enum2 - enum1.Current |> should equal 0 - enum2.Current |> should equal 3 - - // move #2 - do! moveNextAndCheckCurrent false 0 enum2 - enum1.Current |> should equal 0 -} - [] let ``CE taskSeq with two items, call map multiple times over its own result`` () = task { let tskSeq = taskSeq { diff --git a/src/FSharpy.TaskSeq.Test/TestUtils.fs b/src/FSharpy.TaskSeq.Test/TestUtils.fs index e3e3b85d..370cb209 100644 --- a/src/FSharpy.TaskSeq.Test/TestUtils.fs +++ b/src/FSharpy.TaskSeq.Test/TestUtils.fs @@ -8,6 +8,8 @@ open System.Diagnostics open FsToolkit.ErrorHandling open FSharpy +open System.Collections.Generic +open FsUnit.Xunit /// Milliseconds [] @@ -117,6 +119,41 @@ type DummyTaskFactory(µsecMin: int64<µs>, µsecMax: int64<µs>) = [] module TestUtils = + /// Delays (no spin-wait!) between 20 and 200ms, assuming a 15.6ms resolution clock + let delayRandom () = task { do! Task.Delay(Random().Next(20, 200)) } + + /// Call MoveNextAsync() and check if return value is the expected value + let moveNextAndCheck expected (enumerator: IAsyncEnumerator<_>) = task { + let! (hasNext: bool) = enumerator.MoveNextAsync() + + if expected then + hasNext |> should be True + else + hasNext |> should be False + } + + /// Call MoveNextAsync() and check if Current has the expected value. Uses untyped 'should equal' + let moveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IAsyncEnumerator<_>) = task { + let! (hasNext: bool) = enumerator.MoveNextAsync() + + if successMoveNext then + hasNext |> should be True + else + hasNext |> should be False + + enumerator.Current |> should equal expectedValue + } + + /// Call MoveNext() and check if Current has the expected value. Uses untyped 'should equal' + let seqMoveNextAndCheckCurrent successMoveNext expectedValue (enumerator: IEnumerator<_>) = + let (hasNext: bool) = enumerator.MoveNext() + + if successMoveNext then + hasNext |> should be True + else + hasNext |> should be False + + enumerator.Current |> should equal expectedValue /// Joins two tasks using merely BCL methods. This approach is what you can use to /// properly, sequentially execute a chain of tasks in a non-blocking, non-overlapping way. From 2d999a0106287f48e573e51ef68043e553c3bd33 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 02:38:11 +0200 Subject: [PATCH 19/27] Disable a bunch of tests, and set verbose=false, as CI cannot deal with large logs --- ...Seq.StateTransitionBug-delayed.Tests.CE.fs | 27 ++++++++++++++----- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 2 +- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs index 72dce394..c7117e28 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs @@ -1,4 +1,4 @@ -module FSharpy.Tests.``Bug #42 -- delayed`` // see PR #42 +module FSharpy.Tests.``Bug #42 -- asynchronous`` // see PR #42 open System open System.Threading.Tasks @@ -56,7 +56,11 @@ let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { () } -[] +[] let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsync`` variant = task { let tskSeq = getEmptyVariant variant use enumerator = tskSeq.GetAsyncEnumerator() @@ -64,7 +68,11 @@ let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsyn do! moveNextAndCheck false enumerator } -[] +[] let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` variant = task { let tskSeq = getEmptyVariant variant use enumerator1 = tskSeq.GetAsyncEnumerator() @@ -76,7 +84,11 @@ let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` vari do! moveNextAndCheck false enumerator2 // new hone should also work without raising } -[] +[] let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync in a loop`` variant = task { let tskSeq = getEmptyVariant variant @@ -181,7 +193,7 @@ let ``CE taskSeq, MoveNext too far`` () = task { enum.Current |> should equal Guid.Empty // we return Unchecked.defaultof, which is Guid.Empty for guids } -[] +[] let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior`` () = task { let tskSeq = taskSeq { do! delayRandom () @@ -206,7 +218,7 @@ let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } -[] +[] let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { let tskSeq = taskSeq { do! delayRandom () @@ -236,6 +248,7 @@ let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = task { let tskSeq = taskSeq { yield 1 + do! delayRandom () yield 2 } @@ -254,7 +267,7 @@ let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = t do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } -[] +[] let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = task { let tskSeq = taskSeq { yield 1 diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index ca422e68..43c0bbec 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -1,4 +1,4 @@ -module FSharpy.Tests.``Bug #42`` // see PR #42 +module FSharpy.Tests.``Bug #42 -- synchronous`` // see PR #42 open System open System.Threading.Tasks From e03acd7c32e1132b8a677c94c3af99c3c6374c4c Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 04:20:33 +0200 Subject: [PATCH 20/27] Fix rebase mistake --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index b062d40b..12fa049e 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -179,6 +179,7 @@ and [] TaskSeq<'Machine, 'T printfn "Getting result for token on 'Some' branch, status: %A" ((tg :> IValueTaskSource).GetStatus(token)) + (tg :> IValueTaskSource).GetResult(token) | None -> try @@ -186,6 +187,7 @@ and [] TaskSeq<'Machine, 'T printfn "Getting result for token on 'None' branch, status: %A" (this.Machine.Data.promiseOfValueOrEnd.GetStatus(token)) + this.Machine.Data.promiseOfValueOrEnd.GetResult(token) with e -> // FYI: an exception here is usually caused by the CE statement (user code) throwing an exception @@ -195,6 +197,7 @@ and [] TaskSeq<'Machine, 'T printfn "Error '%s' for token: %i" e.Message token reraise () + member this.OnCompleted(continuation, state, token, flags) = match this.hijack () with | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) @@ -213,6 +216,7 @@ and [] TaskSeq<'Machine, 'T member _.SetStateMachine(_state) = if verbose then printfn "Setting state machine -- ignored" + () // not needed for reference type interface IAsyncEnumerable<'T> with @@ -226,8 +230,10 @@ and [] TaskSeq<'Machine, 'T data.taken <- true data.cancellationToken <- ct data.builder <- AsyncIteratorMethodBuilder.Create() + if verbose then printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint + (this :> IAsyncEnumerator<_>) else if verbose then @@ -328,6 +334,7 @@ and [] TaskSeq<'Machine, 'T if this.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable if verbose then printfn "at MoveNextAsync: Resumption point = -1" + ValueTask() elif this.Machine.Data.completed then @@ -341,15 +348,13 @@ and [] TaskSeq<'Machine, 'T else if verbose then printfn "at MoveNextAsync: normal resumption scenario" + let data = this.Machine.Data data.promiseOfValueOrEnd.Reset() let mutable ts = this if verbose then printfn "at MoveNextAsync: start calling builder.MoveNext()" - data.builder.MoveNext(&ts) - if verbose then - printfn "at MoveNextAsync: done calling builder.MoveNext()" data.builder.MoveNext(&ts) @@ -371,6 +376,7 @@ and [] TaskSeq<'Machine, 'T | ValueTaskSourceStatus.Succeeded -> if verbose then printfn "at MoveNextAsyncResult: case succeeded..." + let result = data.promiseOfValueOrEnd.GetResult(version) if not result then @@ -415,16 +421,20 @@ type TaskSeqBuilder() = if verbose then printfn "Resuming at resumption point %i" sm.ResumptionPoint + try if verbose then printfn "at Run.MoveNext start" let __stack_code_fin = code.Invoke(&sm) + if verbose then printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}" + if __stack_code_fin then if verbose then printfn $"at Run.MoveNext, done" + sm.Data.promiseOfValueOrEnd.SetResult(false) sm.Data.builder.Complete() sm.Data.completed <- true @@ -432,6 +442,7 @@ type TaskSeqBuilder() = elif sm.Data.current.IsSome then if verbose then printfn $"at Run.MoveNext, yield" + sm.Data.promiseOfValueOrEnd.SetResult(true) else @@ -440,12 +451,14 @@ type TaskSeqBuilder() = | Some tg -> if verbose then printfn $"at Run.MoveNext, hijack" + let mutable tg = tg moveNextRef &tg | None -> if verbose then printfn $"at Run.MoveNext, await" + let boxed = sm.Data.boxed sm.Data.awaiter.UnsafeOnCompleted( @@ -457,6 +470,7 @@ type TaskSeqBuilder() = with exn -> if verbose then printfn "Setting exception of PromiseOfValueOrEnd to: %s" exn.Message + sm.Data.promiseOfValueOrEnd.SetException(exn) sm.Data.builder.Complete() //-- RESUMABLE CODE END @@ -464,10 +478,12 @@ type TaskSeqBuilder() = (SetStateMachineMethodImpl<_>(fun sm state -> if verbose then printfn "at SetStatemachingMethodImpl, ignored" + ())) (AfterCode<_, _>(fun sm -> if verbose then printfn "at AfterCode<_, _>, setting the Machine field to the StateMachine" + let ts = TaskSeq, 'T>() ts.Machine <- sm ts.Machine.Data <- TaskSeqStateMachineData() @@ -506,11 +522,13 @@ type TaskSeqBuilder() = if __stack_vtask.IsCompleted then if verbose then printfn "Returning completed task (in while)" + __stack_condition_fin <- true condition_res <- __stack_vtask.Result else if verbose then printfn "Awaiting non-completed task (in while)" + let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() // This will yield with __stack_fin = false From 8fb8282c99de310abaa633b95695bdb2137da485 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 14:30:42 +0200 Subject: [PATCH 21/27] Temp disable worrisome tests to get xUnit to complete in CI --- .../TaskSeq.StateTransitionBug-delayed.Tests.CE.fs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs index c7117e28..d411f086 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs @@ -244,7 +244,7 @@ let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } -[] +[] let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = task { let tskSeq = taskSeq { yield 1 @@ -323,7 +323,7 @@ let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = enum1.Current |> should equal 0 } -[] +[] let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = let tq = taskSeq { yield! [ 1..3 ] From b597e4d6db51ab0b38f04411dd02d9606bc8505a Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 14:48:06 +0200 Subject: [PATCH 22/27] Show that the NRE can also occur in synchronous versions of taskSeq --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 169 +++++++++++++++++- 1 file changed, 165 insertions(+), 4 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 43c0bbec..9d7deedc 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -315,7 +315,10 @@ let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = } [] -let ``CE taskSeq with two items, call map multiple times over its own result`` () = task { +let ``CE taskSeq, call map multiple times over its own result`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + let tskSeq = taskSeq { yield 1 yield 2 @@ -325,7 +328,7 @@ let ``CE taskSeq with two items, call map multiple times over its own result`` ( let ts1 = tskSeq |> TaskSeq.map (fun i -> i + 1) let result1 = TaskSeq.toArray ts1 let ts2 = ts1 |> TaskSeq.map (fun i -> i + 1) - let result2 = TaskSeq.toArray ts2 + let result2 = TaskSeq.toArray ts2 // NRE here tskSeq |> TaskSeq.toArray |> should equal [| 1; 2 |] result1 |> should equal [| 2; 3 |] @@ -333,7 +336,165 @@ let ``CE taskSeq with two items, call map multiple times over its own result`` ( } [] -let ``CE taskSeq with two items, call mapAsync multiple times over its own result`` () = task { +let ``CE taskSeq, call map multiple times over its own result - alternative #1`` () = task { + let tskSeq1 = taskSeq { + yield 1 + yield 2 + } + + // [ 2; 3] + let tskSeq2 = taskSeq { + for i in tskSeq1 do + yield i + 1 + } + + // [ 3; 4] + let tskSeq3 = taskSeq { + for i in tskSeq2 do + yield i + 1 + } + + let result3 = TaskSeq.toArray tskSeq3 + + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #2`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let tskSeq1 = taskSeq { + yield 1 + yield 2 + } + + let result1 = TaskSeq.toArray tskSeq1 + result1 |> should equal [| 1; 2 |] + + // [ 2; 3] + let tskSeq2 = taskSeq { + for i in tskSeq1 do + yield i + 1 + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + for i in tskSeq2 do // NRE here + yield i + 1 + } + + let! result3 = TaskSeq.toArrayAsync tskSeq3 // from here + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #3`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let tskSeq1 = taskSeq { + yield 1 + yield 2 + } + + let result1 = TaskSeq.toArray tskSeq1 + result1 |> should equal [| 1; 2 |] + + // [ 2; 3] + let tskSeq2 = taskSeq { + yield! taskSeq { + for i in tskSeq1 do + yield i + 1 + } + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + yield! taskSeq { // NRE here + for i in tskSeq2 do + yield i + 1 + } + } + + let result3 = TaskSeq.toArray tskSeq3 // from here + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #4`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let sequence = seq { + yield 1 + yield 2 + } + + // [ 2; 3] + let tskSeq2 = taskSeq { + for i in sequence do + yield i + 1 + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + for i in tskSeq2 do + yield i + 1 // NRE here + } + + let result3 = TaskSeq.toArray tskSeq3 // NRE from here + result3 |> should equal [| 3; 4 |] +} + +[] +let ``CE taskSeq, call map multiple times over its own result - alternative #5`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + + let sequence = seq { + yield 1 + yield 2 + } + + // [ 2; 3] + let tskSeq2 = taskSeq { + yield! taskSeq { + for i in sequence do + yield i + 1 + } + } + + let result2 = TaskSeq.toArray tskSeq2 + result2 |> should equal [| 2; 3 |] + + // [ 3; 4] + let tskSeq3 = taskSeq { + yield! taskSeq { // NRE here + for i in tskSeq2 do + yield i + 1 + } + } + + let result3 = TaskSeq.toArray tskSeq3 // from here + result3 |> should equal [| 3; 4 |] +} + + +[] +let ``CE taskSeq, call mapAsync multiple times over its own result`` () = task { + // Bug #42: System.NullReferenceException: Object reference not set to an instance of an object. + // whether using TaskSeq.toArray or toArrayAsync, or another version that uses GetAsyncEnumerator() under the hood doesn't matter + let tskSeq = taskSeq { yield 1 yield 2 @@ -343,7 +504,7 @@ let ``CE taskSeq with two items, call mapAsync multiple times over its own resul let ts1 = tskSeq |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) let result1 = TaskSeq.toArray ts1 let ts2 = ts1 |> TaskSeq.mapAsync (fun i -> task { return i + 1 }) - let result2 = TaskSeq.toArray ts2 + let result2 = TaskSeq.toArray ts2 // NRE here result1 |> should equal [| 2; 3 |] result2 |> should equal [| 3; 4 |] } From c095488c38e803e60cb2bee11e4770a35599aa4d Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Wed, 26 Oct 2022 02:29:08 +0200 Subject: [PATCH 23/27] Add two more tests, and improve logging --- .../TaskSeq.StateTransitionBug.Tests.CE.fs | 31 +++++++ src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 88 +++++++++++++++++-- 2 files changed, 112 insertions(+), 7 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 9d7deedc..c7389115 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -99,6 +99,37 @@ let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant enumerator.Current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer } +[] +let ``CE taskSeq, proper two-item task sequence`` () = task { + let tskSeq = taskSeq { + yield "foo" + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + enum.Current |> should equal "foo" + do! moveNextAndCheck true enum // second item + enum.Current |> should equal "bar" + do! moveNextAndCheck false enum // third item: false +} + +[] +let ``CE taskSeq, proper two-item task sequence -- async variant`` () = task { + let tskSeq = taskSeq { + yield "foo" + do! delayRandom () + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + enum.Current |> should equal "foo" + do! moveNextAndCheck true enum // second item + enum.Current |> should equal "bar" + do! moveNextAndCheck false enum // third item: false +} + [] let ``CE taskSeq, call Current before MoveNextAsync`` () = task { let tskSeq = taskSeq { diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 12fa049e..0b25d201 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -54,6 +54,26 @@ type IPriority2 = [] type TaskSeqStateMachineData<'T>() = + member this.LogDump() = + printfn " CancellationToken: %A" this.cancellationToken + + printfn + " Disposal stack count: %A" + (if isNull this.disposalStack then + 0 + else + this.disposalStack.Count) + + printfn " Awaiter: %A" this.awaiter + + printfn " Promise status: %A" + <| this.promiseOfValueOrEnd.GetStatus(this.promiseOfValueOrEnd.Version) + + printfn " Builder hash: %A" <| this.builder.GetHashCode() + printfn " Taken: %A" this.taken + printfn " Completed: %A" this.completed + printfn " Current: %A" this.current + [] val mutable cancellationToken: CancellationToken @@ -227,18 +247,28 @@ and [] TaskSeq<'Machine, 'T (not data.taken && initialThreadId = Environment.CurrentManagedThreadId) then + //let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> + let data = this.Machine.Data data.taken <- true data.cancellationToken <- ct data.builder <- AsyncIteratorMethodBuilder.Create() + if verbose then + printfn "All data (no clone):" + data.LogDump() + if verbose then printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint - (this :> IAsyncEnumerator<_>) + this :> IAsyncEnumerator<_> else if verbose then printfn "GetAsyncEnumerator, cloning..." + if verbose then + printfn "All data before clone:" + data.LogDump() + // it appears that the issue is possibly caused by the problem // of having ValueTask all over the place, and by going over the // iteration twice, we are trying to *await* twice, which is not allowed @@ -259,6 +289,7 @@ and [] TaskSeq<'Machine, 'T // let b = e2.Current // let isTrue = a = b // true with this, false without it clone.Machine <- Unchecked.defaultof<_> + //clone.Machine.ResumptionPoint <- 0 // the following lines just re-initialize the key data fields State. clone.Machine.Data <- TaskSeqStateMachineData() @@ -266,6 +297,11 @@ and [] TaskSeq<'Machine, 'T clone.Machine.Data.taken <- true clone.Machine.Data.builder <- AsyncIteratorMethodBuilder.Create() + if verbose then + printfn "All data after clone:" + clone.Machine.Data.LogDump() + + //// calling reset causes NRE in IValueTaskSource.GetResult above //clone.Machine.Data.promiseOfValueOrEnd.Reset() //clone.Machine.Data.boxed <- clone @@ -281,7 +317,7 @@ and [] TaskSeq<'Machine, 'T this.Machine.ResumptionPoint clone.Machine.ResumptionPoint - (clone :> System.Collections.Generic.IAsyncEnumerator<'T>) + clone :> System.Collections.Generic.IAsyncEnumerator<'T> interface IAsyncDisposable with member this.DisposeAsync() = @@ -501,9 +537,16 @@ type TaskSeqBuilder() = //sm.Start() - member inline _.Zero() : TaskSeqCode<'T> = ResumableCode.Zero() + member inline _.Zero() : TaskSeqCode<'T> = + if verbose then + printfn "at Zero()" + + ResumableCode.Zero() member inline _.Combine(task1: TaskSeqCode<'T>, task2: TaskSeqCode<'T>) : TaskSeqCode<'T> = + if verbose then + printfn "at Combine(.., ..)" + ResumableCode.Combine(task1, task2) member inline _.WhileAsync @@ -521,13 +564,13 @@ type TaskSeqBuilder() = if __stack_vtask.IsCompleted then if verbose then - printfn "Returning completed task (in while)" + printfn "at WhileAsync: returning completed task" __stack_condition_fin <- true condition_res <- __stack_vtask.Result else if verbose then - printfn "Awaiting non-completed task (in while)" + printfn "at WhileAsync: awaiting non-completed task" let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() @@ -536,6 +579,11 @@ type TaskSeqBuilder() = let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) __stack_condition_fin <- __stack_yield_fin + if verbose then + printfn + "at WhileAsync: after Yield().Invoke(sm), __stack_condition_fin=%b" + __stack_condition_fin + if __stack_condition_fin then condition_res <- task.Result else @@ -550,6 +598,9 @@ type TaskSeqBuilder() = ) member inline b.While([] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> = + if verbose then + printfn "at While(...), calling WhileAsync()" + b.WhileAsync((fun () -> ValueTask(condition ())), body) member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> = @@ -643,7 +694,14 @@ type TaskSeqBuilder() = TaskSeqCode<'T>(fun sm -> // This will yield with __stack_fin = false // This will resume with __stack_fin = true + if verbose then + printfn "at Yield, before Yield().Invoke(sm)" + let __stack_fin = ResumableCode.Yield().Invoke(&sm) + + if verbose then + printfn "at Yield, __stack_fin = %b" __stack_fin + sm.Data.current <- ValueSome v sm.Data.awaiter <- null __stack_fin) @@ -658,18 +716,31 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true + if verbose then + printfn "at Bind" + if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false // This will resume with __stack_fin2 = true let __stack_fin2 = ResumableCode.Yield().Invoke(&sm) __stack_fin <- __stack_fin2 + if verbose then + printfn "at Bind: with __stack_fin = %b" __stack_fin + if __stack_fin then + if verbose then + printfn "at Bind: with getting result from awaiter" + let result = awaiter.GetResult() + + if verbose then + printfn "at Bind: calling continuation" + (continuation result).Invoke(&sm) else if verbose then - printfn "calling AwaitUnsafeOnCompleted" + printfn "at Bind: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone @@ -680,6 +751,9 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true + if verbose then + printfn "at BindV" + if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false // This will resume with __stack_fin2 = true @@ -691,7 +765,7 @@ type TaskSeqBuilder() = (continuation result).Invoke(&sm) else if verbose then - printfn "calling AwaitUnsafeOnCompleted" + printfn "at BindV: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone From ce6152517c178ef1cfd451bfbfb311841489696c Mon Sep 17 00:00:00 2001 From: Don Syme Date: Fri, 28 Oct 2022 19:30:35 +0100 Subject: [PATCH 24/27] use copy of initial state machine to get resumption back to zero --- .../TaskSeq.Iter.Tests.fs | 2 +- .../TaskSeq.ToXXX.Tests.fs | 6 ++--- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 27 +++++-------------- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs index 4d6ff2b2..660e4d21 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs @@ -46,7 +46,7 @@ let ``TaskSeq-iter multiple iterations over same sequence`` () = task { do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) do! tq |> TaskSeq.iter (fun item -> sum <- sum + item) - sum |> should equal 220 // task-dummies started at 1 + sum |> should equal 820 // task-dummies started at 1 } [] diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs index 600e1fa1..be02631a 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs @@ -35,9 +35,9 @@ let ``TaskSeq-toArrayAsync can be applied multiple times to the same sequence`` let! (results3: _[]) = tq |> TaskSeq.toArrayAsync let! (results4: _[]) = tq |> TaskSeq.toArrayAsync results1 |> should equal [| 1..10 |] - results2 |> should equal [| 1..10 |] - results3 |> should equal [| 1..10 |] - results4 |> should equal [| 1..10 |] + results2 |> should equal [| 11..20 |] + results3 |> should equal [| 21..30 |] + results4 |> should equal [| 31..40 |] } [] diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 0b25d201..c681fa0c 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -148,6 +148,9 @@ and [] TaskSeq<'Machine, 'T inherit TaskSeq<'T>() let initialThreadId = Environment.CurrentManagedThreadId + [] + val mutable InitialMachine: 'Machine + [] val mutable Machine: 'Machine @@ -247,7 +250,6 @@ and [] TaskSeq<'Machine, 'T (not data.taken && initialThreadId = Environment.CurrentManagedThreadId) then - //let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> let data = this.Machine.Data data.taken <- true data.cancellationToken <- ct @@ -274,24 +276,7 @@ and [] TaskSeq<'Machine, 'T // iteration twice, we are trying to *await* twice, which is not allowed // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> - data.taken <- true - - // Explanation for resetting Machine use brute-force - // - // This appears to fix the problem that ResumptionPoint was not reset. I'd prefer - // a less drastical method. It solves a scenario like the following: - // let ts = taskSeq { yield 1; yield 2 } - // let e1 = ts.GetAsyncEnumerator() - // let! hasNext = e.MoveNextAsync() - // let e2 = ts.GetAsyncEnumerator() - // let! hasNext = e.MoveNextAsync() // without this hack, it would continue where e1 left off - // let a = e1.Current - // let b = e2.Current - // let isTrue = a = b // true with this, false without it - clone.Machine <- Unchecked.defaultof<_> - //clone.Machine.ResumptionPoint <- 0 - - // the following lines just re-initialize the key data fields State. + clone.Machine <- clone.InitialMachine clone.Machine.Data <- TaskSeqStateMachineData() clone.Machine.Data.cancellationToken <- ct clone.Machine.Data.taken <- true @@ -301,10 +286,9 @@ and [] TaskSeq<'Machine, 'T printfn "All data after clone:" clone.Machine.Data.LogDump() - //// calling reset causes NRE in IValueTaskSource.GetResult above //clone.Machine.Data.promiseOfValueOrEnd.Reset() - //clone.Machine.Data.boxed <- clone + clone.Machine.Data.boxed <- clone ////clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack //////clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception //clone.Machine.Data.awaiter <- null @@ -521,6 +505,7 @@ type TaskSeqBuilder() = printfn "at AfterCode<_, _>, setting the Machine field to the StateMachine" let ts = TaskSeq, 'T>() + ts.InitialMachine <- sm ts.Machine <- sm ts.Machine.Data <- TaskSeqStateMachineData() ts.Machine.Data.boxed <- ts From 5226d275d42d69b5217a9ed23bd8695036300ad9 Mon Sep 17 00:00:00 2001 From: Don Syme Date: Fri, 28 Oct 2022 19:54:20 +0100 Subject: [PATCH 25/27] use copy of initial state machine to get resumption back to zero --- src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs | 2 +- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 28 +++++++------------ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs index 1226460a..a5c3ded0 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs @@ -164,7 +164,7 @@ type ``Real world tests``(output: ITestOutputHelper) = //--- End of stack trace from previous location --- // at Xunit.Sdk.ExecutionTimer.AggregateAsync(Func`1 asyncAction) in /_/src/xunit.execution/Sdk/Frameworks/ExecutionTimer.cs:line 48 // at Xunit.Sdk.ExceptionAggregator.RunAsync(Func`1 code) in /_/src/xunit.core/Sdk/ExceptionAggregator.cs:line 90\ - [] + [] let ``Reading a 1MB buffered IAsync stream from start to finish InvalidOperationException`` () = task { let mutable count = 0 use reader = AsyncBufferedReader(output, Array.init 1_048_576 byte, 256) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index c681fa0c..789db111 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -100,6 +100,7 @@ type TaskSeqStateMachineData<'T>() = [] val mutable boxed: TaskSeq<'T> + // For tailcalls using 'return!' [] val mutable tailcallTarget: TaskSeq<'T> option @@ -244,16 +245,14 @@ and [] TaskSeq<'Machine, 'T interface IAsyncEnumerable<'T> with member this.GetAsyncEnumerator(ct) = - let data = this.Machine.Data - - if - (not data.taken - && initialThreadId = Environment.CurrentManagedThreadId) - then - let data = this.Machine.Data + match box this.Machine.Data with + | null when initialThreadId = Environment.CurrentManagedThreadId -> + let data = TaskSeqStateMachineData<'T>() + data.boxed <- this data.taken <- true data.cancellationToken <- ct data.builder <- AsyncIteratorMethodBuilder.Create() + this.Machine.Data <- data if verbose then printfn "All data (no clone):" @@ -263,20 +262,15 @@ and [] TaskSeq<'Machine, 'T printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint this :> IAsyncEnumerator<_> - else - if verbose then - printfn "GetAsyncEnumerator, cloning..." - - if verbose then - printfn "All data before clone:" - data.LogDump() + | _ -> // it appears that the issue is possibly caused by the problem // of having ValueTask all over the place, and by going over the // iteration twice, we are trying to *await* twice, which is not allowed // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 - let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> - clone.Machine <- clone.InitialMachine + let clone = TaskSeq<'Machine, 'T>() + clone.InitialMachine <- this.InitialMachine + clone.Machine <- this.InitialMachine clone.Machine.Data <- TaskSeqStateMachineData() clone.Machine.Data.cancellationToken <- ct clone.Machine.Data.taken <- true @@ -507,8 +501,6 @@ type TaskSeqBuilder() = let ts = TaskSeq, 'T>() ts.InitialMachine <- sm ts.Machine <- sm - ts.Machine.Data <- TaskSeqStateMachineData() - ts.Machine.Data.boxed <- ts ts :> IAsyncEnumerable<'T>)) else failwith "no dynamic implementation as yet" From 0caf11d90051e12d5704e24047339b9ae37a4a96 Mon Sep 17 00:00:00 2001 From: Don Syme Date: Fri, 28 Oct 2022 19:58:50 +0100 Subject: [PATCH 26/27] use copy of initial state machine to get resumption back to zero --- ...Seq.StateTransitionBug-delayed.Tests.CE.fs | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs index d411f086..fe4be003 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs @@ -56,7 +56,7 @@ let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { () } -[ should equal Guid.Empty // we return Unchecked.defaultof, which is Guid.Empty for guids } -[] +[] let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior`` () = task { let tskSeq = taskSeq { do! delayRandom () @@ -218,7 +218,7 @@ let ``CE taskSeq, call GetAsyncEnumerator twice, both should have equal behavior do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } -[] +[] let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { let tskSeq = taskSeq { do! delayRandom () @@ -244,7 +244,7 @@ let ``CE taskSeq, cal GetAsyncEnumerator twice -- in lockstep`` () = task { do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } -[] +[] let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = task { let tskSeq = taskSeq { yield 1 @@ -267,7 +267,7 @@ let ``CE taskSeq, call GetAsyncEnumerator twice -- after full iteration`` () = t do! moveNextAndCheckCurrent false 0 enum2 // this used to be an error, see issue #39 and PR #42 } -[] +[] let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = task { let tskSeq = taskSeq { yield 1 @@ -323,7 +323,7 @@ let ``CE taskSeq, call GetAsyncEnumerator twice -- random mixed iteration`` () = enum1.Current |> should equal 0 } -[] +[] let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = let tq = taskSeq { yield! [ 1..3 ] @@ -336,7 +336,7 @@ let ``TaskSeq-toArray can be applied multiple times to the same sequence`` () = let (results2: _[]) = tq |> TaskSeq.toArray let (results3: _[]) = tq |> TaskSeq.toArray let (results4: _[]) = tq |> TaskSeq.toArray - results1 |> should equal [| 1..10 |] - results2 |> should equal [| 1..10 |] - results3 |> should equal [| 1..10 |] - results4 |> should equal [| 1..10 |] + results1 |> should equal [| 1..7 |] + results2 |> should equal [| 1..7 |] + results3 |> should equal [| 1..7 |] + results4 |> should equal [| 1..7 |] From b809967d83b17c81bf15a3b7e9168946d531f432 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 29 Oct 2022 21:13:43 +0200 Subject: [PATCH 27/27] Fix merge mistakes --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index c4f1cfa3..16db4470 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -78,9 +78,6 @@ type TaskSeqStateMachineData<'T>() = /// Used by the AsyncEnumerator interface to return the Current value when /// IAsyncEnumerator.Current is called - [] - val mutable completed: bool - [] val mutable current: ValueOption<'T> @@ -255,11 +252,6 @@ and [] TaskSeq<'Machine, 'T this.InitMachineData(ct, &this._machine) this // just return 'self' here - | _ -> - if verbose then - printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint - - this :> IAsyncEnumerator<_> | _ -> // We need to reset state, but only to the "initial machine", resetting the _machine to