A lock-free single-producer single-consumer (SPSC) channel for Julia.
Julia's built-in Channel type is excellent for general-purpose concurrent programming, but it allocates memory on every put! and take! operation due to its task scheduling and condition variable infrastructure. This is usually fine, but in real-time applications where consistent latency matters (such as SDR signal processing, audio processing, or high-frequency trading), these allocations can trigger garbage collection pauses at unpredictable times.
PipeChannel solves this by using a lock-free ring buffer with atomic operations. The trade-off is that it only supports exactly one producer thread and one consumer thread - but for many streaming pipelines, this is exactly the pattern you need.
For even higher throughput, PipeChannel supports batch operations that amortize the cost of atomic operations across multiple items:
With batch size 64, throughput increases by 15-20x compared to single-item operations, achieving over 400 million items/second.
using Pkg
Pkg.add("PipeChannels")The API matches Julia's Channel:
using PipeChannels
# Create a channel with capacity 16
ch = PipeChannel{Int}(16)
# Producer thread
producer = Threads.@spawn begin
for i in 1:100
put!(ch, i)
end
close(ch)
end
# Consumer thread
consumer = Threads.@spawn begin
for value in ch
println("Received: ", value)
end
end
wait(producer)
wait(consumer)For high-throughput scenarios, use batch put! and take! to transfer multiple items at once:
ch = PipeChannel{Int}(1024)
# Producer: write batches
producer = Threads.@spawn begin
data = collect(1:64)
for _ in 1:1000
put!(ch, data) # Blocks until all 64 items are written
end
close(ch)
end
# Consumer: read into pre-allocated buffer
consumer = Threads.@spawn begin
buffer = Vector{Int}(undef, 64)
while isopen(ch) || !isempty(ch)
try
take!(ch, buffer) # Blocks until buffer is filled
# process buffer...
catch e
e isa InvalidStateException && break
rethrow()
end
end
end
wait(producer)
wait(consumer)Just like Channel, you can bind a task to automatically close the channel and propagate errors:
ch = PipeChannel{Int}(16)
task = @async begin
for i in 1:10
put!(ch, i)
end
close(ch)
end
bind(ch, task)
# If task fails, the exception propagates to consumers
for x in ch
println(x)
endPipeChannel{T}(capacity::Integer)- Create a channel with the given buffer capacity
put!(ch, value)- Add an element (blocks if full)put!(ch, values::AbstractVector)- Add multiple elements (blocks until all written)take!(ch)- Remove and return an element (blocks if empty)take!(ch, n::Integer)- Remove and return exactlynelements as a new vectortake!(ch, buffer::AbstractVector)- Fillbufferwith elements (blocks until full)close(ch)- Close the channelbind(ch, task)- Bind a task for automatic close and error propagation
isopen(ch)- Check if channel is openisready(ch)- Check if data is availableisempty(ch)- Check if buffer is emptyisfull(ch)- Check if buffer is fulln_avail(ch)- Number of elements available to readeltype(ch)- Element type of the channel
for x in ch- Iterate until channel is closed and empty
Important constraints:
- Exactly ONE producer thread may call
put! - Exactly ONE consumer thread may call
take! - Multiple producers or consumers will cause data races
This is not a limitation for most streaming pipelines where you have a single data source feeding a single processor.
MIT License - see LICENSE for details.

