Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 128 additions & 60 deletions src/Streamly/Data/Stream/MkType.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,146 +16,214 @@
-- >>> :set -XTemplateHaskell
-- >>> :set -XTypeFamilies
-- >>> :set -XUndecidableInstances
-- >>> :set -XFlexibleContexts
-- >>> :set -XFlexibleInstances
-- >>> :set -XGeneralizedNewtypeDeriving
-- >>> :set -XTypeOperators
--
-- Import this module unqualified to bring everything needed in scope without
-- having to import several other modules. Also, "Streamly.Data.Stream" or
-- "Streamly.Data.Stream.Prelude" must be imported @as Stream@.
--
-- >>> import Streamly.Data.Stream.MkType
-- >>> import qualified Streamly.Data.Stream.Prelude as Stream
-- >>> import Streamly.Data.Stream.Prelude (Stream, MonadAsync)
--
-- We are describing below many useful types that can be created using macros
-- in this module and the behavior of those types. These could be useful if you
-- like to program using the monad \"do notation\" instead of using concatMap
-- like operations.
-- like operations explicitly.
--
-- == Parallel
--
-- A newtype wrapper over the 'Stream' type; the Applicative and Monad
-- instances generate a cross product of the two streams in a concurrent
-- manner. The order in which the stream elements are produced is not
-- deterministic, this is supposed to be used if order does not matter.

