diff --git a/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj b/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj
index b807b3a0be5..29030516ebd 100644
--- a/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj
+++ b/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj
@@ -121,12 +121,6 @@
Reflection/reflect.fs
-
- Event/event.fsi
-
-
- Event/event.fs
-
Numerics/n.fsi
@@ -163,11 +157,35 @@
NativeInterop/nativeptr.fs
-
- Async/control.fsi
+
+ Control/event.fsi
+
+
+ Control/event.fs
+
+
+ Control/async.fsi
+
+
+ Control/async.fs
+
+
+ Control/eventmodule.fsi
+
+
+ Control/eventmodule.fs
+
+
+ Control/observable.fsi
+
+
+ Control/observable.fs
+
+
+ MailboxProcessor/mailbox.fsi
-
- Async/control.fs
+
+ MailboxProcessor/mailbox.fs
Queries/Linq.fsi
diff --git a/src/fsharp/FSharp.Core/FSharp.Core.fsproj b/src/fsharp/FSharp.Core/FSharp.Core.fsproj
index 1da4c6da6d6..0326640bf6e 100644
--- a/src/fsharp/FSharp.Core/FSharp.Core.fsproj
+++ b/src/fsharp/FSharp.Core/FSharp.Core.fsproj
@@ -149,12 +149,6 @@
Reflection/reflect.fs
-
- Event/event.fsi
-
-
- Event/event.fs
-
Numerics/n.fsi
@@ -191,11 +185,35 @@
NativeInterop/nativeptr.fs
-
- Async/control.fsi
+
+ Control/event.fsi
+
+
+ Control/event.fs
+
+
+ Control/async.fsi
+
+
+ Control/async.fs
+
+
+ Control/eventmodule.fsi
+
+
+ Control/eventmodule.fs
+
+
+ Control/observable.fsi
+
+
+ Control/observable.fs
+
+
+ MailboxProcessor/mailbox.fsi
-
- Async/control.fs
+
+ MailboxProcessor/mailbox.fs
Queries/Linq.fsi
diff --git a/src/fsharp/FSharp.Core/control.fs b/src/fsharp/FSharp.Core/async.fs
similarity index 72%
rename from src/fsharp/FSharp.Core/control.fs
rename to src/fsharp/FSharp.Core/async.fs
index 4058d38b927..e2f87857f41 100644
--- a/src/fsharp/FSharp.Core/control.fs
+++ b/src/fsharp/FSharp.Core/async.fs
@@ -3,16 +3,10 @@
namespace Microsoft.FSharp.Control
#nowarn "40"
- #nowarn "21"
- #nowarn "47"
#nowarn "52" // The value has been copied to ensure the original is not mutated by this operation
- #nowarn "67" // This type test or downcast will always hold
- #nowarn "864" // IObservable.Subscribe
open System
open System.Diagnostics
- open System.Diagnostics.CodeAnalysis
- open System.IO
open System.Reflection
open System.Runtime.CompilerServices
open System.Runtime.ExceptionServices
@@ -20,7 +14,6 @@ namespace Microsoft.FSharp.Control
open System.Threading.Tasks
open Microsoft.FSharp.Core
open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
- open Microsoft.FSharp.Core.Operators
open Microsoft.FSharp.Control
open Microsoft.FSharp.Collections
@@ -28,48 +21,6 @@ namespace Microsoft.FSharp.Control
open ReflectionAdapters
#endif
-
- /// We use our own internal implementation of queues to avoid a dependency on System.dll
- type Queue<'T>() = //: IEnumerable, ICollection, IEnumerable
-
- let mutable array = [| |]
- let mutable head = 0
- let mutable size = 0
- let mutable tail = 0
-
- let SetCapacity(capacity) =
- let destinationArray = Array.zeroCreate capacity
- if (size > 0) then
- if (head < tail) then
- Array.Copy(array, head, destinationArray, 0, size)
-
- else
- Array.Copy(array, head, destinationArray, 0, array.Length - head)
- Array.Copy(array, 0, destinationArray, array.Length - head, tail)
- array <- destinationArray
- head <- 0
- tail <- if (size = capacity) then 0 else size
-
- member x.Dequeue() =
- if (size = 0) then
- failwith "Dequeue"
- let local = array.[head]
- array.[head] <- Unchecked.defaultof<'T>
- head <- (head + 1) % array.Length
- size <- size - 1
- local
-
- member this.Enqueue(item) =
- if (size = array.Length) then
- let capacity = int ((int64 array.Length * 200L) / 100L)
- let capacity = max capacity (array.Length + 4)
- SetCapacity(capacity)
- array.[tail] <- item
- tail <- (tail + 1) % array.Length
- size <- size + 1
-
- member x.Count = size
-
type LinkedSubSource(cancellationToken : CancellationToken) =
let failureCTS = new CancellationTokenSource()
@@ -286,6 +237,12 @@ namespace Microsoft.FSharp.Control
member ctxt.CallExceptionContinuation edi =
ctxt.aux.econt edi
+ member ctxt.QueueContinuationWithTrampoline (result: 'T) =
+ ctxt.aux.trampolineHolder.QueueWorkItemWithTrampoline(fun () -> ctxt.cont result)
+
+ member ctxt.CallContinuation(result: 'T) =
+ ctxt.cont result
+
[]
[]
type Async<'T> =
@@ -1714,646 +1671,3 @@ namespace Microsoft.FSharp.Control
)
#endif
-
- open CommonExtensions
-
- module AsyncHelpers =
-
- let awaitEither a1 a2 =
- async {
- let resultCell = new ResultCell<_>()
- let! cancellationToken = Async.CancellationToken
- let start a f =
- Async.StartWithContinuationsUsingDispatchInfo(a,
- (fun res -> resultCell.RegisterResult(f res |> AsyncResult.Ok, reuseThread=false) |> unfake),
- (fun edi -> resultCell.RegisterResult(edi |> AsyncResult.Error, reuseThread=false) |> unfake),
- (fun oce -> resultCell.RegisterResult(oce |> AsyncResult.Canceled, reuseThread=false) |> unfake),
- cancellationToken = cancellationToken
- )
- start a1 Choice1Of2
- start a2 Choice2Of2
- // Note: It is ok to use "NoDirectCancel" here because the started computations use the same
- // cancellation token and will register a cancelled result if cancellation occurs.
- // Note: It is ok to use "NoDirectTimeout" here because there is no specific timeout log to this routine.
- let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout
- return! CreateAsyncResultAsync result
- }
-
- let timeout msec cancellationToken =
- if msec < 0 then
- MakeAsync (fun _ -> FakeUnit) // "block" forever
- else
- let resultCell = new ResultCell<_>()
- Async.StartWithContinuations(
- computation=Async.Sleep(msec),
- continuation=(fun () -> resultCell.RegisterResult((), reuseThread = false) |> unfake),
- exceptionContinuation=ignore,
- cancellationContinuation=ignore,
- cancellationToken = cancellationToken)
- // Note: It is ok to use "NoDirectCancel" here because the started computations use the same
- // cancellation token and will register a cancelled result if cancellation occurs.
- // Note: It is ok to use "NoDirectTimeout" here because the child compuation above looks after the timeout.
- resultCell.AwaitResult_NoDirectCancelOrTimeout
-
- []
- []
- type Mailbox<'Msg>(cancellationSupported: bool) =
- let mutable inboxStore = null
- let mutable arrivals = new Queue<'Msg>()
- let syncRoot = arrivals
-
- // Control elements indicating the state of the reader. When the reader is "blocked" at an
- // asynchronous receive, either
- // -- "cont" is non-null and the reader is "activated" by re-scheduling cont in the thread pool; or
- // -- "pulse" is non-null and the reader is "activated" by setting this event
- let mutable savedCont : ((bool -> AsyncReturn) * TrampolineHolder) option = None
-
- // Readers who have a timeout use this event
- let mutable pulse : AutoResetEvent = null
-
- // Make sure that the "pulse" value is created
- let ensurePulse() =
- match pulse with
- | null ->
- pulse <- new AutoResetEvent(false);
- | _ ->
- ()
- pulse
-
- let waitOneNoTimeoutOrCancellation =
- MakeAsync (fun ctxt ->
- match savedCont with
- | None ->
- let descheduled =
- // An arrival may have happened while we're preparing to deschedule
- lock syncRoot (fun () ->
- if arrivals.Count = 0 then
- // OK, no arrival so deschedule
- savedCont <- Some(ctxt.cont, ctxt.aux.trampolineHolder);
- true
- else
- false)
- if descheduled then
- FakeUnit
- else
- // If we didn't deschedule then run the continuation immediately
- ctxt.cont true
- | Some _ ->
- failwith "multiple waiting reader continuations for mailbox")
-
- let waitOneWithCancellation timeout =
- Async.AwaitWaitHandle(ensurePulse(), millisecondsTimeout=timeout)
-
- let waitOne timeout =
- if timeout < 0 && not cancellationSupported then
- waitOneNoTimeoutOrCancellation
- else
- waitOneWithCancellation(timeout)
-
- member __.inbox =
- match inboxStore with
- | null -> inboxStore <- new System.Collections.Generic.List<'Msg>(1)
- | _ -> ()
- inboxStore
-
- member x.CurrentQueueLength =
- lock syncRoot (fun () -> x.inbox.Count + arrivals.Count)
-
- member x.ScanArrivalsUnsafe(f) =
- if arrivals.Count = 0 then
- None
- else
- let msg = arrivals.Dequeue()
- match f msg with
- | None ->
- x.inbox.Add(msg)
- x.ScanArrivalsUnsafe(f)
- | res -> res
-
- // Lock the arrivals queue while we scan that
- member x.ScanArrivals(f) =
- lock syncRoot (fun () -> x.ScanArrivalsUnsafe(f))
-
- member x.ScanInbox(f,n) =
- match inboxStore with
- | null -> None
- | inbox ->
- if n >= inbox.Count
- then None
- else
- let msg = inbox.[n]
- match f msg with
- | None -> x.ScanInbox (f,n+1)
- | res -> inbox.RemoveAt(n); res
-
- member x.ReceiveFromArrivalsUnsafe() =
- if arrivals.Count = 0 then
- None
- else
- Some(arrivals.Dequeue())
-
- member x.ReceiveFromArrivals() =
- lock syncRoot (fun () -> x.ReceiveFromArrivalsUnsafe())
-
- member x.ReceiveFromInbox() =
- match inboxStore with
- | null -> None
- | inbox ->
- if inbox.Count = 0 then
- None
- else
- let x = inbox.[0]
- inbox.RemoveAt(0)
- Some(x)
-
- member x.Post(msg) =
- lock syncRoot (fun () ->
-
- // Add the message to the arrivals queue
- arrivals.Enqueue(msg)
-
- // Cooperatively unblock any waiting reader. If there is no waiting
- // reader we just leave the message in the incoming queue
- match savedCont with
- | None ->
- match pulse with
- | null ->
- () // no one waiting, leaving the message in the queue is sufficient
- | ev ->
- // someone is waiting on the wait handle
- ev.Set() |> ignore
-
- | Some (action, trampolineHolder) ->
- savedCont <- None
- trampolineHolder.QueueWorkItemWithTrampoline(fun () -> action true) |> unfake)
-
- member x.TryScan ((f: 'Msg -> (Async<'T>) option), timeout) : Async<'T option> =
- let rec scan timeoutAsync (timeoutCts:CancellationTokenSource) =
- async {
- match x.ScanArrivals(f) with
- | None ->
- // Deschedule and wait for a message. When it comes, rescan the arrivals
- let! ok = AsyncHelpers.awaitEither waitOneNoTimeoutOrCancellation timeoutAsync
- match ok with
- | Choice1Of2 true ->
- return! scan timeoutAsync timeoutCts
- | Choice1Of2 false ->
- return failwith "should not happen - waitOneNoTimeoutOrCancellation always returns true"
- | Choice2Of2 () ->
- lock syncRoot (fun () ->
- // Cancel the outstanding wait for messages installed by waitOneWithCancellation
- //
- // HERE BE DRAGONS. This is bestowed on us because we only support
- // a single mailbox reader at any one time.
- // If awaitEither returned control because timeoutAsync has terminated, waitOneNoTimeoutOrCancellation
- // might still be in-flight. In practical terms, it means that the push-to-async-result-cell
- // continuation that awaitEither registered on it is still pending, i.e. it is still in savedCont.
- // That continuation is a no-op now, but it is still a registered reader for arriving messages.
- // Therefore we just abandon it - a brutal way of canceling.
- // This ugly non-compositionality is only needed because we only support a single mailbox reader
- // (i.e. the user is not allowed to run several Receive/TryReceive/Scan/TryScan in parallel) - otherwise
- // we would just have an extra no-op reader in the queue.
- savedCont <- None)
-
- return None
- | Some resP ->
- timeoutCts.Cancel() // cancel the timeout watcher
- let! res = resP
- return Some res
- }
- let rec scanNoTimeout () =
- async {
- match x.ScanArrivals(f) with
- | None ->
- let! ok = waitOne(Timeout.Infinite)
- if ok then
- return! scanNoTimeout()
- else
- return (failwith "Timed out with infinite timeout??")
- | Some resP ->
- let! res = resP
- return Some res
- }
-
- // Look in the inbox first
- async {
- match x.ScanInbox(f,0) with
- | None when timeout < 0 ->
- return! scanNoTimeout()
- | None ->
- let! cancellationToken = Async.CancellationToken
- let timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, CancellationToken.None)
- let timeoutAsync = AsyncHelpers.timeout timeout timeoutCts.Token
- return! scan timeoutAsync timeoutCts
- | Some resP ->
- let! res = resP
- return Some res
- }
-
- member x.Scan((f: 'Msg -> (Async<'T>) option), timeout) =
- async {
- let! resOpt = x.TryScan(f,timeout)
- match resOpt with
- | None -> return raise(TimeoutException(SR.GetString(SR.mailboxScanTimedOut)))
- | Some res -> return res
- }
-
- member x.TryReceive(timeout) =
- let rec processFirstArrival() =
- async {
- match x.ReceiveFromArrivals() with
- | None ->
- // Make sure the pulse is created if it is going to be needed.
- // If it isn't, then create it, and go back to the start to
- // check arrivals again.
- match pulse with
- | null when timeout >= 0 || cancellationSupported ->
- ensurePulse() |> ignore
- return! processFirstArrival()
- | _ ->
- // Wait until we have been notified about a message. When that happens, rescan the arrivals
- let! ok = waitOne(timeout)
- if ok then
- return! processFirstArrival()
- else
- return None
- | res -> return res
- }
-
- // look in the inbox first
- async {
- match x.ReceiveFromInbox() with
- | None -> return! processFirstArrival()
- | res -> return res
- }
-
- member x.Receive(timeout) =
-
- let rec processFirstArrival() =
- async {
- match x.ReceiveFromArrivals() with
- | None ->
- // Make sure the pulse is created if it is going to be needed.
- // If it isn't, then create it, and go back to the start to
- // check arrivals again.
- match pulse with
- | null when timeout >= 0 || cancellationSupported ->
- ensurePulse() |> ignore
- return! processFirstArrival()
- | _ ->
- // Wait until we have been notified about a message. When that happens, rescan the arrivals
- let! ok = waitOne(timeout)
- if ok then
- return! processFirstArrival()
- else
- return raise(TimeoutException(SR.GetString(SR.mailboxReceiveTimedOut)))
- | Some res -> return res
- }
-
- // look in the inbox first
- async {
- match x.ReceiveFromInbox() with
- | None -> return! processFirstArrival()
- | Some res -> return res
- }
-
- interface System.IDisposable with
- member __.Dispose() =
- if isNotNull pulse then (pulse :> IDisposable).Dispose()
-
-#if DEBUG
- member x.UnsafeContents =
- (x.inbox,arrivals,pulse,savedCont) |> box
-#endif
-
-
- []
- []
- type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) =
- member x.Reply(value) = replyf(value)
-
- []
- []
- []
- type MailboxProcessor<'Msg>(body, ?cancellationToken) =
-
- let cancellationSupported = cancellationToken.IsSome
- let cancellationToken = defaultArg cancellationToken Async.DefaultCancellationToken
- let mailbox = new Mailbox<'Msg>(cancellationSupported)
- let mutable defaultTimeout = Threading.Timeout.Infinite
- let mutable started = false
- let errorEvent = new Event()
-
- member __.CurrentQueueLength = mailbox.CurrentQueueLength // nb. unprotected access gives an approximation of the queue length
-
- member __.DefaultTimeout
- with get() = defaultTimeout
- and set(v) = defaultTimeout <- v
-
- []
- member __.Error = errorEvent.Publish
-
-#if DEBUG
- member __.UnsafeMessageQueueContents = mailbox.UnsafeContents
-#endif
-
- member x.Start() =
- if started then
- raise (new InvalidOperationException(SR.GetString(SR.mailboxProcessorAlreadyStarted)))
- else
- started <- true
-
- // Protect the execution and send errors to the event.
- // Note that exception stack traces are lost in this design - in an extended design
- // the event could propagate an ExceptionDispatchInfo instead of an Exception.
- let p =
- async { try
- do! body x
- with exn ->
- errorEvent.Trigger exn }
-
- Async.Start(computation=p, cancellationToken=cancellationToken)
-
- member __.Post(message) = mailbox.Post(message)
-
- member __.TryPostAndReply(buildMessage : (_ -> 'Msg), ?timeout) : 'Reply option =
- let timeout = defaultArg timeout defaultTimeout
- use resultCell = new ResultCell<_>()
- let msg = buildMessage (new AsyncReplyChannel<_>(fun reply ->
- // Note the ResultCell may have been disposed if the operation
- // timed out. In this case RegisterResult drops the result on the floor.
- resultCell.RegisterResult(reply,reuseThread=false) |> unfake))
- mailbox.Post(msg)
- resultCell.TryWaitForResultSynchronously(timeout=timeout)
-
- member x.PostAndReply(buildMessage, ?timeout) : 'Reply =
- match x.TryPostAndReply(buildMessage,?timeout=timeout) with
- | None -> raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndReplyTimedOut)))
- | Some res -> res
-
- member __.PostAndTryAsyncReply(buildMessage, ?timeout) : Async<'Reply option> =
- let timeout = defaultArg timeout defaultTimeout
- let resultCell = new ResultCell<_>()
- let msg = buildMessage (new AsyncReplyChannel<_>(fun reply ->
- // Note the ResultCell may have been disposed if the operation
- // timed out. In this case RegisterResult drops the result on the floor.
- resultCell.RegisterResult(reply, reuseThread=false) |> unfake))
- mailbox.Post(msg)
- match timeout with
- | Threading.Timeout.Infinite when not cancellationSupported ->
- async { let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout
- return Some result }
-
- | _ ->
- async { use _disposeCell = resultCell
- let! ok = Async.AwaitWaitHandle(resultCell.GetWaitHandle(), millisecondsTimeout=timeout)
- let res = (if ok then Some(resultCell.GrabResult()) else None)
- return res }
-
- member x.PostAndAsyncReply(buildMessage, ?timeout:int) =
- let timeout = defaultArg timeout defaultTimeout
- match timeout with
- | Threading.Timeout.Infinite when not cancellationSupported ->
- // Nothing to dispose, no wait handles used
- let resultCell = new ResultCell<_>()
- let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> unfake))
- mailbox.Post(msg)
- resultCell.AwaitResult_NoDirectCancelOrTimeout
- | _ ->
- let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout)
- async { let! res = asyncReply
- match res with
- | None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut)))
- | Some res -> return res }
-
- member __.Receive(?timeout) =
- mailbox.Receive(timeout=defaultArg timeout defaultTimeout)
-
- member __.TryReceive(?timeout) =
- mailbox.TryReceive(timeout=defaultArg timeout defaultTimeout)
-
- member __.Scan(scanner: 'Msg -> (Async<'T>) option,?timeout) =
- mailbox.Scan(scanner,timeout=defaultArg timeout defaultTimeout)
-
- member __.TryScan(scanner: 'Msg -> (Async<'T>) option,?timeout) =
- mailbox.TryScan(scanner,timeout=defaultArg timeout defaultTimeout)
-
- interface System.IDisposable with
- member __.Dispose() = (mailbox :> IDisposable).Dispose()
-
- static member Start(body,?cancellationToken) =
- let mailboxProcessor = new MailboxProcessor<'Msg>(body,?cancellationToken=cancellationToken)
- mailboxProcessor.Start()
- mailboxProcessor
-
- []
- []
- module Event =
- []
- let create<'T>() =
- let ev = new Event<'T>()
- ev.Trigger, ev.Publish
-
- []
- let map mapping (sourceEvent: IEvent<'Delegate,'T>) =
- let ev = new Event<_>()
- sourceEvent.Add(fun x -> ev.Trigger(mapping x));
- ev.Publish
-
- []
- let filter predicate (sourceEvent: IEvent<'Delegate,'T>) =
- let ev = new Event<_>()
- sourceEvent.Add(fun x -> if predicate x then ev.Trigger x);
- ev.Publish
-
- []
- let partition predicate (sourceEvent: IEvent<'Delegate,'T>) =
- let ev1 = new Event<_>()
- let ev2 = new Event<_>()
- sourceEvent.Add(fun x -> if predicate x then ev1.Trigger x else ev2.Trigger x);
- ev1.Publish,ev2.Publish
-
- []
- let choose chooser (sourceEvent: IEvent<'Delegate,'T>) =
- let ev = new Event<_>()
- sourceEvent.Add(fun x -> match chooser x with None -> () | Some r -> ev.Trigger r);
- ev.Publish
-
- []
- let scan collector state (sourceEvent: IEvent<'Delegate,'T>) =
- let state = ref state
- let ev = new Event<_>()
- sourceEvent.Add(fun msg ->
- let z = !state
- let z = collector z msg
- state := z;
- ev.Trigger(z));
- ev.Publish
-
- []
- let add callback (sourceEvent: IEvent<'Delegate,'T>) = sourceEvent.Add(callback)
-
- []
- let pairwise (sourceEvent : IEvent<'Delegate,'T>) : IEvent<'T * 'T> =
- let ev = new Event<'T * 'T>()
- let lastArgs = ref None
- sourceEvent.Add(fun args2 ->
- (match !lastArgs with
- | None -> ()
- | Some args1 -> ev.Trigger(args1,args2))
- lastArgs := Some args2)
-
- ev.Publish
-
- []
- let merge (event1: IEvent<'Del1,'T>) (event2: IEvent<'Del2,'T>) =
- let ev = new Event<_>()
- event1.Add(fun x -> ev.Trigger(x))
- event2.Add(fun x -> ev.Trigger(x))
- ev.Publish
-
- []
- let split (splitter : 'T -> Choice<'U1,'U2>) (sourceEvent: IEvent<'Delegate,'T>) =
- let ev1 = new Event<_>()
- let ev2 = new Event<_>()
- sourceEvent.Add(fun x -> match splitter x with Choice1Of2 y -> ev1.Trigger(y) | Choice2Of2 z -> ev2.Trigger(z));
- ev1.Publish,ev2.Publish
-
-
- []
- []
- module Observable =
- let obs x = (x :> IObservable<_>)
-
-
- let inline protect f succeed fail =
- match (try Choice1Of2 (f ()) with e -> Choice2Of2 e) with
- | Choice1Of2 x -> (succeed x)
- | Choice2Of2 e -> (fail e)
-
- []
- type BasicObserver<'T>() =
- let mutable stopped = false
- abstract Next : value : 'T -> unit
- abstract Error : error : exn -> unit
- abstract Completed : unit -> unit
- interface IObserver<'T> with
- member x.OnNext value = if not stopped then x.Next value
- member x.OnError e = if not stopped then stopped <- true
- x.Error e
- member x.OnCompleted () = if not stopped then stopped <- true
- x.Completed ()
-
- []
- let map mapping (source: IObservable<'T>) =
- { new IObservable<'U> with
- member x.Subscribe(observer) =
- source.Subscribe { new BasicObserver<'T>() with
- member x.Next(v) =
- protect (fun () -> mapping v) observer.OnNext observer.OnError
- member x.Error(e) = observer.OnError(e)
- member x.Completed() = observer.OnCompleted() } }
-
- []
- let choose chooser (source: IObservable<'T>) =
- { new IObservable<'U> with
- member x.Subscribe(observer) =
- source.Subscribe { new BasicObserver<'T>() with
- member x.Next(v) =
- protect (fun () -> chooser v) (function None -> () | Some v2 -> observer.OnNext v2) observer.OnError
- member x.Error(e) = observer.OnError(e)
- member x.Completed() = observer.OnCompleted() } }
-
- []
- let filter predicate (source: IObservable<'T>) =
- choose (fun x -> if predicate x then Some x else None) source
-
- []
- let partition predicate (source: IObservable<'T>) =
- filter predicate source, filter (predicate >> not) source
-
-
- []
- let scan collector state (source: IObservable<'T>) =
- { new IObservable<'U> with
- member x.Subscribe(observer) =
- let state = ref state
- source.Subscribe { new BasicObserver<'T>() with
- member x.Next(v) =
- let z = !state
- protect (fun () -> collector z v) (fun z ->
- state := z
- observer.OnNext z) observer.OnError
-
- member x.Error(e) = observer.OnError(e)
- member x.Completed() = observer.OnCompleted() } }
-
- []
- let add callback (source: IObservable<'T>) = source.Add(callback)
-
- []
- let subscribe (callback: 'T -> unit) (source: IObservable<'T>) = source.Subscribe(callback)
-
- []
- let pairwise (source : IObservable<'T>) : IObservable<'T * 'T> =
- { new IObservable<_> with
- member x.Subscribe(observer) =
- let lastArgs = ref None
- source.Subscribe { new BasicObserver<'T>() with
- member x.Next(args2) =
- match !lastArgs with
- | None -> ()
- | Some args1 -> observer.OnNext (args1,args2)
- lastArgs := Some args2
- member x.Error(e) = observer.OnError(e)
- member x.Completed() = observer.OnCompleted() } }
-
-
- []
- let merge (source1: IObservable<'T>) (source2: IObservable<'T>) =
- { new IObservable<_> with
- member x.Subscribe(observer) =
- let stopped = ref false
- let completed1 = ref false
- let completed2 = ref false
- let h1 =
- source1.Subscribe { new IObserver<'T> with
- member x.OnNext(v) =
- if not !stopped then
- observer.OnNext v
- member x.OnError(e) =
- if not !stopped then
- stopped := true;
- observer.OnError(e)
- member x.OnCompleted() =
- if not !stopped then
- completed1 := true;
- if !completed1 && !completed2 then
- stopped := true
- observer.OnCompleted() }
- let h2 =
- source2.Subscribe { new IObserver<'T> with
- member x.OnNext(v) =
- if not !stopped then
- observer.OnNext v
- member x.OnError(e) =
- if not !stopped then
- stopped := true;
- observer.OnError(e)
- member x.OnCompleted() =
- if not !stopped then
- completed2 := true;
- if !completed1 && !completed2 then
- stopped := true
- observer.OnCompleted() }
-
- { new IDisposable with
- member x.Dispose() =
- h1.Dispose();
- h2.Dispose() } }
-
- []
- let split (splitter : 'T -> Choice<'U1,'U2>) (source: IObservable<'T>) =
- choose (fun v -> match splitter v with Choice1Of2 x -> Some x | _ -> None) source,
- choose (fun v -> match splitter v with Choice2Of2 x -> Some x | _ -> None) source
-
diff --git a/src/fsharp/FSharp.Core/control.fsi b/src/fsharp/FSharp.Core/async.fsi
similarity index 64%
rename from src/fsharp/FSharp.Core/control.fsi
rename to src/fsharp/FSharp.Core/async.fsi
index f69a663d823..9ce73b10cc2 100644
--- a/src/fsharp/FSharp.Core/control.fsi
+++ b/src/fsharp/FSharp.Core/async.fsi
@@ -5,14 +5,12 @@ namespace Microsoft.FSharp.Control
open System
open System.Threading
open System.Threading.Tasks
- open System.Runtime.CompilerServices
+ open System.Runtime.ExceptionServices
open Microsoft.FSharp.Core
- open Microsoft.FSharp.Core.Operators
open Microsoft.FSharp.Control
open Microsoft.FSharp.Collections
-
/// A compositional asynchronous computation, which, when run, will eventually produce a value
/// of type T, or else raises an exception.
///
@@ -405,6 +403,11 @@ namespace Microsoft.FSharp.Control
continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) *
?cancellationToken:CancellationToken-> unit
+ static member internal StartWithContinuationsUsingDispatchInfo:
+ computation:Async<'T> *
+ continuation:('T -> unit) * exceptionContinuation:(ExceptionDispatchInfo -> unit) * cancellationContinuation:(OperationCanceledException -> unit) *
+ ?cancellationToken:CancellationToken-> unit
+
/// Runs an asynchronous computation, starting immediately on the current operating system
/// thread.
/// If no cancellation token is provided then the default cancellation token is used.
@@ -574,7 +577,6 @@ namespace Microsoft.FSharp.Control
[]
/// A module of extension members providing asynchronous operations for some basic CLI types related to concurrency and I/O.
module CommonExtensions =
- open System.IO
type System.IO.Stream with
@@ -626,7 +628,6 @@ namespace Microsoft.FSharp.Control
/// A module of extension members providing asynchronous operations for some basic Web operations.
[]
module WebExtensions =
- begin
type System.Net.WebRequest with
/// Returns an asynchronous computation that, when run, will wait for a response to the given WebRequest.
@@ -636,16 +637,19 @@ namespace Microsoft.FSharp.Control
#if !FX_NO_WEB_CLIENT
type System.Net.WebClient with
+
/// Returns an asynchronous computation that, when run, will wait for the download of the given URI.
/// The URI to retrieve.
/// An asynchronous computation that will wait for the download of the URI.
[] // give the extension member a nice, unmangled compiled name, unique within this module
member AsyncDownloadString : address:System.Uri -> Async
+
/// Returns an asynchronous computation that, when run, will wait for the download of the given URI.
/// The URI to retrieve.
/// An asynchronous computation that will wait for the download of the URI.
[] // give the extension member a nice, unmangled compiled name, unique within this module
member AsyncDownloadData : address:System.Uri -> Async
+
/// Returns an asynchronous computation that, when run, will wait for the download of the given URI to specified file.
/// The URI to retrieve.
/// The filename to save download to.
@@ -654,373 +658,42 @@ namespace Microsoft.FSharp.Control
member AsyncDownloadFile : address:System.Uri * fileName: string -> Async
#endif
- end
-
-
- []
- []
- /// A handle to a capability to reply to a PostAndReply message.
- type AsyncReplyChannel<'Reply> =
- /// Sends a reply to a PostAndReply message.
- /// The value to send.
- member Reply : value:'Reply -> unit
-
-
- /// A message-processing agent which executes an asynchronous computation.
- ///
- /// The agent encapsulates a message queue that supports multiple-writers and
- /// a single reader agent. Writers send messages to the agent by using the Post
- /// method and its variations.
- ///
- /// The agent may wait for messages using the Receive or TryReceive methods or
- /// scan through all available messages using the Scan or TryScan method.
-
- []
- []
- []
- type MailboxProcessor<'Msg> =
-
- /// Creates an agent. The body function is used to generate the asynchronous
- /// computation executed by the agent. This function is not executed until
- /// Start is called.
- /// The function to produce an asynchronous computation that will be executed
- /// as the read loop for the MailboxProcessor when Start is called.
- /// An optional cancellation token for the body.
- /// Defaults to Async.DefaultCancellationToken.
- /// The created MailboxProcessor.
- new : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>
-
- /// Creates and starts an agent. The body function is used to generate the asynchronous
- /// computation executed by the agent.
- /// The function to produce an asynchronous computation that will be executed
- /// as the read loop for the MailboxProcessor when Start is called.
- /// An optional cancellation token for the body.
- /// Defaults to Async.DefaultCancellationToken.
- /// The created MailboxProcessor.
- static member Start : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>
-
- /// Posts a message to the message queue of the MailboxProcessor, asynchronously.
- /// The message to post.
- member Post : message:'Msg -> unit
-
- /// Posts a message to an agent and await a reply on the channel, synchronously.
- ///
- /// The message is generated by applying buildMessage to a new reply channel
- /// to be incorporated into the message. The receiving agent must process this
- /// message and invoke the Reply method on this reply channel precisely once.
- /// The function to incorporate the AsyncReplyChannel into
- /// the message to be sent.
- /// An optional timeout parameter (in milliseconds) to wait for a reply message.
- /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
- /// The reply from the agent.
- member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply
-
- /// Posts a message to an agent and await a reply on the channel, asynchronously.
- ///
- /// The message is generated by applying buildMessage to a new reply channel
- /// to be incorporated into the message. The receiving agent must process this
- /// message and invoke the Reply method on this reply channel precisely once.
- /// The function to incorporate the AsyncReplyChannel into
- /// the message to be sent.
- /// An optional timeout parameter (in milliseconds) to wait for a reply message.
- /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
- /// An asynchronous computation that will wait for the reply from the agent.
- member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply>
-
- /// Like PostAndReply, but returns None if no reply within the timeout period.
- /// The function to incorporate the AsyncReplyChannel into
- /// the message to be sent.
- /// An optional timeout parameter (in milliseconds) to wait for a reply message.
- /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
- /// The reply from the agent or None if the timeout expires.
- member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply option
-
- /// Like AsyncPostAndReply, but returns None if no reply within the timeout period.
- /// The function to incorporate the AsyncReplyChannel into
- /// the message to be sent.
- /// An optional timeout parameter (in milliseconds) to wait for a reply message.
- /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
- /// An asynchronous computation that will return the reply or None if the timeout expires.
- member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply option>
-
- /// Waits for a message. This will consume the first message in arrival order.
- ///
- /// This method is for use within the body of the agent.
- ///
- /// This method is for use within the body of the agent. For each agent, at most
- /// one concurrent reader may be active, so no more than one concurrent call to
- /// Receive, TryReceive, Scan and/or TryScan may be active.
- /// An optional timeout in milliseconds. Defaults to -1 which corresponds
- /// to System.Threading.Timeout.Infinite.
- /// An asynchronous computation that returns the received message.
- /// Thrown when the timeout is exceeded.
- member Receive : ?timeout:int -> Async<'Msg>
-
- /// Waits for a message. This will consume the first message in arrival order.
- ///
- /// This method is for use within the body of the agent.
- ///
- /// Returns None if a timeout is given and the timeout is exceeded.
- ///
- /// This method is for use within the body of the agent. For each agent, at most
- /// one concurrent reader may be active, so no more than one concurrent call to
- /// Receive, TryReceive, Scan and/or TryScan may be active.
- /// An optional timeout in milliseconds. Defaults to -1 which
- /// corresponds to System.Threading.Timeout.Infinite.
- /// An asynchronous computation that returns the received message or
- /// None if the timeout is exceeded.
- member TryReceive : ?timeout:int -> Async<'Msg option>
-
- /// Scans for a message by looking through messages in arrival order until scanner
- /// returns a Some value. Other messages remain in the queue.
- ///
- /// Returns None if a timeout is given and the timeout is exceeded.
- ///
- /// This method is for use within the body of the agent. For each agent, at most
- /// one concurrent reader may be active, so no more than one concurrent call to
- /// Receive, TryReceive, Scan and/or TryScan may be active.
- /// The function to return None if the message is to be skipped
- /// or Some if the message is to be processed and removed from the queue.
- /// An optional timeout in milliseconds. Defaults to -1 which corresponds
- /// to System.Threading.Timeout.Infinite.
- /// An asynchronous computation that scanner built off the read message.
- /// Thrown when the timeout is exceeded.
- member Scan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T>
-
- /// Scans for a message by looking through messages in arrival order until scanner
- /// returns a Some value. Other messages remain in the queue.
- ///
- /// This method is for use within the body of the agent. For each agent, at most
- /// one concurrent reader may be active, so no more than one concurrent call to
- /// Receive, TryReceive, Scan and/or TryScan may be active.
- /// The function to return None if the message is to be skipped
- /// or Some if the message is to be processed and removed from the queue.
- /// An optional timeout in milliseconds. Defaults to -1 which corresponds
- /// to System.Threading.Timeout.Infinite.
- /// An asynchronous computation that scanner built off the read message.
- member TryScan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T option>
-
- /// Starts the agent.
- member Start : unit -> unit
-
- /// Raises a timeout exception if a message not received in this amount of time. By default
- /// no timeout is used.
- member DefaultTimeout : int with get, set
-
- /// Occurs when the execution of the agent results in an exception.
- []
- member Error : IEvent
-
- interface System.IDisposable
-
- /// Returns the number of unprocessed messages in the message queue of the agent.
- member CurrentQueueLength : int
+ // Internals used by MailboxProcessor
+ module internal AsyncImpl =
+ val async : AsyncBuilder
+ []
+ // Internals used by MailboxProcessor
+ type internal AsyncReturn
- []
- []
- /// Basic operations on first class event and other observable objects.
- module Observable =
-
- /// Returns an observable for the merged observations from the sources.
- /// The returned object propagates success and error values arising
- /// from either source and completes when both the sources have completed.
- ///
- /// For each observer, the registered intermediate observing object is not
- /// thread safe. That is, observations arising from the sources must not
- /// be triggered concurrently on different threads.
- /// The first Observable.
- /// The second Observable.
- /// An Observable that propagates information from both sources.
- []
- val merge: source1:IObservable<'T> -> source2:IObservable<'T> -> IObservable<'T>
-
- /// Returns an observable which transforms the observations of the source by the
- /// given function. The transformation function is executed once for each
- /// subscribed observer. The returned object also propagates error observations
- /// arising from the source and completes when the source completes.
- /// The function applied to observations from the source.
- /// The input Observable.
- /// An Observable of the type specified by mapping.
- []
- val map: mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U>
-
- /// Returns an observable which filters the observations of the source
- /// by the given function. The observable will see only those observations
- /// for which the predicate returns true. The predicate is executed once for
- /// each subscribed observer. The returned object also propagates error
- /// observations arising from the source and completes when the source completes.
- /// The function to apply to observations to determine if it should
- /// be kept.
- /// The input Observable.
- /// An Observable that filters observations based on filter.
- []
- val filter: predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T>
-
- /// Returns two observables which partition the observations of the source by
- /// the given function. The first will trigger observations for those values
- /// for which the predicate returns true. The second will trigger observations
- /// for those values where the predicate returns false. The predicate is
- /// executed once for each subscribed observer. Both also propagate all error
- /// observations arising from the source and each completes when the source
- /// completes.
- /// The function to determine which output Observable will trigger
- /// a particular observation.
- /// The input Observable.
- /// A tuple of Observables. The first triggers when the predicate returns true, and
- /// the second triggers when the predicate returns false.
- []
- val partition: predicate:('T -> bool) -> source:IObservable<'T> -> (IObservable<'T> * IObservable<'T>)
-
- /// Returns two observables which split the observations of the source by the
- /// given function. The first will trigger observations x for which the
- /// splitter returns Choice1Of2 x. The second will trigger observations
- /// y for which the splitter returns Choice2Of2 y The splitter is
- /// executed once for each subscribed observer. Both also propagate error
- /// observations arising from the source and each completes when the source
- /// completes.
- /// The function that takes an observation an transforms
- /// it into one of the two output Choice types.
- /// The input Observable.
- /// A tuple of Observables. The first triggers when splitter returns Choice1of2
- /// and the second triggers when splitter returns Choice2of2.
- []
- val split: splitter:('T -> Choice<'U1,'U2>) -> source:IObservable<'T> -> (IObservable<'U1> * IObservable<'U2>)
-
- /// Returns an observable which chooses a projection of observations from the source
- /// using the given function. The returned object will trigger observations x
- /// for which the splitter returns Some x. The returned object also propagates
- /// all errors arising from the source and completes when the source completes.
- /// The function that returns Some for observations to be propagated
- /// and None for observations to ignore.
- /// The input Observable.
- /// An Observable that only propagates some of the observations from the source.
- []
- val choose: chooser:('T -> 'U option) -> source:IObservable<'T> -> IObservable<'U>
-
- /// Returns an observable which, for each observer, allocates an item of state
- /// and applies the given accumulating function to successive values arising from
- /// the input. The returned object will trigger observations for each computed
- /// state value, excluding the initial value. The returned object propagates
- /// all errors arising from the source and completes when the source completes.
- ///
- /// For each observer, the registered intermediate observing object is not thread safe.
- /// That is, observations arising from the source must not be triggered concurrently
- /// on different threads.
- /// The function to update the state with each observation.
- /// The initial state.
- /// The input Observable.
- /// An Observable that triggers on the updated state values.
- []
- val scan: collector:('U -> 'T -> 'U) -> state:'U -> source:IObservable<'T> -> IObservable<'U>
-
- /// Creates an observer which permanently subscribes to the given observable and which calls
- /// the given function for each observation.
- /// The function to be called on each observation.
- /// The input Observable.
- []
- val add : callback:('T -> unit) -> source:IObservable<'T> -> unit
-
- /// Creates an observer which subscribes to the given observable and which calls
- /// the given function for each observation.
- /// The function to be called on each observation.
- /// The input Observable.
- /// An object that will remove the callback if disposed.
- []
- val subscribe : callback:('T -> unit) -> source:IObservable<'T> -> System.IDisposable
-
- /// Returns a new observable that triggers on the second and subsequent triggerings of the input observable.
- /// The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as
- /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the
- /// Nth triggering occurs.
- ///
- /// For each observer, the registered intermediate observing object is not thread safe.
- /// That is, observations arising from the source must not be triggered concurrently
- /// on different threads.
- /// The input Observable.
- /// An Observable that triggers on successive pairs of observations from the input Observable.
- []
- val pairwise: source:IObservable<'T> -> IObservable<'T * 'T>
-
- []
- []
- module Event =
-
- /// Fires the output event when either of the input events fire.
- /// The first input event.
- /// The second input event.
- /// An event that fires when either of the input events fire.
- []
- val merge: event1:IEvent<'Del1,'T> -> event2:IEvent<'Del2,'T> -> IEvent<'T>
-
- /// Returns a new event that passes values transformed by the given function.
- /// The function to transform event values.
- /// The input event.
- /// An event that passes the transformed values.
- []
- val map: mapping:('T -> 'U) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U>
-
- /// Returns a new event that listens to the original event and triggers the resulting
- /// event only when the argument to the event passes the given function.
- /// The function to determine which triggers from the event to propagate.
- /// The input event.
- /// An event that only passes values that pass the predicate.
- []
- val filter: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'T>
-
- /// Returns a new event that listens to the original event and triggers the
- /// first resulting event if the application of the predicate to the event arguments
- /// returned true, and the second event if it returned false.
- /// The function to determine which output event to trigger.
- /// The input event.
- /// A tuple of events. The first is triggered when the predicate evaluates to true
- /// and the second when the predicate evaluates to false.
- []
- val partition: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'T> * IEvent<'T>)
-
- /// Returns a new event that listens to the original event and triggers the
- /// first resulting event if the application of the function to the event arguments
- /// returned a Choice1Of2, and the second event if it returns a Choice2Of2.
- /// The function to transform event values into one of two types.
- /// The input event.
- /// A tuple of events. The first fires whenever splitter evaluates to Choice1of1 and
- /// the second fires whenever splitter evaluates to Choice2of2.
- []
- val split: splitter:('T -> Choice<'U1,'U2>) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'U1> * IEvent<'U2>)
-
- /// Returns a new event which fires on a selection of messages from the original event.
- /// The selection function takes an original message to an optional new message.
- /// The function to select and transform event values to pass on.
- /// The input event.
- /// An event that fires only when the chooser returns Some.
- []
- val choose: chooser:('T -> 'U option) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U>
-
- []
- /// Returns a new event consisting of the results of applying the given accumulating function
- /// to successive values triggered on the input event. An item of internal state
- /// records the current value of the state parameter. The internal state is not locked during the
- /// execution of the accumulation function, so care should be taken that the
- /// input IEvent not triggered by multiple threads simultaneously.
- /// The function to update the state with each event value.
- /// The initial state.
- /// The input event.
- /// An event that fires on the updated state values.
- val scan: collector:('U -> 'T -> 'U) -> state:'U -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U>
-
- /// Runs the given function each time the given event is triggered.
- /// The function to call when the event is triggered.
- /// The input event.
- []
- val add : callback:('T -> unit) -> sourceEvent:IEvent<'Del,'T> -> unit
-
- /// Returns a new event that triggers on the second and subsequent triggerings of the input event.
- /// The Nth triggering of the input event passes the arguments from the N-1th and Nth triggering as
- /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the
- /// Nth triggering occurs.
- /// The input event.
- /// An event that triggers on pairs of consecutive values passed from the source event.
- []
- val pairwise: sourceEvent:IEvent<'Del,'T> -> IEvent<'T * 'T>
-
+ []
+ // Internals used by MailboxProcessor
+ type internal AsyncActivation<'T> =
+ member QueueContinuationWithTrampoline: 'T -> AsyncReturn
+ member CallContinuation: 'T -> AsyncReturn
+ []
+ // Internals used by MailboxProcessor
+ type internal AsyncResult<'T> =
+ | Ok of 'T
+ | Error of ExceptionDispatchInfo
+ | Canceled of OperationCanceledException
+
+ // Internals used by MailboxProcessor
+ module internal AsyncPrimitives =
+
+ []
+ type internal ResultCell<'T> =
+ new : unit -> ResultCell<'T>
+ member GetWaitHandle: unit -> WaitHandle
+ member Close: unit -> unit
+ interface IDisposable
+ member RegisterResult: 'T * reuseThread: bool -> AsyncReturn
+ member GrabResult: unit -> 'T
+ member ResultAvailable : bool
+ member AwaitResult_NoDirectCancelOrTimeout : Async<'T>
+ member TryWaitForResultSynchronously: ?timeout: int -> 'T option
+
+ val CreateAsyncResultAsync : AsyncResult<'T> -> Async<'T>
+
+ val MakeAsync : (AsyncActivation<'T> -> AsyncReturn) -> Async<'T>
diff --git a/src/fsharp/FSharp.Core/event.fs b/src/fsharp/FSharp.Core/event.fs
index 19b94a36697..4489c325d5a 100644
--- a/src/fsharp/FSharp.Core/event.fs
+++ b/src/fsharp/FSharp.Core/event.fs
@@ -147,5 +147,3 @@ namespace Microsoft.FSharp.Control
(e :?> IEvent<_,_>).AddHandler(h)
{ new System.IDisposable with
member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } }
-
-
diff --git a/src/fsharp/FSharp.Core/event.fsi b/src/fsharp/FSharp.Core/event.fsi
index 2ade4ac205c..23bc85ceea6 100644
--- a/src/fsharp/FSharp.Core/event.fsi
+++ b/src/fsharp/FSharp.Core/event.fsi
@@ -8,7 +8,6 @@ namespace Microsoft.FSharp.Control
open Microsoft.FSharp.Control
open Microsoft.FSharp.Collections
-
/// Event implementations for an arbitrary type of delegate.
[]
type DelegateEvent<'Delegate when 'Delegate :> System.Delegate> =
@@ -21,7 +20,6 @@ namespace Microsoft.FSharp.Control
/// Publishes the event as a first class event value.
member Publish : IDelegateEvent<'Delegate>
-
/// Event implementations for a delegate types following the standard .NET Framework convention of a first 'sender' argument.
[]
type Event<'Delegate,'Args when 'Delegate : delegate<'Args,unit> and 'Delegate :> System.Delegate > =
diff --git a/src/fsharp/FSharp.Core/eventmodule.fs b/src/fsharp/FSharp.Core/eventmodule.fs
new file mode 100644
index 00000000000..da359843d5c
--- /dev/null
+++ b/src/fsharp/FSharp.Core/eventmodule.fs
@@ -0,0 +1,81 @@
+// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
+
+namespace Microsoft.FSharp.Control
+
+ open Microsoft.FSharp.Core
+ open Microsoft.FSharp.Control
+
+ []
+ []
+ module Event =
+ []
+ let create<'T>() =
+ let ev = new Event<'T>()
+ ev.Trigger, ev.Publish
+
+ []
+ let map mapping (sourceEvent: IEvent<'Delegate,'T>) =
+ let ev = new Event<_>()
+ sourceEvent.Add(fun x -> ev.Trigger(mapping x));
+ ev.Publish
+
+ []
+ let filter predicate (sourceEvent: IEvent<'Delegate,'T>) =
+ let ev = new Event<_>()
+ sourceEvent.Add(fun x -> if predicate x then ev.Trigger x);
+ ev.Publish
+
+ []
+ let partition predicate (sourceEvent: IEvent<'Delegate,'T>) =
+ let ev1 = new Event<_>()
+ let ev2 = new Event<_>()
+ sourceEvent.Add(fun x -> if predicate x then ev1.Trigger x else ev2.Trigger x);
+ ev1.Publish,ev2.Publish
+
+ []
+ let choose chooser (sourceEvent: IEvent<'Delegate,'T>) =
+ let ev = new Event<_>()
+ sourceEvent.Add(fun x -> match chooser x with None -> () | Some r -> ev.Trigger r);
+ ev.Publish
+
+ []
+ let scan collector state (sourceEvent: IEvent<'Delegate,'T>) =
+ let state = ref state
+ let ev = new Event<_>()
+ sourceEvent.Add(fun msg ->
+ let z = !state
+ let z = collector z msg
+ state := z;
+ ev.Trigger(z));
+ ev.Publish
+
+ []
+ let add callback (sourceEvent: IEvent<'Delegate,'T>) = sourceEvent.Add(callback)
+
+ []
+ let pairwise (sourceEvent : IEvent<'Delegate,'T>) : IEvent<'T * 'T> =
+ let ev = new Event<'T * 'T>()
+ let lastArgs = ref None
+ sourceEvent.Add(fun args2 ->
+ (match !lastArgs with
+ | None -> ()
+ | Some args1 -> ev.Trigger(args1,args2))
+ lastArgs := Some args2)
+
+ ev.Publish
+
+ []
+ let merge (event1: IEvent<'Del1,'T>) (event2: IEvent<'Del2,'T>) =
+ let ev = new Event<_>()
+ event1.Add(fun x -> ev.Trigger(x))
+ event2.Add(fun x -> ev.Trigger(x))
+ ev.Publish
+
+ []
+ let split (splitter : 'T -> Choice<'U1,'U2>) (sourceEvent: IEvent<'Delegate,'T>) =
+ let ev1 = new Event<_>()
+ let ev2 = new Event<_>()
+ sourceEvent.Add(fun x -> match splitter x with Choice1Of2 y -> ev1.Trigger(y) | Choice2Of2 z -> ev2.Trigger(z));
+ ev1.Publish,ev2.Publish
+
+
diff --git a/src/fsharp/FSharp.Core/eventmodule.fsi b/src/fsharp/FSharp.Core/eventmodule.fsi
new file mode 100644
index 00000000000..c3f5f3cc9fb
--- /dev/null
+++ b/src/fsharp/FSharp.Core/eventmodule.fsi
@@ -0,0 +1,89 @@
+// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
+
+namespace Microsoft.FSharp.Control
+
+ open Microsoft.FSharp.Core
+ open Microsoft.FSharp.Control
+
+ []
+ []
+ module Event =
+
+ /// Fires the output event when either of the input events fire.
+ /// The first input event.
+ /// The second input event.
+ /// An event that fires when either of the input events fire.
+ []
+ val merge: event1:IEvent<'Del1,'T> -> event2:IEvent<'Del2,'T> -> IEvent<'T>
+
+ /// Returns a new event that passes values transformed by the given function.
+ /// The function to transform event values.
+ /// The input event.
+ /// An event that passes the transformed values.
+ []
+ val map: mapping:('T -> 'U) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U>
+
+ /// Returns a new event that listens to the original event and triggers the resulting
+ /// event only when the argument to the event passes the given function.
+ /// The function to determine which triggers from the event to propagate.
+ /// The input event.
+ /// An event that only passes values that pass the predicate.
+ []
+ val filter: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'T>
+
+ /// Returns a new event that listens to the original event and triggers the
+ /// first resulting event if the application of the predicate to the event arguments
+ /// returned true, and the second event if it returned false.
+ /// The function to determine which output event to trigger.
+ /// The input event.
+ /// A tuple of events. The first is triggered when the predicate evaluates to true
+ /// and the second when the predicate evaluates to false.
+ []
+ val partition: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'T> * IEvent<'T>)
+
+ /// Returns a new event that listens to the original event and triggers the
+ /// first resulting event if the application of the function to the event arguments
+ /// returned a Choice1Of2, and the second event if it returns a Choice2Of2.
+ /// The function to transform event values into one of two types.
+ /// The input event.
+ /// A tuple of events. The first fires whenever splitter evaluates to Choice1of1 and
+ /// the second fires whenever splitter evaluates to Choice2of2.
+ []
+ val split: splitter:('T -> Choice<'U1,'U2>) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'U1> * IEvent<'U2>)
+
+ /// Returns a new event which fires on a selection of messages from the original event.
+ /// The selection function takes an original message to an optional new message.
+ /// The function to select and transform event values to pass on.
+ /// The input event.
+ /// An event that fires only when the chooser returns Some.
+ []
+ val choose: chooser:('T -> 'U option) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U>
+
+ []
+ /// Returns a new event consisting of the results of applying the given accumulating function
+ /// to successive values triggered on the input event. An item of internal state
+ /// records the current value of the state parameter. The internal state is not locked during the
+ /// execution of the accumulation function, so care should be taken that the
+ /// input IEvent not triggered by multiple threads simultaneously.
+ /// The function to update the state with each event value.
+ /// The initial state.
+ /// The input event.
+ /// An event that fires on the updated state values.
+ val scan: collector:('U -> 'T -> 'U) -> state:'U -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U>
+
+ /// Runs the given function each time the given event is triggered.
+ /// The function to call when the event is triggered.
+ /// The input event.
+ []
+ val add : callback:('T -> unit) -> sourceEvent:IEvent<'Del,'T> -> unit
+
+ /// Returns a new event that triggers on the second and subsequent triggerings of the input event.
+ /// The Nth triggering of the input event passes the arguments from the N-1th and Nth triggering as
+ /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the
+ /// Nth triggering occurs.
+ /// The input event.
+ /// An event that triggers on pairs of consecutive values passed from the source event.
+ []
+ val pairwise: sourceEvent:IEvent<'Del,'T> -> IEvent<'T * 'T>
+
+
diff --git a/src/fsharp/FSharp.Core/mailbox.fs b/src/fsharp/FSharp.Core/mailbox.fs
new file mode 100644
index 00000000000..c83bc49e5fb
--- /dev/null
+++ b/src/fsharp/FSharp.Core/mailbox.fs
@@ -0,0 +1,481 @@
+// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
+
+namespace Microsoft.FSharp.Control
+
+ open System
+ open System.Threading
+ open Microsoft.FSharp.Core
+ open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
+ open Microsoft.FSharp.Control
+ open Microsoft.FSharp.Control.AsyncImpl
+ open Microsoft.FSharp.Control.AsyncPrimitives
+ open Microsoft.FSharp.Collections
+
+ /// We use our own internal implementation of queues to avoid a dependency on System.dll
+ type Queue<'T>() =
+
+ let mutable array = [| |]
+ let mutable head = 0
+ let mutable size = 0
+ let mutable tail = 0
+
+ let SetCapacity(capacity) =
+ let destinationArray = Array.zeroCreate capacity
+ if (size > 0) then
+ if (head < tail) then
+ Array.Copy(array, head, destinationArray, 0, size)
+
+ else
+ Array.Copy(array, head, destinationArray, 0, array.Length - head)
+ Array.Copy(array, 0, destinationArray, array.Length - head, tail)
+ array <- destinationArray
+ head <- 0
+ tail <- if (size = capacity) then 0 else size
+
+ member x.Dequeue() =
+ if (size = 0) then
+ failwith "Dequeue"
+ let local = array.[head]
+ array.[head] <- Unchecked.defaultof<'T>
+ head <- (head + 1) % array.Length
+ size <- size - 1
+ local
+
+ member this.Enqueue(item) =
+ if (size = array.Length) then
+ let capacity = int ((int64 array.Length * 200L) / 100L)
+ let capacity = max capacity (array.Length + 4)
+ SetCapacity(capacity)
+ array.[tail] <- item
+ tail <- (tail + 1) % array.Length
+ size <- size + 1
+
+ member x.Count = size
+
+
+ module AsyncHelpers =
+
+ let awaitEither a1 a2 =
+ async {
+ let resultCell = new ResultCell<_>()
+ let! cancellationToken = Async.CancellationToken
+ let start a f =
+ Async.StartWithContinuationsUsingDispatchInfo(a,
+ (fun res -> resultCell.RegisterResult(f res |> AsyncResult.Ok, reuseThread=false) |> ignore),
+ (fun edi -> resultCell.RegisterResult(edi |> AsyncResult.Error, reuseThread=false) |> ignore),
+ (fun oce -> resultCell.RegisterResult(oce |> AsyncResult.Canceled, reuseThread=false) |> ignore),
+ cancellationToken = cancellationToken
+ )
+ start a1 Choice1Of2
+ start a2 Choice2Of2
+ // Note: It is ok to use "NoDirectCancel" here because the started computations use the same
+ // cancellation token and will register a cancelled result if cancellation occurs.
+ // Note: It is ok to use "NoDirectTimeout" here because there is no specific timeout log to this routine.
+ let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout
+ return! CreateAsyncResultAsync result
+ }
+
+ let timeout msec cancellationToken =
+ assert (msec >= 0)
+ let resultCell = new ResultCell<_>()
+ Async.StartWithContinuations(
+ computation=Async.Sleep(msec),
+ continuation=(fun () -> resultCell.RegisterResult((), reuseThread = false) |> ignore),
+ exceptionContinuation=ignore,
+ cancellationContinuation=ignore,
+ cancellationToken = cancellationToken)
+ // Note: It is ok to use "NoDirectCancel" here because the started computations use the same
+ // cancellation token and will register a cancelled result if cancellation occurs.
+ // Note: It is ok to use "NoDirectTimeout" here because the child compuation above looks after the timeout.
+ resultCell.AwaitResult_NoDirectCancelOrTimeout
+
+ []
+ []
+ type Mailbox<'Msg>(cancellationSupported: bool) =
+ let mutable inboxStore = null
+ let mutable arrivals = new Queue<'Msg>()
+ let syncRoot = arrivals
+
+ // Control elements indicating the state of the reader. When the reader is "blocked" at an
+ // asynchronous receive, either
+ // -- "cont" is non-null and the reader is "activated" by re-scheduling cont in the thread pool; or
+ // -- "pulse" is non-null and the reader is "activated" by setting this event
+ let mutable savedCont : (bool -> AsyncReturn) option = None
+
+ // Readers who have a timeout use this event
+ let mutable pulse : AutoResetEvent = null
+
+ // Make sure that the "pulse" value is created
+ let ensurePulse() =
+ match pulse with
+ | null ->
+ pulse <- new AutoResetEvent(false);
+ | _ ->
+ ()
+ pulse
+
+ let waitOneNoTimeoutOrCancellation =
+ MakeAsync (fun ctxt ->
+ match savedCont with
+ | None ->
+ let descheduled =
+ // An arrival may have happened while we're preparing to deschedule
+ lock syncRoot (fun () ->
+ if arrivals.Count = 0 then
+ // OK, no arrival so deschedule
+ savedCont <- Some(fun res -> ctxt.QueueContinuationWithTrampoline(res))
+ true
+ else
+ false)
+ if descheduled then
+ Unchecked.defaultof<_>
+ else
+ // If we didn't deschedule then run the continuation immediately
+ ctxt.CallContinuation true
+ | Some _ ->
+ failwith "multiple waiting reader continuations for mailbox")
+
+ let waitOneWithCancellation timeout =
+ Async.AwaitWaitHandle(ensurePulse(), millisecondsTimeout=timeout)
+
+ let waitOne timeout =
+ if timeout < 0 && not cancellationSupported then
+ waitOneNoTimeoutOrCancellation
+ else
+ waitOneWithCancellation(timeout)
+
+ member __.inbox =
+ match inboxStore with
+ | null -> inboxStore <- new System.Collections.Generic.List<'Msg>(1)
+ | _ -> ()
+ inboxStore
+
+ member x.CurrentQueueLength =
+ lock syncRoot (fun () -> x.inbox.Count + arrivals.Count)
+
+ member x.ScanArrivalsUnsafe(f) =
+ if arrivals.Count = 0 then
+ None
+ else
+ let msg = arrivals.Dequeue()
+ match f msg with
+ | None ->
+ x.inbox.Add(msg)
+ x.ScanArrivalsUnsafe(f)
+ | res -> res
+
+ // Lock the arrivals queue while we scan that
+ member x.ScanArrivals(f) =
+ lock syncRoot (fun () -> x.ScanArrivalsUnsafe(f))
+
+ member x.ScanInbox(f,n) =
+ match inboxStore with
+ | null -> None
+ | inbox ->
+ if n >= inbox.Count
+ then None
+ else
+ let msg = inbox.[n]
+ match f msg with
+ | None -> x.ScanInbox (f,n+1)
+ | res -> inbox.RemoveAt(n); res
+
+ member x.ReceiveFromArrivalsUnsafe() =
+ if arrivals.Count = 0 then
+ None
+ else
+ Some(arrivals.Dequeue())
+
+ member x.ReceiveFromArrivals() =
+ lock syncRoot (fun () -> x.ReceiveFromArrivalsUnsafe())
+
+ member x.ReceiveFromInbox() =
+ match inboxStore with
+ | null -> None
+ | inbox ->
+ if inbox.Count = 0 then
+ None
+ else
+ let x = inbox.[0]
+ inbox.RemoveAt(0)
+ Some(x)
+
+ member x.Post(msg) =
+ lock syncRoot (fun () ->
+
+ // Add the message to the arrivals queue
+ arrivals.Enqueue(msg)
+
+ // Cooperatively unblock any waiting reader. If there is no waiting
+ // reader we just leave the message in the incoming queue
+ match savedCont with
+ | None ->
+ match pulse with
+ | null ->
+ () // no one waiting, leaving the message in the queue is sufficient
+ | ev ->
+ // someone is waiting on the wait handle
+ ev.Set() |> ignore
+
+ | Some action ->
+ savedCont <- None
+ action true |> ignore)
+
+ member x.TryScan ((f: 'Msg -> (Async<'T>) option), timeout) : Async<'T option> =
+ let rec scan timeoutAsync (timeoutCts:CancellationTokenSource) =
+ async {
+ match x.ScanArrivals(f) with
+ | None ->
+ // Deschedule and wait for a message. When it comes, rescan the arrivals
+ let! ok = AsyncHelpers.awaitEither waitOneNoTimeoutOrCancellation timeoutAsync
+ match ok with
+ | Choice1Of2 true ->
+ return! scan timeoutAsync timeoutCts
+ | Choice1Of2 false ->
+ return failwith "should not happen - waitOneNoTimeoutOrCancellation always returns true"
+ | Choice2Of2 () ->
+ lock syncRoot (fun () ->
+ // Cancel the outstanding wait for messages installed by waitOneWithCancellation
+ //
+ // HERE BE DRAGONS. This is bestowed on us because we only support
+ // a single mailbox reader at any one time.
+ // If awaitEither returned control because timeoutAsync has terminated, waitOneNoTimeoutOrCancellation
+ // might still be in-flight. In practical terms, it means that the push-to-async-result-cell
+ // continuation that awaitEither registered on it is still pending, i.e. it is still in savedCont.
+ // That continuation is a no-op now, but it is still a registered reader for arriving messages.
+ // Therefore we just abandon it - a brutal way of canceling.
+ // This ugly non-compositionality is only needed because we only support a single mailbox reader
+ // (i.e. the user is not allowed to run several Receive/TryReceive/Scan/TryScan in parallel) - otherwise
+ // we would just have an extra no-op reader in the queue.
+ savedCont <- None)
+
+ return None
+ | Some resP ->
+ timeoutCts.Cancel() // cancel the timeout watcher
+ let! res = resP
+ return Some res
+ }
+ let rec scanNoTimeout () =
+ async {
+ match x.ScanArrivals(f) with
+ | None ->
+ let! ok = waitOne(Timeout.Infinite)
+ if ok then
+ return! scanNoTimeout()
+ else
+ return (failwith "Timed out with infinite timeout??")
+ | Some resP ->
+ let! res = resP
+ return Some res
+ }
+
+ // Look in the inbox first
+ async {
+ match x.ScanInbox(f,0) with
+ | None when timeout < 0 ->
+ return! scanNoTimeout()
+ | None ->
+ let! cancellationToken = Async.CancellationToken
+ let timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, CancellationToken.None)
+ let timeoutAsync = AsyncHelpers.timeout timeout timeoutCts.Token
+ return! scan timeoutAsync timeoutCts
+ | Some resP ->
+ let! res = resP
+ return Some res
+ }
+
+ member x.Scan((f: 'Msg -> (Async<'T>) option), timeout) =
+ async {
+ let! resOpt = x.TryScan(f,timeout)
+ match resOpt with
+ | None -> return raise(TimeoutException(SR.GetString(SR.mailboxScanTimedOut)))
+ | Some res -> return res
+ }
+
+ member x.TryReceive(timeout) =
+ let rec processFirstArrival() =
+ async {
+ match x.ReceiveFromArrivals() with
+ | None ->
+ // Make sure the pulse is created if it is going to be needed.
+ // If it isn't, then create it, and go back to the start to
+ // check arrivals again.
+ match pulse with
+ | null when timeout >= 0 || cancellationSupported ->
+ ensurePulse() |> ignore
+ return! processFirstArrival()
+ | _ ->
+ // Wait until we have been notified about a message. When that happens, rescan the arrivals
+ let! ok = waitOne(timeout)
+ if ok then
+ return! processFirstArrival()
+ else
+ return None
+ | res -> return res
+ }
+
+ // look in the inbox first
+ async {
+ match x.ReceiveFromInbox() with
+ | None -> return! processFirstArrival()
+ | res -> return res
+ }
+
+ member x.Receive(timeout) =
+
+ let rec processFirstArrival() =
+ async {
+ match x.ReceiveFromArrivals() with
+ | None ->
+ // Make sure the pulse is created if it is going to be needed.
+ // If it isn't, then create it, and go back to the start to
+ // check arrivals again.
+ match pulse with
+ | null when timeout >= 0 || cancellationSupported ->
+ ensurePulse() |> ignore
+ return! processFirstArrival()
+ | _ ->
+ // Wait until we have been notified about a message. When that happens, rescan the arrivals
+ let! ok = waitOne(timeout)
+ if ok then
+ return! processFirstArrival()
+ else
+ return raise(TimeoutException(SR.GetString(SR.mailboxReceiveTimedOut)))
+ | Some res -> return res
+ }
+
+ // look in the inbox first
+ async {
+ match x.ReceiveFromInbox() with
+ | None -> return! processFirstArrival()
+ | Some res -> return res
+ }
+
+ interface System.IDisposable with
+ member __.Dispose() =
+ if isNotNull pulse then (pulse :> IDisposable).Dispose()
+
+#if DEBUG
+ member x.UnsafeContents =
+ (x.inbox,arrivals,pulse,savedCont) |> box
+#endif
+
+
+ []
+ []
+ type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) =
+ member x.Reply(value) = replyf(value)
+
+ []
+ []
+ []
+ type MailboxProcessor<'Msg>(body, ?cancellationToken) =
+
+ let cancellationSupported = cancellationToken.IsSome
+ let cancellationToken = defaultArg cancellationToken Async.DefaultCancellationToken
+ let mailbox = new Mailbox<'Msg>(cancellationSupported)
+ let mutable defaultTimeout = Threading.Timeout.Infinite
+ let mutable started = false
+ let errorEvent = new Event()
+
+ member __.CurrentQueueLength = mailbox.CurrentQueueLength // nb. unprotected access gives an approximation of the queue length
+
+ member __.DefaultTimeout
+ with get() = defaultTimeout
+ and set(v) = defaultTimeout <- v
+
+ []
+ member __.Error = errorEvent.Publish
+
+#if DEBUG
+ member __.UnsafeMessageQueueContents = mailbox.UnsafeContents
+#endif
+
+ member x.Start() =
+ if started then
+ raise (new InvalidOperationException(SR.GetString(SR.mailboxProcessorAlreadyStarted)))
+ else
+ started <- true
+
+ // Protect the execution and send errors to the event.
+ // Note that exception stack traces are lost in this design - in an extended design
+ // the event could propagate an ExceptionDispatchInfo instead of an Exception.
+ let p =
+ async { try
+ do! body x
+ with exn ->
+ errorEvent.Trigger exn }
+
+ Async.Start(computation=p, cancellationToken=cancellationToken)
+
+ member __.Post(message) = mailbox.Post(message)
+
+ member __.TryPostAndReply(buildMessage : (_ -> 'Msg), ?timeout) : 'Reply option =
+ let timeout = defaultArg timeout defaultTimeout
+ use resultCell = new ResultCell<_>()
+ let msg = buildMessage (new AsyncReplyChannel<_>(fun reply ->
+ // Note the ResultCell may have been disposed if the operation
+ // timed out. In this case RegisterResult drops the result on the floor.
+ resultCell.RegisterResult(reply,reuseThread=false) |> ignore))
+ mailbox.Post(msg)
+ resultCell.TryWaitForResultSynchronously(timeout=timeout)
+
+ member x.PostAndReply(buildMessage, ?timeout) : 'Reply =
+ match x.TryPostAndReply(buildMessage,?timeout=timeout) with
+ | None -> raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndReplyTimedOut)))
+ | Some res -> res
+
+ member __.PostAndTryAsyncReply(buildMessage, ?timeout) : Async<'Reply option> =
+ let timeout = defaultArg timeout defaultTimeout
+ let resultCell = new ResultCell<_>()
+ let msg = buildMessage (new AsyncReplyChannel<_>(fun reply ->
+ // Note the ResultCell may have been disposed if the operation
+ // timed out. In this case RegisterResult drops the result on the floor.
+ resultCell.RegisterResult(reply, reuseThread=false) |> ignore))
+ mailbox.Post(msg)
+ match timeout with
+ | Threading.Timeout.Infinite when not cancellationSupported ->
+ async { let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout
+ return Some result }
+
+ | _ ->
+ async { use _disposeCell = resultCell
+ let! ok = Async.AwaitWaitHandle(resultCell.GetWaitHandle(), millisecondsTimeout=timeout)
+ let res = (if ok then Some(resultCell.GrabResult()) else None)
+ return res }
+
+ member x.PostAndAsyncReply(buildMessage, ?timeout:int) =
+ let timeout = defaultArg timeout defaultTimeout
+ match timeout with
+ | Threading.Timeout.Infinite when not cancellationSupported ->
+ // Nothing to dispose, no wait handles used
+ let resultCell = new ResultCell<_>()
+ let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> ignore))
+ mailbox.Post(msg)
+ resultCell.AwaitResult_NoDirectCancelOrTimeout
+ | _ ->
+ let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout)
+ async { let! res = asyncReply
+ match res with
+ | None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut)))
+ | Some res -> return res }
+
+ member __.Receive(?timeout) =
+ mailbox.Receive(timeout=defaultArg timeout defaultTimeout)
+
+ member __.TryReceive(?timeout) =
+ mailbox.TryReceive(timeout=defaultArg timeout defaultTimeout)
+
+ member __.Scan(scanner: 'Msg -> (Async<'T>) option,?timeout) =
+ mailbox.Scan(scanner,timeout=defaultArg timeout defaultTimeout)
+
+ member __.TryScan(scanner: 'Msg -> (Async<'T>) option,?timeout) =
+ mailbox.TryScan(scanner,timeout=defaultArg timeout defaultTimeout)
+
+ interface System.IDisposable with
+ member __.Dispose() = (mailbox :> IDisposable).Dispose()
+
+ static member Start(body,?cancellationToken) =
+ let mailboxProcessor = new MailboxProcessor<'Msg>(body,?cancellationToken=cancellationToken)
+ mailboxProcessor.Start()
+ mailboxProcessor
diff --git a/src/fsharp/FSharp.Core/mailbox.fsi b/src/fsharp/FSharp.Core/mailbox.fsi
new file mode 100644
index 00000000000..6d818299e06
--- /dev/null
+++ b/src/fsharp/FSharp.Core/mailbox.fsi
@@ -0,0 +1,162 @@
+// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
+
+namespace Microsoft.FSharp.Control
+
+ open System.Threading
+
+ open Microsoft.FSharp.Core
+ open Microsoft.FSharp.Control
+
+ []
+ /// A handle to a capability to reply to a PostAndReply message.
+ type AsyncReplyChannel<'Reply> =
+ /// Sends a reply to a PostAndReply message.
+ /// The value to send.
+ member Reply : value:'Reply -> unit
+
+ /// A message-processing agent which executes an asynchronous computation.
+ ///
+ /// The agent encapsulates a message queue that supports multiple-writers and
+ /// a single reader agent. Writers send messages to the agent by using the Post
+ /// method and its variations.
+ ///
+ /// The agent may wait for messages using the Receive or TryReceive methods or
+ /// scan through all available messages using the Scan or TryScan method.
+ []
+ type MailboxProcessor<'Msg> =
+
+ /// Creates an agent. The body function is used to generate the asynchronous
+ /// computation executed by the agent. This function is not executed until
+ /// Start is called.
+ /// The function to produce an asynchronous computation that will be executed
+ /// as the read loop for the MailboxProcessor when Start is called.
+ /// An optional cancellation token for the body.
+ /// Defaults to Async.DefaultCancellationToken.
+ /// The created MailboxProcessor.
+ new : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>
+
+ /// Creates and starts an agent. The body function is used to generate the asynchronous
+ /// computation executed by the agent.
+ /// The function to produce an asynchronous computation that will be executed
+ /// as the read loop for the MailboxProcessor when Start is called.
+ /// An optional cancellation token for the body.
+ /// Defaults to Async.DefaultCancellationToken.
+ /// The created MailboxProcessor.
+ static member Start : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>
+
+ /// Posts a message to the message queue of the MailboxProcessor, asynchronously.
+ /// The message to post.
+ member Post : message:'Msg -> unit
+
+ /// Posts a message to an agent and await a reply on the channel, synchronously.
+ ///
+ /// The message is generated by applying buildMessage to a new reply channel
+ /// to be incorporated into the message. The receiving agent must process this
+ /// message and invoke the Reply method on this reply channel precisely once.
+ /// The function to incorporate the AsyncReplyChannel into
+ /// the message to be sent.
+ /// An optional timeout parameter (in milliseconds) to wait for a reply message.
+ /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
+ /// The reply from the agent.
+ member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply
+
+ /// Posts a message to an agent and await a reply on the channel, asynchronously.
+ ///
+ /// The message is generated by applying buildMessage to a new reply channel
+ /// to be incorporated into the message. The receiving agent must process this
+ /// message and invoke the Reply method on this reply channel precisely once.
+ /// The function to incorporate the AsyncReplyChannel into
+ /// the message to be sent.
+ /// An optional timeout parameter (in milliseconds) to wait for a reply message.
+ /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
+ /// An asynchronous computation that will wait for the reply from the agent.
+ member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply>
+
+ /// Like PostAndReply, but returns None if no reply within the timeout period.
+ /// The function to incorporate the AsyncReplyChannel into
+ /// the message to be sent.
+ /// An optional timeout parameter (in milliseconds) to wait for a reply message.
+ /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
+ /// The reply from the agent or None if the timeout expires.
+ member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply option
+
+ /// Like AsyncPostAndReply, but returns None if no reply within the timeout period.
+ /// The function to incorporate the AsyncReplyChannel into
+ /// the message to be sent.
+ /// An optional timeout parameter (in milliseconds) to wait for a reply message.
+ /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite.
+ /// An asynchronous computation that will return the reply or None if the timeout expires.
+ member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply option>
+
+ /// Waits for a message. This will consume the first message in arrival order.
+ ///
+ /// This method is for use within the body of the agent.
+ ///
+ /// This method is for use within the body of the agent. For each agent, at most
+ /// one concurrent reader may be active, so no more than one concurrent call to
+ /// Receive, TryReceive, Scan and/or TryScan may be active.
+ /// An optional timeout in milliseconds. Defaults to -1 which corresponds
+ /// to System.Threading.Timeout.Infinite.
+ /// An asynchronous computation that returns the received message.
+ /// Thrown when the timeout is exceeded.
+ member Receive : ?timeout:int -> Async<'Msg>
+
+ /// Waits for a message. This will consume the first message in arrival order.
+ ///
+ /// This method is for use within the body of the agent.
+ ///
+ /// Returns None if a timeout is given and the timeout is exceeded.
+ ///
+ /// This method is for use within the body of the agent. For each agent, at most
+ /// one concurrent reader may be active, so no more than one concurrent call to
+ /// Receive, TryReceive, Scan and/or TryScan may be active.
+ /// An optional timeout in milliseconds. Defaults to -1 which
+ /// corresponds to System.Threading.Timeout.Infinite.
+ /// An asynchronous computation that returns the received message or
+ /// None if the timeout is exceeded.
+ member TryReceive : ?timeout:int -> Async<'Msg option>
+
+ /// Scans for a message by looking through messages in arrival order until scanner
+ /// returns a Some value. Other messages remain in the queue.
+ ///
+ /// Returns None if a timeout is given and the timeout is exceeded.
+ ///
+ /// This method is for use within the body of the agent. For each agent, at most
+ /// one concurrent reader may be active, so no more than one concurrent call to
+ /// Receive, TryReceive, Scan and/or TryScan may be active.
+ /// The function to return None if the message is to be skipped
+ /// or Some if the message is to be processed and removed from the queue.
+ /// An optional timeout in milliseconds. Defaults to -1 which corresponds
+ /// to System.Threading.Timeout.Infinite.
+ /// An asynchronous computation that scanner built off the read message.
+ /// Thrown when the timeout is exceeded.
+ member Scan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T>
+
+ /// Scans for a message by looking through messages in arrival order until scanner
+ /// returns a Some value. Other messages remain in the queue.
+ ///
+ /// This method is for use within the body of the agent. For each agent, at most
+ /// one concurrent reader may be active, so no more than one concurrent call to
+ /// Receive, TryReceive, Scan and/or TryScan may be active.
+ /// The function to return None if the message is to be skipped
+ /// or Some if the message is to be processed and removed from the queue.
+ /// An optional timeout in milliseconds. Defaults to -1 which corresponds
+ /// to System.Threading.Timeout.Infinite.
+ /// An asynchronous computation that scanner built off the read message.
+ member TryScan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T option>
+
+ /// Starts the agent.
+ member Start : unit -> unit
+
+ /// Raises a timeout exception if a message not received in this amount of time. By default
+ /// no timeout is used.
+ member DefaultTimeout : int with get, set
+
+ /// Occurs when the execution of the agent results in an exception.
+ []
+ member Error : IEvent
+
+ interface System.IDisposable
+
+ /// Returns the number of unprocessed messages in the message queue of the agent.
+ member CurrentQueueLength : int
diff --git a/src/fsharp/FSharp.Core/observable.fs b/src/fsharp/FSharp.Core/observable.fs
new file mode 100644
index 00000000000..75ef1d081b1
--- /dev/null
+++ b/src/fsharp/FSharp.Core/observable.fs
@@ -0,0 +1,177 @@
+// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
+
+namespace Microsoft.FSharp.Control
+
+ open System
+ open Microsoft.FSharp.Core
+ open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
+ open Microsoft.FSharp.Control
+
+ []
+ []
+ module Observable =
+
+ let inline protect f succeed fail =
+ match (try Choice1Of2 (f ()) with e -> Choice2Of2 e) with
+ | Choice1Of2 x -> (succeed x)
+ | Choice2Of2 e -> (fail e)
+
+ []
+ type BasicObserver<'T>() =
+
+ let mutable stopped = false
+
+ abstract Next : value : 'T -> unit
+
+ abstract Error : error : exn -> unit
+
+ abstract Completed : unit -> unit
+
+ interface IObserver<'T> with
+
+ member x.OnNext value =
+ if not stopped then
+ x.Next value
+
+ member x.OnError e =
+ if not stopped then
+ stopped <- true
+ x.Error e
+
+ member x.OnCompleted () =
+ if not stopped then
+ stopped <- true
+ x.Completed ()
+
+ []
+ let map mapping (source: IObservable<'T>) =
+ { new IObservable<'U> with
+ member x.Subscribe(observer) =
+ source.Subscribe
+ { new BasicObserver<'T>() with
+
+ member x.Next(v) =
+ protect (fun () -> mapping v) observer.OnNext observer.OnError
+
+ member x.Error(e) = observer.OnError(e)
+
+ member x.Completed() = observer.OnCompleted() } }
+
+ []
+ let choose chooser (source: IObservable<'T>) =
+ { new IObservable<'U> with
+ member x.Subscribe(observer) =
+ source.Subscribe
+ { new BasicObserver<'T>() with
+
+ member x.Next(v) =
+ protect (fun () -> chooser v) (function None -> () | Some v2 -> observer.OnNext v2) observer.OnError
+
+ member x.Error(e) = observer.OnError(e)
+
+ member x.Completed() = observer.OnCompleted() } }
+
+ []
+ let filter predicate (source: IObservable<'T>) =
+ choose (fun x -> if predicate x then Some x else None) source
+
+ []
+ let partition predicate (source: IObservable<'T>) =
+ filter predicate source, filter (predicate >> not) source
+
+ []
+ let scan collector state (source: IObservable<'T>) =
+ { new IObservable<'U> with
+ member x.Subscribe(observer) =
+ let mutable state = state
+ source.Subscribe
+ { new BasicObserver<'T>() with
+
+ member x.Next(v) =
+ let z = state
+ protect (fun () -> collector z v) (fun z ->
+ state <- z
+ observer.OnNext z) observer.OnError
+
+ member x.Error(e) = observer.OnError(e)
+
+ member x.Completed() = observer.OnCompleted() } }
+
+ []
+ let add callback (source: IObservable<'T>) = source.Add(callback)
+
+ []
+ let subscribe (callback: 'T -> unit) (source: IObservable<'T>) = source.Subscribe(callback)
+
+ []
+ let pairwise (source : IObservable<'T>) : IObservable<'T * 'T> =
+ { new IObservable<_> with
+ member x.Subscribe(observer) =
+ let mutable lastArgs = None
+ source.Subscribe
+ { new BasicObserver<'T>() with
+
+ member x.Next(args2) =
+ match lastArgs with
+ | None -> ()
+ | Some args1 -> observer.OnNext (args1,args2)
+ lastArgs <- Some args2
+
+ member x.Error(e) = observer.OnError(e)
+
+ member x.Completed() = observer.OnCompleted() } }
+
+ []
+ let merge (source1: IObservable<'T>) (source2: IObservable<'T>) =
+ { new IObservable<_> with
+ member x.Subscribe(observer) =
+ let mutable stopped = false
+ let mutable completed1 = false
+ let mutable completed2 = false
+ let h1 =
+ source1.Subscribe
+ { new IObserver<'T> with
+ member x.OnNext(v) =
+ if not stopped then
+ observer.OnNext v
+
+ member x.OnError(e) =
+ if not stopped then
+ stopped <- true
+ observer.OnError(e)
+
+ member x.OnCompleted() =
+ if not stopped then
+ completed1 <- true
+ if completed1 && completed2 then
+ stopped <- true
+ observer.OnCompleted() }
+ let h2 =
+ source2.Subscribe
+ { new IObserver<'T> with
+ member x.OnNext(v) =
+ if not stopped then
+ observer.OnNext v
+
+ member x.OnError(e) =
+ if not stopped then
+ stopped <- true
+ observer.OnError(e)
+
+ member x.OnCompleted() =
+ if not stopped then
+ completed2 <- true
+ if completed1 && completed2 then
+ stopped <- true
+ observer.OnCompleted() }
+
+ { new IDisposable with
+ member x.Dispose() =
+ h1.Dispose()
+ h2.Dispose() } }
+
+ []
+ let split (splitter : 'T -> Choice<'U1,'U2>) (source: IObservable<'T>) =
+ choose (fun v -> match splitter v with Choice1Of2 x -> Some x | _ -> None) source,
+ choose (fun v -> match splitter v with Choice2Of2 x -> Some x | _ -> None) source
+
diff --git a/src/fsharp/FSharp.Core/observable.fsi b/src/fsharp/FSharp.Core/observable.fsi
new file mode 100644
index 00000000000..36cbee5700d
--- /dev/null
+++ b/src/fsharp/FSharp.Core/observable.fsi
@@ -0,0 +1,131 @@
+// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
+
+namespace Microsoft.FSharp.Control
+
+ open System
+ open Microsoft.FSharp.Core
+
+ []
+ []
+ /// Basic operations on first class event and other observable objects.
+ module Observable =
+
+ /// Returns an observable for the merged observations from the sources.
+ /// The returned object propagates success and error values arising
+ /// from either source and completes when both the sources have completed.
+ ///
+ /// For each observer, the registered intermediate observing object is not
+ /// thread safe. That is, observations arising from the sources must not
+ /// be triggered concurrently on different threads.
+ /// The first Observable.
+ /// The second Observable.
+ /// An Observable that propagates information from both sources.
+ []
+ val merge: source1:IObservable<'T> -> source2:IObservable<'T> -> IObservable<'T>
+
+ /// Returns an observable which transforms the observations of the source by the
+ /// given function. The transformation function is executed once for each
+ /// subscribed observer. The returned object also propagates error observations
+ /// arising from the source and completes when the source completes.
+ /// The function applied to observations from the source.
+ /// The input Observable.
+ /// An Observable of the type specified by mapping.
+ []
+ val map: mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U>
+
+ /// Returns an observable which filters the observations of the source
+ /// by the given function. The observable will see only those observations
+ /// for which the predicate returns true. The predicate is executed once for
+ /// each subscribed observer. The returned object also propagates error
+ /// observations arising from the source and completes when the source completes.
+ /// The function to apply to observations to determine if it should
+ /// be kept.
+ /// The input Observable.
+ /// An Observable that filters observations based on filter.
+ []
+ val filter: predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T>
+
+ /// Returns two observables which partition the observations of the source by
+ /// the given function. The first will trigger observations for those values
+ /// for which the predicate returns true. The second will trigger observations
+ /// for those values where the predicate returns false. The predicate is
+ /// executed once for each subscribed observer. Both also propagate all error
+ /// observations arising from the source and each completes when the source
+ /// completes.
+ /// The function to determine which output Observable will trigger
+ /// a particular observation.
+ /// The input Observable.
+ /// A tuple of Observables. The first triggers when the predicate returns true, and
+ /// the second triggers when the predicate returns false.
+ []
+ val partition: predicate:('T -> bool) -> source:IObservable<'T> -> (IObservable<'T> * IObservable<'T>)
+
+ /// Returns two observables which split the observations of the source by the
+ /// given function. The first will trigger observations x for which the
+ /// splitter returns Choice1Of2 x. The second will trigger observations
+ /// y for which the splitter returns Choice2Of2 y The splitter is
+ /// executed once for each subscribed observer. Both also propagate error
+ /// observations arising from the source and each completes when the source
+ /// completes.
+ /// The function that takes an observation an transforms
+ /// it into one of the two output Choice types.
+ /// The input Observable.
+ /// A tuple of Observables. The first triggers when splitter returns Choice1of2
+ /// and the second triggers when splitter returns Choice2of2.
+ []
+ val split: splitter:('T -> Choice<'U1,'U2>) -> source:IObservable<'T> -> (IObservable<'U1> * IObservable<'U2>)
+
+ /// Returns an observable which chooses a projection of observations from the source
+ /// using the given function. The returned object will trigger observations x
+ /// for which the splitter returns Some x. The returned object also propagates
+ /// all errors arising from the source and completes when the source completes.
+ /// The function that returns Some for observations to be propagated
+ /// and None for observations to ignore.
+ /// The input Observable.
+ /// An Observable that only propagates some of the observations from the source.
+ []
+ val choose: chooser:('T -> 'U option) -> source:IObservable<'T> -> IObservable<'U>
+
+ /// Returns an observable which, for each observer, allocates an item of state
+ /// and applies the given accumulating function to successive values arising from
+ /// the input. The returned object will trigger observations for each computed
+ /// state value, excluding the initial value. The returned object propagates
+ /// all errors arising from the source and completes when the source completes.
+ ///
+ /// For each observer, the registered intermediate observing object is not thread safe.
+ /// That is, observations arising from the source must not be triggered concurrently
+ /// on different threads.
+ /// The function to update the state with each observation.
+ /// The initial state.
+ /// The input Observable.
+ /// An Observable that triggers on the updated state values.
+ []
+ val scan: collector:('U -> 'T -> 'U) -> state:'U -> source:IObservable<'T> -> IObservable<'U>
+
+ /// Creates an observer which permanently subscribes to the given observable and which calls
+ /// the given function for each observation.
+ /// The function to be called on each observation.
+ /// The input Observable.
+ []
+ val add : callback:('T -> unit) -> source:IObservable<'T> -> unit
+
+ /// Creates an observer which subscribes to the given observable and which calls
+ /// the given function for each observation.
+ /// The function to be called on each observation.
+ /// The input Observable.
+ /// An object that will remove the callback if disposed.
+ []
+ val subscribe : callback:('T -> unit) -> source:IObservable<'T> -> System.IDisposable
+
+ /// Returns a new observable that triggers on the second and subsequent triggerings of the input observable.
+ /// The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as
+ /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the
+ /// Nth triggering occurs.
+ ///
+ /// For each observer, the registered intermediate observing object is not thread safe.
+ /// That is, observations arising from the source must not be triggered concurrently
+ /// on different threads.
+ /// The input Observable.
+ /// An Observable that triggers on successive pairs of observations from the input Observable.
+ []
+ val pairwise: source:IObservable<'T> -> IObservable<'T * 'T>