From ca1a9c9b7d4915e41f1a7c3bc5ee1d2a6b13af70 Mon Sep 17 00:00:00 2001 From: Nilo Resende Date: Mon, 7 Mar 2022 21:38:17 +0000 Subject: [PATCH 1/6] add DataprocInstantiateInlineWorkflowTemplateOperator example --- .../google/cloud/example_dags/example_dataproc.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py index 87acf3f66b6f2..4db390057e376 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataproc.py +++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py @@ -34,6 +34,7 @@ DataprocDeleteClusterOperator, DataprocGetBatchOperator, DataprocInstantiateWorkflowTemplateOperator, + DataprocInstantiateInlineWorkflowTemplateOperator, DataprocListBatchesOperator, DataprocSubmitJobOperator, DataprocUpdateClusterOperator, @@ -256,6 +257,13 @@ ) # [END how_to_cloud_dataproc_trigger_workflow_template] + # [START how_to_cloud_dataproc_instantiate_inline_workflow_template] + instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator( + task_id='instantiate_inline_workflow_template', + template=WORKFLOW_TEMPLATE, + region=REGION) + # [END how_to_cloud_dataproc_instantiate_inline_workflow_template] + pig_task = DataprocSubmitJobOperator( task_id="pig_task", job=PIG_JOB, region=REGION, project_id=PROJECT_ID ) From 276d606bde99b404831366cea6930ff7a8759a1e Mon Sep 17 00:00:00 2001 From: Nilo Resende Date: Mon, 7 Mar 2022 21:38:43 +0000 Subject: [PATCH 2/6] add DataprocInstantiateInlineWorkflowTemplateOperator doc --- .../operators/cloud/dataproc.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst index c4339fd8ff258..e5279420ef870 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst @@ -212,6 +212,15 @@ Once a workflow is created users can trigger it using :start-after: [START how_to_cloud_dataproc_trigger_workflow_template] :end-before: [END how_to_cloud_dataproc_trigger_workflow_template] +The inline operator is an alternative. It creates a workflow, run it, and delete it afterwards: +:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_dataproc_instantiate_inline_workflow_template] + :end-before: [END how_to_cloud_dataproc_instantiate_inline_workflow_template] + Create a Batch -------------- From 8a31a0f5d4da3887be92edcc7b653c9309ee2721 Mon Sep 17 00:00:00 2001 From: Nilo Resende Date: Mon, 7 Mar 2022 23:59:39 +0000 Subject: [PATCH 3/6] fix indentation --- .../google/cloud/example_dags/example_dataproc.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py index 4db390057e376..debb95cffb2ca 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataproc.py +++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py @@ -33,8 +33,8 @@ DataprocDeleteBatchOperator, DataprocDeleteClusterOperator, DataprocGetBatchOperator, - DataprocInstantiateWorkflowTemplateOperator, DataprocInstantiateInlineWorkflowTemplateOperator, + DataprocInstantiateWorkflowTemplateOperator, DataprocListBatchesOperator, DataprocSubmitJobOperator, DataprocUpdateClusterOperator, @@ -259,9 +259,8 @@ # [START how_to_cloud_dataproc_instantiate_inline_workflow_template] instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator( - task_id='instantiate_inline_workflow_template', - template=WORKFLOW_TEMPLATE, - region=REGION) + task_id='instantiate_inline_workflow_template', template=WORKFLOW_TEMPLATE, region=REGION + ) # [END how_to_cloud_dataproc_instantiate_inline_workflow_template] pig_task = DataprocSubmitJobOperator( From b3cd016c1f9e4e1c46aeae3dd901d1fc9b54ac7e Mon Sep 17 00:00:00 2001 From: Nilo Resende Date: Tue, 8 Mar 2022 00:44:11 +0000 Subject: [PATCH 4/6] update reference --- airflow/providers/google/cloud/operators/dataproc.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 753ca287132d5..7cf833357838e 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -1673,8 +1673,11 @@ class DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator): wait until the WorkflowTemplate is finished executing. .. seealso:: - Please refer to: - https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataprocInstantiateInlineWorkflowTemplateOperator` + + For more detail on about instantiate inline have a look at the reference: + https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.workflowTemplates/instantiateInline :param template: The template contents. (templated) :param project_id: The ID of the google cloud project in which From c890654fb859af43463f5352401c216ae8e16e9d Mon Sep 17 00:00:00 2001 From: Nilo Resende Date: Tue, 8 Mar 2022 13:03:34 +0000 Subject: [PATCH 5/6] fix test --- tests/always/test_project_structure.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 3200f39ffe94a..9c019f418744f 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -192,7 +192,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): # Please at the examples to those operators at the earliest convenience :) MISSING_EXAMPLES_FOR_OPERATORS = { - 'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator', 'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPGetStoredInfoTypeOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator', From 4f675e5e706931214f6e798d7f9b7cd672ed0dbe Mon Sep 17 00:00:00 2001 From: Nilo Resende Date: Tue, 8 Mar 2022 15:30:58 +0000 Subject: [PATCH 6/6] fix reference to documentation --- .../apache-airflow-providers-google/operators/cloud/dataproc.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst index e5279420ef870..1909249258dc2 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst @@ -32,6 +32,7 @@ Prerequisite Tasks .. _howto/operator:DataprocCreateClusterOperator: +.. _howto/operator:DataprocInstantiateInlineWorkflowTemplateOperator: Create a Cluster ----------------