-- Loops over the outer stream, generating multiple elements concurrently; for
-- each outer stream element, loop over the inner stream concurrently. More
-- outer iterations are started only if the existing inner iterations are not
-- saturating the resources.
--
-- Use 'mkParallel' to construct from 'Stream' type and 'unParallel' to
-- deconstruct back to 'Stream'.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- bind = flip (Stream.parConcatMap id)
-- $(mkCrossType "Parallel" "bind" True)
-- :}
--
-- This is a bounded concurrent, unordered list-transformer (ListT) monad.
-- A newtype wrapper over 'Stream' that provides a
-- __bounded concurrent ListT-like monad__, running computations
-- __concurrently__. Prefer this type when you want high throughput and do not
-- require ordering guarantees.
--
-- Combining streams with 'Applicative' or 'Monad' does not process them
-- element-by-element in sequence. Instead, multiple elements are produced
-- and consumed __at the same time__.
--
-- For example, in 'Applicative', both sides are evaluated concurrently:
--
-- > pure (,) <*> a <*> b
--
-- elements from @a@ and @b@ may be generated in parallel.
--
-- In 'Monad', each element of the outer stream can start its own
-- concurrent computation:
--
-- > do
-- > x <- a
-- > y <- f x
--
-- multiple @x@ values may be generated at once, and for each @x@,
-- multiple @y@ values may be produced concurrently.
--
-- A useful way to think about this is nested loops:
--
-- * Many iterations of the outer loop can run at the same time
-- * For each outer iteration, many inner iterations can also run at the same
-- time
--
-- Conceptually, this behaves like a concurrent list transformer.
--
-- Results are emitted in completion order (first-come-first-served),
-- not in the original stream order:
--
-- * Values from the outer stream (@x@) may be produced out of order
-- * Values from the inner stream (@y@) may also be produced out of order
--
-- For deterministic ordering, see 'OrderedParallel'.
--
-- Concurrency is __bounded__: new outer iterations are scheduled only when
-- existing inner computations are not saturating available resources.
--
-- === Construction
--
-- * Use 'mkParallel' to wrap a 'Stream' as 'Parallel'
-- * Use 'unParallel' to convert back to 'Stream'
--
-- === Laws and Caveats
--
-- /Non-associative Monad/
--
-- Due to concurrent, completion-order scheduling:
--
-- * '>>=' is __not associative__
-- * The order of effects is __not deterministic__
-- * The order of results is __not deterministic__
--
-- This means:
--
-- > (m >>= f) >>= g ≠ m >>= (\x -> f x >>= g)
--
-- WARNING! By design, monad bind of this type is not associative, because of
-- concurrency, order of effects as well as results is non-deterministic.
-- Programs relying on ordering or sequencing of effects may behave
-- differently under reassociation.
--
-- Serves the same purpose as the 'Streamly.Prelude.AsyncT' type in older
-- This type serves a similar role as 'Streamly.Prelude.AsyncT' in older
-- releases.
--
-- == FairParallel
--
-- Like Parallel but strikes a balance between going deeper into existing
-- iterations of the loop and starting new outer loop iterations.
--
-- Use 'mkFairParallel' to construct from 'Stream' type and 'unFairParallel' to
-- deconstruct back to 'Stream'.
-- A __bounded concurrent, fair LogicT-like (logic programming) monad__.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- bind = flip (Stream.parConcatMap (Stream.interleaved True))
-- $(mkCrossType "FairParallel" "bind" True)
-- :}
--
-- This is a bounded concurrent, fair logic programming (LogicT) monad.
-- Like 'Parallel', but uses __fair scheduling__: instead of prioritizing
-- existing outer iterations by running their inner computations further,
-- it interleaves them with starting new outer iterations. This avoids
-- starvation and ensures more outer branches make progress, even if some
-- inner computations are infinite.
--
-- WARNING! By design, monad bind of this type is not associative, because of
-- concurrency, order of effects as well as results may be unpredictable.
-- Results are still emitted in completion order (first-come-first-served), and
-- may be out of order.
--
-- Serves the same purpose as the 'Streamly.Prelude.WAsyncT' type in older
-- releases.
-- Use 'mkFairParallel' to construct from 'Stream' and 'unFairParallel' to
-- convert back to 'Stream'.
--
-- == EagerParallel
-- WARNING! '>>=' is __not associative__ due to concurrent,
-- completion-order scheduling; effects and results may be observed in
-- different orders.
--
-- Like Parallel, but executes as many actions concurrently as possible. This
-- is useful if you want all actions to be scheduled at the same time so that
-- something does not get starved due to others.
-- Serves a similar role as 'Streamly.Prelude.WAsyncT' in older releases.
--
-- Use 'mkEagerParallel' to construct from 'Stream' type and 'unEagerParallel'
-- to deconstruct back to 'Stream'.
-- == EagerParallel
--
-- An __unbounded concurrent, unordered ListT-like (list transformer) monad__.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- parBind = flip (Stream.parConcatMap (Stream.eager True))
-- $(mkCrossType "EagerParallel" "parBind" True)
-- bind = flip (Stream.parConcatMap (Stream.eager True))
-- $(mkCrossType "EagerParallel" "bind" True)
-- :}
--
-- This is an unbounded concurrent, unordered list transformer (ListT) monad.
-- Like 'Parallel', but uses __eager scheduling__: it starts all possible
-- concurrent actions immediately, without waiting for existing ones to make
-- progress.
--
-- WARNING! By design, monad bind of this type is not associative, because of
-- concurrency order of effects as well as results may be unpredictable.
-- This ensures that all computations begin execution (e.g. timers or other
-- effects start promptly), and avoids starvation due to delayed scheduling.
-- It is primarily about semantics rather than performance, and may reduce
-- throughput due to excessive concurrency.
--
-- Serves the same purpose as the 'Streamly.Prelude.ParallelT' type in older
-- releases.
-- Results are emitted in completion order (first-come-first-served), and may
-- be out of order.
--
-- == OrderedParallel
-- Use 'mkEagerParallel' to construct from 'Stream' and 'unEagerParallel' to
-- convert back to 'Stream'.
--
-- WARNING! '>>=' is __not associative__ due to concurrent,
-- completion-order scheduling; effects and results may be observed in
-- different orders.
--
-- Serves a similar role as 'Streamly.Prelude.ParallelT' in older releases.
--
-- Like Parallel, runs many iterations concurrently, but stages the results
-- such that the results of iterations are presented in the same order as
-- specified in the code. This is closest to the serial Nested type in behavior
-- among all the concurrent types.
-- == OrderedParallel
--
-- Use 'mkOrderedParallel' to construct from 'Stream' type and
-- 'unOrderedParallel' to deconstruct back to 'Stream'.
-- A __bounded concurrent, ordered ListT-like monad__.
--
-- >>> :{
-- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b
-- bind = flip (Stream.parConcatMap (Stream.ordered True))
-- $(mkCrossType "OrderedParallel" "bind" True)
-- :}
--
-- This is a bounded concurrent, ordered list transformer (ListT) monad.
-- Like 'Parallel', runs many iterations concurrently, but
-- __preserves the original stream order__ by staging results. Results are
-- yielded in the same order as specified in the code.
--
-- WARNING! Monad bind of this type is associative for values, but because of
-- concurrency, order of effects may be unpredictable.
-- This is closest in behavior to the serial list transformer monad (ListT)
-- type among the concurrent types.
--
-- Serves the same purpose as the 'Streamly.Prelude.AheadT' type in older
-- releases.
-- Use 'mkOrderedParallel' to construct from 'Stream' and
-- 'unOrderedParallel' to convert back to 'Stream'.
--
-- WARNING! '>>=' is associative for values, but due to concurrency,
-- the order of effects may be unpredictable.
--
-- Serves a similar role as 'Streamly.Prelude.AheadT' in older releases.
--
-- == Zip
--
-- A newtype wrapper over the 'Stream' type, the applicative instance zips two
-- streams.
--
-- Use 'mkZip' to construct from 'Stream' type and 'unZip' to deconstruct back
-- to 'Stream'.
--
-- >>> :{
-- zipApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b
-- zipApply = Stream.zipWith ($)
-- $(mkZipType "Zip" "zipApply" False)
-- :}
--
-- Same as the deprcated 'Streamly.Prelude.ZipSerialM' type.
-- Use 'mkZip' to construct from 'Stream' type and 'unZip' to deconstruct back
-- to 'Stream'.
--
-- Same as the deprecated 'Streamly.Prelude.ZipSerialM'.
--
-- == ZipParallel
--
-- Like Zip but evaluates the streams being zipped concurrently.
--
-- Use 'mkZipParallel' to construct from 'Stream' type and 'unZipParallel' to
-- deconstruct back to 'Stream'.
--
-- >>> :{
-- parZipApply :: MonadAsync m => Stream m (a -> b) -> Stream m a -> Stream m b
-- parZipApply = Stream.parZipWith id id
-- $(mkZipType "ZipParallel" "parZipApply" True)
-- :}
--
-- Use 'mkZipParallel' to construct from 'Stream' type and 'unZipParallel' to
-- deconstruct back to 'Stream'.
--
-- Same as the deprecated 'Streamly.Prelude.ZipAsync' type.
--
-- == Avoiding Template Haskell
Expand Down
3 changes: 3 additions & 0 deletions src/Streamly/Internal/FileSystem/Event/Linux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ import qualified Streamly.Internal.FileSystem.DirIO as Dir
import qualified Streamly.Internal.Data.Parser as PR
(takeEQ, fromEffect, fromFold)

-- $setup
-- >>> :set -Wno-deprecations

-------------------------------------------------------------------------------
-- Subscription to events
-------------------------------------------------------------------------------
Expand Down
Loading