From b6a6490dd5ec62d1ae3dae783a2a72e27d395a71 Mon Sep 17 00:00:00 2001 From: Kengo Seki Date: Mon, 16 Jul 2018 20:51:55 -0400 Subject: [PATCH] [AIRFLOW-2758] Add a sensor for MongoDB This PR adds a sensor for MongoDB, which waits for some document that matches the given query to be inserted to the specified collection. --- .travis.yml | 1 + airflow/contrib/sensors/mongo_sensor.py | 58 +++++++++++++++++++ tests/contrib/sensors/test_mongo_sensor.py | 65 ++++++++++++++++++++++ 3 files changed, 124 insertions(+) create mode 100644 airflow/contrib/sensors/mongo_sensor.py create mode 100644 tests/contrib/sensors/test_mongo_sensor.py diff --git a/.travis.yml b/.travis.yml index 01c08d9537f64..81e43fb4b8675 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,6 +23,7 @@ jdk: - oraclejdk8 services: - cassandra + - mongodb - mysql - postgresql - rabbitmq diff --git a/airflow/contrib/sensors/mongo_sensor.py b/airflow/contrib/sensors/mongo_sensor.py new file mode 100644 index 0000000000000..3ed49a6f9d64f --- /dev/null +++ b/airflow/contrib/sensors/mongo_sensor.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow.contrib.hooks.mongo_hook import MongoHook +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class MongoSensor(BaseSensorOperator): + """ + Checks for the existence of a document which + matches the given query in MongoDB. Example: + + >>> mongo_sensor = MongoSensor(collection="coll", + ... query={"key": "value"}, + ... mongo_conn_id="mongo_default", + ... task_id="mongo_sensor") + """ + template_fields = ('collection', 'query') + + @apply_defaults + def __init__(self, collection, query, mongo_conn_id="mongo_default", *args, **kwargs): + """ + Create a new MongoSensor + + :param collection: Target MongoDB collection. + :type collection: string + :param query: The query to find the target document. + :type query: dict + :param mongo_conn_id: The connection ID to use + when connecting to MongoDB. + :type mongo_conn_id: string + """ + super(MongoSensor, self).__init__(*args, **kwargs) + self.mongo_conn_id = mongo_conn_id + self.collection = collection + self.query = query + + def poke(self, context): + self.log.info("Sensor check existence of the document " + "that matches the following query: %s", self.query) + hook = MongoHook(self.mongo_conn_id) + return hook.find(self.collection, self.query, find_one=True) is not None diff --git a/tests/contrib/sensors/test_mongo_sensor.py b/tests/contrib/sensors/test_mongo_sensor.py new file mode 100644 index 0000000000000..876cb99ba4251 --- /dev/null +++ b/tests/contrib/sensors/test_mongo_sensor.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import unittest + +from airflow import DAG +from airflow import configuration +from airflow.contrib.hooks.mongo_hook import MongoHook +from airflow.contrib.sensors.mongo_sensor import MongoSensor +from airflow.models import Connection +from airflow.utils import db, timezone + + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) + + +class TestMongoSensor(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + db.merge_conn( + Connection( + conn_id='mongo_test', conn_type='mongo', + host='localhost', port='27017', schema='test')) + + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + self.dag = DAG('test_dag_id', default_args=args) + + hook = MongoHook('mongo_test') + hook.insert_one('foo', {'bar': 'baz'}) + + self.sensor = MongoSensor( + task_id='test_task', + mongo_conn_id='mongo_test', + dag=self.dag, + collection='foo', + query={'bar': 'baz'} + ) + + def test_poke(self): + self.assertTrue(self.sensor.poke(None)) + + +if __name__ == '__main__': + unittest.main()