Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions airflow/contrib/sensors/qubole_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@
class QuboleSensor(BaseSensorOperator):
"""
Base class for all Qubole Sensors

:param qubole_conn_id: The qubole connection to run the sensor against
:type qubole_conn_id: str
:param data: a JSON object containing payload, whose presence needs to be checked
:type data: a JSON object

.. note:: Both ``data`` and ``qubole_conn_id`` fields are template-supported. You can
also use ``.txt`` files for template driven use cases.
"""

template_fields = ('data', 'qubole_conn_id')
Expand Down Expand Up @@ -75,13 +67,45 @@ def poke(self, context):


class QuboleFileSensor(QuboleSensor):
"""
Wait for a file or folder to be present in cloud storage
and check for its presence via QDS APIs

:param qubole_conn_id: Connection id which consists of qds auth_token
:type qubole_conn_id: str
:param data: a JSON object containing payload, whose presence needs to be checked
Check this `example <https://github.com/apache/incubator-airflow/blob/master\
/airflow/contrib/example_dags/example_qubole_sensor.py>`_ for sample payload
structure.
:type data: a JSON object

.. note:: Both ``data`` and ``qubole_conn_id`` fields support templating. You can
also use ``.txt`` files for template-driven use cases.
"""

@apply_defaults
def __init__(self, *args, **kwargs):
self.sensor_class = FileSensor
super(QuboleFileSensor, self).__init__(*args, **kwargs)


class QubolePartitionSensor(QuboleSensor):
"""
Wait for a Hive partition to show up in QHS (Qubole Hive Service)
and check for its presence via QDS APIs

:param qubole_conn_id: Connection id which consists of qds auth_token
:type qubole_conn_id: str
:param data: a JSON object containing payload, whose presence needs to be checked.
Check this `example <https://github.com/apache/incubator-airflow/blob/master\
/airflow/contrib/example_dags/example_qubole_sensor.py>`_ for sample payload
structure.
:type data: a JSON object

.. note:: Both ``data`` and ``qubole_conn_id`` fields support templating. You can
also use ``.txt`` files for template-driven use cases.
"""

@apply_defaults
def __init__(self, *args, **kwargs):
self.sensor_class = PartitionSensor
Expand Down
26 changes: 26 additions & 0 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Integration
- :ref:`AWS`
- :ref:`Databricks`
- :ref:`GCP`
- :ref:`Qubole`

.. _ReverseProxy:

Expand Down Expand Up @@ -829,3 +830,28 @@ Google Kubernetes Engine Hook

.. autoclass:: airflow.contrib.hooks.gcp_container_hook.GKEClusterHook
:members:


.. _Qubole:

Qubole
------

Apache Airflow has a native operator and hooks to talk to `Qubole <https://qubole.com/>`__,
which lets you submit your big data jobs directly to Qubole from Apache Airflow.

QuboleOperator
''''''''''''''

.. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator

QubolePartitionSensor
'''''''''''''''''''''

.. autoclass:: airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor


QuboleFileSensor
''''''''''''''''

.. autoclass:: airflow.contrib.sensors.qubole_sensor.QuboleFileSensor