Skip to content
Closed
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Added
- Add support for `EXTRACTOR_KEY` and `CLOWDER_EMAIL` environment variables to register
an extractor for just one user.

## 2.6.0 - 2022-06-14

This will change how clowder sees the extractors. If you have an extractor, and you specify
Expand Down
45 changes: 29 additions & 16 deletions pyclowder/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Connector(object):
registered_clowder = list()

def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, clowder_url=None, max_retry=10):
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
self.extractor_name = extractor_name
self.extractor_info = extractor_info
self.check_message = check_message
Expand All @@ -77,6 +77,8 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
else:
self.mounted_paths = mounted_paths
self.clowder_url = clowder_url
self.clowder_email = clowder_email
self.extractor_key = extractor_key
self.max_retry = max_retry

filename = 'notifications.json'
Expand Down Expand Up @@ -399,8 +401,8 @@ def _process_message(self, body):
# register extractor
url = "%sapi/extractors" % source_host
if url not in Connector.registered_clowder:
Connector.registered_clowder.append(url)
self.register_extractor("%s?key=%s" % (url, secret_key))
Connector.registered_clowder.append(url)

# tell everybody we are starting to process the file
self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing.")
Expand Down Expand Up @@ -518,18 +520,24 @@ def register_extractor(self, endpoints):

headers = {'Content-Type': 'application/json'}
data = self.extractor_info
if self.extractor_key:
data["unique_key"] = self.extractor_key

for url in endpoints.split(','):
if url not in Connector.registered_clowder:
Connector.registered_clowder.append(url)
try:
result = requests.post(url.strip(), headers=headers,
data=json.dumps(data),
verify=self.ssl_verify)
result.raise_for_status()
logger.debug("Registering extractor with %s : %s", url, result.text)
except Exception as exc: # pylint: disable=broad-except
logger.exception('Error in registering extractor: ' + str(exc))
if "unique_key" in data:
if url.find("?") > -1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this is the first parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the config has specified a registration URL without an API key, registering privately with just an email address won't be accepted. so it doesn't send the request.

url += "&user=%s" % self.clowder_email
else:
logger.info("Unable to register extractor without an API key.")
return
try:
result = requests.post(url.strip(), headers=headers,
data=json.dumps(data),
verify=self.ssl_verify)
result.raise_for_status()
logger.info("Registering extractor as %s : %s", url, result.text)
except Exception as exc: # pylint: disable=broad-except
logger.exception('Error in registering extractor: ' + str(exc))

# pylint: disable=no-self-use
def status_update(self, status, resource, message):
Expand Down Expand Up @@ -636,16 +644,19 @@ class RabbitMQConnector(Connector):
def __init__(self, extractor_name, extractor_info,
rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
heartbeat=5*60, clowder_url=None, max_retry=10):
heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths, clowder_url, max_retry)
ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email)
self.rabbitmq_uri = rabbitmq_uri
self.rabbitmq_exchange = rabbitmq_exchange
self.rabbitmq_key = rabbitmq_key
if rabbitmq_queue is None:
self.rabbitmq_queue = extractor_info['name']
else:
self.rabbitmq_queue = rabbitmq_queue
self.extractor_key = extractor_key
if extractor_key:
self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue)
self.channel = None
self.connection = None
self.consumer_tag = None
Expand Down Expand Up @@ -693,7 +704,7 @@ def connect(self):
routing_key="extractors." + self.extractor_name)

# start the extractor announcer
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.rabbitmq_queue, self.heartbeat)
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.clowder_email, self.rabbitmq_queue, self.heartbeat)
self.announcer.start_thread()

def listen(self):
Expand Down Expand Up @@ -797,10 +808,11 @@ def on_message(self, channel, method, header, body):


class RabbitMQBroadcast:
def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat):
def __init__(self, rabbitmq_uri, extractor_info, clowder_email, rabbitmq_queue, heartbeat):
self.active = True
self.rabbitmq_uri = rabbitmq_uri
self.extractor_info = extractor_info
self.clowder_email = clowder_email
self.rabbitmq_queue = rabbitmq_queue
self.heartbeat = heartbeat
self.id = str(uuid.uuid4())
Expand Down Expand Up @@ -830,6 +842,7 @@ def send_heartbeat(self):
message = {
'id': self.id,
'queue': self.rabbitmq_queue,
'owner': self.clowder_email,
'extractor_info': self.extractor_info
}
next_heartbeat = time.time()
Expand Down
18 changes: 17 additions & 1 deletion pyclowder/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def __init__(self):
rabbitmq_exchange = os.getenv('RABBITMQ_EXCHANGE', "")
clowder_url = os.getenv("CLOWDER_URL", "")
registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "")
extractor_key = os.getenv("EXTRACTOR_KEY", "")
clowder_email = os.getenv("CLOWDER_EMAIL", "")
logging_config = os.getenv("LOGGING")
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
input_file_path = os.getenv("INPUT_FILE_PATH")
Expand All @@ -91,6 +93,12 @@ def __init__(self):
self.parser.add_argument('--register', '-r', nargs='?', dest="registration_endpoints",
default=registration_endpoints,
help='Clowder registration URL (default=%s)' % registration_endpoints)
self.parser.add_argument('--key', '-k', dest="extractor_key",
default=extractor_key,
help='Unique key to use for extractor queue ID (sets extractor to private)')
self.parser.add_argument('--user', '-u', dest="clowder_email",
default=clowder_email,
help='Email address of Clowder user who will initially be assigned ownership (ignored if no --key provided)')
self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri,
help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%"))
self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename',
Expand Down Expand Up @@ -173,9 +181,17 @@ def start(self):
mounted_paths=json.loads(self.args.mounted_paths),
clowder_url=self.args.clowder_url,
max_retry=self.args.max_retry,
heartbeat=self.args.heartbeat)
heartbeat=self.args.heartbeat,
extractor_key=self.args.extractor_key,
clowder_email=self.args.clowder_email)
connector.connect()
connector.register_extractor(self.args.registration_endpoints)

url = "%sapi/extractors" % self.args.clowder_url
if url not in connector.registered_clowder:
connector.register_extractor("%s" % (url))
connector.registered_clowder.append(url)

threading.Thread(target=connector.listen, name="RabbitMQConnector").start()

elif self.args.connector == "HPC":
Expand Down