Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197"
BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66"
ConstructionBase = "187b0558-2788-49d3-abe0-74a17ed4e7c9"
InitialValues = "22cec73e-a1b8-11e9-2c92-598750a2cf9c"
MicroCollections = "128add7d-3638-4c79-886c-908ea0c25c34"
Referenceables = "42d2dcc6-99eb-4e98-b66c-637b7d73030e"
Setfield = "efcf1570-3423-57d1-acb7-fd33fddbac46"
SplittablesBase = "171d559e-b47b-412a-8079-5efa626c420e"
Expand All @@ -18,10 +19,11 @@ ArgCheck = "1.0, 2.0"
BangBang = "0.3.17"
ConstructionBase = "0.1.0, 1.0"
InitialValues = "0.2"
MicroCollections = "0.1"
Referenceables = "0.1"
Setfield = "0.3, 0.4, 0.5, 0.6, 0.7"
SplittablesBase = "0.1.8"
Transducers = "0.4.39"
Transducers = "0.4.52"
julia = "1"

[extras]
Expand Down
14 changes: 10 additions & 4 deletions benchmark/Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ version = "0.5.5"
deps = ["Base64"]
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"

[[MicroCollections]]
deps = ["BangBang", "Setfield"]
git-tree-sha1 = "e991b6a9d38091c4a0d7cd051fcb57c05f98ac03"
uuid = "128add7d-3638-4c79-886c-908ea0c25c34"
version = "0.1.0"

[[Mmap]]
uuid = "a63ad114-7e13-5084-954f-fe012c677804"

Expand Down Expand Up @@ -232,18 +238,18 @@ deps = ["Distributed", "InteractiveUtils", "Logging", "Random"]
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[[ThreadsX]]
deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "Referenceables", "Setfield", "SplittablesBase", "Transducers"]
deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "MicroCollections", "Referenceables", "Setfield", "SplittablesBase", "Transducers"]
path = ".."
uuid = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d"
version = "0.1.3-DEV"

[[Transducers]]
deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "Requires", "Setfield", "SplittablesBase", "Tables"]
git-tree-sha1 = "d9bfa17064d3ea0da8213aa315d55cb8c84b60db"
deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "MicroCollections", "Requires", "Setfield", "SplittablesBase", "Tables"]
git-tree-sha1 = "a2d46274b3a58d9117681d7effdf706022d44650"
repo-rev = "master"
repo-url = "https://github.com/JuliaFolds/Transducers.jl.git"
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
version = "0.4.41-DEV"
version = "0.4.52"

[[UUIDs]]
deps = ["Random", "SHA"]
Expand Down
52 changes: 52 additions & 0 deletions benchmark/bench_mapi.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
module BenchMapi

using BenchmarkTools
using ThreadsX

collatz(x) =
if iseven(x)
x ÷ 2
else
3x + 1
end

function collatz_stopping_time(x)
n = 0
while true
x == 1 && return n
n += 1
x = collatz(x)
end
end

function consume(ns)
t0 = time_ns()
d = 0
while d < ns
d = Int(time_ns() - t0)
end
return d
end

constant(x) = _ -> x

const SUITE = BenchmarkGroup()

let s0 = SUITE["collatz"] = BenchmarkGroup()
s0["base"] = @benchmarkable map(collatz_stopping_time, 1:1000_000)
s0["tx"] =
@benchmarkable ThreadsX.mapi(collatz_stopping_time, 1:1000_000; basesize = 12500)
end

let s0 = SUITE["consume-1ms"] = BenchmarkGroup()
s0["base"] = @benchmarkable map(consume ∘ constant(1_000_000), 1:20)
s0["tx"] = @benchmarkable ThreadsX.mapi(consume ∘ constant(1_000_000), 1:20)
end

let s0 = SUITE["consume-100us"] = BenchmarkGroup()
s0["base"] = @benchmarkable map(consume ∘ constant(100_000), 1:20)
s0["tx"] = @benchmarkable ThreadsX.mapi(consume ∘ constant(100_000), 1:20)
end

end # module
BenchMapi.SUITE
14 changes: 10 additions & 4 deletions docs/Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ version = "0.5.5"
deps = ["Base64"]
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"

[[MicroCollections]]
deps = ["BangBang", "Setfield"]
git-tree-sha1 = "e991b6a9d38091c4a0d7cd051fcb57c05f98ac03"
uuid = "128add7d-3638-4c79-886c-908ea0c25c34"
version = "0.1.0"

[[Missings]]
deps = ["DataAPI"]
git-tree-sha1 = "de0a5ce9e5289f27df672ffabef4d1e5861247d5"
Expand Down Expand Up @@ -266,18 +272,18 @@ deps = ["Distributed", "InteractiveUtils", "Logging", "Random"]
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[[ThreadsX]]
deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "Referenceables", "Setfield", "SplittablesBase", "Transducers"]
deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "MicroCollections", "Referenceables", "Setfield", "SplittablesBase", "Transducers"]
path = ".."
uuid = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d"
version = "0.1.3-DEV"

[[Transducers]]
deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "Requires", "Setfield", "SplittablesBase", "Tables"]
git-tree-sha1 = "d9bfa17064d3ea0da8213aa315d55cb8c84b60db"
deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "MicroCollections", "Requires", "Setfield", "SplittablesBase", "Tables"]
git-tree-sha1 = "a2d46274b3a58d9117681d7effdf706022d44650"
repo-rev = "master"
repo-url = "https://github.com/JuliaFolds/Transducers.jl.git"
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
version = "0.4.41-DEV"
version = "0.4.52"

