diff --git a/src/fsharp/FSharp.Core/async.fs b/src/fsharp/FSharp.Core/async.fs index 2d87a60078..4fe2d48d6f 100644 --- a/src/fsharp/FSharp.Core/async.fs +++ b/src/fsharp/FSharp.Core/async.fs @@ -69,7 +69,7 @@ namespace Microsoft.FSharp.Control // Hence we don't use the 'unit' return type here, and instead invent our own type. [] type AsyncReturn = - | FakeUnit + | AsyncReturn type cont<'T> = ('T -> AsyncReturn) type econt = (ExceptionDispatchInfo -> AsyncReturn) @@ -78,6 +78,7 @@ namespace Microsoft.FSharp.Control [] type Trampoline() = + let fake () = Unchecked.defaultof let unfake (_ : AsyncReturn) = () [] @@ -129,7 +130,7 @@ namespace Microsoft.FSharp.Control finally if thisIsTopTrampoline then Trampoline.thisThreadHasTrampoline <- false - Unchecked.defaultof + fake() /// Increment the counter estimating the size of the synchronous stack and /// return true if time to jump on trampoline. @@ -139,20 +140,20 @@ namespace Microsoft.FSharp.Control /// Prepare to abandon the synchronous stack of the current execution and save the continuation in the trampoline. member __.Set action = - match storedCont with - | None -> - bindCount <- 0 - storedCont <- Some action - | _ -> failwith "Internal error: attempting to install continuation twice" - Unchecked.defaultof + assert storedCont.IsNone + bindCount <- 0 + storedCont <- Some action + fake() /// Save the exception continuation during propagation of an exception, or prior to raising an exception member __.OnExceptionRaised (action: econt) = + assert storedExnCont.IsNone storedExnCont <- Some action type TrampolineHolder() as this = let mutable trampoline = null + let fake () = Unchecked.defaultof static let unfake (_: AsyncReturn) = () // Preallocate this delegate and keep it in the trampoline holder. @@ -183,12 +184,12 @@ namespace Microsoft.FSharp.Control member this.PostWithTrampoline (syncCtxt: SynchronizationContext) (f : unit -> AsyncReturn) = syncCtxt.Post (sendOrPostCallbackWithTrampoline, state=(f |> box)) - Unchecked.defaultof + fake() member this.QueueWorkItemWithTrampoline (f: unit -> AsyncReturn) = if not (ThreadPool.QueueUserWorkItem(waitCallbackForQueueWorkItemWithTrampoline, f |> box)) then failwith "failed to queue user work item" - Unchecked.defaultof + fake() member this.PostOrQueueWithTrampoline (syncCtxt : SynchronizationContext) f = match syncCtxt with @@ -202,14 +203,14 @@ namespace Microsoft.FSharp.Control this.QueueWorkItemWithTrampoline(f) #else (new Thread((fun _ -> this.Execute f |> unfake), IsBackground=true)).Start() - Unchecked.defaultof + fake() #endif #else // This should be the only call to Thread.Start in this library. We must always install a trampoline. member __.StartThreadWithTrampoline (f : unit -> AsyncReturn) = (new Thread(threadStartCallbackForStartThreadWithTrampoline,IsBackground=true)).Start(f|>box) - Unchecked.defaultof + fake() #endif /// Save the exception continuation during propagation of an exception, or prior to raising an exception @@ -230,10 +231,13 @@ namespace Microsoft.FSharp.Control type AsyncActivationAux = { /// The active cancellation token token : CancellationToken + /// The exception continuation econt : econt + /// The cancellation continuation ccont : ccont + /// Holds some commonly-allocated callbacks and a mutable location to use for a trampoline trampolineHolder : TrampolineHolder } @@ -243,6 +247,7 @@ namespace Microsoft.FSharp.Control type AsyncActivationContents<'T> = { /// The success continuation cont : cont<'T> + /// The rarely changing components aux : AsyncActivationAux } @@ -251,37 +256,52 @@ namespace Microsoft.FSharp.Control [] type AsyncActivation<'T>(contents: AsyncActivationContents<'T>) = - member ctxt.WithCancellationContinuation ccont = AsyncActivation<_> { contents with aux = { ctxt.aux with ccont = ccont } } + /// Produce a new execution context for a composite async + member ctxt.WithCancellationContinuation ccont = AsyncActivation<'T> { contents with aux = { ctxt.aux with ccont = ccont } } - member ctxt.WithExceptionContinuation econt = AsyncActivation<_> { contents with aux = { ctxt.aux with econt = econt } } + /// Produce a new execution context for a composite async + member ctxt.WithExceptionContinuation econt = AsyncActivation<'T> { contents with aux = { ctxt.aux with econt = econt } } - member ctxt.WithContinuation(cont) = AsyncActivation<_> { cont = cont; aux = contents.aux } + /// Produce a new execution context for a composite async + member ctxt.WithContinuation(cont) = AsyncActivation<'U> { cont = cont; aux = contents.aux } - member ctxt.WithContinuations(cont, econt) = AsyncActivation<_> { cont = cont; aux = { contents.aux with econt = econt } } + /// Produce a new execution context for a composite async + member ctxt.WithContinuations(cont, econt) = AsyncActivation<'U> { cont = cont; aux = { contents.aux with econt = econt } } - member ctxt.WithContinuations(cont, econt, ccont) = AsyncActivation<_> { contents with cont = cont; aux = { ctxt.aux with econt = econt; ccont = ccont } } + /// Produce a new execution context for a composite async + member ctxt.WithContinuations(cont, econt, ccont) = AsyncActivation<'T> { contents with cont = cont; aux = { ctxt.aux with econt = econt; ccont = ccont } } + /// The extra information relevant to the execution of the async member ctxt.aux = contents.aux + /// The success continuation relevant to the execution of the async member ctxt.cont = contents.cont + /// The exception continuation relevant to the execution of the async member ctxt.econt = contents.aux.econt + /// The cancellation continuation relevant to the execution of the async member ctxt.ccont = contents.aux.ccont + /// The cancellation token relevant to the execution of the async member ctxt.token = contents.aux.token + /// The trampoline holder being used to protect execution of the async member ctxt.trampolineHolder = contents.aux.trampolineHolder + /// Check if cancellation has been requested member ctxt.IsCancellationRequested = contents.aux.token.IsCancellationRequested /// Call the cancellation continuation of the active computation member ctxt.OnCancellation () = contents.aux.ccont (new OperationCanceledException (contents.aux.token)) + /// Check for trampoline hijacking. member inline ctxt.HijackCheckThenCall cont arg = contents.aux.trampolineHolder.HijackCheckThenCall cont arg + /// Call the success continuation of the asynchronous execution context after checking for + /// cancellation and trampoline hijacking. member ctxt.OnSuccess result = if ctxt.IsCancellationRequested then ctxt.OnCancellation () @@ -300,42 +320,47 @@ namespace Microsoft.FSharp.Control static member Create cancellationToken trampolineHolder cont econt ccont : AsyncActivation<'T> = AsyncActivation { cont = cont; aux = { token = cancellationToken; econt = econt; ccont = ccont; trampolineHolder = trampolineHolder } } - + /// Queue the success continuation of the asynchronous execution context as a work item in the thread pool + /// after installing a trampoline member ctxt.QueueContinuationWithTrampoline (result: 'T) = let ctxt = ctxt ctxt.aux.trampolineHolder.QueueWorkItemWithTrampoline(fun () -> ctxt.cont result) + /// Call the success continuation of the asynchronous execution context member ctxt.CallContinuation(result: 'T) = ctxt.cont result + /// Represents an asynchronous computation [] type Async<'T> = { Invoke : (AsyncActivation<'T> -> AsyncReturn) } - type VolatileBarrier() = - [] - let mutable isStopped = false - member __.Proceed = not isStopped - member __.Stop() = isStopped <- true - - [] + /// Mutable register to help ensure that code is only executed once + [] type Latch() = let mutable i = 0 + + /// Execute the latch member this.Enter() = Interlocked.CompareExchange(&i, 1, 0) = 0 - [] + /// Ensures that a function is only called once + [] type Once() = let latch = Latch() + + /// Execute the function at most once member this.Do f = if latch.Enter() then f() + /// Represents the result of an asynchronous computation [] type AsyncResult<'T> = | Ok of 'T | Error of ExceptionDispatchInfo | Canceled of OperationCanceledException + /// Get the result of an asynchronous computation [] member res.Commit () = match res with @@ -343,12 +368,14 @@ namespace Microsoft.FSharp.Control | AsyncResult.Error edi -> edi.ThrowAny() | AsyncResult.Canceled exn -> raise exn + /// Primitives to execute asynchronous computations module AsyncPrimitives = - let fake () = Unchecked.defaultof + let inline fake () = Unchecked.defaultof let unfake (_: AsyncReturn) = () + /// The mutable global CancellationTokenSource, see Async.DefaultCancellationToken let mutable defaultCancellationTokenSource = new CancellationTokenSource() /// Primitive to invoke an async computation. @@ -375,7 +402,7 @@ namespace Microsoft.FSharp.Control if ok then ctxt.HijackCheckThenCall ctxt.cont result else - Unchecked.defaultof + fake() /// Apply 'part2' to 'result1' and invoke the resulting computation. // @@ -395,7 +422,7 @@ namespace Microsoft.FSharp.Control if ok then Invoke result ctxt else - Unchecked.defaultof + fake() /// Like `CallThenInvoke` but does not do a hijack check for historical reasons (exact code compat) [] @@ -413,7 +440,7 @@ namespace Microsoft.FSharp.Control if ok then res.Invoke ctxt else - Unchecked.defaultof + fake() /// Apply 'catchFilter' to 'arg'. If the result is 'Some' invoke the resulting computation. If the result is 'None' /// then send 'result1' to the exception continuation. @@ -436,7 +463,7 @@ namespace Microsoft.FSharp.Control | Some res -> Invoke res ctxt else - Unchecked.defaultof + fake() /// Internal way of making an async from code, for exact code compat. /// Perform a cancellation check and ensure that any exceptions raised by @@ -746,7 +773,7 @@ namespace Microsoft.FSharp.Control // Run the action outside the lock match grabbedConts with - | [] -> Unchecked.defaultof + | [] -> fake() | [cont] -> if reuseThread then cont.ContinueImmediate(res) @@ -778,7 +805,7 @@ namespace Microsoft.FSharp.Control ) match resOpt with | Some res -> ctxt.cont res - | None -> Unchecked.defaultof + | None -> fake() ) member x.TryWaitForResultSynchronously (?timeout) : 'T option = @@ -907,9 +934,9 @@ namespace Microsoft.FSharp.Control let Start cancellationToken (computation:Async) = QueueAsync cancellationToken - (fun () -> Unchecked.defaultof) // nothing to do on success + (fun () -> fake()) // nothing to do on success (fun edi -> edi.ThrowAny()) // raise exception in child - (fun _ -> Unchecked.defaultof) // ignore cancellation in child + (fun _ -> fake()) // ignore cancellation in child computation |> unfake @@ -940,6 +967,7 @@ namespace Microsoft.FSharp.Control task // Helper to attach continuation to the given task. + [] let taskContinueWith (task : Task<'T>) (ctxt: AsyncActivation<'T>) useCcontForTaskCancellation = let continuation (completedTask: Task<_>) : unit = @@ -977,8 +1005,7 @@ namespace Microsoft.FSharp.Control task.ContinueWith(Action(continuation)) |> ignore |> fake - [] - [] + [] type AsyncIAsyncResult<'T>(callback: System.AsyncCallback,state:obj) = // This gets set to false if the result is not available by the // time the IAsyncResult is returned to the caller of Begin @@ -1061,8 +1088,7 @@ namespace Microsoft.FSharp.Control open AsyncPrimitives - [] - [] + [] type AsyncBuilder() = member __.Zero () = unitAsync @@ -1092,8 +1118,7 @@ namespace Microsoft.FSharp.Control module AsyncBuilderImpl = let async = AsyncBuilder() - [] - [] + [] type Async = static member CancellationToken = cancellationTokenAsync @@ -1129,7 +1154,7 @@ namespace Microsoft.FSharp.Control match contToTailCall with | Some k -> k() - | _ -> Unchecked.defaultof) + | _ -> fake()) static member DefaultCancellationToken = defaultCancellationTokenSource.Token @@ -1203,7 +1228,7 @@ namespace Microsoft.FSharp.Control | Some (Choice1Of2 exn) -> ctxtWithSync.trampolineHolder.ExecuteWithTrampoline (fun () -> ctxtWithSync.econt exn) | Some (Choice2Of2 cexn) -> ctxtWithSync.trampolineHolder.ExecuteWithTrampoline (fun () -> ctxtWithSync.ccont cexn) else - Unchecked.defaultof + fake() // recordSuccess and recordFailure between them decrement count to 0 and // as soon as 0 is reached dispose innerCancellationSource @@ -1237,7 +1262,7 @@ namespace Microsoft.FSharp.Control (fun cexn -> recordFailure (Choice2Of2 cexn)) p |> unfake) - Unchecked.defaultof)) + fake())) static member Choice(computations : Async<'T option> seq) : Async<'T option> = MakeAsync (fun ctxt -> @@ -1261,32 +1286,30 @@ namespace Microsoft.FSharp.Control if Interlocked.Increment exnCount = 1 then innerCts.Cancel(); ctxtWithSync.trampolineHolder.ExecuteWithTrampoline (fun () -> ctxtWithSync.cont result) else - Unchecked.defaultof + fake() | None -> if Interlocked.Increment noneCount = computations.Length then innerCts.Cancel(); ctxtWithSync.trampolineHolder.ExecuteWithTrampoline (fun () -> ctxtWithSync.cont None) else - Unchecked.defaultof + fake() let econt (exn : ExceptionDispatchInfo) = if Interlocked.Increment exnCount = 1 then innerCts.Cancel(); ctxtWithSync.trampolineHolder.ExecuteWithTrampoline (fun () -> ctxtWithSync.econt exn) else - Unchecked.defaultof + fake() let ccont (exn : OperationCanceledException) = if Interlocked.Increment exnCount = 1 then innerCts.Cancel(); ctxtWithSync.trampolineHolder.ExecuteWithTrampoline (fun () -> ctxtWithSync.ccont exn) else - Unchecked.defaultof + fake() for c in computations do QueueAsync innerCts.Token scont econt ccont c |> unfake - Unchecked.defaultof)) - - type Async with + fake())) /// StartWithContinuations, except the exception continuation is given an ExceptionDispatchInfo static member StartWithContinuationsUsingDispatchInfo(computation:Async<'T>, continuation, exceptionContinuation, cancellationContinuation, ?cancellationToken) : unit = @@ -1353,7 +1376,7 @@ namespace Microsoft.FSharp.Control match edi with | null -> - Unchecked.defaultof + fake() | _ -> ctxt.econt edi) @@ -1404,13 +1427,13 @@ namespace Microsoft.FSharp.Control state=null, millisecondsTimeOutInterval=millisecondsTimeout, executeOnlyOnce=true)); - Unchecked.defaultof) + fake()) with _ -> if latch.Enter() then registration.Dispose() reraise() // reraise exception only if we successfully enter the latch (no other continuations were called) else - Unchecked.defaultof + fake() ) static member AwaitIAsyncResult(iar: IAsyncResult, ?millisecondsTimeout): Async =