From 6cdaa24e4c2a7b3b4051685e6dde63a47a2cbed3 Mon Sep 17 00:00:00 2001 From: Brian Connelly Date: Thu, 8 Mar 2018 13:17:47 -0800 Subject: [PATCH 01/11] [AIRFLOW-2193] Add ROperator for using R The ROperator allows tasks to be specified using the R programming language for statistical computing. Tasks can be specified either as R operations or using a source file. Optionally, the last line of output can be pushed to an Xcom. ROperator requires that R be installed. By default, it uses the Rscript interpreter, but littler has also been tested. If Rscript is not in PATH, the full path can be specified as argument to `rscript_bin`. Data can be passed to R using either the environment or through templating. --- airflow/contrib/operators/r_operator.py | 133 ++++++++++++++++ tests/contrib/operators/test_r_operator.py | 171 +++++++++++++++++++++ 2 files changed, 304 insertions(+) create mode 100644 airflow/contrib/operators/r_operator.py create mode 100644 tests/contrib/operators/test_r_operator.py diff --git a/airflow/contrib/operators/r_operator.py b/airflow/contrib/operators/r_operator.py new file mode 100644 index 0000000000000..948b8e7ee5c15 --- /dev/null +++ b/airflow/contrib/operators/r_operator.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from builtins import bytes +import os +import signal +from subprocess import Popen, STDOUT, PIPE +from tempfile import gettempdir, NamedTemporaryFile + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.utils.file import TemporaryDirectory + + +class ROperator(BaseOperator): + """ + Execute an R script, command, or set of commands + + :param r_command: The command, set of commands, or a reference to an R + script (must have '.r' extension) to be executed (templated) + :type r_command: string + :param rscript_bin: The command to run to execute an R script (default: + 'Rscript'). If Rscript is not in the PATH, the full path can be + specified. Alternate interpreters can also be used (e.g., littler) by + specifying their executable here. + :type rscript_bin: string + :param env: If env is not None, it must be a mapping that defines the + environment variables for the new process; these are used instead + of inheriting the current process environment, which is the default + behavior. Note that this will remove PATH, which makes it impossible + to find `rscript_bin`. Either give the full path for `rscript_bin`, + or add `'PATH': os.environ['PATH']` to the given env. (templated) + :type env: dict + :param xcom_push: If xcom_push is True (default: False), the last line + written to stdout will also be pushed to an XCom (key 'return_value') + when the R command completes. + :type xcom_push: bool + :param output_encoding: encoding output from R (default: 'utf-8') + :type output_encoding: string + + """ + + template_fields = ('r_command', 'env') + template_ext = ('.r', '.R') + ui_color = '#C8D5E6' + + @apply_defaults + def __init__( + self, + r_command, + rscript_bin='Rscript', + env=None, + xcom_push=False, + output_encoding='utf-8', + *args, **kwargs): + + super(ROperator, self).__init__(*args, **kwargs) + self.r_command = r_command + self.rscript_bin = rscript_bin + self.env = env + self.xcom_push = xcom_push + self.output_encoding = output_encoding + + def execute(self, context): + """ + Execute the R command in a temporary directory + """ + + self.log.info("Tmp dir root location: \n %s", gettempdir()) + + with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: + with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: + + f.write(bytes(self.r_command, 'utf_8')) + f.flush() + fname = f.name + script_location = os.path.abspath(fname) + self.log.info("Temporary script location: %s", script_location) + + def pre_exec(): + # Restore default signal disposition and invoke setsid + for sig in ('SIGPIPE', 'SIGXFZ', 'SIGXFSZ'): + if hasattr(signal, sig): + signal.signal(getattr(signal, sig), signal.SIG_DFL) + os.setsid() + + self.log.info("Running command(s): %s", self.r_command) + + sp = Popen( + args=[self.rscript_bin, fname], + stdout=PIPE, + stderr=STDOUT, + cwd=tmp_dir, + env=self.env, + preexec_fn=pre_exec, + ) + + self.sp = sp + + self.log.info("Output:") + line = '' + for line in iter(sp.stdout.readline, b''): + line = line.decode(self.output_encoding).rstrip() + self.log.info(line) + + sp.wait() + self.log.info( + "Command exited with return code %s", + sp.returncode + ) + + if sp.returncode: + raise AirflowException('R command failed') + + if self.xcom_push: + self.log.info('Pushing last line of output to Xcom') + return line + + def on_kill(self): + self.log.info('Sending SIGTERM signal to R process group') + os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) diff --git a/tests/contrib/operators/test_r_operator.py b/tests/contrib/operators/test_r_operator.py new file mode 100644 index 0000000000000..13205d9c1e4d7 --- /dev/null +++ b/tests/contrib/operators/test_r_operator.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function, unicode_literals + +import os +import unittest + +from airflow import configuration, DAG +from airflow.contrib.operators.r_operator import ROperator +from airflow.models import TaskInstance +from airflow.utils import timezone + + +DEFAULT_DATE = timezone.datetime(2016, 1, 1) + + +class ROperatorTest(unittest.TestCase): + """Test the ROperator""" + + def setUp(self): + super(ROperatorTest, self).setUp() + configuration.load_test_config() + self.dag = DAG( + 'test_roperator_dag', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + }, + schedule_interval='@once' + ) + + self.xcom_test_str = 'Hello Airflow' + self.task_xcom = ROperator( + task_id='test_r_xcom', + r_command='cat("Ignored Line\n{}")'.format(self.xcom_test_str), + xcom_push=True, + dag=self.dag + ) + + def test_invalid_rscript_bin(self): + """Fail if invalid rscript_bin supplied""" + + try: + expected_error = FileNotFoundError + except NameError: + # py2 + expected_error = OSError + + task = ROperator( + task_id='test_r_bad_rscript', + r_command='print(Sys.Date())', + rscript_bin='somebadrscript', + dag=self.dag + ) + + self.assertIsNotNone(task) + + ti = TaskInstance(task=task, execution_date=timezone.utcnow()) + + with self.assertRaises(expected_error): + ti.run() + + def test_xcom_output(self): + """Test whether Xcom output is produced using last line""" + + self.task_xcom.xcom_push = True + + ti = TaskInstance( + task=self.task_xcom, + execution_date=timezone.utcnow() + ) + + ti.run() + self.assertIsNotNone(ti.duration) + + self.assertEqual( + ti.xcom_pull(task_ids=self.task_xcom.task_id, key='return_value'), + self.xcom_test_str + ) + + def test_xcom_none(self): + """Test whether no Xcom output is produced when push=False""" + + self.task_xcom.xcom_push = False + + ti = TaskInstance( + task=self.task_xcom, + execution_date=timezone.utcnow(), + ) + + ti.run() + self.assertIsNotNone(ti.duration) + self.assertIsNone(ti.xcom_pull(task_ids=self.task_xcom.task_id)) + + def test_env_vars(self): + """Test whether environment is passed properly""" + + test_var = 'TEST_VALUE_X' + test_str = 'Hello Airflow' + + task = ROperator( + task_id='test_env_vars', + r_command='cat(Sys.getenv("{}"))'.format(test_var), + env={test_var: test_str, "PATH": os.environ['PATH']}, + xcom_push=True, + dag=self.dag + ) + + ti = TaskInstance(task=task, execution_date=timezone.utcnow()) + + ti.run() + self.assertIsNotNone(ti.duration) + + self.assertEqual( + ti.xcom_pull(task_ids=task.task_id, key='return_value'), + test_str + ) + + def test_command_template(self): + """Test whether templating works properly with r_command""" + + task = ROperator( + task_id='test_cmd_template', + r_command='cat("{{ ds }}")', + dag=self.dag + ) + + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti.render_templates() + + self.assertEqual( + ti.task.r_command, + 'cat("{}")'.format(DEFAULT_DATE.date().isoformat()) + ) + + def test_env_templates(self): + """Test whether templating works properly with env vars""" + + test_var = 'TEST_CURR_DATE' + + task = ROperator( + task_id='test_env_template', + r_command='cat(Sys.getenv("{}"))'.format(test_var), + env={test_var: "{{ ds }}"}, + xcom_push=True, + dag=self.dag + ) + + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti.run() + + self.assertEqual( + ti.xcom_pull(task_ids=task.task_id, key='return_value'), + DEFAULT_DATE.date().isoformat() + ) + + +if __name__ == '__main__': + unittest.main() From 4dca37191921c6d84d90f1774033e8580e8e2f88 Mon Sep 17 00:00:00 2001 From: Brian Connelly Date: Thu, 8 Mar 2018 13:17:47 -0800 Subject: [PATCH 02/11] [AIRFLOW-2193] Add ROperator for using R The ROperator allows tasks to be specified using the R programming language for statistical computing. Tasks can be specified either as R operations or using a source file. Optionally, the last line of output can be pushed to an Xcom. ROperator requires that R be installed. By default, it uses the Rscript interpreter, but littler has also been tested. If Rscript is not in PATH, the full path can be specified as argument to `rscript_bin`. Data can be passed to R using either the environment or through templating. --- airflow/contrib/operators/r_operator.py | 133 ++++++++++++++++ tests/contrib/operators/test_r_operator.py | 171 +++++++++++++++++++++ 2 files changed, 304 insertions(+) create mode 100644 airflow/contrib/operators/r_operator.py create mode 100644 tests/contrib/operators/test_r_operator.py diff --git a/airflow/contrib/operators/r_operator.py b/airflow/contrib/operators/r_operator.py new file mode 100644 index 0000000000000..948b8e7ee5c15 --- /dev/null +++ b/airflow/contrib/operators/r_operator.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from builtins import bytes +import os +import signal +from subprocess import Popen, STDOUT, PIPE +from tempfile import gettempdir, NamedTemporaryFile + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.utils.file import TemporaryDirectory + + +class ROperator(BaseOperator): + """ + Execute an R script, command, or set of commands + + :param r_command: The command, set of commands, or a reference to an R + script (must have '.r' extension) to be executed (templated) + :type r_command: string + :param rscript_bin: The command to run to execute an R script (default: + 'Rscript'). If Rscript is not in the PATH, the full path can be + specified. Alternate interpreters can also be used (e.g., littler) by + specifying their executable here. + :type rscript_bin: string + :param env: If env is not None, it must be a mapping that defines the + environment variables for the new process; these are used instead + of inheriting the current process environment, which is the default + behavior. Note that this will remove PATH, which makes it impossible + to find `rscript_bin`. Either give the full path for `rscript_bin`, + or add `'PATH': os.environ['PATH']` to the given env. (templated) + :type env: dict + :param xcom_push: If xcom_push is True (default: False), the last line + written to stdout will also be pushed to an XCom (key 'return_value') + when the R command completes. + :type xcom_push: bool + :param output_encoding: encoding output from R (default: 'utf-8') + :type output_encoding: string + + """ + + template_fields = ('r_command', 'env') + template_ext = ('.r', '.R') + ui_color = '#C8D5E6' + + @apply_defaults + def __init__( + self, + r_command, + rscript_bin='Rscript', + env=None, + xcom_push=False, + output_encoding='utf-8', + *args, **kwargs): + + super(ROperator, self).__init__(*args, **kwargs) + self.r_command = r_command + self.rscript_bin = rscript_bin + self.env = env + self.xcom_push = xcom_push + self.output_encoding = output_encoding + + def execute(self, context): + """ + Execute the R command in a temporary directory + """ + + self.log.info("Tmp dir root location: \n %s", gettempdir()) + + with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: + with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: + + f.write(bytes(self.r_command, 'utf_8')) + f.flush() + fname = f.name + script_location = os.path.abspath(fname) + self.log.info("Temporary script location: %s", script_location) + + def pre_exec(): + # Restore default signal disposition and invoke setsid + for sig in ('SIGPIPE', 'SIGXFZ', 'SIGXFSZ'): + if hasattr(signal, sig): + signal.signal(getattr(signal, sig), signal.SIG_DFL) + os.setsid() + + self.log.info("Running command(s): %s", self.r_command) + + sp = Popen( + args=[self.rscript_bin, fname], + stdout=PIPE, + stderr=STDOUT, + cwd=tmp_dir, + env=self.env, + preexec_fn=pre_exec, + ) + + self.sp = sp + + self.log.info("Output:") + line = '' + for line in iter(sp.stdout.readline, b''): + line = line.decode(self.output_encoding).rstrip() + self.log.info(line) + + sp.wait() + self.log.info( + "Command exited with return code %s", + sp.returncode + ) + + if sp.returncode: + raise AirflowException('R command failed') + + if self.xcom_push: + self.log.info('Pushing last line of output to Xcom') + return line + + def on_kill(self): + self.log.info('Sending SIGTERM signal to R process group') + os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) diff --git a/tests/contrib/operators/test_r_operator.py b/tests/contrib/operators/test_r_operator.py new file mode 100644 index 0000000000000..13205d9c1e4d7 --- /dev/null +++ b/tests/contrib/operators/test_r_operator.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function, unicode_literals + +import os +import unittest + +from airflow import configuration, DAG +from airflow.contrib.operators.r_operator import ROperator +from airflow.models import TaskInstance +from airflow.utils import timezone + + +DEFAULT_DATE = timezone.datetime(2016, 1, 1) + + +class ROperatorTest(unittest.TestCase): + """Test the ROperator""" + + def setUp(self): + super(ROperatorTest, self).setUp() + configuration.load_test_config() + self.dag = DAG( + 'test_roperator_dag', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + }, + schedule_interval='@once' + ) + + self.xcom_test_str = 'Hello Airflow' + self.task_xcom = ROperator( + task_id='test_r_xcom', + r_command='cat("Ignored Line\n{}")'.format(self.xcom_test_str), + xcom_push=True, + dag=self.dag + ) + + def test_invalid_rscript_bin(self): + """Fail if invalid rscript_bin supplied""" + + try: + expected_error = FileNotFoundError + except NameError: + # py2 + expected_error = OSError + + task = ROperator( + task_id='test_r_bad_rscript', + r_command='print(Sys.Date())', + rscript_bin='somebadrscript', + dag=self.dag + ) + + self.assertIsNotNone(task) + + ti = TaskInstance(task=task, execution_date=timezone.utcnow()) + + with self.assertRaises(expected_error): + ti.run() + + def test_xcom_output(self): + """Test whether Xcom output is produced using last line""" + + self.task_xcom.xcom_push = True + + ti = TaskInstance( + task=self.task_xcom, + execution_date=timezone.utcnow() + ) + + ti.run() + self.assertIsNotNone(ti.duration) + + self.assertEqual( + ti.xcom_pull(task_ids=self.task_xcom.task_id, key='return_value'), + self.xcom_test_str + ) + + def test_xcom_none(self): + """Test whether no Xcom output is produced when push=False""" + + self.task_xcom.xcom_push = False + + ti = TaskInstance( + task=self.task_xcom, + execution_date=timezone.utcnow(), + ) + + ti.run() + self.assertIsNotNone(ti.duration) + self.assertIsNone(ti.xcom_pull(task_ids=self.task_xcom.task_id)) + + def test_env_vars(self): + """Test whether environment is passed properly""" + + test_var = 'TEST_VALUE_X' + test_str = 'Hello Airflow' + + task = ROperator( + task_id='test_env_vars', + r_command='cat(Sys.getenv("{}"))'.format(test_var), + env={test_var: test_str, "PATH": os.environ['PATH']}, + xcom_push=True, + dag=self.dag + ) + + ti = TaskInstance(task=task, execution_date=timezone.utcnow()) + + ti.run() + self.assertIsNotNone(ti.duration) + + self.assertEqual( + ti.xcom_pull(task_ids=task.task_id, key='return_value'), + test_str + ) + + def test_command_template(self): + """Test whether templating works properly with r_command""" + + task = ROperator( + task_id='test_cmd_template', + r_command='cat("{{ ds }}")', + dag=self.dag + ) + + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti.render_templates() + + self.assertEqual( + ti.task.r_command, + 'cat("{}")'.format(DEFAULT_DATE.date().isoformat()) + ) + + def test_env_templates(self): + """Test whether templating works properly with env vars""" + + test_var = 'TEST_CURR_DATE' + + task = ROperator( + task_id='test_env_template', + r_command='cat(Sys.getenv("{}"))'.format(test_var), + env={test_var: "{{ ds }}"}, + xcom_push=True, + dag=self.dag + ) + + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti.run() + + self.assertEqual( + ti.xcom_pull(task_ids=task.task_id, key='return_value'), + DEFAULT_DATE.date().isoformat() + ) + + +if __name__ == '__main__': + unittest.main() From 284b4974525baed405380d7623535ace0fa1e4b8 Mon Sep 17 00:00:00 2001 From: Brian Connelly Date: Tue, 8 May 2018 11:15:14 -0700 Subject: [PATCH 03/11] Update ROperator to use rpy2 instead of CLI --- airflow/contrib/operators/r_operator.py | 92 ++++++------------------- 1 file changed, 22 insertions(+), 70 deletions(-) diff --git a/airflow/contrib/operators/r_operator.py b/airflow/contrib/operators/r_operator.py index 948b8e7ee5c15..9974061892db8 100644 --- a/airflow/contrib/operators/r_operator.py +++ b/airflow/contrib/operators/r_operator.py @@ -14,35 +14,23 @@ from builtins import bytes import os -import signal -from subprocess import Popen, STDOUT, PIPE -from tempfile import gettempdir, NamedTemporaryFile +from tempfile import NamedTemporaryFile -from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory +import rpy2.robjects as robjects +from rpy2.rinterface import RRuntimeError + class ROperator(BaseOperator): """ - Execute an R script, command, or set of commands + Execute an R script or command - :param r_command: The command, set of commands, or a reference to an R - script (must have '.r' extension) to be executed (templated) + :param r_command: The command or a reference to an R script (must have + '.r' extension) to be executed (templated) :type r_command: string - :param rscript_bin: The command to run to execute an R script (default: - 'Rscript'). If Rscript is not in the PATH, the full path can be - specified. Alternate interpreters can also be used (e.g., littler) by - specifying their executable here. - :type rscript_bin: string - :param env: If env is not None, it must be a mapping that defines the - environment variables for the new process; these are used instead - of inheriting the current process environment, which is the default - behavior. Note that this will remove PATH, which makes it impossible - to find `rscript_bin`. Either give the full path for `rscript_bin`, - or add `'PATH': os.environ['PATH']` to the given env. (templated) - :type env: dict :param xcom_push: If xcom_push is True (default: False), the last line written to stdout will also be pushed to an XCom (key 'return_value') when the R command completes. @@ -52,7 +40,7 @@ class ROperator(BaseOperator): """ - template_fields = ('r_command', 'env') + template_fields = ('r_command',) template_ext = ('.r', '.R') ui_color = '#C8D5E6' @@ -60,26 +48,20 @@ class ROperator(BaseOperator): def __init__( self, r_command, - rscript_bin='Rscript', - env=None, xcom_push=False, output_encoding='utf-8', *args, **kwargs): super(ROperator, self).__init__(*args, **kwargs) self.r_command = r_command - self.rscript_bin = rscript_bin - self.env = env self.xcom_push = xcom_push self.output_encoding = output_encoding def execute(self, context): """ - Execute the R command in a temporary directory + Execute the R command or script in a temporary directory """ - self.log.info("Tmp dir root location: \n %s", gettempdir()) - with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: @@ -87,47 +69,17 @@ def execute(self, context): f.flush() fname = f.name script_location = os.path.abspath(fname) - self.log.info("Temporary script location: %s", script_location) - def pre_exec(): - # Restore default signal disposition and invoke setsid - for sig in ('SIGPIPE', 'SIGXFZ', 'SIGXFSZ'): - if hasattr(signal, sig): - signal.signal(getattr(signal, sig), signal.SIG_DFL) - os.setsid() - - self.log.info("Running command(s): %s", self.r_command) - - sp = Popen( - args=[self.rscript_bin, fname], - stdout=PIPE, - stderr=STDOUT, - cwd=tmp_dir, - env=self.env, - preexec_fn=pre_exec, - ) - - self.sp = sp - - self.log.info("Output:") - line = '' - for line in iter(sp.stdout.readline, b''): - line = line.decode(self.output_encoding).rstrip() - self.log.info(line) - - sp.wait() - self.log.info( - "Command exited with return code %s", - sp.returncode - ) - - if sp.returncode: - raise AirflowException('R command failed') - - if self.xcom_push: - self.log.info('Pushing last line of output to Xcom') - return line - - def on_kill(self): - self.log.info('Sending SIGTERM signal to R process group') - os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) + self.log.info("Temporary script location: %s", script_location) + self.log.info("Running command(s):\n%s", self.r_command) + + try: + res = robjects.r.source(fname, echo=False) + except RRuntimeError as e: + self.log.error("Received R error: %s", e) + res = None + + if self.xcom_push and res: + # This will be a pickled rpy2.robjects.vectors.ListVector + self.log.info('Pushing last line of output to Xcom: \n %s', res) + return res From 181c8a281a1ba2cb762c3a5bc1ac99e749d7683f Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Wed, 13 Feb 2019 09:52:17 -0800 Subject: [PATCH 04/11] Use do_xcom_push from BaseOperator Co-Authored-By: briandconnelly --- airflow/contrib/operators/r_operator.py | 4 +--- tests/contrib/operators/test_r_operator.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/contrib/operators/r_operator.py b/airflow/contrib/operators/r_operator.py index 9974061892db8..5fc95b5b66880 100644 --- a/airflow/contrib/operators/r_operator.py +++ b/airflow/contrib/operators/r_operator.py @@ -48,13 +48,11 @@ class ROperator(BaseOperator): def __init__( self, r_command, - xcom_push=False, output_encoding='utf-8', *args, **kwargs): super(ROperator, self).__init__(*args, **kwargs) self.r_command = r_command - self.xcom_push = xcom_push self.output_encoding = output_encoding def execute(self, context): @@ -79,7 +77,7 @@ def execute(self, context): self.log.error("Received R error: %s", e) res = None - if self.xcom_push and res: + if self.do_xcom_push and res: # This will be a pickled rpy2.robjects.vectors.ListVector self.log.info('Pushing last line of output to Xcom: \n %s', res) return res diff --git a/tests/contrib/operators/test_r_operator.py b/tests/contrib/operators/test_r_operator.py index 13205d9c1e4d7..2cbba85c9dbe2 100644 --- a/tests/contrib/operators/test_r_operator.py +++ b/tests/contrib/operators/test_r_operator.py @@ -75,7 +75,7 @@ def test_invalid_rscript_bin(self): def test_xcom_output(self): """Test whether Xcom output is produced using last line""" - self.task_xcom.xcom_push = True + self.task_xcom.do_xcom_push = True ti = TaskInstance( task=self.task_xcom, @@ -93,7 +93,7 @@ def test_xcom_output(self): def test_xcom_none(self): """Test whether no Xcom output is produced when push=False""" - self.task_xcom.xcom_push = False + self.task_xcom.do_xcom_push = False ti = TaskInstance( task=self.task_xcom, From 8d6bef54bc889838ae4e0e575007e33128c728e7 Mon Sep 17 00:00:00 2001 From: Brian Connelly Date: Wed, 13 Feb 2019 10:02:26 -0800 Subject: [PATCH 05/11] Remove old ENV var and rscript tests --- tests/contrib/operators/test_r_operator.py | 68 ---------------------- 1 file changed, 68 deletions(-) diff --git a/tests/contrib/operators/test_r_operator.py b/tests/contrib/operators/test_r_operator.py index 2cbba85c9dbe2..f979a8e0c7378 100644 --- a/tests/contrib/operators/test_r_operator.py +++ b/tests/contrib/operators/test_r_operator.py @@ -49,29 +49,6 @@ def setUp(self): dag=self.dag ) - def test_invalid_rscript_bin(self): - """Fail if invalid rscript_bin supplied""" - - try: - expected_error = FileNotFoundError - except NameError: - # py2 - expected_error = OSError - - task = ROperator( - task_id='test_r_bad_rscript', - r_command='print(Sys.Date())', - rscript_bin='somebadrscript', - dag=self.dag - ) - - self.assertIsNotNone(task) - - ti = TaskInstance(task=task, execution_date=timezone.utcnow()) - - with self.assertRaises(expected_error): - ti.run() - def test_xcom_output(self): """Test whether Xcom output is produced using last line""" @@ -104,30 +81,6 @@ def test_xcom_none(self): self.assertIsNotNone(ti.duration) self.assertIsNone(ti.xcom_pull(task_ids=self.task_xcom.task_id)) - def test_env_vars(self): - """Test whether environment is passed properly""" - - test_var = 'TEST_VALUE_X' - test_str = 'Hello Airflow' - - task = ROperator( - task_id='test_env_vars', - r_command='cat(Sys.getenv("{}"))'.format(test_var), - env={test_var: test_str, "PATH": os.environ['PATH']}, - xcom_push=True, - dag=self.dag - ) - - ti = TaskInstance(task=task, execution_date=timezone.utcnow()) - - ti.run() - self.assertIsNotNone(ti.duration) - - self.assertEqual( - ti.xcom_pull(task_ids=task.task_id, key='return_value'), - test_str - ) - def test_command_template(self): """Test whether templating works properly with r_command""" @@ -145,27 +98,6 @@ def test_command_template(self): 'cat("{}")'.format(DEFAULT_DATE.date().isoformat()) ) - def test_env_templates(self): - """Test whether templating works properly with env vars""" - - test_var = 'TEST_CURR_DATE' - - task = ROperator( - task_id='test_env_template', - r_command='cat(Sys.getenv("{}"))'.format(test_var), - env={test_var: "{{ ds }}"}, - xcom_push=True, - dag=self.dag - ) - - ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) - ti.run() - - self.assertEqual( - ti.xcom_pull(task_ids=task.task_id, key='return_value'), - DEFAULT_DATE.date().isoformat() - ) - if __name__ == '__main__': unittest.main() From 18c4b0d52019cecd36d42024dd111922bb405534 Mon Sep 17 00:00:00 2001 From: Brian Connelly Date: Wed, 13 Feb 2019 11:15:54 -0800 Subject: [PATCH 06/11] Remove `xcom_push` arg info from docstring --- airflow/contrib/operators/r_operator.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airflow/contrib/operators/r_operator.py b/airflow/contrib/operators/r_operator.py index 5fc95b5b66880..de3f8d333aa88 100644 --- a/airflow/contrib/operators/r_operator.py +++ b/airflow/contrib/operators/r_operator.py @@ -31,10 +31,6 @@ class ROperator(BaseOperator): :param r_command: The command or a reference to an R script (must have '.r' extension) to be executed (templated) :type r_command: string - :param xcom_push: If xcom_push is True (default: False), the last line - written to stdout will also be pushed to an XCom (key 'return_value') - when the R command completes. - :type xcom_push: bool :param output_encoding: encoding output from R (default: 'utf-8') :type output_encoding: string From b946e2383795e6d6c2d6b1e057d5a5010b20b02f Mon Sep 17 00:00:00 2001 From: zhongjiajie Date: Sun, 17 Feb 2019 10:09:39 -0800 Subject: [PATCH 07/11] remove `os` package to pass CI test Co-Authored-By: briandconnelly --- tests/contrib/operators/test_r_operator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/contrib/operators/test_r_operator.py b/tests/contrib/operators/test_r_operator.py index f979a8e0c7378..135d1ed764f1b 100644 --- a/tests/contrib/operators/test_r_operator.py +++ b/tests/contrib/operators/test_r_operator.py @@ -14,7 +14,6 @@ from __future__ import print_function, unicode_literals -import os import unittest from airflow import configuration, DAG From 1c3b61663eb73444508b802b88a8b619e23af9c0 Mon Sep 17 00:00:00 2001 From: zhongjiajie Date: Mon, 18 Feb 2019 09:23:56 +0800 Subject: [PATCH 08/11] Add `rpy2` in setup.py to install extras package --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3e0b37615675d..4e0090ae14475 100644 --- a/setup.py +++ b/setup.py @@ -172,6 +172,7 @@ def write_version(filename=os.path.join(*['airflow', pinot = ['pinotdb>=0.1.1'] postgres = ['psycopg2-binary>=2.7.4'] qds = ['qds-sdk>=1.9.6'] +r = ['rpy2>=2.9.5'] rabbitmq = ['librabbitmq>=1.6.1'] redis = ['redis>=2.10.5'] s3 = ['boto3>=1.7.0'] @@ -209,7 +210,7 @@ def write_version(filename=os.path.join(*['airflow', devel_minreq = devel + kubernetes + mysql + doc + password + s3 + cgroups devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + - docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog + + docker + ssh + kubernetes + celery + azure + r + redis + gcp_api + datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins + druid + pinot + snowflake + elasticsearch) @@ -310,6 +311,7 @@ def do_setup(): 'pinot': pinot, 'postgres': postgres, 'qds': qds, + 'r': r, 'rabbitmq': rabbitmq, 'redis': redis, 's3': s3, From 9862147f390f041caf423fb707c692edafa4fcfc Mon Sep 17 00:00:00 2001 From: Brian Connelly Date: Mon, 18 Feb 2019 16:52:49 -0800 Subject: [PATCH 09/11] Fix merge issue --- setup.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/setup.py b/setup.py index 0845cd12b56ac..d3219ce07b852 100644 --- a/setup.py +++ b/setup.py @@ -258,18 +258,11 @@ def write_version(filename=os.path.join(*['airflow', devel_minreq = devel + kubernetes + mysql + doc + password + cgroups devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos -<<<<<<< HEAD -devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + - docker + ssh + kubernetes + celery + azure + r + redis + gcp_api + datadog + - zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins + - druid + pinot + snowflake + elasticsearch) -======= devel_all = (sendgrid + devel + all_dbs + doc + samba + slack + crypto + oracle + docker + ssh + kubernetes + celery + redis + gcp_api + datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins + druid + pinot + segment + snowflake + elasticsearch + - atlas + azure + aws) ->>>>>>> upstream/master + atlas + azure + aws + r) # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'( if PY3: From 9f997a20a889fa42a77f0d72184d6dd267d15726 Mon Sep 17 00:00:00 2001 From: Brian Connelly Date: Tue, 19 Feb 2019 11:17:40 -0800 Subject: [PATCH 10/11] Add environment variable support - Environment variables can be exported using the `env` arg - Fixed outdated xcom support. Now just returning the result. --- airflow/contrib/operators/r_operator.py | 30 ++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/airflow/contrib/operators/r_operator.py b/airflow/contrib/operators/r_operator.py index de3f8d333aa88..7fe05136aca3a 100644 --- a/airflow/contrib/operators/r_operator.py +++ b/airflow/contrib/operators/r_operator.py @@ -19,6 +19,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory +from airflow.utils.operator_helpers import context_to_airflow_vars import rpy2.robjects as robjects from rpy2.rinterface import RRuntimeError @@ -28,15 +29,23 @@ class ROperator(BaseOperator): """ Execute an R script or command + If BaseOperator.do_xcom_push is True, the last line written to stdout + will also be pushed to an XCom when the R command completes + :param r_command: The command or a reference to an R script (must have '.r' extension) to be executed (templated) :type r_command: string + :param env: Optional list of environment variables and their (string) + values to set (templated). Unlike `BashOperator`, this does not + replace the current environment, although it can be used to override + existing values. Values can be read in R with `Sys.getenv()`. + :type env: dict :param output_encoding: encoding output from R (default: 'utf-8') :type output_encoding: string """ - template_fields = ('r_command',) + template_fields = ('r_command', 'env',) template_ext = ('.r', '.R') ui_color = '#C8D5E6' @@ -44,11 +53,13 @@ class ROperator(BaseOperator): def __init__( self, r_command, + env={}, output_encoding='utf-8', *args, **kwargs): super(ROperator, self).__init__(*args, **kwargs) self.r_command = r_command + self.env = env self.output_encoding = output_encoding def execute(self, context): @@ -56,6 +67,17 @@ def execute(self, context): Execute the R command or script in a temporary directory """ + # Export additional environment variables + os.environ.update(self.env) + + # Export context as environment variables + airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) + self.log.info('Exporting the following env vars:\n' + + '\n'.join(["{}={}".format(k, v) + for k, v in + airflow_context_vars.items()])) + os.environ.update(airflow_context_vars) + with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: @@ -73,7 +95,5 @@ def execute(self, context): self.log.error("Received R error: %s", e) res = None - if self.do_xcom_push and res: - # This will be a pickled rpy2.robjects.vectors.ListVector - self.log.info('Pushing last line of output to Xcom: \n %s', res) - return res + # This will be a pickled rpy2.robjects.vectors.ListVector + return res From 1a980dbcdfddbcd2e7b4605e8b071f1154c587e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Tue, 5 Mar 2019 07:14:30 -0800 Subject: [PATCH 11/11] Use logger formatting instead of format string Co-Authored-By: briandconnelly --- airflow/contrib/operators/r_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/contrib/operators/r_operator.py b/airflow/contrib/operators/r_operator.py index 7fe05136aca3a..2db3a6087274d 100644 --- a/airflow/contrib/operators/r_operator.py +++ b/airflow/contrib/operators/r_operator.py @@ -72,7 +72,7 @@ def execute(self, context): # Export context as environment variables airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) - self.log.info('Exporting the following env vars:\n' + + self.log.info('Exporting the following env vars:\n%s', '\n'.join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()]))