Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added inlinino/cfg/HBB8005_Cal_new_format_example.mat
Binary file not shown.
9 changes: 7 additions & 2 deletions inlinino/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,14 @@ def act_browse_zsc_file(self):

def act_browse_plaque_file(self):
file_name, selected_filter = QtGui.QFileDialog.getOpenFileName(
caption='Choose plaque calibration file', filter='Plaque File (*.mat)')
caption='Choose plaque calibration file',
filter='Calibration File (*.mat *.hbb_cal);;Legacy MAT File (*.mat);;Current Binary (*.hbb_cal)')
self.le_plaque_file.setText(file_name)

def act_browse_temperature_file(self):
file_name, selected_filter = QtGui.QFileDialog.getOpenFileName(
caption='Choose temperature calibration file', filter='Temperature File (*.mat)')
caption='Choose temperature calibration file',
filter='Temperature Calibration File (*.mat *.hbb_tcal);;Legacy MAT File (*.mat);;Current Binary (*.hbb_tcal)')
self.le_temperature_file.setText(file_name)

def act_browse_px_reg_prt(self):
Expand Down Expand Up @@ -1077,6 +1079,9 @@ def act_save(self):
except FileNotFoundError:
self.notification(f"No such calibration file: {self.cfg['calibration_file']}")
return
elif self.cfg['module'] == 'hyperbb':
self.cfg['manufacturer'] = 'Sequoia'
self.cfg['model'] = 'HyperBB'
# Update global instrument cfg
CFG.read() # Update local cfg if other instance updated cfg
CFG.instruments[self.cfg_uuid] = self.cfg.copy()
Expand Down
224 changes: 188 additions & 36 deletions inlinino/instruments/hyperbb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import numpy as np
from scipy.io import loadmat
from scipy.interpolate import interp2d, splrep, splev # , pchip_interpolate
from scipy.interpolate import RegularGridInterpolator, splrep, splev # , pchip_interpolate

from inlinino.instruments import Instrument

Expand Down Expand Up @@ -49,11 +49,13 @@ def setup(self, cfg):
# Set HyperBB specific attributes
if 'plaque_file' not in cfg.keys():
raise ValueError('Missing calibration plaque file (*.mat)')
if 'temperature_file' not in cfg.keys():
raise ValueError('Missing calibration temperature file (*.mat)')
if 'data_format' not in cfg.keys():
cfg['data_format'] = 'advanced'
self._parser = HyperBBParser(cfg['plaque_file'], cfg['temperature_file'], cfg['data_format'])
if 'cal_format' not in cfg.keys():
cfg['cal_format'] = 'legacy'
temperature_file = cfg.get('temperature_file', None)
self._parser = HyperBBParser(cfg['plaque_file'], temperature_file, cfg['data_format'],
cfg['cal_format'])
self.signal_reconstructed = np.empty(len(self._parser.wavelength)) * np.nan
# Overload cfg with received data
prod_var_names = ['beta_u', 'bb']
Expand Down Expand Up @@ -154,7 +156,7 @@ def update_active_timeseries_variables(self, name, state):
LIGHT_DATA_FORMAT = 2

