diff --git a/CHANGELOG.md b/CHANGELOG.md index 9941309..d7d7f76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## Unreleased + +### Added +- Add support for `EXTRACTOR_KEY` and `CLOWDER_EMAIL` environment variables to register +an extractor for just one user. + ## 2.6.0 - 2022-06-14 This will change how clowder sees the extractors. If you have an extractor, and you specify diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 1da8a22..41dcafb 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -66,7 +66,7 @@ class Connector(object): registered_clowder = list() def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, max_retry=10): + mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -77,6 +77,8 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m else: self.mounted_paths = mounted_paths self.clowder_url = clowder_url + self.clowder_email = clowder_email + self.extractor_key = extractor_key self.max_retry = max_retry filename = 'notifications.json' @@ -399,8 +401,8 @@ def _process_message(self, body): # register extractor url = "%sapi/extractors" % source_host if url not in Connector.registered_clowder: - Connector.registered_clowder.append(url) self.register_extractor("%s?key=%s" % (url, secret_key)) + Connector.registered_clowder.append(url) # tell everybody we are starting to process the file self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing.") @@ -518,18 +520,24 @@ def register_extractor(self, endpoints): headers = {'Content-Type': 'application/json'} data = self.extractor_info + if self.extractor_key: + data["unique_key"] = self.extractor_key for url in endpoints.split(','): - if url not in Connector.registered_clowder: - Connector.registered_clowder.append(url) - try: - result = requests.post(url.strip(), headers=headers, - data=json.dumps(data), - verify=self.ssl_verify) - result.raise_for_status() - logger.debug("Registering extractor with %s : %s", url, result.text) - except Exception as exc: # pylint: disable=broad-except - logger.exception('Error in registering extractor: ' + str(exc)) + if "unique_key" in data: + if url.find("?") > -1: + url += "&user=%s" % self.clowder_email + else: + logger.info("Unable to register extractor without an API key.") + return + try: + result = requests.post(url.strip(), headers=headers, + data=json.dumps(data), + verify=self.ssl_verify) + result.raise_for_status() + logger.info("Registering extractor as %s : %s", url, result.text) + except Exception as exc: # pylint: disable=broad-except + logger.exception('Error in registering extractor: ' + str(exc)) # pylint: disable=no-self-use def status_update(self, status, resource, message): @@ -636,9 +644,9 @@ class RabbitMQConnector(Connector): def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None, check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, - heartbeat=5*60, clowder_url=None, max_retry=10): + heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry) + ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email) self.rabbitmq_uri = rabbitmq_uri self.rabbitmq_exchange = rabbitmq_exchange self.rabbitmq_key = rabbitmq_key @@ -646,6 +654,9 @@ def __init__(self, extractor_name, extractor_info, self.rabbitmq_queue = extractor_info['name'] else: self.rabbitmq_queue = rabbitmq_queue + self.extractor_key = extractor_key + if extractor_key: + self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue) self.channel = None self.connection = None self.consumer_tag = None @@ -693,7 +704,7 @@ def connect(self): routing_key="extractors." + self.extractor_name) # start the extractor announcer - self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.rabbitmq_queue, self.heartbeat) + self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.clowder_email, self.rabbitmq_queue, self.heartbeat) self.announcer.start_thread() def listen(self): @@ -797,10 +808,11 @@ def on_message(self, channel, method, header, body): class RabbitMQBroadcast: - def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat): + def __init__(self, rabbitmq_uri, extractor_info, clowder_email, rabbitmq_queue, heartbeat): self.active = True self.rabbitmq_uri = rabbitmq_uri self.extractor_info = extractor_info + self.clowder_email = clowder_email self.rabbitmq_queue = rabbitmq_queue self.heartbeat = heartbeat self.id = str(uuid.uuid4()) @@ -830,6 +842,7 @@ def send_heartbeat(self): message = { 'id': self.id, 'queue': self.rabbitmq_queue, + 'owner': self.clowder_email, 'extractor_info': self.extractor_info } next_heartbeat = time.time() diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index 55fe3a0..275ff4a 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -66,6 +66,8 @@ def __init__(self): rabbitmq_exchange = os.getenv('RABBITMQ_EXCHANGE', "") clowder_url = os.getenv("CLOWDER_URL", "") registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "") + extractor_key = os.getenv("EXTRACTOR_KEY", "") + clowder_email = os.getenv("CLOWDER_EMAIL", "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") input_file_path = os.getenv("INPUT_FILE_PATH") @@ -91,6 +93,12 @@ def __init__(self): self.parser.add_argument('--register', '-r', nargs='?', dest="registration_endpoints", default=registration_endpoints, help='Clowder registration URL (default=%s)' % registration_endpoints) + self.parser.add_argument('--key', '-k', dest="extractor_key", + default=extractor_key, + help='Unique key to use for extractor queue ID (sets extractor to private)') + self.parser.add_argument('--user', '-u', dest="clowder_email", + default=clowder_email, + help='Email address of Clowder user who will initially be assigned ownership (ignored if no --key provided)') self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri, help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%")) self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename', @@ -173,9 +181,17 @@ def start(self): mounted_paths=json.loads(self.args.mounted_paths), clowder_url=self.args.clowder_url, max_retry=self.args.max_retry, - heartbeat=self.args.heartbeat) + heartbeat=self.args.heartbeat, + extractor_key=self.args.extractor_key, + clowder_email=self.args.clowder_email) connector.connect() connector.register_extractor(self.args.registration_endpoints) + + url = "%sapi/extractors" % self.args.clowder_url + if url not in connector.registered_clowder: + connector.register_extractor("%s" % (url)) + connector.registered_clowder.append(url) + threading.Thread(target=connector.listen, name="RabbitMQConnector").start() elif self.args.connector == "HPC":