Skip to content

Commit bc3ced5

Browse files
andyrossmarc-hb
authored andcommitted
Simple dependency-free ALSA test rig for PCM capture analysis.
Just drop this script on a test device to run it. No tools to build, no dependencies to install. Confirmed to run on Python 3.8+ with nothing more than the core libraries and a working libasound.so.2 visible to the runtime linker. When run without arguments, the tool will record from the capture device for the specified duration, then emit the resulting samples back out the playback device without processing (except potentially to convert the sample format from s32_le to s16_le if needed, and to discard any channels beyond those supported by the playback device). Passing --chirp-test enables a playback-to-capture latency detector: the tool will emit a short ~6 kHz wave packet via ALSA's mmap interface (which allows measuring and correcting for the buffer latency from the userspace process) and simultaneously loop on short reads from the capture device looking for the moment it arrives. Passing --echo-test enables a capture-while-playback test. The script will play a specified .wav file ("noise.wav" by default) for the specified duration, while simultaneously capturing, and report the "power" (in essentially arbitrary units, but it's linear with actual signal energy assuming the sample space is itself linear) of the captured data to stdout at the end of the test. Signed-off-by: Andy Ross <andyross@google.com>
1 parent a4d3f24 commit bc3ced5

File tree

1 file changed

+397
-0
lines changed

1 file changed

+397
-0
lines changed

tools/capture-test.py