class HyperBBParser():
def __init__(self, plaque_cal_file, temperature_cal_file, data_format='advanced'):
def __init__(self, plaque_cal_file, temperature_cal_file=None, data_format='advanced', cal_format='legacy'):
# Frame Parser
if data_format.lower() == 'legacy':
self.data_format = LEGACY_DATA_FORMAT
Expand Down Expand Up @@ -220,33 +222,187 @@ def __init__(self, plaque_cal_file, temperature_cal_file, data_format='advanced'
self.saturation_level = 4000
self.theta = 135 # calls theta setter which sets Xp

# Load Temperature calibration file
t = loadmat(temperature_cal_file, simplify_cells=True)
self.wavelength = t['cal_temp']['wl']
self.cal_t_coef = t['cal_temp']['coeff']
# Load calibration files (supports legacy two-file format and current single-file format)
p_cal, t_cal = self._load_calibration(plaque_cal_file, temperature_cal_file, cal_format)

# Load plaque calibration file
p = loadmat(plaque_cal_file, simplify_cells=True)
self.pmt_ref_gain = p['cal']['pmtRefGain']
self.pmt_gamma = p['cal']['pmtGamma']
self.gain12 = p['cal']['gain12']
self.gain23 = p['cal']['gain23']
self.wavelength = t_cal['wl']
self.cal_t_coef = t_cal['coeff']
self.pmt_ref_gain = p_cal['pmtRefGain']
self.pmt_gamma = p_cal['pmtGamma']
self.gain12 = p_cal['gain12']
self.gain23 = p_cal['gain23']

# Check wavelength match in all calibration files
if np.any(p['cal']['darkCalWavelength'] != p['cal']['muWavelengths']) or \
np.any(p['cal']['darkCalWavelength'] != t['cal_temp']['wl']):
if np.any(p_cal['darkCalWavelength'] != p_cal['muWavelengths']) or \
np.any(p_cal['darkCalWavelength'] != t_cal['wl']):
raise ValueError('Wavelength from calibration files don\'t match.')

# Pre-compute temperature correction grid over an extensive temperature range.
# The grid spans 0–50°C, covering the full operational range of the LED.
# RegularGridInterpolator uses fill_value=None which extrapolates beyond the grid
# edges; log a warning if data temperatures fall outside this range.
_led_t_grid = np.arange(0, 50.01, 0.1)
_t_corr_grid = np.empty((len(self.wavelength), len(_led_t_grid)))
for k in range(len(self.wavelength)):
_t_corr_grid[k, :] = np.polyval(self.cal_t_coef[k, :], _led_t_grid)
self._f_t_correction = RegularGridInterpolator(
(self.wavelength.astype(float), _led_t_grid),
_t_corr_grid, method='linear', bounds_error=False, fill_value=None)

# Prepare interpolation tables for dark offsets
self.f_dark_cal_scat_1 = interp2d(p['cal']['darkCalPmtGain'], p['cal']['darkCalWavelength'],
p['cal']['darkCalScat1'], kind='linear')
self.f_dark_cal_scat_2 = interp2d(p['cal']['darkCalPmtGain'], p['cal']['darkCalWavelength'],
p['cal']['darkCalScat2'], kind='linear')
self.f_dark_cal_scat_3 = interp2d(p['cal']['darkCalPmtGain'], p['cal']['darkCalWavelength'],
p['cal']['darkCalScat3'], kind='linear')
_gain_vals = p_cal['darkCalPmtGain'].astype(float)
_wl_vals = p_cal['darkCalWavelength'].astype(float)
self.f_dark_cal_scat_1 = RegularGridInterpolator(
(_wl_vals, _gain_vals), p_cal['darkCalScat1'],
method='linear', bounds_error=False, fill_value=None)
self.f_dark_cal_scat_2 = RegularGridInterpolator(
(_wl_vals, _gain_vals), p_cal['darkCalScat2'],
method='linear', bounds_error=False, fill_value=None)
self.f_dark_cal_scat_3 = RegularGridInterpolator(
(_wl_vals, _gain_vals), p_cal['darkCalScat3'],
method='linear', bounds_error=False, fill_value=None)
# mu calibration corrected for temperature
self.mu = p['cal']['muFactors'] * self.compute_temperature_coefficients(p['cal']['muWavelengths'],
p['cal']['muLedTemp'])
self.mu = p_cal['muFactors'] * self.compute_temperature_coefficients(p_cal['muWavelengths'],
p_cal['muLedTemp'])

@staticmethod
def _load_calibration(plaque_cal_file, temperature_cal_file=None, cal_format='legacy'):
"""Load calibration data from legacy (.mat) or current binary format.

Legacy format: Two separate .mat files — a plaque calibration file (containing a 'cal'
struct) and a temperature calibration file (containing a 'cal_temp' struct).

Current format: Binary plaque calibration file (.hbb_cal) and binary temperature
calibration file (.hbb_tcal) as produced by Hbb_ConvertCalibrations.m (Sequoia, 2024).
Both files are required for live data temperature correction.

:param plaque_cal_file: Path to plaque .mat file (legacy) or .hbb_cal binary file (current).
:param temperature_cal_file: Path to temperature .mat file (legacy) or .hbb_tcal binary
file (current). Required for both calibration formats.
:param cal_format: 'legacy' for MATLAB .mat file pair, 'current' for binary .hbb_cal/.hbb_tcal.
:return: Tuple (p_cal, t_cal) where p_cal and t_cal are dicts of calibration parameters.
:raises ValueError: If required files are missing or the file format is unexpected.
"""
if cal_format == 'legacy':
p_mat = loadmat(plaque_cal_file, simplify_cells=True)
if 'cal' not in p_mat:
raise ValueError(
f"Plaque calibration file '{plaque_cal_file}' does not contain 'cal' struct. "
"Ensure the correct legacy format plaque file is specified.")
p_cal = p_mat['cal']
if temperature_cal_file is None:
raise ValueError(
'Missing temperature calibration file (*.mat). '
'The legacy calibration format requires two separate files.')
t_mat = loadmat(temperature_cal_file, simplify_cells=True)
if 'cal_temp' not in t_mat:
raise ValueError(
f"Temperature calibration file '{temperature_cal_file}' does not contain "
"'cal_temp' struct. Check that the correct file is specified.")
t_cal = t_mat['cal_temp']
elif cal_format == 'current':
if temperature_cal_file is None:
raise ValueError(
'Missing temperature calibration file (*.hbb_tcal). '
'The current calibration format requires both a plaque (.hbb_cal) '
'and a temperature (.hbb_tcal) calibration file.')
p_cal = HyperBBParser._read_binary_plaque_cal(plaque_cal_file)
t_cal = HyperBBParser._read_binary_temp_cal(temperature_cal_file)
else:
raise ValueError(
f"Calibration format '{cal_format}' not recognized. Use 'legacy' or 'current'.")
return p_cal, t_cal

@staticmethod
def _read_binary_plaque_cal(filename):
"""Read a HyperBB binary plaque calibration file (.hbb_cal) as produced by
Hbb_ConvertCalibrations.m (Sequoia Scientific, 2024).

The file may contain multiple calibration records appended sequentially; the last
(most recent) record is returned, consistent with MATLAB's ``cal = cal(end)``.

Returns a dict with the same field names as the legacy .mat 'cal' struct so the rest
of the calibration pipeline requires no changes.
"""
records = []
with open(filename, 'rb') as fid:
fid.seek(0, 2)
eof = fid.tell()
fid.seek(0)
while fid.tell() < eof:
fid.read(24) # ID string (24 chars, e.g. 'Sequoia Hyper-bb Cal')
fid.read(2) # calLengthBytes (uint16) — read but not needed for seeking
fid.read(2) # processingVersion (uint16 / 100)
fid.read(2) # serialNumber (uint16)
fid.read(6) # cal date (6 × uint8: year-1900, month, day, hour, min, sec)
fid.read(6) # tempCalDate
pmt_ref_gain = int(np.frombuffer(fid.read(2), dtype='<u2')[0])
pmt_gamma = float(np.frombuffer(fid.read(4), dtype='<f4')[0])
fid.read(4) # PMTGammaRMSE (float)
gain12 = np.frombuffer(fid.read(2), dtype='<u2')[0] / 1000.0
fid.read(4) # gain1_2_std (float)
gain23 = np.frombuffer(fid.read(2), dtype='<u2')[0] / 1000.0
fid.read(4) # gain2_3_std (float)
fid.read(2) # muFactorPMTGain (uint16)
fid.read(2) # transmitReceiveDistance (uint16 / 100)
fid.read(1) # plaqueReflectivity (uint8 / 100)
num_mu_wl = int(np.frombuffer(fid.read(1), dtype='<u1')[0])
num_dark_wl = int(np.frombuffer(fid.read(1), dtype='<u1')[0])
num_dark_gain = int(np.frombuffer(fid.read(1), dtype='<u1')[0])
mu_wl = np.frombuffer(fid.read(num_mu_wl * 2), dtype='<u2').astype(float) / 10.0
mu_factors = np.frombuffer(fid.read(num_mu_wl * 4), dtype='<f4').copy()
fid.read(num_mu_wl * 4) # muFactorTempCorr — stored but not used in live processing
mu_led_temp = np.frombuffer(fid.read(num_mu_wl * 2), dtype='<u2').astype(float) / 100.0
dark_wl = np.frombuffer(fid.read(num_dark_wl * 2), dtype='<u2').astype(float) / 10.0
dark_gain = np.frombuffer(fid.read(num_dark_gain * 2), dtype='<u2').copy()
n = num_dark_wl * num_dark_gain
# MATLAB writes 2-D arrays column-major; Fortran order matches reshape behavior
dark_scat1 = np.frombuffer(fid.read(n * 4), dtype='<f4').copy().reshape(
(num_dark_wl, num_dark_gain), order='F')
dark_scat2 = np.frombuffer(fid.read(n * 4), dtype='<f4').copy().reshape(
(num_dark_wl, num_dark_gain), order='F')
dark_scat3 = np.frombuffer(fid.read(n * 4), dtype='<f4').copy().reshape(
(num_dark_wl, num_dark_gain), order='F')
records.append({
'pmtRefGain': pmt_ref_gain,
'pmtGamma': pmt_gamma,
'gain12': float(gain12),
'gain23': float(gain23),
'darkCalWavelength': dark_wl,
'darkCalPmtGain': dark_gain,
'darkCalScat1': dark_scat1,
'darkCalScat2': dark_scat2,
'darkCalScat3': dark_scat3,
'muWavelengths': mu_wl,
'muFactors': mu_factors,
'muLedTemp': mu_led_temp,
})
if not records:
raise ValueError(f"No valid calibration records found in '{filename}'.")
return records[-1] # last record, consistent with MATLAB: cal = cal(end)

@staticmethod
def _read_binary_temp_cal(filename):
"""Read a HyperBB binary temperature calibration file (.hbb_tcal) as produced by
Hbb_ConvertCalibrations.m (Sequoia Scientific, 2024).

Returns a dict with 'wl' and 'coeff' keys compatible with the legacy .mat 'cal_temp'
struct so the temperature correction code requires no changes.
"""
with open(filename, 'rb') as fid:
fid.read(24) # ID string (24 chars, e.g. 'Sequoia Hyper-bb T Cal')
fid.read(2) # processingVersion (uint16 / 100)
fid.read(2) # serialNumber (uint16)
fid.read(6) # date (6 × uint8: year-1900, month, day, hour, min, sec)
fid.read(2) # normalizedTemp (uint16 / 100)
polynomial_order = int(np.frombuffer(fid.read(1), dtype='<u1')[0])
num_wl = int(np.frombuffer(fid.read(1), dtype='<u1')[0])
wl = np.frombuffer(fid.read(num_wl * 2), dtype='<u2').astype(float) / 10.0
# Coefficients written column-major by MATLAB (numWLs × (order+1) matrix)
num_coeffs = num_wl * (polynomial_order + 1)
coeffs_raw = np.frombuffer(fid.read(num_coeffs * 4), dtype='<f4').copy()
coeff = coeffs_raw.reshape((num_wl, polynomial_order + 1), order='F')
return {'wl': wl, 'coeff': coeff}

@property
def theta(self) -> float:
Expand All @@ -261,14 +417,9 @@ def theta(self, value) -> None:
self.Xp = float(splev(self.theta, splrep(theta_ref, Xp_ref)))

def compute_temperature_coefficients(self, wl, t):
# Generate temperature correction grid
led_t = np.arange(np.min(t), np.max(t) + 0.1001, 0.1) # TODO optimize by creating grid for extensive range of value once
t_correction = np.empty((len(self.wavelength), len(led_t)))
for k in range(len(self.wavelength)):
t_correction[k, :] = np.polyval(self.cal_t_coef[k, :], led_t)
# mu temperature correction
t_correction = interp2d(led_t, self.wavelength, t_correction, kind='linear')(t, wl)
return np.diag(t_correction) if t_correction.ndim > 1 else t_correction
wl_arr = np.atleast_1d(np.asarray(wl, dtype=float))
t_arr = np.atleast_1d(np.asarray(t, dtype=float))
return self._f_t_correction(np.column_stack([wl_arr, t_arr]))

def parse(self, raw):
tmp = raw.decode().split()
Expand Down Expand Up @@ -334,9 +485,10 @@ def calibrate(self, raw):
gain[raw[:, self.idx_ChSaturated] == 2] = 1
gain[raw[:, self.idx_ChSaturated] == 1] = 0 # All signals saturated
# Subtract dark offset
scat1_dark_removed = scat1 - self.f_dark_cal_scat_1(raw[:, self.idx_PmtGain], wl)
scat2_dark_removed = scat2 - self.f_dark_cal_scat_2(raw[:, self.idx_PmtGain], wl)
scat3_dark_removed = scat3 - self.f_dark_cal_scat_3(raw[:, self.idx_PmtGain], wl)
_pts = np.column_stack([wl, raw[:, self.idx_PmtGain]])
scat1_dark_removed = scat1 - self.f_dark_cal_scat_1(_pts)
scat2_dark_removed = scat2 - self.f_dark_cal_scat_2(_pts)
scat3_dark_removed = scat3 - self.f_dark_cal_scat_3(_pts)
# Apply PMT and front end gain factors
g_pmt = (raw[:, self.idx_PmtGain] / self.pmt_ref_gain) ** self.pmt_gamma
scat1_gain_corrected = scat1_dark_removed * self.gain12 * self.gain23 * g_pmt
Expand Down
Loading