Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ REGISTRY=

TAG=latest

WFM_USER=admin
WFM_PASSWORD=mpfadm

# Set this if using "docker-compose.users.yml".
USER_PROPERTIES_PATH=

Expand Down
87 changes: 42 additions & 45 deletions components/component-executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from pathlib import Path
from typing import Any, Dict, NamedTuple, Optional, Tuple

import component_registration

Descriptor = Dict[str, Any]

Expand All @@ -54,12 +53,6 @@ def main():


class EnvConfig(NamedTuple):
wfm_user: str
wfm_password: str
wfm_base_url: str
oidc_issuer_uri: Optional[str]
oidc_client_id: Optional[str]
oidc_client_secret: Optional[str]
activemq_broker_uri: str
component_log_name: Optional[str]
disable_component_registration: bool
Expand All @@ -69,14 +62,6 @@ class EnvConfig(NamedTuple):

@staticmethod
def create():
oidc_issuer_uri = os.getenv('OIDC_JWT_ISSUER_URI', os.getenv('OIDC_ISSUER_URI'))
oidc_client_id = os.getenv('OIDC_CLIENT_ID')
oidc_client_secret = os.getenv('OIDC_CLIENT_SECRET')
if oidc_issuer_uri and (not oidc_client_id or not oidc_client_secret):
raise RuntimeError(
'The OIDC_CLIENT_ID and OIDC_CLIENT_SECRET environment variables must both '
'be set.')

activemq_broker_uri = os.getenv('ACTIVE_MQ_BROKER_URI')
if not activemq_broker_uri:
activemq_host = os.getenv('ACTIVE_MQ_HOST', 'workflow-manager')
Expand All @@ -90,12 +75,6 @@ def create():
else:
log_path = mpf_home / 'share/logs'
return EnvConfig(
os.getenv('WFM_USER', 'admin'),
os.getenv('WFM_PASSWORD', 'mpfadm'),
os.getenv('WFM_BASE_URL', 'http://workflow-manager:8080'),
oidc_issuer_uri,
oidc_client_id,
oidc_client_secret,
activemq_broker_uri,
os.getenv('COMPONENT_LOG_NAME'),
bool(os.getenv('DISABLE_COMPONENT_REGISTRATION')),
Expand All @@ -109,16 +88,9 @@ def init() -> Tuple[subprocess.Popen[str], Optional[subprocess.Popen[bytes]]]:

descriptor_path = find_descriptor(env_config.mpf_home)
print('Loading descriptor from', descriptor_path)
with open(descriptor_path, 'rb') as descriptor_file:
unparsed_descriptor = descriptor_file.read()

if env_config.disable_component_registration:
print('Component registration disabled because the '
'"DISABLE_COMPONENT_REGISTRATION" environment variable was set.')
else:
component_registration.register_component(env_config, unparsed_descriptor)
with open(descriptor_path) as descriptor_file:
descriptor = json.load(descriptor_file)

descriptor = json.loads(unparsed_descriptor)
if env_config.node_name:
node_name = env_config.node_name
else:
Expand All @@ -127,7 +99,11 @@ def init() -> Tuple[subprocess.Popen[str], Optional[subprocess.Popen[bytes]]]:
log_dir = env_config.base_log_path / node_name / 'log'

executor_proc = start_executor(
descriptor, env_config.mpf_home, env_config.activemq_broker_uri, node_name)
env_config.mpf_home,
descriptor,
descriptor_path,
env_config.activemq_broker_uri,
node_name)
tail_proc = tail_log_if_needed(log_dir, env_config.component_log_name, executor_proc.pid)

return executor_proc, tail_proc
Expand All @@ -152,13 +128,17 @@ def find_descriptor(mpf_home: Path) -> Path:
f'descriptors were found: {glob_matches}')


def start_executor(descriptor: Descriptor, mpf_home: Path, activemq_broker_uri: str, node_name: str
) -> subprocess.Popen[str]:
def start_executor(
mpf_home: Path,
descriptor: Descriptor,
descriptor_path: Path,
activemq_broker_uri: str,
node_name: str) -> subprocess.Popen[str]:
algorithm_name = descriptor['algorithm']['name'].upper()
queue_name = f'MPF.DETECTION_{algorithm_name}_REQUEST'
language = descriptor['sourceLanguage'].lower()

executor_env = get_executor_env_vars(mpf_home, descriptor, node_name)
executor_env = get_executor_env_vars(mpf_home, descriptor, descriptor_path, node_name)
if language in ('c++', 'python'):
amq_detection_component_path = str(mpf_home / 'bin/amq_detection_component')
batch_lib = expand_env_vars(descriptor['batchLibrary'], executor_env)
Expand Down Expand Up @@ -187,9 +167,9 @@ def start_executor(descriptor: Descriptor, mpf_home: Path, activemq_broker_uri:
text=True)

# Handle ctrl-c
signal.signal(signal.SIGINT, lambda sig, frame: forward_signal(sig, executor_proc))
signal.signal(signal.SIGINT, lambda sig, frame: handle_sig_int(executor_proc))
# Handle docker stop
signal.signal(signal.SIGTERM, lambda sig, frame: forward_signal(sig, executor_proc))
signal.signal(signal.SIGTERM, lambda sig, frame: handle_sig_term(executor_proc))
return executor_proc


Expand All @@ -213,10 +193,22 @@ def find_java_executor_jar(descriptor: Descriptor, mpf_home: Path) -> Path:
return expanded_executor_path


def forward_signal(sig: int, executor_proc: subprocess.Popen[str]) -> None:
signal_entry = signal.Signals(sig)
print(f'Sending {signal_entry.name}({sig}) to component executor.')
executor_proc.send_signal(sig)
def handle_sig_term(executor_proc: subprocess.Popen[str]):
print(f'Sending SIGTERM({signal.SIGTERM}) to component executor.')
executor_proc.terminate()
try:
executor_proc.wait(1)
except subprocess.TimeoutExpired:
executor_proc.kill()


def handle_sig_int(executor_proc: subprocess.Popen[str]):
print(f'Sending SIGINT({signal.SIGINT}) to component executor.')
executor_proc.send_signal(signal.SIGINT)
try:
executor_proc.wait(1)
except subprocess.TimeoutExpired:
handle_sig_term(executor_proc)


def tail_log_if_needed(log_dir: Path, component_log_name: Optional[str], executor_pid: int
Expand Down Expand Up @@ -246,11 +238,16 @@ def tail_log_if_needed(log_dir: Path, component_log_name: Optional[str], executo



def get_executor_env_vars(mpf_home: Path, descriptor: Descriptor, node_name: str) -> Dict[str, str]:
executor_env = os.environ.copy()
executor_env['THIS_MPF_NODE'] = node_name
executor_env['SERVICE_NAME'] = descriptor['componentName']
executor_env['COMPONENT_NAME'] = descriptor['componentName']
def get_executor_env_vars(
mpf_home: Path,
descriptor: Descriptor,
descriptor_path: Path,
node_name: str) -> Dict[str, str]:
executor_env = {**os.environ,
'THIS_MPF_NODE': node_name,
'SERVICE_NAME': descriptor['componentName'],
'COMPONENT_NAME': descriptor['componentName'],
'DESCRIPTOR_PATH': str(descriptor_path)}

for json_env_var in descriptor.get('environmentVariables', ()):
var_name = json_env_var['name']
Expand Down
Loading