diff --git a/src/Streamly/Data/Stream/MkType.hs b/src/Streamly/Data/Stream/MkType.hs index 97092cae0c..7f82f7e3e6 100644 --- a/src/Streamly/Data/Stream/MkType.hs +++ b/src/Streamly/Data/Stream/MkType.hs @@ -16,6 +16,10 @@ -- >>> :set -XTemplateHaskell -- >>> :set -XTypeFamilies -- >>> :set -XUndecidableInstances +-- >>> :set -XFlexibleContexts +-- >>> :set -XFlexibleInstances +-- >>> :set -XGeneralizedNewtypeDeriving +-- >>> :set -XTypeOperators -- -- Import this module unqualified to bring everything needed in scope without -- having to import several other modules. Also, "Streamly.Data.Stream" or @@ -23,48 +27,93 @@ -- -- >>> import Streamly.Data.Stream.MkType -- >>> import qualified Streamly.Data.Stream.Prelude as Stream +-- >>> import Streamly.Data.Stream.Prelude (Stream, MonadAsync) -- -- We are describing below many useful types that can be created using macros -- in this module and the behavior of those types. These could be useful if you -- like to program using the monad \"do notation\" instead of using concatMap --- like operations. +-- like operations explicitly. -- -- == Parallel -- --- A newtype wrapper over the 'Stream' type; the Applicative and Monad --- instances generate a cross product of the two streams in a concurrent --- manner. The order in which the stream elements are produced is not --- deterministic, this is supposed to be used if order does not matter. - --- Loops over the outer stream, generating multiple elements concurrently; for --- each outer stream element, loop over the inner stream concurrently. More --- outer iterations are started only if the existing inner iterations are not --- saturating the resources. --- --- Use 'mkParallel' to construct from 'Stream' type and 'unParallel' to --- deconstruct back to 'Stream'. --- -- >>> :{ -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b -- bind = flip (Stream.parConcatMap id) -- $(mkCrossType "Parallel" "bind" True) -- :} -- --- This is a bounded concurrent, unordered list-transformer (ListT) monad. +-- A newtype wrapper over 'Stream' that provides a +-- __bounded concurrent ListT-like monad__, running computations +-- __concurrently__. Prefer this type when you want high throughput and do not +-- require ordering guarantees. +-- +-- Combining streams with 'Applicative' or 'Monad' does not process them +-- element-by-element in sequence. Instead, multiple elements are produced +-- and consumed __at the same time__. +-- +-- For example, in 'Applicative', both sides are evaluated concurrently: +-- +-- > pure (,) <*> a <*> b +-- +-- elements from @a@ and @b@ may be generated in parallel. +-- +-- In 'Monad', each element of the outer stream can start its own +-- concurrent computation: +-- +-- > do +-- > x <- a +-- > y <- f x +-- +-- multiple @x@ values may be generated at once, and for each @x@, +-- multiple @y@ values may be produced concurrently. +-- +-- A useful way to think about this is nested loops: +-- +-- * Many iterations of the outer loop can run at the same time +-- * For each outer iteration, many inner iterations can also run at the same +-- time +-- +-- Conceptually, this behaves like a concurrent list transformer. +-- +-- Results are emitted in completion order (first-come-first-served), +-- not in the original stream order: +-- +-- * Values from the outer stream (@x@) may be produced out of order +-- * Values from the inner stream (@y@) may also be produced out of order +-- +-- For deterministic ordering, see 'OrderedParallel'. +-- +-- Concurrency is __bounded__: new outer iterations are scheduled only when +-- existing inner computations are not saturating available resources. +-- +-- === Construction +-- +-- * Use 'mkParallel' to wrap a 'Stream' as 'Parallel' +-- * Use 'unParallel' to convert back to 'Stream' +-- +-- === Laws and Caveats +-- +-- /Non-associative Monad/ +-- +-- Due to concurrent, completion-order scheduling: +-- +-- * '>>=' is __not associative__ +-- * The order of effects is __not deterministic__ +-- * The order of results is __not deterministic__ +-- +-- This means: +-- +-- > (m >>= f) >>= g ≠ m >>= (\x -> f x >>= g) -- --- WARNING! By design, monad bind of this type is not associative, because of --- concurrency, order of effects as well as results is non-deterministic. +-- Programs relying on ordering or sequencing of effects may behave +-- differently under reassociation. -- --- Serves the same purpose as the 'Streamly.Prelude.AsyncT' type in older +-- This type serves a similar role as 'Streamly.Prelude.AsyncT' in older -- releases. -- -- == FairParallel -- --- Like Parallel but strikes a balance between going deeper into existing --- iterations of the loop and starting new outer loop iterations. --- --- Use 'mkFairParallel' to construct from 'Stream' type and 'unFairParallel' to --- deconstruct back to 'Stream'. +-- A __bounded concurrent, fair LogicT-like (logic programming) monad__. -- -- >>> :{ -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b @@ -72,46 +121,58 @@ -- $(mkCrossType "FairParallel" "bind" True) -- :} -- --- This is a bounded concurrent, fair logic programming (LogicT) monad. +-- Like 'Parallel', but uses __fair scheduling__: instead of prioritizing +-- existing outer iterations by running their inner computations further, +-- it interleaves them with starting new outer iterations. This avoids +-- starvation and ensures more outer branches make progress, even if some +-- inner computations are infinite. -- --- WARNING! By design, monad bind of this type is not associative, because of --- concurrency, order of effects as well as results may be unpredictable. +-- Results are still emitted in completion order (first-come-first-served), and +-- may be out of order. -- --- Serves the same purpose as the 'Streamly.Prelude.WAsyncT' type in older --- releases. +-- Use 'mkFairParallel' to construct from 'Stream' and 'unFairParallel' to +-- convert back to 'Stream'. -- --- == EagerParallel +-- WARNING! '>>=' is __not associative__ due to concurrent, +-- completion-order scheduling; effects and results may be observed in +-- different orders. -- --- Like Parallel, but executes as many actions concurrently as possible. This --- is useful if you want all actions to be scheduled at the same time so that --- something does not get starved due to others. +-- Serves a similar role as 'Streamly.Prelude.WAsyncT' in older releases. -- --- Use 'mkEagerParallel' to construct from 'Stream' type and 'unEagerParallel' --- to deconstruct back to 'Stream'. +-- == EagerParallel +-- +-- An __unbounded concurrent, unordered ListT-like (list transformer) monad__. -- -- >>> :{ -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b --- parBind = flip (Stream.parConcatMap (Stream.eager True)) --- $(mkCrossType "EagerParallel" "parBind" True) +-- bind = flip (Stream.parConcatMap (Stream.eager True)) +-- $(mkCrossType "EagerParallel" "bind" True) -- :} -- --- This is an unbounded concurrent, unordered list transformer (ListT) monad. +-- Like 'Parallel', but uses __eager scheduling__: it starts all possible +-- concurrent actions immediately, without waiting for existing ones to make +-- progress. -- --- WARNING! By design, monad bind of this type is not associative, because of --- concurrency order of effects as well as results may be unpredictable. +-- This ensures that all computations begin execution (e.g. timers or other +-- effects start promptly), and avoids starvation due to delayed scheduling. +-- It is primarily about semantics rather than performance, and may reduce +-- throughput due to excessive concurrency. -- --- Serves the same purpose as the 'Streamly.Prelude.ParallelT' type in older --- releases. +-- Results are emitted in completion order (first-come-first-served), and may +-- be out of order. -- --- == OrderedParallel +-- Use 'mkEagerParallel' to construct from 'Stream' and 'unEagerParallel' to +-- convert back to 'Stream'. +-- +-- WARNING! '>>=' is __not associative__ due to concurrent, +-- completion-order scheduling; effects and results may be observed in +-- different orders. +-- +-- Serves a similar role as 'Streamly.Prelude.ParallelT' in older releases. -- --- Like Parallel, runs many iterations concurrently, but stages the results --- such that the results of iterations are presented in the same order as --- specified in the code. This is closest to the serial Nested type in behavior --- among all the concurrent types. +-- == OrderedParallel -- --- Use 'mkOrderedParallel' to construct from 'Stream' type and --- 'unOrderedParallel' to deconstruct back to 'Stream'. +-- A __bounded concurrent, ordered ListT-like monad__. -- -- >>> :{ -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b @@ -119,43 +180,50 @@ -- $(mkCrossType "OrderedParallel" "bind" True) -- :} -- --- This is a bounded concurrent, ordered list transformer (ListT) monad. +-- Like 'Parallel', runs many iterations concurrently, but +-- __preserves the original stream order__ by staging results. Results are +-- yielded in the same order as specified in the code. -- --- WARNING! Monad bind of this type is associative for values, but because of --- concurrency, order of effects may be unpredictable. +-- This is closest in behavior to the serial list transformer monad (ListT) +-- type among the concurrent types. -- --- Serves the same purpose as the 'Streamly.Prelude.AheadT' type in older --- releases. +-- Use 'mkOrderedParallel' to construct from 'Stream' and +-- 'unOrderedParallel' to convert back to 'Stream'. +-- +-- WARNING! '>>=' is associative for values, but due to concurrency, +-- the order of effects may be unpredictable. +-- +-- Serves a similar role as 'Streamly.Prelude.AheadT' in older releases. -- -- == Zip -- -- A newtype wrapper over the 'Stream' type, the applicative instance zips two -- streams. -- --- Use 'mkZip' to construct from 'Stream' type and 'unZip' to deconstruct back --- to 'Stream'. --- -- >>> :{ -- zipApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b -- zipApply = Stream.zipWith ($) -- $(mkZipType "Zip" "zipApply" False) -- :} -- --- Same as the deprcated 'Streamly.Prelude.ZipSerialM' type. +-- Use 'mkZip' to construct from 'Stream' type and 'unZip' to deconstruct back +-- to 'Stream'. +-- +-- Same as the deprecated 'Streamly.Prelude.ZipSerialM'. -- -- == ZipParallel -- -- Like Zip but evaluates the streams being zipped concurrently. -- --- Use 'mkZipParallel' to construct from 'Stream' type and 'unZipParallel' to --- deconstruct back to 'Stream'. --- -- >>> :{ -- parZipApply :: MonadAsync m => Stream m (a -> b) -> Stream m a -> Stream m b -- parZipApply = Stream.parZipWith id id -- $(mkZipType "ZipParallel" "parZipApply" True) -- :} -- +-- Use 'mkZipParallel' to construct from 'Stream' type and 'unZipParallel' to +-- deconstruct back to 'Stream'. +-- -- Same as the deprecated 'Streamly.Prelude.ZipAsync' type. -- -- == Avoiding Template Haskell diff --git a/src/Streamly/Internal/FileSystem/Event/Linux.hs b/src/Streamly/Internal/FileSystem/Event/Linux.hs index b77dee7fbe..3d5a537071 100644 --- a/src/Streamly/Internal/FileSystem/Event/Linux.hs +++ b/src/Streamly/Internal/FileSystem/Event/Linux.hs @@ -199,6 +199,9 @@ import qualified Streamly.Internal.FileSystem.DirIO as Dir import qualified Streamly.Internal.Data.Parser as PR (takeEQ, fromEffect, fromFold) +-- $setup +-- >>> :set -Wno-deprecations + ------------------------------------------------------------------------------- -- Subscription to events -------------------------------------------------------------------------------