Lines changed: 397 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,397 @@
1+
#!/usr/bin/env python3
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
# Copyright 2024 Google LLC
4+
# Author: Andy Ross <andyross@google.com>
5+
import os
6+
import re
7+
import sys
8+
import time
9+
import struct
10+
import random
11+
import argparse
12+
import ctypes as C
13+
14+
HELP_TEXT="""
15+
Simple dependency-free ALSA test rig for PCM capture analysis.
16+
17+
Just drop this script on a test device to run it. No tools to build,
18+
no dependencies to install. Confirmed to run on Python 3.8+ with
19+
nothing more than the core libraries and a working libasound.so.2
20+
visible to the runtime linker.
21+
22+
When run without arguments, the tool will record from the capture
23+
device for the specified duration, then emit the resulting samples
24+
back out the playback device without processing (except potentially to
25+
convert the sample format from s32_le to s16_le if needed, and to
26+
discard any channels beyond those supported by the playback device).
27+
28+
Passing --chirp-test enables a playback-to-capture latency detector:
29+
the tool will emit a short ~6 kHz wave packet via ALSA's mmap
30+
interface (which allows measuring and correcting for the buffer
31+
latency from the userspace process) and simultaneously loop on short
32+
reads from the capture device looking for the moment it arrives.
33+
34+
Passing --echo-test enables a capture-while-playback test. The script
35+
will play a specified .wav file ("noise.wav" by default) for the
36+
specified duration, while simultaneously capturing, and report the
37+
"power" (in essentially arbitrary units, but it's linear with actual
38+
signal energy assuming the sample space is itself linear) of the
39+
captured data to stdout at the end of the test.
40+
41+
The tool supports a "--disable-rtnr" for the specific case of RTNR
42+
noise reduction, which has been observed to suppress the chirp test
43+
occasionally. It does not otherwise change the runtime configuration
44+
of the ALSA device and expects it to be configured by the user for the
45+
specific case under test.
46+
"""
47+
48+
def parse_opts():
49+
global opts
50+
ap = argparse.ArgumentParser(description=HELP_TEXT,
51+
formatter_class=argparse.RawDescriptionHelpFormatter)
52+
ap.add_argument("--disable-rtnr", action="store_true", help="Disable RTNR noise reduction")
53+
ap.add_argument("-c", "--card", type=int, default=0, help="ALSA card index")
54+
ap.add_argument("--pcm", type=int, default=16, help="Output ALSA PCM index")
55+
ap.add_argument("--cap", type=int, default=18, help="Capture ALSA PCM index")
56+
ap.add_argument("--rate", type=int, default=48000, help="Sample rate")
57+
ap.add_argument("--chan", type=int, default=2, help="Output channel count")
58+
ap.add_argument("--capchan", type=int,
59+
help="Capture channel count (if different from output)")
60+
ap.add_argument("--capbits", type=int, default=16, help="Capture sample bits (16 or 32)")
61+
ap.add_argument("--noise", default="noise.wav",
62+
help="WAV file containing 'noise' for capture")
63+
ap.add_argument("--duration", type=int, default=3, help="Capture duration (seconds)")
64+
ap.add_argument("--chirpcyc", type=int, default=120, help="Repetitions of chirp waveform")
65+
ap.add_argument("--chirp-test", action="store_true",
66+
help="Test latency with synthesized audio")
67+
ap.add_argument("--echo-test", action="store_true", help="Test simultaneous capture/playback")
68+
69+
opts = ap.parse_args()
70+
if not opts.capchan:
71+
opts.capchan = opts.chan
72+
opts.base_test = not (opts.chirp_test or opts.echo_test)
73+
74+
class ALSA:
75+
"""
76+
Tiny ctypes stub. Wraps the alsa API such that errno returns (at
77+
least ones that look like an errno) become OSErrors and don't need
78+
to be checked. Includes a generalized alloc() that wraps all the
79+
_sizeof() predicates and allocates from the (safe/collected) python
80+
heap. Provides a simple spot for putting (manually-derived)
81+
constants. The ALSA C API is mostly-structless and quite simple, so
82+
this tends to work well without a lot of ctypes use except for an
83+
occasional constructed integer or byref() pointer.
84+
"""
85+
PCM_STREAM_PLAYBACK = 0
86+
PCM_STREAM_CAPTURE = 1
87+
PCM_FORMAT_S16_LE = 2
88+
PCM_FORMAT_S32_LE = 10
89+
PCM_ACCESS_MMAP_INTERLEAVED = 0
90+
PCM_ACCESS_RW_INTERLEAVED = 3
91+
def __init__(self):
92+
self.lib = C.cdll.LoadLibrary("libasound.so.2")
93+
def __getattr__(self, name):
94+
fn = getattr(self.lib, name)
95+
if name.endswith("_name"): # These return strings!
96+
fn.restype = C.c_char_p
97+
return lambda *args: fn(*args).decode("utf-8")
98+
return lambda *args: ALSA.err_wrap(fn(*args))
99+
@staticmethod
100+
def err_wrap(ret):
101+
if -200 < ret < 0:
102+
raise OSError(os.strerror(-ret))
103+
return ret
104+
def alloc(self, typ):
105+
return (C.c_byte * getattr(self.lib, f"snd_{typ}_sizeof")())()
106+
class pcm_channel_area_t(C.Structure):
107+
_fields_ = [("addr", C.c_ulong), ("first", C.c_int), ("step", C.c_int)]
108+
109+
def pcm_init_stream(pcm, rate, chans, fmt, access):
110+
hwp = alsa.alloc("pcm_hw_params")
111+
alsa.snd_pcm_hw_params_any(pcm, hwp)
112+
alsa.snd_pcm_hw_params_set_format(pcm, hwp, fmt)
113+
alsa.snd_pcm_hw_params_set_channels(pcm, hwp, chans)
114+
alsa.snd_pcm_hw_params_set_rate(pcm, hwp, rate, alsa.PCM_STREAM_PLAYBACK)
115+
alsa.snd_pcm_hw_params_set_access(pcm, hwp, access)
116+
alsa.snd_pcm_hw_params(pcm, hwp)
117+
118+
def ctl_disable_rtnr():
119+
"""
120+
Noise reduction likes to squash our chirp on capture. Walk the list
121+
of controls, looking for an RTNR enable control, if one exists, and
122+
set it to false. Unbelievably cumbersome API to do this: call
123+
elem_list once on an empty struct to get the element count, then
124+
allocate, then call it again. Then for each element we can check
125+
the name directly, but need to allocate an "id" struct to query an
126+
abstract identifier, that we use with a separately-allocated "value"
127+
(on which we set the dyncmically typed data) to send the command to
128+
the kernel.
129+
"""
130+
dev = f"hw:{opts.card}".encode("ascii")
131+
ctl = C.c_ulong()
132+
alsa.snd_ctl_open(C.byref(ctl), dev, 0)
133+
elist = alsa.alloc("ctl_elem_list")
134+
alsa.snd_ctl_elem_list(ctl, elist)
135+
nelem = alsa.snd_ctl_elem_list_get_count(elist)
136+
alsa.snd_ctl_elem_list_alloc_space(elist, nelem)
137+
alsa.snd_ctl_elem_list(ctl, elist)
138+
for i in range(nelem):
139+
name = alsa.snd_ctl_elem_list_get_name(elist, i)
140+
if re.match(r'RTNR.*\s+rtnr_enable.*', name):
141+
print(f"Disabling control: {name}")
142+
eid = alsa.alloc("ctl_elem_id")
143+
val = alsa.alloc("ctl_elem_value")
144+
alsa.snd_ctl_elem_list_get_id(elist, i, C.byref(eid))
145+
alsa.snd_ctl_elem_value_set_id(val, eid)
146+
alsa.snd_ctl_elem_value_set_boolean(val, 0, False)
147+
alsa.snd_ctl_elem_write(ctl, val)
148+
alsa.snd_ctl_close(ctl)
149+
150+
def pcm_play_buf(data):
151+
data = bytearray(data)
152+
addr = C.addressof((C.c_byte * 1).from_buffer(data))
153+
off = 0
154+
n = int(len(data) / (2 * opts.chan))
155+
n = min(n, opts.rate * opts.duration)
156+
157+
pcm = C.c_long(0)
158+
dev = f"hw:{opts.card},{opts.pcm}".encode("ascii")
159+
alsa.snd_pcm_open(C.byref(pcm), dev, alsa.PCM_STREAM_PLAYBACK, 0)
160+
pcm_init_stream(pcm, opts.rate, opts.chan, alsa.PCM_FORMAT_S16_LE,
161+
alsa.PCM_ACCESS_RW_INTERLEAVED)
162+
while n > 0:
163+
f = alsa.snd_pcm_writei(pcm, C.c_ulong(addr + off), n)
164+
n -= f
165+
off += f
166+
alsa.snd_pcm_drain(pcm)
167+
alsa.snd_pcm_close(pcm)
168+
169+
def pcm_play_chirp():
170+
pcm = C.c_long(0)
171+
dev = f"hw:{opts.card},{opts.pcm}".encode("ascii")
172+
alsa.snd_pcm_open(C.byref(pcm), dev, alsa.PCM_STREAM_PLAYBACK, 0)
173+
pcm_init_stream(pcm, opts.rate, opts.chan, alsa.PCM_FORMAT_S16_LE,
174+
alsa.PCM_ACCESS_MMAP_INTERLEAVED)
175+
176+
(chirp, chirp_frames) = gen_chirp_s16le()
177+
178+
# Reset the stream and queue up as much data as will fit in the
179+
# ring buffer
180+
area = alsa.pcm_channel_area_t()
181+
offset = C.c_ulong()
182+
frames = C.c_ulong(opts.rate)
183+
ring_frames = 0
184+
alsa.snd_pcm_prepare(pcm)
185+
alsa.snd_pcm_reset(pcm)
186+
while True:
187+
alsa.snd_pcm_avail_update(pcm)
188+
alsa.snd_pcm_mmap_begin(pcm, C.byref(area), C.byref(offset), C.byref(frames))
189+
committed = alsa.snd_pcm_mmap_commit(pcm, offset, frames)
190+
ring_frames += committed
191+
if committed == 0:
192+
break
193+
194+
silence = bytes(2 * opts.chan * ring_frames)
195+
196+
# Start up the stream, spin until there is space in the buffer,
197+
# write the chirp. This minimizes client-side overhead like
198+
# stream startup. Then immediately take a timestamp and write
199+
# silence for one full cycle (to be 100% sure the buffer can't
200+
# wrap and chirp twice).
201+
alsa.snd_pcm_start(pcm)
202+
while alsa.snd_pcm_avail(pcm) < chirp_frames:
203+
pass
204+
pre_buffered = ring_frames - alsa.snd_pcm_avail(pcm)
205+
f = alsa.snd_pcm_mmap_writei(pcm, chirp, chirp_frames)
206+
chirp_sent = time.perf_counter()
207+
assert f == chirp_frames
208+
209+
n = 0
210+
while n < ring_frames:
211+
n += alsa.snd_pcm_mmap_writei(pcm, silence, ring_frames)
212+
alsa.snd_pcm_drain(pcm)
213+
alsa.snd_pcm_close(pcm)
214+
215+
# Correct chirp_sent for buffered data!
216+
chirp_sent += pre_buffered / opts.rate
217+
return chirp_sent
218+
219+
def pcm_do_capture(duration):
220+
"""
221+
Returns an array of tuples of (timestamp, bytes), no processing done
222+
here for performance reasons, just one heap allocation and copy.
223+
"""
224+
pcm = C.c_long(0)
225+
fmt = alsa.PCM_FORMAT_S32_LE if opts.capbits == 32 else alsa.PCM_FORMAT_S16_LE
226+
capsz = 4 if opts.capbits == 32 else 2
227+
dev = f"hw:{opts.card},{opts.cap}".encode("ascii")
228+
alsa.snd_pcm_open(C.byref(pcm), dev, alsa.PCM_STREAM_CAPTURE, 0)
229+
pcm_init_stream(pcm, opts.rate, opts.capchan, fmt, alsa.PCM_ACCESS_RW_INTERLEAVED)
230+
frames_remaining = duration * opts.rate
231+
buf_frames = int(opts.rate / 1000) # 1ms blocks
232+
fsz = opts.capchan * capsz
233+
buf = bytearray(fsz * buf_frames)
234+
addr = C.c_ulong(C.addressof((C.c_byte * 1).from_buffer(buf)))
235+
buflist = []
236+
buf_frames = C.c_ulong(buf_frames)
237+
while frames_remaining > 0:
238+
f = alsa.snd_pcm_readi(pcm, addr, buf_frames)
239+
t = time.perf_counter()
240+
frames_remaining -= f
241+
buflist.append((t, bytes(buf[0:f * fsz])))
242+
return buflist
243+
244+
def gen_chirp_s16le():
245+
"""
246+
A programmatically-detectable chirp/pop signal for testing latency.
247+
To minimize latency, we want the chirp to be low duration, high
248+
energy and high frequency. This repeats an 8-sample square wave (6
249+
kHz at 48k sample rate). Some devices can reproduce this well with
250+
as few as 8 repetitions (1.3ms), but on at least one mt8195 device
251+
it's unreliably audible unless repeated 128 times! It's not caused
252+
by software in the DSP, more like a codec/amp feature (possibly
253+
related to power management, if we don't play other audio
254+
immediately before, it's even less reliable).
255+
"""
256+
reps = 4
257+
chirp = b''
258+
for _ in range(opts.chirpcyc):
259+
n = opts.chan * reps
260+
vals = [-0x8000] * n + [0x7fff] * n
261+
chirp += struct.pack(f"{2*n}h", *vals)
262+
return (chirp, opts.chirpcyc * reps)
263+
264+
def cap_to_playback(buf):
265+
"""
266+
Converts a byte array containing capture frames (which can have
267+
different sample format and channel count) to the playback format
268+
(always s16_le). Also computes an "energy" value as the sum of
269+
absolute sample differences (in units of +/-1.0) over all result
270+
channels. Returns both as a tuple.
271+
"""
272+
capfmt = ('i' if opts.capbits == 32 else 'h') * opts.capchan
273+
capsz = opts.capchan * (4 if opts.capbits == 32 else 2)
274+
scale = 1 / (1 << (opts.capbits - 1))
275+
last_frame = []
276+
delta_sum = 0
277+
out_frames = []
278+
279+
# NOTE: should consider low-passing the energy computation by
280+
# averaging ~N recent samples. Otherwise high frequency noise can
281+
# dominate, which we don't really care about measuring (AEC can't
282+
# treat it, and it can plausibly create false positive chirp signals
283+
# loud enough).
284+
for i in range(0, len(buf), capsz):
285+
frame = [scale * x for x in struct.unpack(capfmt, buf[i:i+capsz])[0:opts.chan]]
286+
if last_frame:
287+
delta_sum += sum(abs(last_frame[x] - frame[x]) for x in range(opts.chan))
288+
last_frame = frame
289+
iframe = [int(min(0x7fff, max(-0x8000, (1 << 15) * e))) for e in frame]
290+
out_frames.append(struct.pack(f'{opts.chan}h', *iframe))
291+
return (b''.join(out_frames), delta_sum)
292+
293+
def chirp_child(wpipe):
294+
for rec in pcm_do_capture(opts.duration):
295+
t = rec[0]
296+
(buf, energy) = cap_to_playback(rec[1])
297+
frames = len(buf) / (2 * opts.chan)
298+
299+
# Normalize energy as "half-swing per sample" and check vs. a
300+
# threshold that will trigger if we get a 0.1 unit swing over
301+
# the 8-sample chirp waveform.
302+
#
303+
# NOTE: would be possible to do this analysis at the
304+
# individual sample layer for better time fidelity instead of
305+
# in 1ms chunks.
306+
energy = energy / (frames * opts.chan)
307+
if energy > (0.1/8):
308+
os.write(wpipe, f"{t}".encode("ascii"))
309+
return
310+
311+
def echo_child(wpipe):
312+
energy = 0
313+
for rec in pcm_do_capture(opts.duration):
314+
energy += cap_to_playback(rec[1])[1]
315+
316+
# Normalize energy to "half-swing per second" here, just to make
317+
# essentially arbitrary numbers prettier (e.g. a typical pop music
318+
# track results in ~few-hundred values for "energy")
319+
energy /= (opts.duration * opts.chan)
320+
os.write(wpipe, f"{energy:.3f}".encode("ascii"))
321+
322+
def chirp_test():
323+
"""
324+
Forks a child process to listen for the chirp and write back a
325+
time.perf_counter() value (which is an invariant clock across
326+
processes) through a pipe.
327+
"""
328+
(rfd, wfd) = os.pipe()
329+
pid = os.fork()
330+
if pid == 0:
331+
chirp_child(wfd)
332+
sys.exit(0)
333+
334+
# Randomly sleep for a bit to make aliasing bugs (e.g. noise being
335+
# detected as a chirp) visible as unreliable output.
336+
time.sleep(random.randint(1000, 2000)/1000)
337+
chirp_sent = pcm_play_chirp()
338+
339+
os.waitpid(pid, 0)
340+
msg = os.read(rfd, 9999).decode("ascii")
341+
chirp_detected = float(msg)
342+
343+
lat_ms = (chirp_detected - chirp_sent) * 1000
344+
print(f"Chirp latency: {lat_ms:.1f} ms")
345+
346+
def echo_test():
347+
"""
348+
Similar to chirp test, but plays a .wav file while the child
349+
captures, and reports average capture energy (useful for testing mic
350+
gain and echo cancellation performance)
351+
"""
352+
# Just slurps in the wav file and chops off the header, assuming
353+
# the user got the format and sampling rate correct.
354+
WAV_HDR_LEN = 44
355+
buf = open(opts.noise, "rb").read()[WAV_HDR_LEN:]
356+
357+
(rfd, wfd) = os.pipe()
358+
pid = os.fork()
359+
if pid == 0:
360+
echo_child(wfd)
361+
sys.exit(0)
362+
363+
pcm_play_buf(buf)
364+
365+
os.waitpid(pid, 0)
366+
msg = os.read(rfd, 9999).decode("ascii")
367+
print("Capture energy:", msg)
368+
369+
def base_test():
370+
"""
371+
Simplest test: Just capture opts.duration seconds worth of data,
372+
convert to playback format, and play it.
373+
"""
374+
bufs = []
375+
energy = 0
376+
for rec in pcm_do_capture(opts.duration):
377+
crec = cap_to_playback(rec[1])
378+
bufs.append(crec[0])
379+
energy += crec[1]
380+
pcm_play_buf(b''.join(bufs))
381+
print(f"Energy {energy}")
382+
383+
def main():
384+
parse_opts()
385+
if opts.disable_rtnr:
386+
ctl_disable_rtnr()
387+
if opts.base_test:
388+
base_test()
389+
if opts.chirp_test:
390+
chirp_test()
391+
if opts.echo_test:
392+
echo_test()
393+
394+
opts = None
395+
alsa = ALSA()
396+
if __name__ == "__main__":
397+
main()

0 commit comments

Comments
 (0)