diff --git a/mne/io/cnt/__init__.py b/mne/io/cnt/__init__.py index d85b6bce1db..7ac2b38e1b0 100644 --- a/mne/io/cnt/__init__.py +++ b/mne/io/cnt/__init__.py @@ -2,4 +2,4 @@ # Copyright the MNE-Python contributors. """CNT data reader.""" -from .cnt import read_raw_cnt +from .cnt import read_raw_cnt, read_epochs_cnt, read_evoked_cnt diff --git a/mne/io/cnt/_utils.py b/mne/io/cnt/_utils.py index 86842ad60c6..30135c6e09c 100644 --- a/mne/io/cnt/_utils.py +++ b/mne/io/cnt/_utils.py @@ -73,6 +73,18 @@ def _read_teeg(f, teeg_offset): ) +EpochHeader = namedtuple( + "EpochHeader", + ("Accept Ttype Correct Rt Response Reserved"), +) +# char Accept; /* accept byte */ +# short Ttype; /* trial type */ +# short Correct; /* accuracy */ +# float Rt; /* reaction time */ +# short Response; /* response type */ +# short Reserved; /* not used */ + + def _get_event_parser(event_type): if event_type == 1: event_maker = CNTEventType1 @@ -83,8 +95,11 @@ def _get_event_parser(event_type): elif event_type == 3: event_maker = CNTEventType3 struct_pattern = " "RawCNT": + """Reader function for Neuroscan ``.eeg`` epochs files. + + Parameters + ---------- + input_fname : path-like + Path to the Neuroscan ``.eeg`` file. + events : ndarray, shape (n_events, 3) | None + Path to events file. If array, it is the events typically returned + by the read_events function. If some events don't match the events + of interest as specified by event_id, they will be marked as 'IGNORED' + in the drop log. If None, it is constructed from the Neuroscan (.eeg) + file with each unique event encoded with a different integer. + event_id : int | list of int | dict | None + The id of the event to consider. If dict, the keys can later be used + to access associated events. + Example:: + + {"auditory":1, "visual":3} + + If int, a dict will be created with + the id as string. If a list, all events with the IDs specified + in the list are used. If None, the event_id is constructed from the + Neuroscan (.eeg) file with each descriptions copied from ``eventtype``. + eog : list | tuple | 'auto' + Names or indices of channels that should be designated EOG channels. + If 'auto', the channel names containing ``EOG`` or ``EYE`` are used. + Defaults to empty tuple. + %(uint16_codec)s + %(montage_units)s + %(verbose)s + + Returns + ------- + epochs : instance of Epochs + The epochs. + + See Also + -------- + mne.Epochs : Documentation of attributes and methods. + + + """ + epochs = EpochsCNT( + input_fname=input_fname, + events=events, + event_id=event_id, + eog=eog, + misc=misc, + ecg=ecg, + emg=emg, + data_format=data_format, + date_format=date_format, + header=header, + verbose=verbose, + ) + return epochs + + +def read_evoked_cnt( + input_fname, + data_format, + date_format, + header, + info=None, + comment=None, +): + """Load evoked data from a Neuroscan .avg timelocked structure. + + This function expects to find timelocked data in the structure data_name is + pointing at. + + .. warning:: FieldTrip does not normally store the original information + concerning channel location, orientation, type etc. It is + therefore **highly recommended** to provide the info field. + This can be obtained by reading the original raw data file + with MNE functions (without preload). The returned object + contains the necessary info field. + + Parameters + ---------- + fname : path-like + Path and filename of the ``.mat`` file containing the data. + info : dict or None + The info dict of the raw data file corresponding to the data to import. + If this is set to None, information is extracted from the + Neuroscan structure. + comment : str + Comment on dataset. Can be the condition. + + Returns + ------- + evoked : instance of EvokedArray + An EvokedArray containing the loaded data. + """ + eog, ecg, emg, misc = (), (), (), () + info, cnt_info = _get_cnt_info( + input_fname, + eog, + ecg, + emg, + misc, + data_format, + date_format, + header, + mode="evoked", + ) + + input_fname = cnt_info["input_fname"] + + # number of points + n_pnts = cnt_info["n_pnts"] + n_channels = cnt_info["orig_nchan"] + cals = cnt_info["cals_avg"] + accepted_epochs = int(cnt_info["accepted_epochs"]) + + data = np.empty((n_channels, n_pnts), dtype=float) + UNUSED_HEAD_SIZE = 5 + DATA_POINT_SIZE = 4 + + with open(input_fname, "rb") as f: + # Ensure the file pointer is at the beginning of the EEG data + data_start = 900 + n_channels * 75 + data_end = data_start + (n_channels * (5 + n_pnts * DATA_POINT_SIZE)) + data_step = 5 + n_pnts * DATA_POINT_SIZE + + for chan, i in enumerate(range(data_start, data_end, data_step)): + f.seek(i) + data_points = np.fromfile( + f, dtype=">f", count=n_pnts, offset=UNUSED_HEAD_SIZE + ) + # Scale the data to physical units in Volts + data[chan] = data_points * cals[chan] / accepted_epochs * 1e-6 + + evoked = EvokedArray(data, info, comment=comment) + return evoked + + +def _get_cnt_info( + input_fname, eog, ecg, emg, misc, data_format, date_format, header, mode="raw" +): """Read the cnt header.""" data_offset = 900 # Size of the 'SETUP' header. cnt_info = dict() @@ -294,11 +458,22 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he session_date = "%s %s" % (read_str(fid, 10), read_str(fid, 12)) meas_date = _session_date_2_meas_date(session_date, date_format) + if mode == "epoch": + fid.seek(362) + cnt_info["n_epochs"] = np.fromfile(fid, dtype="= 0] @@ -360,8 +535,23 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he cnt_info["channel_offset"] //= n_bytes else: cnt_info["channel_offset"] = 1 - - ch_names, cals, baselines, chs, pos = (list(), list(), list(), list(), list()) + if mode == "evoked": + ch_names, cals, baselines, chs, pos, cals_avg = ( + list(), + list(), + list(), + list(), + list(), + list(), + ) + else: + ch_names, cals, baselines, chs, pos = ( + list(), + list(), + list(), + list(), + list(), + ) bads = list() _validate_type(header, str, "header") @@ -392,6 +582,8 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he sensitivity = np.fromfile(fid, dtype="f4", count=1).item() fid.seek(data_offset + 75 * ch_idx + 71) cal = np.fromfile(fid, dtype="f4", count=1).item() + if mode == "evoked": + cals_avg.append(cal) cals.append(cal * sensitivity * 1e-6 / 204.8) info = _empty_info(sfreq) @@ -407,6 +599,8 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he "last_name": last_name, } + if mode == "evoked": + cnt_info["cals_avg"] = cals_avg if eog == "auto": eog = _find_channels(ch_names, "EOG") if ecg == "auto": @@ -485,11 +679,7 @@ class RawCNT(BaseRaw): Defaults to ``'auto'``. date_format : ``'mm/dd/yy'`` | ``'dd/mm/yy'`` Format of date in the header. Defaults to ``'mm/dd/yy'``. - header : ``'auto'`` | ``'new'`` | ``'old'`` - Defines the header format. Used to describe how bad channels - are formatted. If auto, reads using old and new header and - if either contain a bad channel make channel bad. - Defaults to ``'auto'``. + %(cnt_header)s %(preload)s stim_channel : bool | None Add a stim channel from the events. Defaults to None to trigger a @@ -620,3 +810,208 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): one[idx] -= baselines[idx][:, None] _mult_cal_one(data[:, sample_start:sample_stop], one, idx, cals, mult) + + +class EpochsCNT(BaseEpochs): + """Epochs from EEGLAB .set file. + + Parameters + ---------- + input_fname : path-like + Path to the ``.eeg`` epochs file. + %(cnt_events)s + %(cnt_event_id)s + %(tmin_epochs)s + %(baseline_epochs)s + %(reject_epochs)s + %(flat)s + %(epochs_reject_tmin_tmax)s + eog : list | tuple | 'auto' + Names or indices of channels that should be designated EOG channels. + If 'auto', the channel names containing ``EOG`` or ``EYE`` are used. + Defaults to empty tuple. + misc : list | tuple + Names or indices of channels that should be designated MISC channels. + Defaults to empty tuple. + ecg : list | tuple | 'auto' + Names or indices of channels that should be designated ECG channels. + If 'auto', the channel names containing ``ECG`` are used. + Defaults to empty tuple. + emg : list | tuple + Names or indices of channels that should be designated EMG channels. + If 'auto', the channel names containing ``EMG`` are used. + Defaults to empty tuple. + data_format : ``'auto'`` | ``'int16'`` | ``'int32'`` + Defines the data format the data is read in. If ``'auto'``, it is + determined from the file header using ``numsamples`` field. + Defaults to ``'auto'``. + date_format : ``'mm/dd/yy'`` | ``'dd/mm/yy'`` + Format of date in the header. Defaults to ``'mm/dd/yy'``. + %(cnt_header)s + %(verbose)s + + See Also + -------- + mne.Epochs : Documentation of attributes and methods. + + Notes + ----- + .. versionadded:: 0.11.0 + """ + + @verbose + def __init__( + self, + input_fname, + events=None, + event_id=None, + tmin=0, + baseline=None, + reject=None, + flat=None, + reject_tmin=None, + reject_tmax=None, + eog=(), + misc=(), + ecg=(), + emg=(), + data_format="auto", + date_format="mm/dd/yy", + *, + header="auto", + verbose=None, + ): + _check_option("date_format", date_format, ["mm/dd/yy", "dd/mm/yy"]) + if date_format == "dd/mm/yy": + _date_format = "%d/%m/%y %H:%M:%S" + else: + _date_format = "%m/%d/%y %H:%M:%S" + + input_fname = str( + _check_fname(fname=input_fname, must_exist=True, overwrite="read") + ) + logger.info("Extracting .eeg Parameters from %s..." % input_fname) + self.info, cnt_info = _get_cnt_info( + input_fname, + eog, + ecg, + emg, + misc, + data_format, + _date_format, + header, + mode="epoch", + ) + + cnt_info.update(input_fname=input_fname) + self._raw_extras = [cnt_info] + self._filenames = [] + epoch_headers, data = self._read_cnt_epochs_data() + events = self._epoch_event_parser(epoch_headers, data) + if event_id is None: # convert to int to make typing-checks happy + event_id = {str(e): int(e) for e in np.unique(events.T[2])} + if not ( + (events is None and event_id is None) + or (events is not None and event_id is not None) + ): + raise ValueError("Both `events` and `event_id` must be " "None or not None") + + assert data.shape == ( + self._raw_extras[0]["n_epochs"], + self.info["nchan"], + self._raw_extras[0]["n_pnts"], + ) + tmax = ((data.shape[2] - 1) / self.info["sfreq"]) + tmin + + super().__init__( + self.info, + data, + events, + event_id, + tmin, + tmax, + baseline, + reject=reject, + flat=flat, + reject_tmin=reject_tmin, + reject_tmax=reject_tmax, + filename=input_fname, + verbose=verbose, + ) + + # data are preloaded but _bad_dropped is not set so we do it here: + self._bad_dropped = True + + logger.info("Ready.") + + def _read_cnt_epochs_data(self): + """Read epochs data from .eeg file.""" + cnt_info = self._raw_extras[0] + info = self.info + + input_fname = cnt_info["input_fname"] + + n_epochs = cnt_info["n_epochs"] + # number of points per epoch + n_pnts = cnt_info["n_pnts"] + n_channels = cnt_info["orig_nchan"] + # calibration factors + cals = [d["cal"] for d in info["chs"]] + + data = np.empty((n_epochs, n_channels, n_pnts), dtype=float) + epoch_headers = [] + SWEEP_HEAD_SIZE = 13 + DATA_POINT_SIZE = 4 + + with open(input_fname, "rb") as f: + # Ensure the file pointer is at the beginning of the EEG data + + data_start = 900 + n_channels * 75 + data_end = data_start + n_epochs * ( + SWEEP_HEAD_SIZE + n_pnts * n_channels * DATA_POINT_SIZE + ) + data_step = SWEEP_HEAD_SIZE + n_pnts * n_channels * DATA_POINT_SIZE + + for epoch, i in enumerate(range(data_start, data_end, data_step)): + # Epoch Header: + # char Accept; /* accept byte */ + # short Ttype; /* trial type */ + # short Correct; /* accuracy */ + # float Rt; /* reaction time */ + # short Response; /* response type */ + # short Reserved; /* not used */ + f.seek(i) + epoch_header = np.fromfile( + f, + dtype=np.dtype("