From dc12b8ac0adb0636c61a325fcb7af42e7b68672e Mon Sep 17 00:00:00 2001 From: robbisg Date: Tue, 13 Feb 2024 16:34:12 +0100 Subject: [PATCH] NF: itab reader --- mne/io/__init__.pyi | 2 + mne/io/itab/__init__.py | 7 + mne/io/itab/constants.py | 34 +++++ mne/io/itab/info.py | 253 +++++++++++++++++++++++++++++++++ mne/io/itab/itab.py | 158 ++++++++++++++++++++ mne/io/itab/mhd.py | 223 +++++++++++++++++++++++++++++ mne/io/itab/tests/__init__.py | 0 mne/io/itab/tests/test_itab.py | 41 ++++++ 8 files changed, 718 insertions(+) create mode 100644 mne/io/itab/__init__.py create mode 100644 mne/io/itab/constants.py create mode 100644 mne/io/itab/info.py create mode 100644 mne/io/itab/itab.py create mode 100644 mne/io/itab/mhd.py create mode 100644 mne/io/itab/tests/__init__.py create mode 100644 mne/io/itab/tests/test_itab.py diff --git a/mne/io/__init__.pyi b/mne/io/__init__.pyi index 3843ca3a384..006762120ac 100644 --- a/mne/io/__init__.pyi +++ b/mne/io/__init__.pyi @@ -35,6 +35,7 @@ __all__ = [ "read_raw_fil", "read_raw_gdf", "read_raw_hitachi", + "read_raw_itab", "read_raw_kit", "read_raw_nedf", "read_raw_neuralynx", @@ -78,6 +79,7 @@ from .fieldtrip import read_epochs_fieldtrip, read_evoked_fieldtrip, read_raw_fi from .fiff import Raw, read_raw_fif from .fil import read_raw_fil from .hitachi import read_raw_hitachi +from .itab import read_raw_itab from .kit import read_epochs_kit, read_raw_kit from .nedf import read_raw_nedf from .neuralynx import read_raw_neuralynx diff --git a/mne/io/itab/__init__.py b/mne/io/itab/__init__.py new file mode 100644 index 00000000000..9204f6a219c --- /dev/null +++ b/mne/io/itab/__init__.py @@ -0,0 +1,7 @@ +"""ITAB module for conversion to FIF""" + +# Author: Vittorio Pizzella +# +# License: BSD (3-clause) + +from .itab import read_raw_itab, RawITAB \ No newline at end of file diff --git a/mne/io/itab/constants.py b/mne/io/itab/constants.py new file mode 100644 index 00000000000..eef36079785 --- /dev/null +++ b/mne/io/itab/constants.py @@ -0,0 +1,34 @@ +"""ITAB constants""" + +# Author: Vittorio Pizzella +# +# License: BSD (3-clause) + +from ...utils import BunchConst + + +ITAB = BunchConst() + + +# Channel types +ITAB.ITABV_EEG_CH = 1 +ITAB.ITABV_MAG_CH = 2 +ITAB.ITABV_REF_EEG_CH = 4 +ITAB.ITABV_REF_MAG_CH = 8 +ITAB.ITABV_AUX_CH = 16 +ITAB.ITABV_PARAM_CH =32 +ITAB.ITABV_DIGIT_CH = 64 +ITAB.ITABV_FLAG_CH = 128 + + +# mhd file pointers +ITAB.MHD_TIME = 656 +ITAB.MHD_NREFCH = 82780 +ITAB.MHD_RAWHDTYPE = 85348 +ITAB.MHD_CHINFO = 85444 +ITAB.MHD_SENSPOS = 295364 +ITAB.MHD_NMARKER = 295440 +ITAB.MHD_MARKERS = 295700 +ITAB.MHD_BESTCHI = 297232 + +ITAB.MHD_CH_SIZE = 328 \ No newline at end of file diff --git a/mne/io/itab/info.py b/mne/io/itab/info.py new file mode 100644 index 00000000000..c4433148981 --- /dev/null +++ b/mne/io/itab/info.py @@ -0,0 +1,253 @@ +"""Build measurement info +""" + +# Author: Vittorio Pizzella +# +# License: BSD (3-clause) + +from datetime import datetime, timezone + +import numpy as np + +from ...utils import logger +from ..._fiff.meas_info import _empty_info +from ..constants import FIFF + +from .constants import ITAB + + +def _convert_time(date_str, time_str): + """Convert date and time strings to float time""" + + + for fmt in ("%d/%m/%Y", "%d-%b-%Y", "%a, %b %d, %Y"): + + try: + date = datetime.strptime(date_str, fmt) + except ValueError: + pass + else: + break + else: + raise RuntimeError( + 'Illegal date: %s.\nIf the language of the date does not ' + 'correspond to your local machine\'s language try to set the ' + 'locale to the language of the date string:\n' + 'locale.setlocale(locale.LC_ALL, "en_US")' % date_str) + + for fmt in ('%H:%M:%S', '%H:%M'): + try: + time = datetime.strptime(time_str, fmt) + except ValueError: + pass + else: + break + else: + raise RuntimeError('Illegal time: %s' % time_str) + # MNE-C uses mktime which uses local time, but here we instead decouple + # conversion location from the process, and instead assume that the + # acquisiton was in GMT. This will be wrong for most sites, but at least + # the value we obtain here won't depend on the geographical location + # that the file was converted. + res = datetime(date.year, date.month, date.day, + time.hour, time.minute, time.second, + tzinfo=timezone.utc + ) + + + + return res + + +def _mhdch_2_chs(mhd_ch): + """Build chs list item from mhd ch list item""" + + ch = dict() + + # Generic channel + loc = np.zeros(12) + ch['ch_name'] = mhd_ch['label'] + ch['coord_frame'] = FIFF.FIFFV_COORD_UNKNOWN + ch['coil_type'] = 0 + ch['range'] = 1.0 + ch['unit'] = FIFF.FIFF_UNIT_NONE + ch['unit_mul'] = FIFF.FIFF_UNITM_NONE + if mhd_ch['calib'] == 0: + ch['cal'] = 1. + else: + ch['cal'] = float(mhd_ch['amvbit'] / mhd_ch['calib']) + ch['loc'] = loc + ch['scanno'] = 0 + ch['kind'] = FIFF.FIFFV_MISC_CH + ch['logno'] = 1 + + # Magnetic channel + if mhd_ch['type'] == ITAB.ITABV_MAG_CH: + loc[0:12] = ( + mhd_ch['pos'][0]['posx'] / 1000, #r0 + mhd_ch['pos'][0]['posy'] / 1000, + mhd_ch['pos'][0]['posz'] / 1000, + mhd_ch['pos'][0]['orix'], #ex + 0, + 0, + 0, #ey + mhd_ch['pos'][0]['oriy'], + 0, + 0, #ez + 0, + mhd_ch['pos'][0]['oriz']) + ch['loc'] = loc + ch['kind'] = FIFF.FIFFV_MEG_CH + ch['coil_type'] = FIFF.FIFFV_COIL_POINT_MAGNETOMETER + ch['logno'] = int(mhd_ch['number']) + if mhd_ch['calib'] == 0: + ch['cal'] = 1. + else: + ch['cal'] = float(mhd_ch['amvbit'] / mhd_ch['calib']) + ch['unit'] = FIFF.FIFF_UNIT_T + if mhd_ch['unit'] == "fT": + ch['unit_mul'] = FIFF.FIFF_UNITM_F + elif mhd_ch['unit'] == "pT": + ch['unit_mul'] = FIFF.FIFF_UNITM_P + + # Electric channel + if mhd_ch['type'] == ITAB.ITABV_EEG_CH: + ch['kind'] = FIFF.FIFFV_BIO_CH + ch['cal'] = float(mhd_ch['amvbit'] / mhd_ch['calib']) + ch['unit'] = FIFF.FIFF_UNIT_V + ch['logno'] = int(mhd_ch['number']) + if mhd_ch['unit'] == "mV": + ch['unit_mul'] = FIFF.FIFF_UNITM_M + elif mhd_ch['unit'] == "uT": + ch['unit_mul'] = FIFF.FIFF_UNITM_MU + + # Other channel type + if (mhd_ch['type'] == ITAB.ITABV_REF_EEG_CH and + mhd_ch['type'] == ITAB.ITABV_REF_MAG_CH and + mhd_ch['type'] == ITAB.ITABV_REF_AUX_CH and + mhd_ch['type'] == ITAB.ITABV_REF_PARAM_CH and + mhd_ch['type'] == ITAB.ITABV_REF_DIGIT_CH and + mhd_ch['type'] == ITAB.ITABV_REF_FLAG_CH ): + + ch['kind'] = FIFF.FIFFV_BIO_CH + ch['cal'] = float(mhd_ch['amvbit'] / mhd_ch['calib']) + ch['unit'] = FIFF.FIFF_UNIT_V + if mhd_ch['unit'] == "mV": + ch['unit_mul'] = FIFF.FIFF_UNITM_M + elif mhd_ch['unit'] == "uT": + ch['unit_mul'] = FIFF.FIFF_UNITM_MU + + return ch + + +def _mhd2info(mhd): + """Create meas info from ITAB mhd data""" + + info = _empty_info(mhd['smpfq']) + + info['meas_date'] = _convert_time(mhd['date'], mhd['time']) + + info['description'] = mhd['notes'] + + si = dict() + #si['id'] = mhd['id'] + si['last_name'] = mhd['last_name'] + si['first_name'] = mhd['first_name'] + si['sex'] = mhd['subinfo']['sex'] + + info['subject_info'] = si + + ch_names = list() + chs = list() + bads = list() + for k in range(mhd['nchan']): + #logger.info(mhd['ch'][k]) + ch_names.append(mhd['ch'][k]['label']) + if (mhd['ch'][k]['flag'] > 0): + bads.append(mhd['ch'][k]['label']) + y = _mhdch_2_chs(mhd['ch'][k]) + chs.append(y) + + info['bads'] = bads + info['chs'] = chs + info['ch_names'] = ch_names + + if mhd['hw_low_fr'] != 0: + info['lowpass'] = mhd['hw_low_fr'] + + if mhd['hw_hig_fr'] != 0: + info['highpass'] = mhd['hw_hig_fr'] + + + # Total number of channels in .raw file + info['nchan'] = mhd['nchan'] + + # Total number of timepoints in .raw file + info['temp'] = dict() + + info['temp']['n_samp'] = mhd['ntpdata'] + + # Only one trial (continous acquisition) + info['temp']['ntrials'] = 1 + + # Data start in .raw file + info['temp']['start_data'] = mhd['start_data'] + + + # Get Polhemus digitization data (in head coordinates) + dig = [] + + point_sequence = [ + FIFF.FIFFV_POINT_NASION, # Nasion + FIFF.FIFFV_POINT_RPA, # Right pre-auricolar + FIFF.FIFFV_POINT_LPA, # Left pre-auricolar + ] + + for i, point in enumerate(mhd['marker']): + point_info = dict() + + point_info['coord_frame'] = FIFF.FIFFV_COORD_HEAD + + # Point kind + if i < 3: # Nasion, Right pre-auricolar, Left pre-auricolar + point_kind = FIFF.FIFFV_POINT_CARDINAL + elif i == 3: # Vertex + point_kind = FIFF.FIFFV_POINT_EXTRA + else: # HPI Coils + point_kind = FIFF.FIFFV_POINT_HPI + + point_info['kind'] = point_kind + + + # Point identity + if i < 3: + point_ident = point_sequence[i] + else: + point_ident = i + 1 + point_info['ident'] = point_ident + + point_info['r'] = (point['posx'], + point['posy'], + point['posz']) + + dig += [point_info] + + # TDB other poosible head points, check on dig points. + + info['dig'] = dig + + # TBD trigger handling + + event = list() + for sample in mhd['sample']: + event.append([ sample['start'], + sample['type'], + sample['quality'] ]) + + info['events'] = event + + info._check_consistency() + + logger.info('Measurement info composed.') + + return info \ No newline at end of file diff --git a/mne/io/itab/itab.py b/mne/io/itab/itab.py new file mode 100644 index 00000000000..06dd27955b5 --- /dev/null +++ b/mne/io/itab/itab.py @@ -0,0 +1,158 @@ +# Author: Vittorio Pizzella +# +# License: BSD (3-clause) + +import numpy as np + +from ...utils import verbose +from ..base import BaseRaw +from ..._fiff.utils import _mult_cal_one +from .mhd import _read_mhd +from .info import _mhd2info + +class RawITAB(BaseRaw): + """Raw object from ITAB directory. + + Parameters + ---------- + fname : str + The raw file to load. Filename should end with *.raw + preload : bool or str (default False) + Preload data into memory for data manipulation and faster indexing. + If True, the data will be preloaded into memory (fast, requires + large amount of memory). If preload is a string, preload is the + file name of a memory-mapped file which is used to store the data + on the hard drive (slower, requires less memory). + verbose : bool, str, int, or None + If not None, override default verbose level (see mne.verbose). + + See Also + -------- + mne.io.Raw : Documentation of attribute and methods. + """ + + @verbose + def __init__(self, fname, preload=False, verbose=True): + + + filenames = list() + filenames.append(fname) + + fname_mhd = fname + ".mhd" + mhd = _read_mhd(fname_mhd) # Read the mhd file + info = _mhd2info(mhd) + + + raw_extras = list() + for fi, _ in enumerate(filenames): + raw_extras.append(dict()) + for k in ['n_samp', 'start_data']: + raw_extras[fi][k] = info['temp'][k] + + raw_extras[fi]['nchan'] = info['nchan'] + raw_extras[fi]['buffer_size_sec'] = info['temp']['n_samp'] / info['sfreq'] + + self.info = info + info._check_consistency() + + first_samps = [0] + last_samps = [info['temp']['n_samp'] - 1] + self.verbose = verbose + + # Remove extras from info + for k in ['start_data', 'ntrials', 'n_samp']: + _ = info['temp'].pop(k) + + super(RawITAB, self).__init__(info, preload, + first_samps=first_samps, + last_samps=last_samps, + raw_extras=raw_extras, + filenames=filenames, + verbose=verbose) + + + def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): + """Read a segment of data from a file. + Only needs to be implemented for readers that support + ``preload=False``. + + Parameters + ---------- + data : ndarray, shape (len(idx), stop - start + 1) + The data array. Should be modified inplace. + idx : ndarray | slice + The requested channel indices. + fi : int + The file index that must be read from. + start : int + The start sample in the given file. + stop : int + The stop sample in the given file (inclusive). + cals : ndarray, shape (len(idx), 1) + Channel calibrations (already sub-indexed). + mult : ndarray, shape (len(idx), len(info['chs']) | None + The compensation + projection + cals matrix, if applicable. + """ + + + # Initial checks + start = int(start) + if stop is None or stop > self._raw_extras[fi]['n_samp']: + stop = self._raw_extras[fi]['n_samp'] + + if start >= stop: + raise ValueError('No data in this range') + + data_offset = self._raw_extras[fi]['start_data'] + + n_samp = stop - start + + with open(self._filenames[fi], 'rb') as fid: + + #position file pointer + fid.seek(data_offset + 4 * start * self._raw_extras[fi]['nchan'], 0) + + # read data + n_read = self._raw_extras[fi]['nchan'] * n_samp + + this_data = np.fromfile(fid, '>i4', count=n_read) + + this_data.shape = (n_samp, self._raw_extras[fi]['nchan']) + + # calibrate data + _mult_cal_one(data, this_data.transpose(), idx, cals, mult) + + + +def read_raw_itab(fname, preload=False, verbose=None): + """Raw object from ITAB directory + + Parameters + ---------- + fname : str + The raw file to load. Filename should end with *.raw + preload : bool or str (default False) + Preload data into memory for data manipulation and faster indexing. + If True, the data will be preloaded into memory (fast, requires + large amount of memory). If preload is a string, preload is the + file name of a memory-mapped file which is used to store the data + on the hard drive (slower, requires less memory). + verbose : bool, str, int, or None + If not None, override default verbose level (see mne.verbose). + + Returns + ------- + raw : instance of RawITAB + The raw data. + + See Also + -------- + mne.io.Raw : Documentation of attribute and methods. + + Notes + ----- + .. versionadded:: 0.01 + """ + + a = RawITAB(fname, preload=preload, verbose=verbose) + return a \ No newline at end of file diff --git a/mne/io/itab/mhd.py b/mne/io/itab/mhd.py new file mode 100644 index 00000000000..c0628b0071a --- /dev/null +++ b/mne/io/itab/mhd.py @@ -0,0 +1,223 @@ +"""Read .mhd file +""" + +# Author: Vittorio Pizzella +# +# License: BSD (3-clause) + +import os.path as op + +import numpy as np + +from ...utils import logger +from .constants import ITAB + + +def _make_itab_name(directory, extra, raise_error=True): + """Helper to make a ITAB name""" + fname = op.join(directory, op.basename(directory)[:-3] + '.' + extra) + if not op.isfile(fname): + if raise_error: + raise IOError('Standard file %s not found' % fname) + else: + return None + return fname + + +def _read_double(fid, n=1): + """Read a double""" + return np.fromfile(fid, 'B', n) + +# TODO: Is it similar to _read_char +def _read_ustring(fid, n_bytes): + """Read unsigned character string""" + return np.fromfile(fid, '>B', n_bytes) + + +def _read_int2(fid): + """Read int from short""" + return np.fromfile(fid, ' +# simplified BSD-3 license +import os.path as op + +from numpy.testing import assert_array_equal, assert_array_almost_equal +from scipy import io as sio + +from mne.io import read_raw_itab +from mne.io.tests.test_raw import _test_raw_reader +from mne.datasets import testing + +# data_path = testing.data_path(download=False) +data_path = "/home/robbis/data/mne_data/MNE-testing-data/" +raw_itab_fname = op.join(data_path, 'itab', 'test_itab.raw') +mat_itab_fname = op.join(data_path, 'itab', 'test_itab.mat') + +@testing.requires_testing_data +def test_itab_raw(): + """Test reading ITAB .raw files.""" + + raw = read_raw_itab(raw_itab_fname, preload=True) + assert 'RawITAB' in repr(raw) + + _test_raw_reader(read_raw_itab, + fname=raw_itab_fname, + test_scaling=False, + ) + + mc = sio.loadmat(mat_itab_fname) + + m_data = mc['dat'] + m_header = mc['hdr'] + + assert raw._data.shape == m_data.shape + assert m_header['Fs'][0, 0][0, 0] == raw.info['sfreq'] + + m_names = [x[0][0] for x in m_header['label'][0, 0]] + assert raw.ch_names == m_names + + assert_array_almost_equal(m_data, raw._data)