diff --git a/Project.toml b/Project.toml index 7bfa4da6..8ef5035d 100644 --- a/Project.toml +++ b/Project.toml @@ -8,6 +8,7 @@ ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197" BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66" ConstructionBase = "187b0558-2788-49d3-abe0-74a17ed4e7c9" InitialValues = "22cec73e-a1b8-11e9-2c92-598750a2cf9c" +MicroCollections = "128add7d-3638-4c79-886c-908ea0c25c34" Referenceables = "42d2dcc6-99eb-4e98-b66c-637b7d73030e" Setfield = "efcf1570-3423-57d1-acb7-fd33fddbac46" SplittablesBase = "171d559e-b47b-412a-8079-5efa626c420e" @@ -18,10 +19,11 @@ ArgCheck = "1.0, 2.0" BangBang = "0.3.17" ConstructionBase = "0.1.0, 1.0" InitialValues = "0.2" +MicroCollections = "0.1" Referenceables = "0.1" Setfield = "0.3, 0.4, 0.5, 0.6, 0.7" SplittablesBase = "0.1.8" -Transducers = "0.4.39" +Transducers = "0.4.52" julia = "1" [extras] diff --git a/benchmark/Manifest.toml b/benchmark/Manifest.toml index 32d49a72..7f2cab27 100644 --- a/benchmark/Manifest.toml +++ b/benchmark/Manifest.toml @@ -126,6 +126,12 @@ version = "0.5.5" deps = ["Base64"] uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" +[[MicroCollections]] +deps = ["BangBang", "Setfield"] +git-tree-sha1 = "e991b6a9d38091c4a0d7cd051fcb57c05f98ac03" +uuid = "128add7d-3638-4c79-886c-908ea0c25c34" +version = "0.1.0" + [[Mmap]] uuid = "a63ad114-7e13-5084-954f-fe012c677804" @@ -232,18 +238,18 @@ deps = ["Distributed", "InteractiveUtils", "Logging", "Random"] uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [[ThreadsX]] -deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "Referenceables", "Setfield", "SplittablesBase", "Transducers"] +deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "MicroCollections", "Referenceables", "Setfield", "SplittablesBase", "Transducers"] path = ".." uuid = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d" version = "0.1.3-DEV" [[Transducers]] -deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "Requires", "Setfield", "SplittablesBase", "Tables"] -git-tree-sha1 = "d9bfa17064d3ea0da8213aa315d55cb8c84b60db" +deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "MicroCollections", "Requires", "Setfield", "SplittablesBase", "Tables"] +git-tree-sha1 = "a2d46274b3a58d9117681d7effdf706022d44650" repo-rev = "master" repo-url = "https://github.com/JuliaFolds/Transducers.jl.git" uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999" -version = "0.4.41-DEV" +version = "0.4.52" [[UUIDs]] deps = ["Random", "SHA"] diff --git a/benchmark/bench_mapi.jl b/benchmark/bench_mapi.jl new file mode 100644 index 00000000..cc010296 --- /dev/null +++ b/benchmark/bench_mapi.jl @@ -0,0 +1,52 @@ +module BenchMapi + +using BenchmarkTools +using ThreadsX + +collatz(x) = + if iseven(x) + x ÷ 2 + else + 3x + 1 + end + +function collatz_stopping_time(x) + n = 0 + while true + x == 1 && return n + n += 1 + x = collatz(x) + end +end + +function consume(ns) + t0 = time_ns() + d = 0 + while d < ns + d = Int(time_ns() - t0) + end + return d +end + +constant(x) = _ -> x + +const SUITE = BenchmarkGroup() + +let s0 = SUITE["collatz"] = BenchmarkGroup() + s0["base"] = @benchmarkable map(collatz_stopping_time, 1:1000_000) + s0["tx"] = + @benchmarkable ThreadsX.mapi(collatz_stopping_time, 1:1000_000; basesize = 12500) +end + +let s0 = SUITE["consume-1ms"] = BenchmarkGroup() + s0["base"] = @benchmarkable map(consume ∘ constant(1_000_000), 1:20) + s0["tx"] = @benchmarkable ThreadsX.mapi(consume ∘ constant(1_000_000), 1:20) +end + +let s0 = SUITE["consume-100us"] = BenchmarkGroup() + s0["base"] = @benchmarkable map(consume ∘ constant(100_000), 1:20) + s0["tx"] = @benchmarkable ThreadsX.mapi(consume ∘ constant(100_000), 1:20) +end + +end # module +BenchMapi.SUITE diff --git a/docs/Manifest.toml b/docs/Manifest.toml index 5a226d59..e00222a2 100644 --- a/docs/Manifest.toml +++ b/docs/Manifest.toml @@ -132,6 +132,12 @@ version = "0.5.5" deps = ["Base64"] uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" +[[MicroCollections]] +deps = ["BangBang", "Setfield"] +git-tree-sha1 = "e991b6a9d38091c4a0d7cd051fcb57c05f98ac03" +uuid = "128add7d-3638-4c79-886c-908ea0c25c34" +version = "0.1.0" + [[Missings]] deps = ["DataAPI"] git-tree-sha1 = "de0a5ce9e5289f27df672ffabef4d1e5861247d5" @@ -266,18 +272,18 @@ deps = ["Distributed", "InteractiveUtils", "Logging", "Random"] uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [[ThreadsX]] -deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "Referenceables", "Setfield", "SplittablesBase", "Transducers"] +deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "MicroCollections", "Referenceables", "Setfield", "SplittablesBase", "Transducers"] path = ".." uuid = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d" version = "0.1.3-DEV" [[Transducers]] -deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "Requires", "Setfield", "SplittablesBase", "Tables"] -git-tree-sha1 = "d9bfa17064d3ea0da8213aa315d55cb8c84b60db" +deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "MicroCollections", "Requires", "Setfield", "SplittablesBase", "Tables"] +git-tree-sha1 = "a2d46274b3a58d9117681d7effdf706022d44650" repo-rev = "master" repo-url = "https://github.com/JuliaFolds/Transducers.jl.git" uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999" -version = "0.4.41-DEV" +version = "0.4.52" [[UUIDs]] deps = ["Random", "SHA"] diff --git a/docs/src/index.md b/docs/src/index.md index c19836b8..a8bce5b3 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -3,6 +3,8 @@ ```@docs ThreadsX ThreadsX.foreach +ThreadsX.map +ThreadsX.mapi ThreadsX.map! ThreadsX.sort! ThreadsX.sort diff --git a/src/ThreadsX.jl b/src/ThreadsX.jl index 1ea6a2d0..3ca438d7 100644 --- a/src/ThreadsX.jl +++ b/src/ThreadsX.jl @@ -2,6 +2,7 @@ baremodule ThreadsX function collect end function map end +function mapi end function mapreduce end function reduce end @@ -42,6 +43,7 @@ using Base: HasShape, IteratorSize, Ordering, add_sum, mapreduce_empty, mul_prod, reduce_empty using ConstructionBase: setproperties using InitialValues: asmonoid +using MicroCollections: EmptyVector using Referenceables: referenceable using Setfield: @set using Transducers: @@ -51,10 +53,12 @@ using Transducers: Init, Map, MapSplat, + NondeterministicThreading, OnInit, ReduceIf, Transducers, extract_transducer, + foldxl, opcompose, reduced, right, diff --git a/src/docs/map.md b/src/docs/map.md new file mode 100644 index 00000000..46516ad2 --- /dev/null +++ b/src/docs/map.md @@ -0,0 +1,4 @@ + ThreadsX.mapi(f, iterators...; basesize) + +Parallelized `map(f, iterators...)`. Input collections `iterators` +must support `SplittablesBase.halve` diff --git a/src/docs/mapi.md b/src/docs/mapi.md new file mode 100644 index 00000000..859db4c2 --- /dev/null +++ b/src/docs/mapi.md @@ -0,0 +1,21 @@ + ThreadsX.mapi(f, iterators...; basesize, ntasks) + +Parallelized `map(f, iterators...)` that works with purely sequential +`iterators`. + +Note that calls to `iterate` on `iterators` are *not* parallelized. +Only `f` may be called in parallel. See also +`Transducers.NondeterministicThreading` for more information. + +!!! note + Currently, the default `basesize` is 1. However, it may be + changed in the future (e.g. it may be automatically tuned at + run-time). + +# Keyword Arguments +- `basesize::Integer`: The number of input elements to be accumulated + in a buffer before sent to a task. +- `ntasks::Integer`: The number of tasks `@spawn`ed. The default + value is `Threads.nthreads()`. A number larger than + `Threads.nthreads()` may be useful if the inner reducing function + contains I/O and does not consume too much resource (e.g., memory). diff --git a/src/map.jl b/src/map.jl index 27b9b98a..0958d402 100644 --- a/src/map.jl +++ b/src/map.jl @@ -49,3 +49,16 @@ ThreadsX.collect(::Type{T}, itr; kwargs...) where {T} = reshape_as( ThreadsX.collect(itr; kwargs...) = reshape_as(tcollect(itr; basesize = default_basesize(itr), kwargs...), itr) + + +ThreadsX.mapi(f, itr; kwargs...) = reshape_as( + itr |> + NondeterministicThreading(; kwargs...) |> + Map(f) |> + Map(SingletonVector) |> + foldxl(append!!; init = EmptyVector()), + itr, +) + +ThreadsX.mapi(f, itr1, itrs...; kwargs...) = + ThreadsX.mapi(Base.splat(f), zip(itr1, itrs...); kwargs...) diff --git a/test/environments/main/Manifest.toml b/test/environments/main/Manifest.toml index 2272c941..03fa8a2b 100644 --- a/test/environments/main/Manifest.toml +++ b/test/environments/main/Manifest.toml @@ -138,6 +138,12 @@ version = "0.5.5" deps = ["Base64"] uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" +[[MicroCollections]] +deps = ["BangBang", "Setfield"] +git-tree-sha1 = "e991b6a9d38091c4a0d7cd051fcb57c05f98ac03" +uuid = "128add7d-3638-4c79-886c-908ea0c25c34" +version = "0.1.0" + [[Missings]] deps = ["DataAPI"] git-tree-sha1 = "de0a5ce9e5289f27df672ffabef4d1e5861247d5" @@ -272,18 +278,18 @@ deps = ["Distributed", "InteractiveUtils", "Logging", "Random"] uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [[ThreadsX]] -deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "Referenceables", "Setfield", "SplittablesBase", "Transducers"] +deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "MicroCollections", "Referenceables", "Setfield", "SplittablesBase", "Transducers"] path = "../../.." uuid = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d" version = "0.1.3-DEV" [[Transducers]] -deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "Requires", "Setfield", "SplittablesBase", "Tables"] -git-tree-sha1 = "d9bfa17064d3ea0da8213aa315d55cb8c84b60db" +deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "MicroCollections", "Requires", "Setfield", "SplittablesBase", "Tables"] +git-tree-sha1 = "a2d46274b3a58d9117681d7effdf706022d44650" repo-rev = "master" repo-url = "https://github.com/JuliaFolds/Transducers.jl.git" uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999" -version = "0.4.41-DEV" +version = "0.4.52" [[UUIDs]] deps = ["Random", "SHA"] diff --git a/test/test_with_base.jl b/test/test_with_base.jl index d135dbb5..a5a6ad06 100644 --- a/test/test_with_base.jl +++ b/test/test_with_base.jl @@ -8,6 +8,8 @@ args_and_kwargs(args...; kwargs...) = args, (; kwargs...) inc(x) = x + 1 +mapi(args...) = map(args...) + raw_testdata = """ collect(1:10) collect(Float64, 1:10) @@ -103,12 +105,17 @@ Set(x^2 for x in [1, -1, -3, 4, 3]) """ # An array of `(label, (f, args, kwargs))` -testdata = map(split(raw_testdata, "\n", keepempty = false)) do x +testdata = mapreduce(vcat, split(raw_testdata, "\n", keepempty = false)) do x @debug "Parsing: $x" f, rest = split(x, "(", limit = 2) ex = Meta.parse("DUMMY($rest") ex.args[1] = args_and_kwargs - @eval ($x, ($(Symbol(f)), $ex...)) + tests = Any[@eval ($x, ($(Symbol(f)), $ex...))] + if f == "map" + y = replace(x, r"^map" => "mapi") + push!(tests, @eval ($y, (mapi, $ex...))) + end + return tests end @testset "$label" for (label, (f, args, kwargs)) in testdata