diff --git a/README.md b/README.md index 1309904..de2c026 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,44 @@ # camacq-plugins -Plugins for camacq +Plugins for camacq: -## Usage +- [production](camacqplugins/production/) ## Installation +- Clone and install the package. + + ```sh + # Clone the repo. + git clone https://github.com/CellProfiling/camacq-plugins.git + # Enter directory. + cd camacq-plugins + # Install package. + pip install . + # Test that program is callable and show help. + camacq -h + ``` + ### Requirements -## Compatibility +- Python version 3.6+. +- camacq >= 0.4.0 + +## Usage + +Add configuration for the plugin you want to run. +See the [config_templates](config_templates/) directory for example configuration. + +Then start `camacq`. + +```sh +camacq +``` ## Licence +- Apache-2.0. + ## Authors + +- Martin Hjelmare diff --git a/camacqplugins/production/README.md b/camacqplugins/production/README.md new file mode 100644 index 0000000..3ce9c4c --- /dev/null +++ b/camacqplugins/production/README.md @@ -0,0 +1,55 @@ +# Production + +## Usage + +Add configuration for the `production`, `gain`, `leica` and `rename` plugin, in the `camacq` configuration file. +See the [config_templates](../config_templates/) directory for example configuration. + +```yaml +production: + ... + +gain: + ... + +rename_image: + +leica: + ... +``` + +Then start `camacq`. + +```sh +camacq +``` + +To allow the user to set up the sample state before starting an +experiment, camacq can load the sample state from a file. In the production +configuration section there is an option to specify a path to a csv +file. + +```yaml +production: + sample_state_file: '/sample_state.csv' +``` + +Each row in the csv file should represent at least one state of a sample container, +ie well, field or channel. A plate name must also be included. The csv file should have a +header. See below. + +```csv +plate_name,well_x,well_y,channel_name,gain +00,1,1,blue,600 +``` + +This example will set a plate '00', a well (1, 1), a blue channel +and set the gain of the blue channel to 600. + +```csv +plate_name,well_x,well_y,field_x,field_y +00,1,1,1,1 +``` + +This example will create a plate '00' a well (1, 1) and a field (1, 1) +in the sample state. diff --git a/camacqplugins/production/__init__.py b/camacqplugins/production/__init__.py new file mode 100644 index 0000000..d151b75 --- /dev/null +++ b/camacqplugins/production/__init__.py @@ -0,0 +1,414 @@ +"""Provide a plugin for production standard flow.""" +import logging +import tempfile +from math import ceil +from pathlib import Path + +import voluptuous as vol + +from camacq.const import CAMACQ_START_EVENT, CHANNEL_EVENT, IMAGE_EVENT, WELL_EVENT +from camacq.event import match_event +from camacq.plugins.leica.command import cam_com, del_com, gain_com +from camacq.plugins.sample import ACTION_TO_METHOD, ACTION_SET_PLATE +from camacq.plugins.sample.helper import next_well_xy +from camacq.util import read_csv + +_LOGGER = logging.getLogger(__name__) + +CONF_GAIN_PATTERN_NAME = "gain_pattern_name" +CONF_GAIN_JOB_ID = "gain_job_id" +CONF_GAIN_JOB_CHANNELS = "gain_job_channels" +CONF_EXP_PATTERN_NAME = "exp_pattern_name" +CONF_EXP_JOB_IDS = "exp_job_ids" +CONF_CHANNELS = "channels" +CONF_CHANNEL = "channel" +CONF_JOB_NAME = "job_name" +CONF_DETECTOR_NUM = "detector_num" +CONF_DEFAULT_GAIN = "default_gain" +CONF_MAX_GAIN = "max_gain" +CONF_WELL_LAYOUT = "well_layout" +CONF_X_FIELDS = "x_fields" +CONF_Y_FIELDS = "y_fields" +CONF_PLOT_SAVE_PATH = "plot_save_path" +CONF_SAMPLE_STATE_FILE = "sample_state_file" + +PLATE_NAME = "00" +SAMPLE_PLATE_NAME = "plate_name" +SAMPLE_WELL_X = "well_x" +SAMPLE_WELL_Y = "well_y" + + +@vol.truth +def is_csv(value): + """Return true if value ends with .csv.""" + return value.endswith(".csv") + + +def is_sample_state(value): + """Validate state data. + + At least one sample action must validate per sample data item. + """ + for idx, data in enumerate(value): + valid = False + error = None + for action, settings in ACTION_TO_METHOD.items(): + if action == ACTION_SET_PLATE: + continue + schema = settings["schema"] + try: + data.update(schema(data)) + except vol.Invalid as exc: + error = exc + continue + else: + valid = True + + if not valid: + _LOGGER.error( + "The sample state file contains invalid data at row %s: %s", + idx + 2, + error, + ) + raise error + + return value + + +CONFIG_SCHEMA = vol.Schema( + { + vol.Required(CONF_GAIN_PATTERN_NAME): vol.Coerce(str), + vol.Required(CONF_GAIN_JOB_ID): vol.Coerce(int), + vol.Required(CONF_GAIN_JOB_CHANNELS): vol.Coerce(int), + vol.Required(CONF_EXP_PATTERN_NAME): vol.Coerce(str), + vol.Required(CONF_EXP_JOB_IDS): vol.All( + [vol.Coerce(int)], vol.Length(min=3, max=3) + ), + vol.Required(CONF_CHANNELS): [ + { + vol.Required(CONF_CHANNEL): vol.Coerce(str), + vol.Required(CONF_JOB_NAME): vol.Coerce(str), + vol.Required(CONF_DETECTOR_NUM): vol.Coerce(int), + vol.Required(CONF_DEFAULT_GAIN): vol.Coerce(int), + vol.Required(CONF_MAX_GAIN): vol.Coerce(int), + } + ], + vol.Required(CONF_WELL_LAYOUT): { + vol.Required(CONF_X_FIELDS): vol.Coerce(int), + vol.Required(CONF_Y_FIELDS): vol.Coerce(int), + }, + # pylint: disable=no-value-for-parameter + CONF_PLOT_SAVE_PATH: vol.IsDir(), + CONF_SAMPLE_STATE_FILE: vol.All( + vol.IsFile(), is_csv, read_csv, is_sample_state + ), + }, +) + + +async def setup_module(center, config): + """Set up production plugin.""" + conf = config["production"] + flow = WorkFlow(center, conf) + state_data = conf.get(CONF_SAMPLE_STATE_FILE) + if state_data is None: + x_wells = 12 + y_wells = 8 + state_data = [ + { + SAMPLE_PLATE_NAME: PLATE_NAME, + SAMPLE_WELL_X: well_x, + SAMPLE_WELL_Y: well_y, + } + for well_x in range(x_wells) + for well_y in range(y_wells) + ] + await flow.setup(state_data) + + +class WorkFlow: + """Represent the production workflow.""" + + # pylint: disable=too-many-instance-attributes + + def __init__(self, center, conf): + """Set up instance.""" + self._center = center + self.gain_pattern = conf[CONF_GAIN_PATTERN_NAME] + self.gain_job_id = conf[CONF_GAIN_JOB_ID] + self.gain_job_channels = conf[CONF_GAIN_JOB_CHANNELS] + self.exp_pattern = conf[CONF_EXP_PATTERN_NAME] + self.exp_job_ids = conf[CONF_EXP_JOB_IDS] + self.channels = conf[CONF_CHANNELS] + well_layout = conf[CONF_WELL_LAYOUT] + self.x_fields = well_layout[CONF_X_FIELDS] + self.y_fields = well_layout[CONF_Y_FIELDS] + self.plot_save_path = conf.get(CONF_PLOT_SAVE_PATH) + self._remove_handle_exp_image = None + self.wells_left = set() + + async def setup(self, state_data): + """Set up the flow.""" + await self.load_sample(state_data) + self.image_next_well_on_sample() + self.analyze_gain() + self.set_exp_gain() + self.add_exp_job() + self._remove_handle_exp_image = self.handle_exp_image() + self.stop_exp() + + async def load_sample(self, state_data): + """Load sample state.""" + for data in state_data: + if data[SAMPLE_PLATE_NAME] not in self._center.sample.plates: + await self._center.actions.sample.set_plate(silent=True, **data) + well_coord = data[SAMPLE_WELL_X], data[SAMPLE_WELL_Y] + self.wells_left.add(well_coord) + if ( + well_coord + not in self._center.sample.plates[data[SAMPLE_PLATE_NAME]].wells + ): + await self._center.actions.sample.set_well(silent=True, **data) + await self._center.actions.sample.set_channel(silent=True, **data) + await self._center.actions.sample.set_field(silent=True, **data) + + def image_next_well_on_sample(self): + """Image next well in existing sample.""" + + async def send_cam_job(center, event): + """Run on well event.""" + next_well_x, next_well_y = next_well_xy(center.sample, PLATE_NAME) + + if ( + not match_event(event, event_type=CAMACQ_START_EVENT) + and not match_event( + event, + field_x=self.x_fields - 1, + field_y=self.y_fields - 1, + well_img_ok=True, + ) + or next_well_x is None + or (next_well_x, next_well_y) not in self.wells_left + ): + return + + if center.sample.images: + await center.actions.command.stop_imaging() + await self.send_gain_jobs( + next_well_x, next_well_y, + ) + self.wells_left.remove((next_well_x, next_well_y)) + + removes = [] + removes.append(self._center.bus.register(CAMACQ_START_EVENT, send_cam_job)) + removes.append(self._center.bus.register(WELL_EVENT, send_cam_job)) + + def remove_callback(): + """Remove all registered listeners of this method.""" + for remove in removes: + remove() + removes.clear() + + return remove_callback + + def analyze_gain(self): + """Analyze gain.""" + + async def calc_gain(center, event): + """Calculate correct gain.""" + field_x, field_y = get_last_gain_coords(self.x_fields, self.y_fields) + channel_id = self.gain_job_channels - 1 + if not match_event( + event, + field_x=field_x, + field_y=field_y, + job_id=self.gain_job_id, + channel_id=channel_id, + ): + return + + await center.actions.command.stop_imaging() + + if self.plot_save_path is None: + save_path = Path(tempfile.gettempdir()) / event.plate_name + else: + save_path = Path(self.plot_save_path) + if not save_path.exists(): + await center.add_executor_job(save_path.mkdir) + + # This should be a path to a base file name, not to a dir or file. + save_path = save_path / f"{event.well_x}--{event.well_y}" + + await center.actions.gain.calc_gain( + plate_name=event.plate_name, + well_x=event.well_x, + well_y=event.well_y, + make_plots=True, + save_path=save_path, + ) + + return self._center.bus.register(IMAGE_EVENT, calc_gain) + + def set_exp_gain(self): + """Set experiment gain.""" + + async def set_gain(center, event): + """Set pmt gain.""" + channel = next( + ( + channel + for channel in self.channels + if event.channel_name == channel[CONF_CHANNEL] + ) + ) + exp = channel[CONF_JOB_NAME] + num = channel[CONF_DETECTOR_NUM] + gain = min(event.gain or channel[CONF_DEFAULT_GAIN], channel[CONF_MAX_GAIN]) + + command = gain_com(exp=exp, num=num, value=gain) + + # Set the gain at the microscope. + await center.actions.command.send(command=command) + # Set the gain in the sample state. + await center.actions.sample.set_channel( + plate_name=event.plate_name, + well_x=event.well_x, + well_y=event.well_y, + channel_name=event.channel_name, + gain=gain, + ) + + return self._center.bus.register("gain_calc_event", set_gain) + + def add_exp_job(self): + """Add experiment job.""" + + async def add_cam_job(center, event): + """Add an experiment job to the cam list.""" + last_channel = self.channels[-1] + if not match_event(event, channel_name=last_channel[CONF_CHANNEL]) or len( + event.well.channels + ) != len(self.channels): + return + + commands = [] + for field_x in range(self.x_fields): + for field_y in range(self.y_fields): + cmd = cam_com( + self.exp_pattern, + event.well_x, + event.well_y, + field_x, + field_y, + 0, + 0, + ) + commands.append(cmd) + + await center.actions.command.send(command=del_com()) + await center.actions.command.send_many(commands=commands) + + if self._remove_handle_exp_image is None: + self._remove_handle_exp_image = self.handle_exp_image() + + await center.actions.command.start_imaging() + await center.actions.command.send(command="/cmd:startcamscan") + + return self._center.bus.register(CHANNEL_EVENT, add_cam_job) + + def handle_exp_image(self): + """Handle experiment image.""" + + async def on_exp_image(center, event): + """Run on experiment image event.""" + await self.rename_image(center, event) + await self.set_sample_img_ok(center, event) + + return self._center.bus.register(IMAGE_EVENT, on_exp_image) + + def stop_exp(self): + """Trigger to stop experiment.""" + + async def stop_imaging(center, event): + """Run to stop the experiment.""" + match = match_event( + event, + field_x=self.x_fields - 1, + field_y=self.y_fields - 1, + well_img_ok=True, + ) + + if not match or self.wells_left: + return + + await center.actions.command.stop_imaging() + + _LOGGER.info("Congratulations, experiment is finished!") + + return self._center.bus.register(WELL_EVENT, stop_imaging) + + async def send_gain_jobs(self, well_x, well_y): + """Send gain cam jobs for the center fields of a well.""" + field_x, field_y = get_last_gain_coords(self.x_fields, self.y_fields) + field_x = field_x - 1 # set the start x field coord + + await self._center.actions.command.send(command=del_com()) + + for field_x in range(field_x, field_x + 2): + command = cam_com(self.gain_pattern, well_x, well_y, field_x, field_y, 0, 0) + await self._center.actions.command.send(command=command) + + if self._remove_handle_exp_image is not None: + self._remove_handle_exp_image() + self._remove_handle_exp_image = None + + await self._center.actions.command.start_imaging() + await self._center.actions.command.send(command="/cmd:startcamscan") + + async def rename_image(self, center, event): + """Rename an image.""" + if event.job_id not in self.exp_job_ids or event.channel_id not in (0, 1): + return + + if event.job_id == self.exp_job_ids[0]: + channel_id = 0 + elif event.job_id == self.exp_job_ids[1] and event.channel_id == 0: + channel_id = 1 + elif event.job_id == self.exp_job_ids[1] and event.channel_id == 1: + channel_id = 2 + elif event.job_id == self.exp_job_ids[2]: + channel_id = 3 + + new_name = ( + f"U{event.well_x:02}--V{event.well_y:02}--E{event.job_id:02}--" + f"X{event.field_x:02}--Y{event.field_y:02}--" + f"Z{event.z_slice:02}--C{channel_id:02}.ome.tif" + ) + + await center.actions.rename_image.rename_image( + old_path=event.path, new_name=new_name + ) + + async def set_sample_img_ok(self, center, event): + """Set sample field img ok.""" + if not match_event(event, job_id=self.exp_job_ids[-1]): + return + + await center.actions.sample.set_field( + plate_name=event.plate_name, + well_x=event.well_x, + well_y=event.well_y, + field_x=event.field_x, + field_y=event.field_y, + img_ok=True, + ) + + +def get_last_gain_coords(x_fields, y_fields): + """Return a tuple with last gain coordinates x and y. + + The gain coordinates will be the two most centered fields. + """ + last_x_field = ceil(x_fields / 2) + last_y_field = ceil(y_fields / 2) - 1 + return last_x_field, last_y_field diff --git a/config_templates/production.yml b/config_templates/production.yml new file mode 100644 index 0000000..3a28568 --- /dev/null +++ b/config_templates/production.yml @@ -0,0 +1,62 @@ +production: + #sample_state_file: "/path/to/sample_state.csv" + gain_pattern_name: p10xgain + gain_job_id: 3 + gain_job_channels: 32 + exp_pattern_name: p10xexp + exp_job_ids: + - 3 + - 4 + - 6 + channels: + - channel: green + job_name: green10x + detector_num: 1 + default_gain: 800 + max_gain: 800 + - channel: blue + job_name: blue10x + detector_num: 1 + default_gain: 505 + max_gain: 610 + - channel: yellow + job_name: blue10x + detector_num: 2 + default_gain: 655 + max_gain: 760 + - channel: red + job_name: red10x + detector_num: 2 + default_gain: 630 + max_gain: 735 + well_layout: + x_fields: 2 + y_fields: 3 + #plot_save_path: "/path/to/gains/dir/00/" + +gain: + channels: + - channel: green + init_gain: [450, 495, 540, 585, 630, 675, 720, 765, 810, 855, 900] + - channel: blue + # 63x + #init_gain: [750, 730, 765, 800, 835, 870, 905] + # 10x + init_gain: [700, 735, 770, 805, 840, 875, 910] + - channel: yellow + # 63x + #init_gain: [550, 585, 620, 655, 690, 725, 760] + # 10x + init_gain: [700, 735, 770, 805, 840, 875, 910] + - channel: red + # 63x + #init_gain: [525, 560, 595, 630, 665, 700, 735] + # 10x + init_gain: [600, 635, 670, 705, 740, 775, 810] + #save_dir: "/path/to/gains/dir" + +rename_image: + +leica: + host: localhost + #imaging_dir: '/path/to/imaging_dir' diff --git a/pylintrc b/pylintrc index 5f3bd50..199fca4 100644 --- a/pylintrc +++ b/pylintrc @@ -7,4 +7,4 @@ disable= bad-continuation, duplicate-code, locally-disabled, - unused-argument, \ No newline at end of file + unused-argument, diff --git a/requirements.txt b/requirements.txt index 552706a..438a380 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -camacq==0.4.0 \ No newline at end of file +camacq==0.4.0 diff --git a/requirements_test.txt b/requirements_test.txt new file mode 100644 index 0000000..3bc9366 --- /dev/null +++ b/requirements_test.txt @@ -0,0 +1,6 @@ +asynctest==0.13.0 +pytest==5.3.1 +pytest-asyncio==0.10.0 +pytest-cov==2.8.1 +pytest-mock==1.12.1 +pytest-timeout==1.3.3 diff --git a/scripts/debug.py b/scripts/debug.py new file mode 100644 index 0000000..3a4ff8c --- /dev/null +++ b/scripts/debug.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""Start camacq to debug in vscode.""" +from camacq.__main__ import main + + +if __name__ == "__main__": + main(args=["--log-level", "debug"]) diff --git a/setup.cfg b/setup.cfg index eca367a..b3d11de 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,6 @@ [flake8] +ignore = E231, W503 max-line-length = 88 [pydocstyle] -add-ignore = D202 \ No newline at end of file +add-ignore = D202 diff --git a/setup.py b/setup.py index 0ffa448..8aada30 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ packages=setuptools.find_packages(), python_requires=">=3.6", install_requires=REQUIRES, + entry_points={"camacq.plugins": ["production = camacqplugins.production",],}, classifiers=[ "Development Status :: 2 - Pre-Alpha", "Programming Language :: Python", diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..3ae48e2 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,12 @@ +"""Provide package level pytest fixtures.""" +import pytest + +from camacq.control import Center + + +@pytest.fixture +def center(event_loop): + """Give access to center via fixture.""" + _center = Center(loop=event_loop) + _center._track_tasks = True # pylint: disable=protected-access + yield _center diff --git a/tests/production/test_production.py b/tests/production/test_production.py new file mode 100644 index 0000000..99e670d --- /dev/null +++ b/tests/production/test_production.py @@ -0,0 +1,158 @@ +"""Test the production plugin.""" +from unittest.mock import call + +import pytest +import voluptuous as vol +from asynctest import CoroutineMock +from ruamel.yaml import YAML + +from camacq import plugins +from camacq.plugins.api import ImageEvent +from camacq.plugins.gain import GainCalcEvent + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio # pylint: disable=invalid-name + +CONFIG = """ +production: + gain_pattern_name: p10xgain + gain_job_id: 3 + gain_job_channels: 32 + exp_pattern_name: p10xexp + exp_job_ids: + - 3 + - 4 + - 6 + channels: + - channel: green + job_name: green10x + detector_num: 1 + default_gain: 800 + max_gain: 800 + - channel: blue + job_name: blue10x + detector_num: 1 + default_gain: 800 + max_gain: 800 + - channel: yellow + job_name: blue10x + detector_num: 2 + default_gain: 695 + max_gain: 800 + - channel: red + job_name: red10x + detector_num: 2 + default_gain: 700 + max_gain: 800 + well_layout: + x_fields: 2 + y_fields: 3 +""" + +SAMPLE_STATE = """ +plate_name,well_x,well_y,channel_name,gain,field_x,field_y +00,0,0,,,0,0 +00,0,0,,,0,1 +00,0,1 +00,1,0 +00,1,1 +""".strip() + + +class WorkflowImageEvent(ImageEvent): + """Represent a test image event.""" + + event_type = "workflow_image_event" + + @property + def job_id(self): + """:int: Return job id of the image.""" + return self.data.get("job_id") + + +async def test_image_events(center, tmp_path): + """Test image events.""" + config = YAML(typ="safe").load(CONFIG) + plate_name = "00" + well_x = 0 + well_y = 0 + save_path = tmp_path / plate_name + await center.add_executor_job(save_path.mkdir) + config["production"]["plot_save_path"] = save_path + await plugins.setup_module(center, config) + save_path = save_path / f"{well_x}--{well_y}" + calc_gain = CoroutineMock() + gains = { + "green": 800, + "blue": 700, + "yellow": 600, + "red": 500, + } + + async def fire_gain_event(**kwargs): + """Fire gain event.""" + well_x = kwargs.get("well_x") + well_y = kwargs.get("well_y") + plate_name = kwargs.get("plate_name") + + for channel_name, gain in gains.items(): + event = GainCalcEvent( + { + "plate_name": plate_name, + "well_x": well_x, + "well_y": well_y, + "channel_name": channel_name, + "gain": gain, + } + ) + await center.bus.notify(event) + + calc_gain.side_effect = fire_gain_event + + center.actions.register( + "gain", "calc_gain", calc_gain, vol.Schema({}, extra=vol.ALLOW_EXTRA) + ) + + event = WorkflowImageEvent( + { + "path": "test_path", + "plate_name": plate_name, + "well_x": well_x, + "well_y": well_y, + "field_x": 1, + "field_y": 1, + "job_id": 3, + "channel_id": 31, + } + ) + center.create_task(center.bus.notify(event)) + await center.wait_for() + + assert calc_gain.call_count == 1 + assert calc_gain.call_args == call( + action_id="calc_gain", + plate_name=plate_name, + well_x=well_x, + well_y=well_y, + make_plots=True, + save_path=save_path, + ) + for channel_name, gain in gains.items(): + channel = center.sample.get_channel(plate_name, well_x, well_y, channel_name) + assert channel.gain == gain + + +async def test_load_sample(center, tmp_path): + """Test loading sample state from file.""" + state_file = tmp_path / "state_file.csv" + state_file.write_text(SAMPLE_STATE) + config = YAML(typ="safe").load(CONFIG) + config["production"]["sample_state_file"] = str(state_file) + plate_name = "00" + await plugins.setup_module(center, config) + await center.wait_for() + + plate = center.sample.get_plate(plate_name) + assert len(plate.wells) == 4 + well = center.sample.get_well(plate_name, 0, 0) + assert len(well.fields) == 2 diff --git a/tox.ini b/tox.ini index 0212152..bf47625 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,10 @@ [tox] -envlist=py36, py37 +envlist = py36, py37, py38 +skip_missing_interpreters = True [testenv] -commands=pytest -deps=pytest +commands = + pytest --timeout=30 --cov=camacqplugins --cov-report= {posargs} +deps = + -rrequirements.txt + -rrequirements_test.txt