From d792a0723c85f158a72d2459610548e06051cf95 Mon Sep 17 00:00:00 2001 From: Don Syme Date: Sat, 12 May 2018 14:53:03 +0100 Subject: [PATCH 1/2] separate control.fs into multiple files --- .../FSharp.Core/FSharp.Core.fsproj | 38 +- src/fsharp/FSharp.Core/FSharp.Core.fsproj | 38 +- .../FSharp.Core/{control.fs => async.fs} | 702 +----------------- .../FSharp.Core/{control.fsi => async.fsi} | 417 ++--------- src/fsharp/FSharp.Core/event.fs | 2 - src/fsharp/FSharp.Core/event.fsi | 2 - src/fsharp/FSharp.Core/eventmodule.fs | 81 ++ src/fsharp/FSharp.Core/eventmodule.fsi | 89 +++ 8 files changed, 277 insertions(+), 1092 deletions(-) rename src/fsharp/FSharp.Core/{control.fs => async.fs} (72%) rename src/fsharp/FSharp.Core/{control.fsi => async.fsi} (64%) create mode 100644 src/fsharp/FSharp.Core/eventmodule.fs create mode 100644 src/fsharp/FSharp.Core/eventmodule.fsi diff --git a/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj b/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj index b807b3a0be5..29030516ebd 100644 --- a/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj +++ b/src/buildfromsource/FSharp.Core/FSharp.Core.fsproj @@ -121,12 +121,6 @@ Reflection/reflect.fs - - Event/event.fsi - - - Event/event.fs - Numerics/n.fsi @@ -163,11 +157,35 @@ NativeInterop/nativeptr.fs - - Async/control.fsi + + Control/event.fsi + + + Control/event.fs + + + Control/async.fsi + + + Control/async.fs + + + Control/eventmodule.fsi + + + Control/eventmodule.fs + + + Control/observable.fsi + + + Control/observable.fs + + + MailboxProcessor/mailbox.fsi - - Async/control.fs + + MailboxProcessor/mailbox.fs Queries/Linq.fsi diff --git a/src/fsharp/FSharp.Core/FSharp.Core.fsproj b/src/fsharp/FSharp.Core/FSharp.Core.fsproj index 1da4c6da6d6..0326640bf6e 100644 --- a/src/fsharp/FSharp.Core/FSharp.Core.fsproj +++ b/src/fsharp/FSharp.Core/FSharp.Core.fsproj @@ -149,12 +149,6 @@ Reflection/reflect.fs - - Event/event.fsi - - - Event/event.fs - Numerics/n.fsi @@ -191,11 +185,35 @@ NativeInterop/nativeptr.fs - - Async/control.fsi + + Control/event.fsi + + + Control/event.fs + + + Control/async.fsi + + + Control/async.fs + + + Control/eventmodule.fsi + + + Control/eventmodule.fs + + + Control/observable.fsi + + + Control/observable.fs + + + MailboxProcessor/mailbox.fsi - - Async/control.fs + + MailboxProcessor/mailbox.fs Queries/Linq.fsi diff --git a/src/fsharp/FSharp.Core/control.fs b/src/fsharp/FSharp.Core/async.fs similarity index 72% rename from src/fsharp/FSharp.Core/control.fs rename to src/fsharp/FSharp.Core/async.fs index 4058d38b927..25c2e3937a5 100644 --- a/src/fsharp/FSharp.Core/control.fs +++ b/src/fsharp/FSharp.Core/async.fs @@ -3,16 +3,10 @@ namespace Microsoft.FSharp.Control #nowarn "40" - #nowarn "21" - #nowarn "47" #nowarn "52" // The value has been copied to ensure the original is not mutated by this operation - #nowarn "67" // This type test or downcast will always hold - #nowarn "864" // IObservable.Subscribe open System open System.Diagnostics - open System.Diagnostics.CodeAnalysis - open System.IO open System.Reflection open System.Runtime.CompilerServices open System.Runtime.ExceptionServices @@ -20,56 +14,9 @@ namespace Microsoft.FSharp.Control open System.Threading.Tasks open Microsoft.FSharp.Core open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators - open Microsoft.FSharp.Core.Operators open Microsoft.FSharp.Control open Microsoft.FSharp.Collections -#if FX_RESHAPED_REFLECTION - open ReflectionAdapters -#endif - - - /// We use our own internal implementation of queues to avoid a dependency on System.dll - type Queue<'T>() = //: IEnumerable, ICollection, IEnumerable - - let mutable array = [| |] - let mutable head = 0 - let mutable size = 0 - let mutable tail = 0 - - let SetCapacity(capacity) = - let destinationArray = Array.zeroCreate capacity - if (size > 0) then - if (head < tail) then - Array.Copy(array, head, destinationArray, 0, size) - - else - Array.Copy(array, head, destinationArray, 0, array.Length - head) - Array.Copy(array, 0, destinationArray, array.Length - head, tail) - array <- destinationArray - head <- 0 - tail <- if (size = capacity) then 0 else size - - member x.Dequeue() = - if (size = 0) then - failwith "Dequeue" - let local = array.[head] - array.[head] <- Unchecked.defaultof<'T> - head <- (head + 1) % array.Length - size <- size - 1 - local - - member this.Enqueue(item) = - if (size = array.Length) then - let capacity = int ((int64 array.Length * 200L) / 100L) - let capacity = max capacity (array.Length + 4) - SetCapacity(capacity) - array.[tail] <- item - tail <- (tail + 1) % array.Length - size <- size + 1 - - member x.Count = size - type LinkedSubSource(cancellationToken : CancellationToken) = let failureCTS = new CancellationTokenSource() @@ -286,6 +233,12 @@ namespace Microsoft.FSharp.Control member ctxt.CallExceptionContinuation edi = ctxt.aux.econt edi + member ctxt.QueueContinuationWithTrampoline (result: 'T) = + ctxt.aux.trampolineHolder.QueueWorkItemWithTrampoline(fun () -> ctxt.cont result) + + member ctxt.CallContinuation(result: 'T) = + ctxt.cont result + [] [] type Async<'T> = @@ -1714,646 +1667,3 @@ namespace Microsoft.FSharp.Control ) #endif - - open CommonExtensions - - module AsyncHelpers = - - let awaitEither a1 a2 = - async { - let resultCell = new ResultCell<_>() - let! cancellationToken = Async.CancellationToken - let start a f = - Async.StartWithContinuationsUsingDispatchInfo(a, - (fun res -> resultCell.RegisterResult(f res |> AsyncResult.Ok, reuseThread=false) |> unfake), - (fun edi -> resultCell.RegisterResult(edi |> AsyncResult.Error, reuseThread=false) |> unfake), - (fun oce -> resultCell.RegisterResult(oce |> AsyncResult.Canceled, reuseThread=false) |> unfake), - cancellationToken = cancellationToken - ) - start a1 Choice1Of2 - start a2 Choice2Of2 - // Note: It is ok to use "NoDirectCancel" here because the started computations use the same - // cancellation token and will register a cancelled result if cancellation occurs. - // Note: It is ok to use "NoDirectTimeout" here because there is no specific timeout log to this routine. - let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout - return! CreateAsyncResultAsync result - } - - let timeout msec cancellationToken = - if msec < 0 then - MakeAsync (fun _ -> FakeUnit) // "block" forever - else - let resultCell = new ResultCell<_>() - Async.StartWithContinuations( - computation=Async.Sleep(msec), - continuation=(fun () -> resultCell.RegisterResult((), reuseThread = false) |> unfake), - exceptionContinuation=ignore, - cancellationContinuation=ignore, - cancellationToken = cancellationToken) - // Note: It is ok to use "NoDirectCancel" here because the started computations use the same - // cancellation token and will register a cancelled result if cancellation occurs. - // Note: It is ok to use "NoDirectTimeout" here because the child compuation above looks after the timeout. - resultCell.AwaitResult_NoDirectCancelOrTimeout - - [] - [] - type Mailbox<'Msg>(cancellationSupported: bool) = - let mutable inboxStore = null - let mutable arrivals = new Queue<'Msg>() - let syncRoot = arrivals - - // Control elements indicating the state of the reader. When the reader is "blocked" at an - // asynchronous receive, either - // -- "cont" is non-null and the reader is "activated" by re-scheduling cont in the thread pool; or - // -- "pulse" is non-null and the reader is "activated" by setting this event - let mutable savedCont : ((bool -> AsyncReturn) * TrampolineHolder) option = None - - // Readers who have a timeout use this event - let mutable pulse : AutoResetEvent = null - - // Make sure that the "pulse" value is created - let ensurePulse() = - match pulse with - | null -> - pulse <- new AutoResetEvent(false); - | _ -> - () - pulse - - let waitOneNoTimeoutOrCancellation = - MakeAsync (fun ctxt -> - match savedCont with - | None -> - let descheduled = - // An arrival may have happened while we're preparing to deschedule - lock syncRoot (fun () -> - if arrivals.Count = 0 then - // OK, no arrival so deschedule - savedCont <- Some(ctxt.cont, ctxt.aux.trampolineHolder); - true - else - false) - if descheduled then - FakeUnit - else - // If we didn't deschedule then run the continuation immediately - ctxt.cont true - | Some _ -> - failwith "multiple waiting reader continuations for mailbox") - - let waitOneWithCancellation timeout = - Async.AwaitWaitHandle(ensurePulse(), millisecondsTimeout=timeout) - - let waitOne timeout = - if timeout < 0 && not cancellationSupported then - waitOneNoTimeoutOrCancellation - else - waitOneWithCancellation(timeout) - - member __.inbox = - match inboxStore with - | null -> inboxStore <- new System.Collections.Generic.List<'Msg>(1) - | _ -> () - inboxStore - - member x.CurrentQueueLength = - lock syncRoot (fun () -> x.inbox.Count + arrivals.Count) - - member x.ScanArrivalsUnsafe(f) = - if arrivals.Count = 0 then - None - else - let msg = arrivals.Dequeue() - match f msg with - | None -> - x.inbox.Add(msg) - x.ScanArrivalsUnsafe(f) - | res -> res - - // Lock the arrivals queue while we scan that - member x.ScanArrivals(f) = - lock syncRoot (fun () -> x.ScanArrivalsUnsafe(f)) - - member x.ScanInbox(f,n) = - match inboxStore with - | null -> None - | inbox -> - if n >= inbox.Count - then None - else - let msg = inbox.[n] - match f msg with - | None -> x.ScanInbox (f,n+1) - | res -> inbox.RemoveAt(n); res - - member x.ReceiveFromArrivalsUnsafe() = - if arrivals.Count = 0 then - None - else - Some(arrivals.Dequeue()) - - member x.ReceiveFromArrivals() = - lock syncRoot (fun () -> x.ReceiveFromArrivalsUnsafe()) - - member x.ReceiveFromInbox() = - match inboxStore with - | null -> None - | inbox -> - if inbox.Count = 0 then - None - else - let x = inbox.[0] - inbox.RemoveAt(0) - Some(x) - - member x.Post(msg) = - lock syncRoot (fun () -> - - // Add the message to the arrivals queue - arrivals.Enqueue(msg) - - // Cooperatively unblock any waiting reader. If there is no waiting - // reader we just leave the message in the incoming queue - match savedCont with - | None -> - match pulse with - | null -> - () // no one waiting, leaving the message in the queue is sufficient - | ev -> - // someone is waiting on the wait handle - ev.Set() |> ignore - - | Some (action, trampolineHolder) -> - savedCont <- None - trampolineHolder.QueueWorkItemWithTrampoline(fun () -> action true) |> unfake) - - member x.TryScan ((f: 'Msg -> (Async<'T>) option), timeout) : Async<'T option> = - let rec scan timeoutAsync (timeoutCts:CancellationTokenSource) = - async { - match x.ScanArrivals(f) with - | None -> - // Deschedule and wait for a message. When it comes, rescan the arrivals - let! ok = AsyncHelpers.awaitEither waitOneNoTimeoutOrCancellation timeoutAsync - match ok with - | Choice1Of2 true -> - return! scan timeoutAsync timeoutCts - | Choice1Of2 false -> - return failwith "should not happen - waitOneNoTimeoutOrCancellation always returns true" - | Choice2Of2 () -> - lock syncRoot (fun () -> - // Cancel the outstanding wait for messages installed by waitOneWithCancellation - // - // HERE BE DRAGONS. This is bestowed on us because we only support - // a single mailbox reader at any one time. - // If awaitEither returned control because timeoutAsync has terminated, waitOneNoTimeoutOrCancellation - // might still be in-flight. In practical terms, it means that the push-to-async-result-cell - // continuation that awaitEither registered on it is still pending, i.e. it is still in savedCont. - // That continuation is a no-op now, but it is still a registered reader for arriving messages. - // Therefore we just abandon it - a brutal way of canceling. - // This ugly non-compositionality is only needed because we only support a single mailbox reader - // (i.e. the user is not allowed to run several Receive/TryReceive/Scan/TryScan in parallel) - otherwise - // we would just have an extra no-op reader in the queue. - savedCont <- None) - - return None - | Some resP -> - timeoutCts.Cancel() // cancel the timeout watcher - let! res = resP - return Some res - } - let rec scanNoTimeout () = - async { - match x.ScanArrivals(f) with - | None -> - let! ok = waitOne(Timeout.Infinite) - if ok then - return! scanNoTimeout() - else - return (failwith "Timed out with infinite timeout??") - | Some resP -> - let! res = resP - return Some res - } - - // Look in the inbox first - async { - match x.ScanInbox(f,0) with - | None when timeout < 0 -> - return! scanNoTimeout() - | None -> - let! cancellationToken = Async.CancellationToken - let timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, CancellationToken.None) - let timeoutAsync = AsyncHelpers.timeout timeout timeoutCts.Token - return! scan timeoutAsync timeoutCts - | Some resP -> - let! res = resP - return Some res - } - - member x.Scan((f: 'Msg -> (Async<'T>) option), timeout) = - async { - let! resOpt = x.TryScan(f,timeout) - match resOpt with - | None -> return raise(TimeoutException(SR.GetString(SR.mailboxScanTimedOut))) - | Some res -> return res - } - - member x.TryReceive(timeout) = - let rec processFirstArrival() = - async { - match x.ReceiveFromArrivals() with - | None -> - // Make sure the pulse is created if it is going to be needed. - // If it isn't, then create it, and go back to the start to - // check arrivals again. - match pulse with - | null when timeout >= 0 || cancellationSupported -> - ensurePulse() |> ignore - return! processFirstArrival() - | _ -> - // Wait until we have been notified about a message. When that happens, rescan the arrivals - let! ok = waitOne(timeout) - if ok then - return! processFirstArrival() - else - return None - | res -> return res - } - - // look in the inbox first - async { - match x.ReceiveFromInbox() with - | None -> return! processFirstArrival() - | res -> return res - } - - member x.Receive(timeout) = - - let rec processFirstArrival() = - async { - match x.ReceiveFromArrivals() with - | None -> - // Make sure the pulse is created if it is going to be needed. - // If it isn't, then create it, and go back to the start to - // check arrivals again. - match pulse with - | null when timeout >= 0 || cancellationSupported -> - ensurePulse() |> ignore - return! processFirstArrival() - | _ -> - // Wait until we have been notified about a message. When that happens, rescan the arrivals - let! ok = waitOne(timeout) - if ok then - return! processFirstArrival() - else - return raise(TimeoutException(SR.GetString(SR.mailboxReceiveTimedOut))) - | Some res -> return res - } - - // look in the inbox first - async { - match x.ReceiveFromInbox() with - | None -> return! processFirstArrival() - | Some res -> return res - } - - interface System.IDisposable with - member __.Dispose() = - if isNotNull pulse then (pulse :> IDisposable).Dispose() - -#if DEBUG - member x.UnsafeContents = - (x.inbox,arrivals,pulse,savedCont) |> box -#endif - - - [] - [] - type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) = - member x.Reply(value) = replyf(value) - - [] - [] - [] - type MailboxProcessor<'Msg>(body, ?cancellationToken) = - - let cancellationSupported = cancellationToken.IsSome - let cancellationToken = defaultArg cancellationToken Async.DefaultCancellationToken - let mailbox = new Mailbox<'Msg>(cancellationSupported) - let mutable defaultTimeout = Threading.Timeout.Infinite - let mutable started = false - let errorEvent = new Event() - - member __.CurrentQueueLength = mailbox.CurrentQueueLength // nb. unprotected access gives an approximation of the queue length - - member __.DefaultTimeout - with get() = defaultTimeout - and set(v) = defaultTimeout <- v - - [] - member __.Error = errorEvent.Publish - -#if DEBUG - member __.UnsafeMessageQueueContents = mailbox.UnsafeContents -#endif - - member x.Start() = - if started then - raise (new InvalidOperationException(SR.GetString(SR.mailboxProcessorAlreadyStarted))) - else - started <- true - - // Protect the execution and send errors to the event. - // Note that exception stack traces are lost in this design - in an extended design - // the event could propagate an ExceptionDispatchInfo instead of an Exception. - let p = - async { try - do! body x - with exn -> - errorEvent.Trigger exn } - - Async.Start(computation=p, cancellationToken=cancellationToken) - - member __.Post(message) = mailbox.Post(message) - - member __.TryPostAndReply(buildMessage : (_ -> 'Msg), ?timeout) : 'Reply option = - let timeout = defaultArg timeout defaultTimeout - use resultCell = new ResultCell<_>() - let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> - // Note the ResultCell may have been disposed if the operation - // timed out. In this case RegisterResult drops the result on the floor. - resultCell.RegisterResult(reply,reuseThread=false) |> unfake)) - mailbox.Post(msg) - resultCell.TryWaitForResultSynchronously(timeout=timeout) - - member x.PostAndReply(buildMessage, ?timeout) : 'Reply = - match x.TryPostAndReply(buildMessage,?timeout=timeout) with - | None -> raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndReplyTimedOut))) - | Some res -> res - - member __.PostAndTryAsyncReply(buildMessage, ?timeout) : Async<'Reply option> = - let timeout = defaultArg timeout defaultTimeout - let resultCell = new ResultCell<_>() - let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> - // Note the ResultCell may have been disposed if the operation - // timed out. In this case RegisterResult drops the result on the floor. - resultCell.RegisterResult(reply, reuseThread=false) |> unfake)) - mailbox.Post(msg) - match timeout with - | Threading.Timeout.Infinite when not cancellationSupported -> - async { let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout - return Some result } - - | _ -> - async { use _disposeCell = resultCell - let! ok = Async.AwaitWaitHandle(resultCell.GetWaitHandle(), millisecondsTimeout=timeout) - let res = (if ok then Some(resultCell.GrabResult()) else None) - return res } - - member x.PostAndAsyncReply(buildMessage, ?timeout:int) = - let timeout = defaultArg timeout defaultTimeout - match timeout with - | Threading.Timeout.Infinite when not cancellationSupported -> - // Nothing to dispose, no wait handles used - let resultCell = new ResultCell<_>() - let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> unfake)) - mailbox.Post(msg) - resultCell.AwaitResult_NoDirectCancelOrTimeout - | _ -> - let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout) - async { let! res = asyncReply - match res with - | None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut))) - | Some res -> return res } - - member __.Receive(?timeout) = - mailbox.Receive(timeout=defaultArg timeout defaultTimeout) - - member __.TryReceive(?timeout) = - mailbox.TryReceive(timeout=defaultArg timeout defaultTimeout) - - member __.Scan(scanner: 'Msg -> (Async<'T>) option,?timeout) = - mailbox.Scan(scanner,timeout=defaultArg timeout defaultTimeout) - - member __.TryScan(scanner: 'Msg -> (Async<'T>) option,?timeout) = - mailbox.TryScan(scanner,timeout=defaultArg timeout defaultTimeout) - - interface System.IDisposable with - member __.Dispose() = (mailbox :> IDisposable).Dispose() - - static member Start(body,?cancellationToken) = - let mailboxProcessor = new MailboxProcessor<'Msg>(body,?cancellationToken=cancellationToken) - mailboxProcessor.Start() - mailboxProcessor - - [] - [] - module Event = - [] - let create<'T>() = - let ev = new Event<'T>() - ev.Trigger, ev.Publish - - [] - let map mapping (sourceEvent: IEvent<'Delegate,'T>) = - let ev = new Event<_>() - sourceEvent.Add(fun x -> ev.Trigger(mapping x)); - ev.Publish - - [] - let filter predicate (sourceEvent: IEvent<'Delegate,'T>) = - let ev = new Event<_>() - sourceEvent.Add(fun x -> if predicate x then ev.Trigger x); - ev.Publish - - [] - let partition predicate (sourceEvent: IEvent<'Delegate,'T>) = - let ev1 = new Event<_>() - let ev2 = new Event<_>() - sourceEvent.Add(fun x -> if predicate x then ev1.Trigger x else ev2.Trigger x); - ev1.Publish,ev2.Publish - - [] - let choose chooser (sourceEvent: IEvent<'Delegate,'T>) = - let ev = new Event<_>() - sourceEvent.Add(fun x -> match chooser x with None -> () | Some r -> ev.Trigger r); - ev.Publish - - [] - let scan collector state (sourceEvent: IEvent<'Delegate,'T>) = - let state = ref state - let ev = new Event<_>() - sourceEvent.Add(fun msg -> - let z = !state - let z = collector z msg - state := z; - ev.Trigger(z)); - ev.Publish - - [] - let add callback (sourceEvent: IEvent<'Delegate,'T>) = sourceEvent.Add(callback) - - [] - let pairwise (sourceEvent : IEvent<'Delegate,'T>) : IEvent<'T * 'T> = - let ev = new Event<'T * 'T>() - let lastArgs = ref None - sourceEvent.Add(fun args2 -> - (match !lastArgs with - | None -> () - | Some args1 -> ev.Trigger(args1,args2)) - lastArgs := Some args2) - - ev.Publish - - [] - let merge (event1: IEvent<'Del1,'T>) (event2: IEvent<'Del2,'T>) = - let ev = new Event<_>() - event1.Add(fun x -> ev.Trigger(x)) - event2.Add(fun x -> ev.Trigger(x)) - ev.Publish - - [] - let split (splitter : 'T -> Choice<'U1,'U2>) (sourceEvent: IEvent<'Delegate,'T>) = - let ev1 = new Event<_>() - let ev2 = new Event<_>() - sourceEvent.Add(fun x -> match splitter x with Choice1Of2 y -> ev1.Trigger(y) | Choice2Of2 z -> ev2.Trigger(z)); - ev1.Publish,ev2.Publish - - - [] - [] - module Observable = - let obs x = (x :> IObservable<_>) - - - let inline protect f succeed fail = - match (try Choice1Of2 (f ()) with e -> Choice2Of2 e) with - | Choice1Of2 x -> (succeed x) - | Choice2Of2 e -> (fail e) - - [] - type BasicObserver<'T>() = - let mutable stopped = false - abstract Next : value : 'T -> unit - abstract Error : error : exn -> unit - abstract Completed : unit -> unit - interface IObserver<'T> with - member x.OnNext value = if not stopped then x.Next value - member x.OnError e = if not stopped then stopped <- true - x.Error e - member x.OnCompleted () = if not stopped then stopped <- true - x.Completed () - - [] - let map mapping (source: IObservable<'T>) = - { new IObservable<'U> with - member x.Subscribe(observer) = - source.Subscribe { new BasicObserver<'T>() with - member x.Next(v) = - protect (fun () -> mapping v) observer.OnNext observer.OnError - member x.Error(e) = observer.OnError(e) - member x.Completed() = observer.OnCompleted() } } - - [] - let choose chooser (source: IObservable<'T>) = - { new IObservable<'U> with - member x.Subscribe(observer) = - source.Subscribe { new BasicObserver<'T>() with - member x.Next(v) = - protect (fun () -> chooser v) (function None -> () | Some v2 -> observer.OnNext v2) observer.OnError - member x.Error(e) = observer.OnError(e) - member x.Completed() = observer.OnCompleted() } } - - [] - let filter predicate (source: IObservable<'T>) = - choose (fun x -> if predicate x then Some x else None) source - - [] - let partition predicate (source: IObservable<'T>) = - filter predicate source, filter (predicate >> not) source - - - [] - let scan collector state (source: IObservable<'T>) = - { new IObservable<'U> with - member x.Subscribe(observer) = - let state = ref state - source.Subscribe { new BasicObserver<'T>() with - member x.Next(v) = - let z = !state - protect (fun () -> collector z v) (fun z -> - state := z - observer.OnNext z) observer.OnError - - member x.Error(e) = observer.OnError(e) - member x.Completed() = observer.OnCompleted() } } - - [] - let add callback (source: IObservable<'T>) = source.Add(callback) - - [] - let subscribe (callback: 'T -> unit) (source: IObservable<'T>) = source.Subscribe(callback) - - [] - let pairwise (source : IObservable<'T>) : IObservable<'T * 'T> = - { new IObservable<_> with - member x.Subscribe(observer) = - let lastArgs = ref None - source.Subscribe { new BasicObserver<'T>() with - member x.Next(args2) = - match !lastArgs with - | None -> () - | Some args1 -> observer.OnNext (args1,args2) - lastArgs := Some args2 - member x.Error(e) = observer.OnError(e) - member x.Completed() = observer.OnCompleted() } } - - - [] - let merge (source1: IObservable<'T>) (source2: IObservable<'T>) = - { new IObservable<_> with - member x.Subscribe(observer) = - let stopped = ref false - let completed1 = ref false - let completed2 = ref false - let h1 = - source1.Subscribe { new IObserver<'T> with - member x.OnNext(v) = - if not !stopped then - observer.OnNext v - member x.OnError(e) = - if not !stopped then - stopped := true; - observer.OnError(e) - member x.OnCompleted() = - if not !stopped then - completed1 := true; - if !completed1 && !completed2 then - stopped := true - observer.OnCompleted() } - let h2 = - source2.Subscribe { new IObserver<'T> with - member x.OnNext(v) = - if not !stopped then - observer.OnNext v - member x.OnError(e) = - if not !stopped then - stopped := true; - observer.OnError(e) - member x.OnCompleted() = - if not !stopped then - completed2 := true; - if !completed1 && !completed2 then - stopped := true - observer.OnCompleted() } - - { new IDisposable with - member x.Dispose() = - h1.Dispose(); - h2.Dispose() } } - - [] - let split (splitter : 'T -> Choice<'U1,'U2>) (source: IObservable<'T>) = - choose (fun v -> match splitter v with Choice1Of2 x -> Some x | _ -> None) source, - choose (fun v -> match splitter v with Choice2Of2 x -> Some x | _ -> None) source - diff --git a/src/fsharp/FSharp.Core/control.fsi b/src/fsharp/FSharp.Core/async.fsi similarity index 64% rename from src/fsharp/FSharp.Core/control.fsi rename to src/fsharp/FSharp.Core/async.fsi index f69a663d823..9ce73b10cc2 100644 --- a/src/fsharp/FSharp.Core/control.fsi +++ b/src/fsharp/FSharp.Core/async.fsi @@ -5,14 +5,12 @@ namespace Microsoft.FSharp.Control open System open System.Threading open System.Threading.Tasks - open System.Runtime.CompilerServices + open System.Runtime.ExceptionServices open Microsoft.FSharp.Core - open Microsoft.FSharp.Core.Operators open Microsoft.FSharp.Control open Microsoft.FSharp.Collections - /// A compositional asynchronous computation, which, when run, will eventually produce a value /// of type T, or else raises an exception. /// @@ -405,6 +403,11 @@ namespace Microsoft.FSharp.Control continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken-> unit + static member internal StartWithContinuationsUsingDispatchInfo: + computation:Async<'T> * + continuation:('T -> unit) * exceptionContinuation:(ExceptionDispatchInfo -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * + ?cancellationToken:CancellationToken-> unit + /// Runs an asynchronous computation, starting immediately on the current operating system /// thread. /// If no cancellation token is provided then the default cancellation token is used. @@ -574,7 +577,6 @@ namespace Microsoft.FSharp.Control [] /// A module of extension members providing asynchronous operations for some basic CLI types related to concurrency and I/O. module CommonExtensions = - open System.IO type System.IO.Stream with @@ -626,7 +628,6 @@ namespace Microsoft.FSharp.Control /// A module of extension members providing asynchronous operations for some basic Web operations. [] module WebExtensions = - begin type System.Net.WebRequest with /// Returns an asynchronous computation that, when run, will wait for a response to the given WebRequest. @@ -636,16 +637,19 @@ namespace Microsoft.FSharp.Control #if !FX_NO_WEB_CLIENT type System.Net.WebClient with + /// Returns an asynchronous computation that, when run, will wait for the download of the given URI. /// The URI to retrieve. /// An asynchronous computation that will wait for the download of the URI. [] // give the extension member a nice, unmangled compiled name, unique within this module member AsyncDownloadString : address:System.Uri -> Async + /// Returns an asynchronous computation that, when run, will wait for the download of the given URI. /// The URI to retrieve. /// An asynchronous computation that will wait for the download of the URI. [] // give the extension member a nice, unmangled compiled name, unique within this module member AsyncDownloadData : address:System.Uri -> Async + /// Returns an asynchronous computation that, when run, will wait for the download of the given URI to specified file. /// The URI to retrieve. /// The filename to save download to. @@ -654,373 +658,42 @@ namespace Microsoft.FSharp.Control member AsyncDownloadFile : address:System.Uri * fileName: string -> Async #endif - end - - - [] - [] - /// A handle to a capability to reply to a PostAndReply message. - type AsyncReplyChannel<'Reply> = - /// Sends a reply to a PostAndReply message. - /// The value to send. - member Reply : value:'Reply -> unit - - - /// A message-processing agent which executes an asynchronous computation. - /// - /// The agent encapsulates a message queue that supports multiple-writers and - /// a single reader agent. Writers send messages to the agent by using the Post - /// method and its variations. - /// - /// The agent may wait for messages using the Receive or TryReceive methods or - /// scan through all available messages using the Scan or TryScan method. - - [] - [] - [] - type MailboxProcessor<'Msg> = - - /// Creates an agent. The body function is used to generate the asynchronous - /// computation executed by the agent. This function is not executed until - /// Start is called. - /// The function to produce an asynchronous computation that will be executed - /// as the read loop for the MailboxProcessor when Start is called. - /// An optional cancellation token for the body. - /// Defaults to Async.DefaultCancellationToken. - /// The created MailboxProcessor. - new : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg> - - /// Creates and starts an agent. The body function is used to generate the asynchronous - /// computation executed by the agent. - /// The function to produce an asynchronous computation that will be executed - /// as the read loop for the MailboxProcessor when Start is called. - /// An optional cancellation token for the body. - /// Defaults to Async.DefaultCancellationToken. - /// The created MailboxProcessor. - static member Start : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg> - - /// Posts a message to the message queue of the MailboxProcessor, asynchronously. - /// The message to post. - member Post : message:'Msg -> unit - - /// Posts a message to an agent and await a reply on the channel, synchronously. - /// - /// The message is generated by applying buildMessage to a new reply channel - /// to be incorporated into the message. The receiving agent must process this - /// message and invoke the Reply method on this reply channel precisely once. - /// The function to incorporate the AsyncReplyChannel into - /// the message to be sent. - /// An optional timeout parameter (in milliseconds) to wait for a reply message. - /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. - /// The reply from the agent. - member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply - - /// Posts a message to an agent and await a reply on the channel, asynchronously. - /// - /// The message is generated by applying buildMessage to a new reply channel - /// to be incorporated into the message. The receiving agent must process this - /// message and invoke the Reply method on this reply channel precisely once. - /// The function to incorporate the AsyncReplyChannel into - /// the message to be sent. - /// An optional timeout parameter (in milliseconds) to wait for a reply message. - /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. - /// An asynchronous computation that will wait for the reply from the agent. - member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply> - - /// Like PostAndReply, but returns None if no reply within the timeout period. - /// The function to incorporate the AsyncReplyChannel into - /// the message to be sent. - /// An optional timeout parameter (in milliseconds) to wait for a reply message. - /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. - /// The reply from the agent or None if the timeout expires. - member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply option - - /// Like AsyncPostAndReply, but returns None if no reply within the timeout period. - /// The function to incorporate the AsyncReplyChannel into - /// the message to be sent. - /// An optional timeout parameter (in milliseconds) to wait for a reply message. - /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. - /// An asynchronous computation that will return the reply or None if the timeout expires. - member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply option> - - /// Waits for a message. This will consume the first message in arrival order. - /// - /// This method is for use within the body of the agent. - /// - /// This method is for use within the body of the agent. For each agent, at most - /// one concurrent reader may be active, so no more than one concurrent call to - /// Receive, TryReceive, Scan and/or TryScan may be active. - /// An optional timeout in milliseconds. Defaults to -1 which corresponds - /// to System.Threading.Timeout.Infinite. - /// An asynchronous computation that returns the received message. - /// Thrown when the timeout is exceeded. - member Receive : ?timeout:int -> Async<'Msg> - - /// Waits for a message. This will consume the first message in arrival order. - /// - /// This method is for use within the body of the agent. - /// - /// Returns None if a timeout is given and the timeout is exceeded. - /// - /// This method is for use within the body of the agent. For each agent, at most - /// one concurrent reader may be active, so no more than one concurrent call to - /// Receive, TryReceive, Scan and/or TryScan may be active. - /// An optional timeout in milliseconds. Defaults to -1 which - /// corresponds to System.Threading.Timeout.Infinite. - /// An asynchronous computation that returns the received message or - /// None if the timeout is exceeded. - member TryReceive : ?timeout:int -> Async<'Msg option> - - /// Scans for a message by looking through messages in arrival order until scanner - /// returns a Some value. Other messages remain in the queue. - /// - /// Returns None if a timeout is given and the timeout is exceeded. - /// - /// This method is for use within the body of the agent. For each agent, at most - /// one concurrent reader may be active, so no more than one concurrent call to - /// Receive, TryReceive, Scan and/or TryScan may be active. - /// The function to return None if the message is to be skipped - /// or Some if the message is to be processed and removed from the queue. - /// An optional timeout in milliseconds. Defaults to -1 which corresponds - /// to System.Threading.Timeout.Infinite. - /// An asynchronous computation that scanner built off the read message. - /// Thrown when the timeout is exceeded. - member Scan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T> - - /// Scans for a message by looking through messages in arrival order until scanner - /// returns a Some value. Other messages remain in the queue. - /// - /// This method is for use within the body of the agent. For each agent, at most - /// one concurrent reader may be active, so no more than one concurrent call to - /// Receive, TryReceive, Scan and/or TryScan may be active. - /// The function to return None if the message is to be skipped - /// or Some if the message is to be processed and removed from the queue. - /// An optional timeout in milliseconds. Defaults to -1 which corresponds - /// to System.Threading.Timeout.Infinite. - /// An asynchronous computation that scanner built off the read message. - member TryScan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T option> - - /// Starts the agent. - member Start : unit -> unit - - /// Raises a timeout exception if a message not received in this amount of time. By default - /// no timeout is used. - member DefaultTimeout : int with get, set - - /// Occurs when the execution of the agent results in an exception. - [] - member Error : IEvent - - interface System.IDisposable - - /// Returns the number of unprocessed messages in the message queue of the agent. - member CurrentQueueLength : int + // Internals used by MailboxProcessor + module internal AsyncImpl = + val async : AsyncBuilder + [] + // Internals used by MailboxProcessor + type internal AsyncReturn - [] - [] - /// Basic operations on first class event and other observable objects. - module Observable = - - /// Returns an observable for the merged observations from the sources. - /// The returned object propagates success and error values arising - /// from either source and completes when both the sources have completed. - /// - /// For each observer, the registered intermediate observing object is not - /// thread safe. That is, observations arising from the sources must not - /// be triggered concurrently on different threads. - /// The first Observable. - /// The second Observable. - /// An Observable that propagates information from both sources. - [] - val merge: source1:IObservable<'T> -> source2:IObservable<'T> -> IObservable<'T> - - /// Returns an observable which transforms the observations of the source by the - /// given function. The transformation function is executed once for each - /// subscribed observer. The returned object also propagates error observations - /// arising from the source and completes when the source completes. - /// The function applied to observations from the source. - /// The input Observable. - /// An Observable of the type specified by mapping. - [] - val map: mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U> - - /// Returns an observable which filters the observations of the source - /// by the given function. The observable will see only those observations - /// for which the predicate returns true. The predicate is executed once for - /// each subscribed observer. The returned object also propagates error - /// observations arising from the source and completes when the source completes. - /// The function to apply to observations to determine if it should - /// be kept. - /// The input Observable. - /// An Observable that filters observations based on filter. - [] - val filter: predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T> - - /// Returns two observables which partition the observations of the source by - /// the given function. The first will trigger observations for those values - /// for which the predicate returns true. The second will trigger observations - /// for those values where the predicate returns false. The predicate is - /// executed once for each subscribed observer. Both also propagate all error - /// observations arising from the source and each completes when the source - /// completes. - /// The function to determine which output Observable will trigger - /// a particular observation. - /// The input Observable. - /// A tuple of Observables. The first triggers when the predicate returns true, and - /// the second triggers when the predicate returns false. - [] - val partition: predicate:('T -> bool) -> source:IObservable<'T> -> (IObservable<'T> * IObservable<'T>) - - /// Returns two observables which split the observations of the source by the - /// given function. The first will trigger observations x for which the - /// splitter returns Choice1Of2 x. The second will trigger observations - /// y for which the splitter returns Choice2Of2 y The splitter is - /// executed once for each subscribed observer. Both also propagate error - /// observations arising from the source and each completes when the source - /// completes. - /// The function that takes an observation an transforms - /// it into one of the two output Choice types. - /// The input Observable. - /// A tuple of Observables. The first triggers when splitter returns Choice1of2 - /// and the second triggers when splitter returns Choice2of2. - [] - val split: splitter:('T -> Choice<'U1,'U2>) -> source:IObservable<'T> -> (IObservable<'U1> * IObservable<'U2>) - - /// Returns an observable which chooses a projection of observations from the source - /// using the given function. The returned object will trigger observations x - /// for which the splitter returns Some x. The returned object also propagates - /// all errors arising from the source and completes when the source completes. - /// The function that returns Some for observations to be propagated - /// and None for observations to ignore. - /// The input Observable. - /// An Observable that only propagates some of the observations from the source. - [] - val choose: chooser:('T -> 'U option) -> source:IObservable<'T> -> IObservable<'U> - - /// Returns an observable which, for each observer, allocates an item of state - /// and applies the given accumulating function to successive values arising from - /// the input. The returned object will trigger observations for each computed - /// state value, excluding the initial value. The returned object propagates - /// all errors arising from the source and completes when the source completes. - /// - /// For each observer, the registered intermediate observing object is not thread safe. - /// That is, observations arising from the source must not be triggered concurrently - /// on different threads. - /// The function to update the state with each observation. - /// The initial state. - /// The input Observable. - /// An Observable that triggers on the updated state values. - [] - val scan: collector:('U -> 'T -> 'U) -> state:'U -> source:IObservable<'T> -> IObservable<'U> - - /// Creates an observer which permanently subscribes to the given observable and which calls - /// the given function for each observation. - /// The function to be called on each observation. - /// The input Observable. - [] - val add : callback:('T -> unit) -> source:IObservable<'T> -> unit - - /// Creates an observer which subscribes to the given observable and which calls - /// the given function for each observation. - /// The function to be called on each observation. - /// The input Observable. - /// An object that will remove the callback if disposed. - [] - val subscribe : callback:('T -> unit) -> source:IObservable<'T> -> System.IDisposable - - /// Returns a new observable that triggers on the second and subsequent triggerings of the input observable. - /// The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as - /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the - /// Nth triggering occurs. - /// - /// For each observer, the registered intermediate observing object is not thread safe. - /// That is, observations arising from the source must not be triggered concurrently - /// on different threads. - /// The input Observable. - /// An Observable that triggers on successive pairs of observations from the input Observable. - [] - val pairwise: source:IObservable<'T> -> IObservable<'T * 'T> - - [] - [] - module Event = - - /// Fires the output event when either of the input events fire. - /// The first input event. - /// The second input event. - /// An event that fires when either of the input events fire. - [] - val merge: event1:IEvent<'Del1,'T> -> event2:IEvent<'Del2,'T> -> IEvent<'T> - - /// Returns a new event that passes values transformed by the given function. - /// The function to transform event values. - /// The input event. - /// An event that passes the transformed values. - [] - val map: mapping:('T -> 'U) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U> - - /// Returns a new event that listens to the original event and triggers the resulting - /// event only when the argument to the event passes the given function. - /// The function to determine which triggers from the event to propagate. - /// The input event. - /// An event that only passes values that pass the predicate. - [] - val filter: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'T> - - /// Returns a new event that listens to the original event and triggers the - /// first resulting event if the application of the predicate to the event arguments - /// returned true, and the second event if it returned false. - /// The function to determine which output event to trigger. - /// The input event. - /// A tuple of events. The first is triggered when the predicate evaluates to true - /// and the second when the predicate evaluates to false. - [] - val partition: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'T> * IEvent<'T>) - - /// Returns a new event that listens to the original event and triggers the - /// first resulting event if the application of the function to the event arguments - /// returned a Choice1Of2, and the second event if it returns a Choice2Of2. - /// The function to transform event values into one of two types. - /// The input event. - /// A tuple of events. The first fires whenever splitter evaluates to Choice1of1 and - /// the second fires whenever splitter evaluates to Choice2of2. - [] - val split: splitter:('T -> Choice<'U1,'U2>) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'U1> * IEvent<'U2>) - - /// Returns a new event which fires on a selection of messages from the original event. - /// The selection function takes an original message to an optional new message. - /// The function to select and transform event values to pass on. - /// The input event. - /// An event that fires only when the chooser returns Some. - [] - val choose: chooser:('T -> 'U option) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U> - - [] - /// Returns a new event consisting of the results of applying the given accumulating function - /// to successive values triggered on the input event. An item of internal state - /// records the current value of the state parameter. The internal state is not locked during the - /// execution of the accumulation function, so care should be taken that the - /// input IEvent not triggered by multiple threads simultaneously. - /// The function to update the state with each event value. - /// The initial state. - /// The input event. - /// An event that fires on the updated state values. - val scan: collector:('U -> 'T -> 'U) -> state:'U -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U> - - /// Runs the given function each time the given event is triggered. - /// The function to call when the event is triggered. - /// The input event. - [] - val add : callback:('T -> unit) -> sourceEvent:IEvent<'Del,'T> -> unit - - /// Returns a new event that triggers on the second and subsequent triggerings of the input event. - /// The Nth triggering of the input event passes the arguments from the N-1th and Nth triggering as - /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the - /// Nth triggering occurs. - /// The input event. - /// An event that triggers on pairs of consecutive values passed from the source event. - [] - val pairwise: sourceEvent:IEvent<'Del,'T> -> IEvent<'T * 'T> - + [] + // Internals used by MailboxProcessor + type internal AsyncActivation<'T> = + member QueueContinuationWithTrampoline: 'T -> AsyncReturn + member CallContinuation: 'T -> AsyncReturn + [] + // Internals used by MailboxProcessor + type internal AsyncResult<'T> = + | Ok of 'T + | Error of ExceptionDispatchInfo + | Canceled of OperationCanceledException + + // Internals used by MailboxProcessor + module internal AsyncPrimitives = + + [] + type internal ResultCell<'T> = + new : unit -> ResultCell<'T> + member GetWaitHandle: unit -> WaitHandle + member Close: unit -> unit + interface IDisposable + member RegisterResult: 'T * reuseThread: bool -> AsyncReturn + member GrabResult: unit -> 'T + member ResultAvailable : bool + member AwaitResult_NoDirectCancelOrTimeout : Async<'T> + member TryWaitForResultSynchronously: ?timeout: int -> 'T option + + val CreateAsyncResultAsync : AsyncResult<'T> -> Async<'T> + + val MakeAsync : (AsyncActivation<'T> -> AsyncReturn) -> Async<'T> diff --git a/src/fsharp/FSharp.Core/event.fs b/src/fsharp/FSharp.Core/event.fs index 19b94a36697..4489c325d5a 100644 --- a/src/fsharp/FSharp.Core/event.fs +++ b/src/fsharp/FSharp.Core/event.fs @@ -147,5 +147,3 @@ namespace Microsoft.FSharp.Control (e :?> IEvent<_,_>).AddHandler(h) { new System.IDisposable with member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } } - - diff --git a/src/fsharp/FSharp.Core/event.fsi b/src/fsharp/FSharp.Core/event.fsi index 2ade4ac205c..23bc85ceea6 100644 --- a/src/fsharp/FSharp.Core/event.fsi +++ b/src/fsharp/FSharp.Core/event.fsi @@ -8,7 +8,6 @@ namespace Microsoft.FSharp.Control open Microsoft.FSharp.Control open Microsoft.FSharp.Collections - /// Event implementations for an arbitrary type of delegate. [] type DelegateEvent<'Delegate when 'Delegate :> System.Delegate> = @@ -21,7 +20,6 @@ namespace Microsoft.FSharp.Control /// Publishes the event as a first class event value. member Publish : IDelegateEvent<'Delegate> - /// Event implementations for a delegate types following the standard .NET Framework convention of a first 'sender' argument. [] type Event<'Delegate,'Args when 'Delegate : delegate<'Args,unit> and 'Delegate :> System.Delegate > = diff --git a/src/fsharp/FSharp.Core/eventmodule.fs b/src/fsharp/FSharp.Core/eventmodule.fs new file mode 100644 index 00000000000..da359843d5c --- /dev/null +++ b/src/fsharp/FSharp.Core/eventmodule.fs @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information. + +namespace Microsoft.FSharp.Control + + open Microsoft.FSharp.Core + open Microsoft.FSharp.Control + + [] + [] + module Event = + [] + let create<'T>() = + let ev = new Event<'T>() + ev.Trigger, ev.Publish + + [] + let map mapping (sourceEvent: IEvent<'Delegate,'T>) = + let ev = new Event<_>() + sourceEvent.Add(fun x -> ev.Trigger(mapping x)); + ev.Publish + + [] + let filter predicate (sourceEvent: IEvent<'Delegate,'T>) = + let ev = new Event<_>() + sourceEvent.Add(fun x -> if predicate x then ev.Trigger x); + ev.Publish + + [] + let partition predicate (sourceEvent: IEvent<'Delegate,'T>) = + let ev1 = new Event<_>() + let ev2 = new Event<_>() + sourceEvent.Add(fun x -> if predicate x then ev1.Trigger x else ev2.Trigger x); + ev1.Publish,ev2.Publish + + [] + let choose chooser (sourceEvent: IEvent<'Delegate,'T>) = + let ev = new Event<_>() + sourceEvent.Add(fun x -> match chooser x with None -> () | Some r -> ev.Trigger r); + ev.Publish + + [] + let scan collector state (sourceEvent: IEvent<'Delegate,'T>) = + let state = ref state + let ev = new Event<_>() + sourceEvent.Add(fun msg -> + let z = !state + let z = collector z msg + state := z; + ev.Trigger(z)); + ev.Publish + + [] + let add callback (sourceEvent: IEvent<'Delegate,'T>) = sourceEvent.Add(callback) + + [] + let pairwise (sourceEvent : IEvent<'Delegate,'T>) : IEvent<'T * 'T> = + let ev = new Event<'T * 'T>() + let lastArgs = ref None + sourceEvent.Add(fun args2 -> + (match !lastArgs with + | None -> () + | Some args1 -> ev.Trigger(args1,args2)) + lastArgs := Some args2) + + ev.Publish + + [] + let merge (event1: IEvent<'Del1,'T>) (event2: IEvent<'Del2,'T>) = + let ev = new Event<_>() + event1.Add(fun x -> ev.Trigger(x)) + event2.Add(fun x -> ev.Trigger(x)) + ev.Publish + + [] + let split (splitter : 'T -> Choice<'U1,'U2>) (sourceEvent: IEvent<'Delegate,'T>) = + let ev1 = new Event<_>() + let ev2 = new Event<_>() + sourceEvent.Add(fun x -> match splitter x with Choice1Of2 y -> ev1.Trigger(y) | Choice2Of2 z -> ev2.Trigger(z)); + ev1.Publish,ev2.Publish + + diff --git a/src/fsharp/FSharp.Core/eventmodule.fsi b/src/fsharp/FSharp.Core/eventmodule.fsi new file mode 100644 index 00000000000..c3f5f3cc9fb --- /dev/null +++ b/src/fsharp/FSharp.Core/eventmodule.fsi @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information. + +namespace Microsoft.FSharp.Control + + open Microsoft.FSharp.Core + open Microsoft.FSharp.Control + + [] + [] + module Event = + + /// Fires the output event when either of the input events fire. + /// The first input event. + /// The second input event. + /// An event that fires when either of the input events fire. + [] + val merge: event1:IEvent<'Del1,'T> -> event2:IEvent<'Del2,'T> -> IEvent<'T> + + /// Returns a new event that passes values transformed by the given function. + /// The function to transform event values. + /// The input event. + /// An event that passes the transformed values. + [] + val map: mapping:('T -> 'U) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U> + + /// Returns a new event that listens to the original event and triggers the resulting + /// event only when the argument to the event passes the given function. + /// The function to determine which triggers from the event to propagate. + /// The input event. + /// An event that only passes values that pass the predicate. + [] + val filter: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'T> + + /// Returns a new event that listens to the original event and triggers the + /// first resulting event if the application of the predicate to the event arguments + /// returned true, and the second event if it returned false. + /// The function to determine which output event to trigger. + /// The input event. + /// A tuple of events. The first is triggered when the predicate evaluates to true + /// and the second when the predicate evaluates to false. + [] + val partition: predicate:('T -> bool) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'T> * IEvent<'T>) + + /// Returns a new event that listens to the original event and triggers the + /// first resulting event if the application of the function to the event arguments + /// returned a Choice1Of2, and the second event if it returns a Choice2Of2. + /// The function to transform event values into one of two types. + /// The input event. + /// A tuple of events. The first fires whenever splitter evaluates to Choice1of1 and + /// the second fires whenever splitter evaluates to Choice2of2. + [] + val split: splitter:('T -> Choice<'U1,'U2>) -> sourceEvent:IEvent<'Del,'T> -> (IEvent<'U1> * IEvent<'U2>) + + /// Returns a new event which fires on a selection of messages from the original event. + /// The selection function takes an original message to an optional new message. + /// The function to select and transform event values to pass on. + /// The input event. + /// An event that fires only when the chooser returns Some. + [] + val choose: chooser:('T -> 'U option) -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U> + + [] + /// Returns a new event consisting of the results of applying the given accumulating function + /// to successive values triggered on the input event. An item of internal state + /// records the current value of the state parameter. The internal state is not locked during the + /// execution of the accumulation function, so care should be taken that the + /// input IEvent not triggered by multiple threads simultaneously. + /// The function to update the state with each event value. + /// The initial state. + /// The input event. + /// An event that fires on the updated state values. + val scan: collector:('U -> 'T -> 'U) -> state:'U -> sourceEvent:IEvent<'Del,'T> -> IEvent<'U> + + /// Runs the given function each time the given event is triggered. + /// The function to call when the event is triggered. + /// The input event. + [] + val add : callback:('T -> unit) -> sourceEvent:IEvent<'Del,'T> -> unit + + /// Returns a new event that triggers on the second and subsequent triggerings of the input event. + /// The Nth triggering of the input event passes the arguments from the N-1th and Nth triggering as + /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the + /// Nth triggering occurs. + /// The input event. + /// An event that triggers on pairs of consecutive values passed from the source event. + [] + val pairwise: sourceEvent:IEvent<'Del,'T> -> IEvent<'T * 'T> + + From 9a7c7f17bd9a376320b1aa6950cf857511dce7bd Mon Sep 17 00:00:00 2001 From: Don Syme Date: Sat, 12 May 2018 17:07:37 +0100 Subject: [PATCH 2/2] add missing files and adjust indentation --- src/fsharp/FSharp.Core/async.fs | 4 + src/fsharp/FSharp.Core/mailbox.fs | 481 ++++++++++++++++++++++++++ src/fsharp/FSharp.Core/mailbox.fsi | 162 +++++++++ src/fsharp/FSharp.Core/observable.fs | 177 ++++++++++ src/fsharp/FSharp.Core/observable.fsi | 131 +++++++ 5 files changed, 955 insertions(+) create mode 100644 src/fsharp/FSharp.Core/mailbox.fs create mode 100644 src/fsharp/FSharp.Core/mailbox.fsi create mode 100644 src/fsharp/FSharp.Core/observable.fs create mode 100644 src/fsharp/FSharp.Core/observable.fsi diff --git a/src/fsharp/FSharp.Core/async.fs b/src/fsharp/FSharp.Core/async.fs index 25c2e3937a5..e2f87857f41 100644 --- a/src/fsharp/FSharp.Core/async.fs +++ b/src/fsharp/FSharp.Core/async.fs @@ -17,6 +17,10 @@ namespace Microsoft.FSharp.Control open Microsoft.FSharp.Control open Microsoft.FSharp.Collections +#if FX_RESHAPED_REFLECTION + open ReflectionAdapters +#endif + type LinkedSubSource(cancellationToken : CancellationToken) = let failureCTS = new CancellationTokenSource() diff --git a/src/fsharp/FSharp.Core/mailbox.fs b/src/fsharp/FSharp.Core/mailbox.fs new file mode 100644 index 00000000000..c83bc49e5fb --- /dev/null +++ b/src/fsharp/FSharp.Core/mailbox.fs @@ -0,0 +1,481 @@ +// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information. + +namespace Microsoft.FSharp.Control + + open System + open System.Threading + open Microsoft.FSharp.Core + open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators + open Microsoft.FSharp.Control + open Microsoft.FSharp.Control.AsyncImpl + open Microsoft.FSharp.Control.AsyncPrimitives + open Microsoft.FSharp.Collections + + /// We use our own internal implementation of queues to avoid a dependency on System.dll + type Queue<'T>() = + + let mutable array = [| |] + let mutable head = 0 + let mutable size = 0 + let mutable tail = 0 + + let SetCapacity(capacity) = + let destinationArray = Array.zeroCreate capacity + if (size > 0) then + if (head < tail) then + Array.Copy(array, head, destinationArray, 0, size) + + else + Array.Copy(array, head, destinationArray, 0, array.Length - head) + Array.Copy(array, 0, destinationArray, array.Length - head, tail) + array <- destinationArray + head <- 0 + tail <- if (size = capacity) then 0 else size + + member x.Dequeue() = + if (size = 0) then + failwith "Dequeue" + let local = array.[head] + array.[head] <- Unchecked.defaultof<'T> + head <- (head + 1) % array.Length + size <- size - 1 + local + + member this.Enqueue(item) = + if (size = array.Length) then + let capacity = int ((int64 array.Length * 200L) / 100L) + let capacity = max capacity (array.Length + 4) + SetCapacity(capacity) + array.[tail] <- item + tail <- (tail + 1) % array.Length + size <- size + 1 + + member x.Count = size + + + module AsyncHelpers = + + let awaitEither a1 a2 = + async { + let resultCell = new ResultCell<_>() + let! cancellationToken = Async.CancellationToken + let start a f = + Async.StartWithContinuationsUsingDispatchInfo(a, + (fun res -> resultCell.RegisterResult(f res |> AsyncResult.Ok, reuseThread=false) |> ignore), + (fun edi -> resultCell.RegisterResult(edi |> AsyncResult.Error, reuseThread=false) |> ignore), + (fun oce -> resultCell.RegisterResult(oce |> AsyncResult.Canceled, reuseThread=false) |> ignore), + cancellationToken = cancellationToken + ) + start a1 Choice1Of2 + start a2 Choice2Of2 + // Note: It is ok to use "NoDirectCancel" here because the started computations use the same + // cancellation token and will register a cancelled result if cancellation occurs. + // Note: It is ok to use "NoDirectTimeout" here because there is no specific timeout log to this routine. + let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout + return! CreateAsyncResultAsync result + } + + let timeout msec cancellationToken = + assert (msec >= 0) + let resultCell = new ResultCell<_>() + Async.StartWithContinuations( + computation=Async.Sleep(msec), + continuation=(fun () -> resultCell.RegisterResult((), reuseThread = false) |> ignore), + exceptionContinuation=ignore, + cancellationContinuation=ignore, + cancellationToken = cancellationToken) + // Note: It is ok to use "NoDirectCancel" here because the started computations use the same + // cancellation token and will register a cancelled result if cancellation occurs. + // Note: It is ok to use "NoDirectTimeout" here because the child compuation above looks after the timeout. + resultCell.AwaitResult_NoDirectCancelOrTimeout + + [] + [] + type Mailbox<'Msg>(cancellationSupported: bool) = + let mutable inboxStore = null + let mutable arrivals = new Queue<'Msg>() + let syncRoot = arrivals + + // Control elements indicating the state of the reader. When the reader is "blocked" at an + // asynchronous receive, either + // -- "cont" is non-null and the reader is "activated" by re-scheduling cont in the thread pool; or + // -- "pulse" is non-null and the reader is "activated" by setting this event + let mutable savedCont : (bool -> AsyncReturn) option = None + + // Readers who have a timeout use this event + let mutable pulse : AutoResetEvent = null + + // Make sure that the "pulse" value is created + let ensurePulse() = + match pulse with + | null -> + pulse <- new AutoResetEvent(false); + | _ -> + () + pulse + + let waitOneNoTimeoutOrCancellation = + MakeAsync (fun ctxt -> + match savedCont with + | None -> + let descheduled = + // An arrival may have happened while we're preparing to deschedule + lock syncRoot (fun () -> + if arrivals.Count = 0 then + // OK, no arrival so deschedule + savedCont <- Some(fun res -> ctxt.QueueContinuationWithTrampoline(res)) + true + else + false) + if descheduled then + Unchecked.defaultof<_> + else + // If we didn't deschedule then run the continuation immediately + ctxt.CallContinuation true + | Some _ -> + failwith "multiple waiting reader continuations for mailbox") + + let waitOneWithCancellation timeout = + Async.AwaitWaitHandle(ensurePulse(), millisecondsTimeout=timeout) + + let waitOne timeout = + if timeout < 0 && not cancellationSupported then + waitOneNoTimeoutOrCancellation + else + waitOneWithCancellation(timeout) + + member __.inbox = + match inboxStore with + | null -> inboxStore <- new System.Collections.Generic.List<'Msg>(1) + | _ -> () + inboxStore + + member x.CurrentQueueLength = + lock syncRoot (fun () -> x.inbox.Count + arrivals.Count) + + member x.ScanArrivalsUnsafe(f) = + if arrivals.Count = 0 then + None + else + let msg = arrivals.Dequeue() + match f msg with + | None -> + x.inbox.Add(msg) + x.ScanArrivalsUnsafe(f) + | res -> res + + // Lock the arrivals queue while we scan that + member x.ScanArrivals(f) = + lock syncRoot (fun () -> x.ScanArrivalsUnsafe(f)) + + member x.ScanInbox(f,n) = + match inboxStore with + | null -> None + | inbox -> + if n >= inbox.Count + then None + else + let msg = inbox.[n] + match f msg with + | None -> x.ScanInbox (f,n+1) + | res -> inbox.RemoveAt(n); res + + member x.ReceiveFromArrivalsUnsafe() = + if arrivals.Count = 0 then + None + else + Some(arrivals.Dequeue()) + + member x.ReceiveFromArrivals() = + lock syncRoot (fun () -> x.ReceiveFromArrivalsUnsafe()) + + member x.ReceiveFromInbox() = + match inboxStore with + | null -> None + | inbox -> + if inbox.Count = 0 then + None + else + let x = inbox.[0] + inbox.RemoveAt(0) + Some(x) + + member x.Post(msg) = + lock syncRoot (fun () -> + + // Add the message to the arrivals queue + arrivals.Enqueue(msg) + + // Cooperatively unblock any waiting reader. If there is no waiting + // reader we just leave the message in the incoming queue + match savedCont with + | None -> + match pulse with + | null -> + () // no one waiting, leaving the message in the queue is sufficient + | ev -> + // someone is waiting on the wait handle + ev.Set() |> ignore + + | Some action -> + savedCont <- None + action true |> ignore) + + member x.TryScan ((f: 'Msg -> (Async<'T>) option), timeout) : Async<'T option> = + let rec scan timeoutAsync (timeoutCts:CancellationTokenSource) = + async { + match x.ScanArrivals(f) with + | None -> + // Deschedule and wait for a message. When it comes, rescan the arrivals + let! ok = AsyncHelpers.awaitEither waitOneNoTimeoutOrCancellation timeoutAsync + match ok with + | Choice1Of2 true -> + return! scan timeoutAsync timeoutCts + | Choice1Of2 false -> + return failwith "should not happen - waitOneNoTimeoutOrCancellation always returns true" + | Choice2Of2 () -> + lock syncRoot (fun () -> + // Cancel the outstanding wait for messages installed by waitOneWithCancellation + // + // HERE BE DRAGONS. This is bestowed on us because we only support + // a single mailbox reader at any one time. + // If awaitEither returned control because timeoutAsync has terminated, waitOneNoTimeoutOrCancellation + // might still be in-flight. In practical terms, it means that the push-to-async-result-cell + // continuation that awaitEither registered on it is still pending, i.e. it is still in savedCont. + // That continuation is a no-op now, but it is still a registered reader for arriving messages. + // Therefore we just abandon it - a brutal way of canceling. + // This ugly non-compositionality is only needed because we only support a single mailbox reader + // (i.e. the user is not allowed to run several Receive/TryReceive/Scan/TryScan in parallel) - otherwise + // we would just have an extra no-op reader in the queue. + savedCont <- None) + + return None + | Some resP -> + timeoutCts.Cancel() // cancel the timeout watcher + let! res = resP + return Some res + } + let rec scanNoTimeout () = + async { + match x.ScanArrivals(f) with + | None -> + let! ok = waitOne(Timeout.Infinite) + if ok then + return! scanNoTimeout() + else + return (failwith "Timed out with infinite timeout??") + | Some resP -> + let! res = resP + return Some res + } + + // Look in the inbox first + async { + match x.ScanInbox(f,0) with + | None when timeout < 0 -> + return! scanNoTimeout() + | None -> + let! cancellationToken = Async.CancellationToken + let timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, CancellationToken.None) + let timeoutAsync = AsyncHelpers.timeout timeout timeoutCts.Token + return! scan timeoutAsync timeoutCts + | Some resP -> + let! res = resP + return Some res + } + + member x.Scan((f: 'Msg -> (Async<'T>) option), timeout) = + async { + let! resOpt = x.TryScan(f,timeout) + match resOpt with + | None -> return raise(TimeoutException(SR.GetString(SR.mailboxScanTimedOut))) + | Some res -> return res + } + + member x.TryReceive(timeout) = + let rec processFirstArrival() = + async { + match x.ReceiveFromArrivals() with + | None -> + // Make sure the pulse is created if it is going to be needed. + // If it isn't, then create it, and go back to the start to + // check arrivals again. + match pulse with + | null when timeout >= 0 || cancellationSupported -> + ensurePulse() |> ignore + return! processFirstArrival() + | _ -> + // Wait until we have been notified about a message. When that happens, rescan the arrivals + let! ok = waitOne(timeout) + if ok then + return! processFirstArrival() + else + return None + | res -> return res + } + + // look in the inbox first + async { + match x.ReceiveFromInbox() with + | None -> return! processFirstArrival() + | res -> return res + } + + member x.Receive(timeout) = + + let rec processFirstArrival() = + async { + match x.ReceiveFromArrivals() with + | None -> + // Make sure the pulse is created if it is going to be needed. + // If it isn't, then create it, and go back to the start to + // check arrivals again. + match pulse with + | null when timeout >= 0 || cancellationSupported -> + ensurePulse() |> ignore + return! processFirstArrival() + | _ -> + // Wait until we have been notified about a message. When that happens, rescan the arrivals + let! ok = waitOne(timeout) + if ok then + return! processFirstArrival() + else + return raise(TimeoutException(SR.GetString(SR.mailboxReceiveTimedOut))) + | Some res -> return res + } + + // look in the inbox first + async { + match x.ReceiveFromInbox() with + | None -> return! processFirstArrival() + | Some res -> return res + } + + interface System.IDisposable with + member __.Dispose() = + if isNotNull pulse then (pulse :> IDisposable).Dispose() + +#if DEBUG + member x.UnsafeContents = + (x.inbox,arrivals,pulse,savedCont) |> box +#endif + + + [] + [] + type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) = + member x.Reply(value) = replyf(value) + + [] + [] + [] + type MailboxProcessor<'Msg>(body, ?cancellationToken) = + + let cancellationSupported = cancellationToken.IsSome + let cancellationToken = defaultArg cancellationToken Async.DefaultCancellationToken + let mailbox = new Mailbox<'Msg>(cancellationSupported) + let mutable defaultTimeout = Threading.Timeout.Infinite + let mutable started = false + let errorEvent = new Event() + + member __.CurrentQueueLength = mailbox.CurrentQueueLength // nb. unprotected access gives an approximation of the queue length + + member __.DefaultTimeout + with get() = defaultTimeout + and set(v) = defaultTimeout <- v + + [] + member __.Error = errorEvent.Publish + +#if DEBUG + member __.UnsafeMessageQueueContents = mailbox.UnsafeContents +#endif + + member x.Start() = + if started then + raise (new InvalidOperationException(SR.GetString(SR.mailboxProcessorAlreadyStarted))) + else + started <- true + + // Protect the execution and send errors to the event. + // Note that exception stack traces are lost in this design - in an extended design + // the event could propagate an ExceptionDispatchInfo instead of an Exception. + let p = + async { try + do! body x + with exn -> + errorEvent.Trigger exn } + + Async.Start(computation=p, cancellationToken=cancellationToken) + + member __.Post(message) = mailbox.Post(message) + + member __.TryPostAndReply(buildMessage : (_ -> 'Msg), ?timeout) : 'Reply option = + let timeout = defaultArg timeout defaultTimeout + use resultCell = new ResultCell<_>() + let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> + // Note the ResultCell may have been disposed if the operation + // timed out. In this case RegisterResult drops the result on the floor. + resultCell.RegisterResult(reply,reuseThread=false) |> ignore)) + mailbox.Post(msg) + resultCell.TryWaitForResultSynchronously(timeout=timeout) + + member x.PostAndReply(buildMessage, ?timeout) : 'Reply = + match x.TryPostAndReply(buildMessage,?timeout=timeout) with + | None -> raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndReplyTimedOut))) + | Some res -> res + + member __.PostAndTryAsyncReply(buildMessage, ?timeout) : Async<'Reply option> = + let timeout = defaultArg timeout defaultTimeout + let resultCell = new ResultCell<_>() + let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> + // Note the ResultCell may have been disposed if the operation + // timed out. In this case RegisterResult drops the result on the floor. + resultCell.RegisterResult(reply, reuseThread=false) |> ignore)) + mailbox.Post(msg) + match timeout with + | Threading.Timeout.Infinite when not cancellationSupported -> + async { let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout + return Some result } + + | _ -> + async { use _disposeCell = resultCell + let! ok = Async.AwaitWaitHandle(resultCell.GetWaitHandle(), millisecondsTimeout=timeout) + let res = (if ok then Some(resultCell.GrabResult()) else None) + return res } + + member x.PostAndAsyncReply(buildMessage, ?timeout:int) = + let timeout = defaultArg timeout defaultTimeout + match timeout with + | Threading.Timeout.Infinite when not cancellationSupported -> + // Nothing to dispose, no wait handles used + let resultCell = new ResultCell<_>() + let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> ignore)) + mailbox.Post(msg) + resultCell.AwaitResult_NoDirectCancelOrTimeout + | _ -> + let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout) + async { let! res = asyncReply + match res with + | None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut))) + | Some res -> return res } + + member __.Receive(?timeout) = + mailbox.Receive(timeout=defaultArg timeout defaultTimeout) + + member __.TryReceive(?timeout) = + mailbox.TryReceive(timeout=defaultArg timeout defaultTimeout) + + member __.Scan(scanner: 'Msg -> (Async<'T>) option,?timeout) = + mailbox.Scan(scanner,timeout=defaultArg timeout defaultTimeout) + + member __.TryScan(scanner: 'Msg -> (Async<'T>) option,?timeout) = + mailbox.TryScan(scanner,timeout=defaultArg timeout defaultTimeout) + + interface System.IDisposable with + member __.Dispose() = (mailbox :> IDisposable).Dispose() + + static member Start(body,?cancellationToken) = + let mailboxProcessor = new MailboxProcessor<'Msg>(body,?cancellationToken=cancellationToken) + mailboxProcessor.Start() + mailboxProcessor diff --git a/src/fsharp/FSharp.Core/mailbox.fsi b/src/fsharp/FSharp.Core/mailbox.fsi new file mode 100644 index 00000000000..6d818299e06 --- /dev/null +++ b/src/fsharp/FSharp.Core/mailbox.fsi @@ -0,0 +1,162 @@ +// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information. + +namespace Microsoft.FSharp.Control + + open System.Threading + + open Microsoft.FSharp.Core + open Microsoft.FSharp.Control + + [] + /// A handle to a capability to reply to a PostAndReply message. + type AsyncReplyChannel<'Reply> = + /// Sends a reply to a PostAndReply message. + /// The value to send. + member Reply : value:'Reply -> unit + + /// A message-processing agent which executes an asynchronous computation. + /// + /// The agent encapsulates a message queue that supports multiple-writers and + /// a single reader agent. Writers send messages to the agent by using the Post + /// method and its variations. + /// + /// The agent may wait for messages using the Receive or TryReceive methods or + /// scan through all available messages using the Scan or TryScan method. + [] + type MailboxProcessor<'Msg> = + + /// Creates an agent. The body function is used to generate the asynchronous + /// computation executed by the agent. This function is not executed until + /// Start is called. + /// The function to produce an asynchronous computation that will be executed + /// as the read loop for the MailboxProcessor when Start is called. + /// An optional cancellation token for the body. + /// Defaults to Async.DefaultCancellationToken. + /// The created MailboxProcessor. + new : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg> + + /// Creates and starts an agent. The body function is used to generate the asynchronous + /// computation executed by the agent. + /// The function to produce an asynchronous computation that will be executed + /// as the read loop for the MailboxProcessor when Start is called. + /// An optional cancellation token for the body. + /// Defaults to Async.DefaultCancellationToken. + /// The created MailboxProcessor. + static member Start : body:(MailboxProcessor<'Msg> -> Async) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg> + + /// Posts a message to the message queue of the MailboxProcessor, asynchronously. + /// The message to post. + member Post : message:'Msg -> unit + + /// Posts a message to an agent and await a reply on the channel, synchronously. + /// + /// The message is generated by applying buildMessage to a new reply channel + /// to be incorporated into the message. The receiving agent must process this + /// message and invoke the Reply method on this reply channel precisely once. + /// The function to incorporate the AsyncReplyChannel into + /// the message to be sent. + /// An optional timeout parameter (in milliseconds) to wait for a reply message. + /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. + /// The reply from the agent. + member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply + + /// Posts a message to an agent and await a reply on the channel, asynchronously. + /// + /// The message is generated by applying buildMessage to a new reply channel + /// to be incorporated into the message. The receiving agent must process this + /// message and invoke the Reply method on this reply channel precisely once. + /// The function to incorporate the AsyncReplyChannel into + /// the message to be sent. + /// An optional timeout parameter (in milliseconds) to wait for a reply message. + /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. + /// An asynchronous computation that will wait for the reply from the agent. + member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply> + + /// Like PostAndReply, but returns None if no reply within the timeout period. + /// The function to incorporate the AsyncReplyChannel into + /// the message to be sent. + /// An optional timeout parameter (in milliseconds) to wait for a reply message. + /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. + /// The reply from the agent or None if the timeout expires. + member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> 'Reply option + + /// Like AsyncPostAndReply, but returns None if no reply within the timeout period. + /// The function to incorporate the AsyncReplyChannel into + /// the message to be sent. + /// An optional timeout parameter (in milliseconds) to wait for a reply message. + /// Defaults to -1 which corresponds to System.Threading.Timeout.Infinite. + /// An asynchronous computation that will return the reply or None if the timeout expires. + member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout : int -> Async<'Reply option> + + /// Waits for a message. This will consume the first message in arrival order. + /// + /// This method is for use within the body of the agent. + /// + /// This method is for use within the body of the agent. For each agent, at most + /// one concurrent reader may be active, so no more than one concurrent call to + /// Receive, TryReceive, Scan and/or TryScan may be active. + /// An optional timeout in milliseconds. Defaults to -1 which corresponds + /// to System.Threading.Timeout.Infinite. + /// An asynchronous computation that returns the received message. + /// Thrown when the timeout is exceeded. + member Receive : ?timeout:int -> Async<'Msg> + + /// Waits for a message. This will consume the first message in arrival order. + /// + /// This method is for use within the body of the agent. + /// + /// Returns None if a timeout is given and the timeout is exceeded. + /// + /// This method is for use within the body of the agent. For each agent, at most + /// one concurrent reader may be active, so no more than one concurrent call to + /// Receive, TryReceive, Scan and/or TryScan may be active. + /// An optional timeout in milliseconds. Defaults to -1 which + /// corresponds to System.Threading.Timeout.Infinite. + /// An asynchronous computation that returns the received message or + /// None if the timeout is exceeded. + member TryReceive : ?timeout:int -> Async<'Msg option> + + /// Scans for a message by looking through messages in arrival order until scanner + /// returns a Some value. Other messages remain in the queue. + /// + /// Returns None if a timeout is given and the timeout is exceeded. + /// + /// This method is for use within the body of the agent. For each agent, at most + /// one concurrent reader may be active, so no more than one concurrent call to + /// Receive, TryReceive, Scan and/or TryScan may be active. + /// The function to return None if the message is to be skipped + /// or Some if the message is to be processed and removed from the queue. + /// An optional timeout in milliseconds. Defaults to -1 which corresponds + /// to System.Threading.Timeout.Infinite. + /// An asynchronous computation that scanner built off the read message. + /// Thrown when the timeout is exceeded. + member Scan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T> + + /// Scans for a message by looking through messages in arrival order until scanner + /// returns a Some value. Other messages remain in the queue. + /// + /// This method is for use within the body of the agent. For each agent, at most + /// one concurrent reader may be active, so no more than one concurrent call to + /// Receive, TryReceive, Scan and/or TryScan may be active. + /// The function to return None if the message is to be skipped + /// or Some if the message is to be processed and removed from the queue. + /// An optional timeout in milliseconds. Defaults to -1 which corresponds + /// to System.Threading.Timeout.Infinite. + /// An asynchronous computation that scanner built off the read message. + member TryScan : scanner:('Msg -> (Async<'T>) option) * ?timeout:int -> Async<'T option> + + /// Starts the agent. + member Start : unit -> unit + + /// Raises a timeout exception if a message not received in this amount of time. By default + /// no timeout is used. + member DefaultTimeout : int with get, set + + /// Occurs when the execution of the agent results in an exception. + [] + member Error : IEvent + + interface System.IDisposable + + /// Returns the number of unprocessed messages in the message queue of the agent. + member CurrentQueueLength : int diff --git a/src/fsharp/FSharp.Core/observable.fs b/src/fsharp/FSharp.Core/observable.fs new file mode 100644 index 00000000000..75ef1d081b1 --- /dev/null +++ b/src/fsharp/FSharp.Core/observable.fs @@ -0,0 +1,177 @@ +// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information. + +namespace Microsoft.FSharp.Control + + open System + open Microsoft.FSharp.Core + open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators + open Microsoft.FSharp.Control + + [] + [] + module Observable = + + let inline protect f succeed fail = + match (try Choice1Of2 (f ()) with e -> Choice2Of2 e) with + | Choice1Of2 x -> (succeed x) + | Choice2Of2 e -> (fail e) + + [] + type BasicObserver<'T>() = + + let mutable stopped = false + + abstract Next : value : 'T -> unit + + abstract Error : error : exn -> unit + + abstract Completed : unit -> unit + + interface IObserver<'T> with + + member x.OnNext value = + if not stopped then + x.Next value + + member x.OnError e = + if not stopped then + stopped <- true + x.Error e + + member x.OnCompleted () = + if not stopped then + stopped <- true + x.Completed () + + [] + let map mapping (source: IObservable<'T>) = + { new IObservable<'U> with + member x.Subscribe(observer) = + source.Subscribe + { new BasicObserver<'T>() with + + member x.Next(v) = + protect (fun () -> mapping v) observer.OnNext observer.OnError + + member x.Error(e) = observer.OnError(e) + + member x.Completed() = observer.OnCompleted() } } + + [] + let choose chooser (source: IObservable<'T>) = + { new IObservable<'U> with + member x.Subscribe(observer) = + source.Subscribe + { new BasicObserver<'T>() with + + member x.Next(v) = + protect (fun () -> chooser v) (function None -> () | Some v2 -> observer.OnNext v2) observer.OnError + + member x.Error(e) = observer.OnError(e) + + member x.Completed() = observer.OnCompleted() } } + + [] + let filter predicate (source: IObservable<'T>) = + choose (fun x -> if predicate x then Some x else None) source + + [] + let partition predicate (source: IObservable<'T>) = + filter predicate source, filter (predicate >> not) source + + [] + let scan collector state (source: IObservable<'T>) = + { new IObservable<'U> with + member x.Subscribe(observer) = + let mutable state = state + source.Subscribe + { new BasicObserver<'T>() with + + member x.Next(v) = + let z = state + protect (fun () -> collector z v) (fun z -> + state <- z + observer.OnNext z) observer.OnError + + member x.Error(e) = observer.OnError(e) + + member x.Completed() = observer.OnCompleted() } } + + [] + let add callback (source: IObservable<'T>) = source.Add(callback) + + [] + let subscribe (callback: 'T -> unit) (source: IObservable<'T>) = source.Subscribe(callback) + + [] + let pairwise (source : IObservable<'T>) : IObservable<'T * 'T> = + { new IObservable<_> with + member x.Subscribe(observer) = + let mutable lastArgs = None + source.Subscribe + { new BasicObserver<'T>() with + + member x.Next(args2) = + match lastArgs with + | None -> () + | Some args1 -> observer.OnNext (args1,args2) + lastArgs <- Some args2 + + member x.Error(e) = observer.OnError(e) + + member x.Completed() = observer.OnCompleted() } } + + [] + let merge (source1: IObservable<'T>) (source2: IObservable<'T>) = + { new IObservable<_> with + member x.Subscribe(observer) = + let mutable stopped = false + let mutable completed1 = false + let mutable completed2 = false + let h1 = + source1.Subscribe + { new IObserver<'T> with + member x.OnNext(v) = + if not stopped then + observer.OnNext v + + member x.OnError(e) = + if not stopped then + stopped <- true + observer.OnError(e) + + member x.OnCompleted() = + if not stopped then + completed1 <- true + if completed1 && completed2 then + stopped <- true + observer.OnCompleted() } + let h2 = + source2.Subscribe + { new IObserver<'T> with + member x.OnNext(v) = + if not stopped then + observer.OnNext v + + member x.OnError(e) = + if not stopped then + stopped <- true + observer.OnError(e) + + member x.OnCompleted() = + if not stopped then + completed2 <- true + if completed1 && completed2 then + stopped <- true + observer.OnCompleted() } + + { new IDisposable with + member x.Dispose() = + h1.Dispose() + h2.Dispose() } } + + [] + let split (splitter : 'T -> Choice<'U1,'U2>) (source: IObservable<'T>) = + choose (fun v -> match splitter v with Choice1Of2 x -> Some x | _ -> None) source, + choose (fun v -> match splitter v with Choice2Of2 x -> Some x | _ -> None) source + diff --git a/src/fsharp/FSharp.Core/observable.fsi b/src/fsharp/FSharp.Core/observable.fsi new file mode 100644 index 00000000000..36cbee5700d --- /dev/null +++ b/src/fsharp/FSharp.Core/observable.fsi @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information. + +namespace Microsoft.FSharp.Control + + open System + open Microsoft.FSharp.Core + + [] + [] + /// Basic operations on first class event and other observable objects. + module Observable = + + /// Returns an observable for the merged observations from the sources. + /// The returned object propagates success and error values arising + /// from either source and completes when both the sources have completed. + /// + /// For each observer, the registered intermediate observing object is not + /// thread safe. That is, observations arising from the sources must not + /// be triggered concurrently on different threads. + /// The first Observable. + /// The second Observable. + /// An Observable that propagates information from both sources. + [] + val merge: source1:IObservable<'T> -> source2:IObservable<'T> -> IObservable<'T> + + /// Returns an observable which transforms the observations of the source by the + /// given function. The transformation function is executed once for each + /// subscribed observer. The returned object also propagates error observations + /// arising from the source and completes when the source completes. + /// The function applied to observations from the source. + /// The input Observable. + /// An Observable of the type specified by mapping. + [] + val map: mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U> + + /// Returns an observable which filters the observations of the source + /// by the given function. The observable will see only those observations + /// for which the predicate returns true. The predicate is executed once for + /// each subscribed observer. The returned object also propagates error + /// observations arising from the source and completes when the source completes. + /// The function to apply to observations to determine if it should + /// be kept. + /// The input Observable. + /// An Observable that filters observations based on filter. + [] + val filter: predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T> + + /// Returns two observables which partition the observations of the source by + /// the given function. The first will trigger observations for those values + /// for which the predicate returns true. The second will trigger observations + /// for those values where the predicate returns false. The predicate is + /// executed once for each subscribed observer. Both also propagate all error + /// observations arising from the source and each completes when the source + /// completes. + /// The function to determine which output Observable will trigger + /// a particular observation. + /// The input Observable. + /// A tuple of Observables. The first triggers when the predicate returns true, and + /// the second triggers when the predicate returns false. + [] + val partition: predicate:('T -> bool) -> source:IObservable<'T> -> (IObservable<'T> * IObservable<'T>) + + /// Returns two observables which split the observations of the source by the + /// given function. The first will trigger observations x for which the + /// splitter returns Choice1Of2 x. The second will trigger observations + /// y for which the splitter returns Choice2Of2 y The splitter is + /// executed once for each subscribed observer. Both also propagate error + /// observations arising from the source and each completes when the source + /// completes. + /// The function that takes an observation an transforms + /// it into one of the two output Choice types. + /// The input Observable. + /// A tuple of Observables. The first triggers when splitter returns Choice1of2 + /// and the second triggers when splitter returns Choice2of2. + [] + val split: splitter:('T -> Choice<'U1,'U2>) -> source:IObservable<'T> -> (IObservable<'U1> * IObservable<'U2>) + + /// Returns an observable which chooses a projection of observations from the source + /// using the given function. The returned object will trigger observations x + /// for which the splitter returns Some x. The returned object also propagates + /// all errors arising from the source and completes when the source completes. + /// The function that returns Some for observations to be propagated + /// and None for observations to ignore. + /// The input Observable. + /// An Observable that only propagates some of the observations from the source. + [] + val choose: chooser:('T -> 'U option) -> source:IObservable<'T> -> IObservable<'U> + + /// Returns an observable which, for each observer, allocates an item of state + /// and applies the given accumulating function to successive values arising from + /// the input. The returned object will trigger observations for each computed + /// state value, excluding the initial value. The returned object propagates + /// all errors arising from the source and completes when the source completes. + /// + /// For each observer, the registered intermediate observing object is not thread safe. + /// That is, observations arising from the source must not be triggered concurrently + /// on different threads. + /// The function to update the state with each observation. + /// The initial state. + /// The input Observable. + /// An Observable that triggers on the updated state values. + [] + val scan: collector:('U -> 'T -> 'U) -> state:'U -> source:IObservable<'T> -> IObservable<'U> + + /// Creates an observer which permanently subscribes to the given observable and which calls + /// the given function for each observation. + /// The function to be called on each observation. + /// The input Observable. + [] + val add : callback:('T -> unit) -> source:IObservable<'T> -> unit + + /// Creates an observer which subscribes to the given observable and which calls + /// the given function for each observation. + /// The function to be called on each observation. + /// The input Observable. + /// An object that will remove the callback if disposed. + [] + val subscribe : callback:('T -> unit) -> source:IObservable<'T> -> System.IDisposable + + /// Returns a new observable that triggers on the second and subsequent triggerings of the input observable. + /// The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as + /// a pair. The argument passed to the N-1th triggering is held in hidden internal state until the + /// Nth triggering occurs. + /// + /// For each observer, the registered intermediate observing object is not thread safe. + /// That is, observations arising from the source must not be triggered concurrently + /// on different threads. + /// The input Observable. + /// An Observable that triggers on successive pairs of observations from the input Observable. + [] + val pairwise: source:IObservable<'T> -> IObservable<'T * 'T>