Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ Current
Changelog
~~~~~~~~~

- Add ``chunk_duration`` parameter to :func:`mne.events_from_annotations` to allow multiple events from a single annotation by `Joan Massich`_

- :func:`mne.io.read_raw_edf` now detects analog stim channels labeled ``'STATUS'`` and sets them as stim channel. :func:`mne.io.read_raw_edf` no longer synthesize TAL annotations into stim channel but stores them in ``raw.annotations`` when reading by `Joan Massich`_

- Add ``drop_refs=True`` parameter to :func:`set_bipolar_reference` to drop/keep anode and cathode channels after applying the reference by `Cristóbal Moënne-Loccoz`_.
- Add ``drop_refs=True`` parameter to :func:`set_bipolar_reference` to drop/keep anode and cathode channels after applying the reference by `Cristóbal Moënne-Loccoz`_.

- Add 448-labels subdivided aparc cortical parcellation by `Denis Engemann`_ and `Sheraz Khan`_

Expand Down
111 changes: 71 additions & 40 deletions mne/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,9 +701,46 @@ def _ensure_annotation_object(obj):
'mne.Annotations. Got %s.' % obj)


def _select_annotations_based_on_description(descriptions, event_id, regexp):
"""Get a collection of descriptions and returns index of selected."""
regexp_comp = re.compile('.*' if regexp is None else regexp)

if event_id is None:
event_id = Counter()

event_id_ = dict()
dropped = []
for desc in descriptions:
if desc in event_id_:
continue

if regexp_comp.match(desc) is None:
continue

if isinstance(event_id, dict):
if desc in event_id:
event_id_[desc] = event_id[desc]
else:
continue
else:
trigger = event_id(desc)
if trigger is not None:
event_id_[desc] = trigger
else:
dropped.append(desc)

event_sel = [ii for ii, kk in enumerate(descriptions)
if kk in event_id_]

if len(event_sel) == 0 and regexp is not None:
raise ValueError('Could not find any of the events you specified.')

return event_sel, event_id_


@verbose
def events_from_annotations(raw, event_id=None, regexp=None, use_rounding=True,
verbose=None):
chunk_duration=None, verbose=None):
"""Get events and event_id from an Annotations object.

Parameters
Expand All @@ -724,6 +761,12 @@ def events_from_annotations(raw, event_id=None, regexp=None, use_rounding=True,
use_rounding : boolean
If True, use rounding (instead of truncation) when converting
times to indices. This can help avoid non-unique indices.
chunk_duration: float | None
If chunk_duration parameter in events_from_annotations is None, events
correspond to the annotation onsets.
If not, :func:`mne.events_from_annotations` returns as many events as
they fit within the annotation duration spaced according to
`chunk_duration`, which is given in seconds.
verbose : bool, str, int, or None
If not None, override default verbose level (see
:func:`mne.verbose` and :ref:`Logging documentation <tut_logging>`
Expand All @@ -741,47 +784,35 @@ def events_from_annotations(raw, event_id=None, regexp=None, use_rounding=True,

annotations = raw.annotations

inds = raw.time_as_index(annotations.onset, use_rounding=use_rounding,
origin=annotations.orig_time) + raw.first_samp

# Filter out the annotations that do not match regexp
regexp_comp = re.compile('.*' if regexp is None else regexp)

if event_id is None:
event_id = Counter()

event_id_ = dict()
dropped = []
for desc in annotations.description:
if desc in event_id_:
continue
event_sel, event_id_ = _select_annotations_based_on_description(
annotations.description, event_id=event_id, regexp=regexp)

if regexp_comp.match(desc) is None:
continue
if chunk_duration is None:
inds = raw.time_as_index(annotations.onset, use_rounding=use_rounding,
origin=annotations.orig_time) + raw.first_samp

if isinstance(event_id, dict):
if desc in event_id:
event_id_[desc] = event_id[desc]
else:
continue
else:
trigger = event_id(desc)
if trigger is not None:
event_id_[desc] = trigger
else:
dropped.append(desc)

event_sel = [ii for ii, kk in enumerate(annotations.description)
if kk in event_id_]

if len(event_sel) == 0 and regexp is not None:
raise ValueError('Could not find any of the events you specified.')

values = [event_id_[kk] for kk in
annotations.description[event_sel]]
previous_value = np.zeros(len(event_sel))
inds = inds[event_sel]
events = np.c_[inds, previous_value, values].astype(int)
values = [event_id_[kk] for kk in annotations.description[event_sel]]
inds = inds[event_sel]
else:
inds = values = np.array([]).astype(int)
iterator = list(zip(annotations.onset[event_sel],
annotations.duration[event_sel],
annotations.description[event_sel]))

for onset, duration, description in iterator:
_onsets = np.arange(start=onset, stop=(onset + duration),
step=chunk_duration)
_inds = raw.time_as_index(_onsets,
use_rounding=use_rounding,
origin=annotations.orig_time)
_inds += raw.first_samp
inds = np.append(inds, _inds)
_values = np.full(shape=len(_inds),
fill_value=event_id_[description],
dtype=int)
values = np.append(values, _values)

events = np.c_[inds, np.zeros(len(inds)), values].astype(int)

logger.info('Used Annotations descriptions: %s' %
(list(event_id_.keys()),))
Expand Down
21 changes: 21 additions & 0 deletions mne/tests/test_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,27 @@ def test_crop():
assert len(raw_read.annotations.onset) == 0 # XXX to be fixed in #5416


def test_chunk_duration():
"""Test chunk_duration."""
# create dummy raw
raw = RawArray(data=np.empty([10, 10], dtype=np.float64),
info=create_info(ch_names=10, sfreq=1.),
first_samp=0)
raw.info['meas_date'] = 0
raw.set_annotations(Annotations(description='foo', onset=[0],
duration=[10], orig_time=None))

# expected_events = [[0, 0, 1], [0, 0, 1], [1, 0, 1], [1, 0, 1], ..
# [9, 0, 1], [9, 0, 1]]
expected_events = np.atleast_2d(np.repeat(range(10), repeats=2)).T
expected_events = np.insert(expected_events, 1, 0, axis=1)
expected_events = np.insert(expected_events, 2, 1, axis=1)

events, events_id = events_from_annotations(raw, chunk_duration=.5,
use_rounding=False)
assert_array_equal(events, expected_events)


def test_crop_more():
"""Test more cropping."""
raw = mne.io.read_raw_fif(fif_fname).crop(0, 11).load_data()
Expand Down