[[UUIDs]]
deps = ["Random", "SHA"]
Expand Down
2 changes: 2 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
```@docs
ThreadsX
ThreadsX.foreach
ThreadsX.map
ThreadsX.mapi
ThreadsX.map!
ThreadsX.sort!
ThreadsX.sort
Expand Down
4 changes: 4 additions & 0 deletions src/ThreadsX.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ baremodule ThreadsX

function collect end
function map end
function mapi end

function mapreduce end
function reduce end
Expand Down Expand Up @@ -42,6 +43,7 @@ using Base:
HasShape, IteratorSize, Ordering, add_sum, mapreduce_empty, mul_prod, reduce_empty
using ConstructionBase: setproperties
using InitialValues: asmonoid
using MicroCollections: EmptyVector
using Referenceables: referenceable
using Setfield: @set
using Transducers:
Expand All @@ -51,10 +53,12 @@ using Transducers:
Init,
Map,
MapSplat,
NondeterministicThreading,
OnInit,
ReduceIf,
Transducers,
extract_transducer,
foldxl,
opcompose,
reduced,
right,
Expand Down
4 changes: 4 additions & 0 deletions src/docs/map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ThreadsX.mapi(f, iterators...; basesize)

Parallelized `map(f, iterators...)`. Input collections `iterators`
must support `SplittablesBase.halve`
21 changes: 21 additions & 0 deletions src/docs/mapi.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
ThreadsX.mapi(f, iterators...; basesize, ntasks)

Parallelized `map(f, iterators...)` that works with purely sequential
`iterators`.

Note that calls to `iterate` on `iterators` are *not* parallelized.
Only `f` may be called in parallel. See also
`Transducers.NondeterministicThreading` for more information.

!!! note
Currently, the default `basesize` is 1. However, it may be
changed in the future (e.g. it may be automatically tuned at
run-time).

# Keyword Arguments
- `basesize::Integer`: The number of input elements to be accumulated
in a buffer before sent to a task.
- `ntasks::Integer`: The number of tasks `@spawn`ed. The default
value is `Threads.nthreads()`. A number larger than
`Threads.nthreads()` may be useful if the inner reducing function
contains I/O and does not consume too much resource (e.g., memory).
13 changes: 13 additions & 0 deletions src/map.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,16 @@ ThreadsX.collect(::Type{T}, itr; kwargs...) where {T} = reshape_as(

ThreadsX.collect(itr; kwargs...) =
reshape_as(tcollect(itr; basesize = default_basesize(itr), kwargs...), itr)


ThreadsX.mapi(f, itr; kwargs...) = reshape_as(
itr |>
NondeterministicThreading(; kwargs...) |>
Map(f) |>
Map(SingletonVector) |>
foldxl(append!!; init = EmptyVector()),
itr,
)

ThreadsX.mapi(f, itr1, itrs...; kwargs...) =
ThreadsX.mapi(Base.splat(f), zip(itr1, itrs...); kwargs...)
14 changes: 10 additions & 4 deletions test/environments/main/Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ version = "0.5.5"
deps = ["Base64"]
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"

[[MicroCollections]]
deps = ["BangBang", "Setfield"]
git-tree-sha1 = "e991b6a9d38091c4a0d7cd051fcb57c05f98ac03"
uuid = "128add7d-3638-4c79-886c-908ea0c25c34"
version = "0.1.0"

[[Missings]]
deps = ["DataAPI"]
git-tree-sha1 = "de0a5ce9e5289f27df672ffabef4d1e5861247d5"
Expand Down Expand Up @@ -272,18 +278,18 @@ deps = ["Distributed", "InteractiveUtils", "Logging", "Random"]
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[[ThreadsX]]
deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "Referenceables", "Setfield", "SplittablesBase", "Transducers"]
deps = ["ArgCheck", "BangBang", "ConstructionBase", "InitialValues", "MicroCollections", "Referenceables", "Setfield", "SplittablesBase", "Transducers"]
path = "../../.."
uuid = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d"
version = "0.1.3-DEV"

[[Transducers]]
deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "Requires", "Setfield", "SplittablesBase", "Tables"]
git-tree-sha1 = "d9bfa17064d3ea0da8213aa315d55cb8c84b60db"
deps = ["ArgCheck", "BangBang", "CompositionsBase", "DefineSingletons", "Distributed", "InitialValues", "Logging", "Markdown", "MicroCollections", "Requires", "Setfield", "SplittablesBase", "Tables"]
git-tree-sha1 = "a2d46274b3a58d9117681d7effdf706022d44650"
repo-rev = "master"
repo-url = "https://github.com/JuliaFolds/Transducers.jl.git"
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
version = "0.4.41-DEV"
version = "0.4.52"

[[UUIDs]]
deps = ["Random", "SHA"]
Expand Down
11 changes: 9 additions & 2 deletions test/test_with_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ args_and_kwargs(args...; kwargs...) = args, (; kwargs...)

inc(x) = x + 1

mapi(args...) = map(args...)

raw_testdata = """
collect(1:10)
collect(Float64, 1:10)
Expand Down Expand Up @@ -103,12 +105,17 @@ Set(x^2 for x in [1, -1, -3, 4, 3])
"""

# An array of `(label, (f, args, kwargs))`
testdata = map(split(raw_testdata, "\n", keepempty = false)) do x
testdata = mapreduce(vcat, split(raw_testdata, "\n", keepempty = false)) do x
@debug "Parsing: $x"
f, rest = split(x, "(", limit = 2)
ex = Meta.parse("DUMMY($rest")
ex.args[1] = args_and_kwargs
@eval ($x, ($(Symbol(f)), $ex...))
tests = Any[@eval ($x, ($(Symbol(f)), $ex...))]
if f == "map"
y = replace(x, r"^map" => "mapi")
push!(tests, @eval ($y, (mapi, $ex...)))
end
return tests
end

@testset "$label" for (label, (f, args, kwargs)) in testdata
Expand Down