From f96532ccffffaa1dc7da3d58ba459ccc534d2989 Mon Sep 17 00:00:00 2001 From: Mitch Date: Wed, 7 Feb 2024 18:29:49 +0000 Subject: [PATCH 1/3] updated the airbyte provider to use the recommended Airbyte API, moving away from the unsupported Configuration API --- airflow/providers/airbyte/__init__.py | 2 +- airflow/providers/airbyte/hooks/airbyte.py | 17 +++++++++-------- airflow/providers/airbyte/operators/airbyte.py | 4 ++-- airflow/providers/airbyte/sensors/airbyte.py | 4 ++-- tests/providers/airbyte/hooks/test_airbyte.py | 18 +++++++++--------- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/airflow/providers/airbyte/__init__.py b/airflow/providers/airbyte/__init__.py index cdbc598fd24ff..cfe3d2ed7d8c7 100644 --- a/airflow/providers/airbyte/__init__.py +++ b/airflow/providers/airbyte/__init__.py @@ -27,7 +27,7 @@ __all__ = ["__version__"] -__version__ = "3.6.0" +__version__ = "3.6.1" try: from airflow import __version__ as airflow_version diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py index b8ad957a9c6d8..8d9765e969519 100644 --- a/airflow/providers/airbyte/hooks/airbyte.py +++ b/airflow/providers/airbyte/hooks/airbyte.py @@ -84,10 +84,10 @@ async def get_job_details(self, job_id: int) -> Any: :param job_id: The ID of an Airbyte Sync Job. """ headers, base_url = await self.get_headers_tenants_from_connection() - url = f"{base_url}/api/{self.api_version}/jobs/get" + url = f"{base_url}/{self.api_version}/jobs/{job_id}" self.log.info("URL for api request: %s", url) async with aiohttp.ClientSession(headers=headers) as session: - async with session.post(url=url, data=json.dumps({"id": job_id})) as response: + async with session.get(url=url) as response: try: response.raise_for_status() return await response.json() @@ -147,8 +147,8 @@ def submit_sync_connection(self, connection_id: str) -> Any: :param connection_id: Required. The ConnectionId of the Airbyte Connection. """ return self.run( - endpoint=f"api/{self.api_version}/connections/sync", - json={"connectionId": connection_id}, + endpoint=f"{self.api_version}/jobs", + json={"connectionId": connection_id, "jobType": "sync"}, headers={"accept": "application/json"}, ) @@ -158,8 +158,9 @@ def get_job(self, job_id: int) -> Any: :param job_id: Required. Id of the Airbyte job """ + self.method = "GET" return self.run( - endpoint=f"api/{self.api_version}/jobs/get", + endpoint=f"{self.api_version}/jobs/{job_id}", json={"id": job_id}, headers={"accept": "application/json"}, ) @@ -170,9 +171,9 @@ def cancel_job(self, job_id: int) -> Any: :param job_id: Required. Id of the Airbyte job """ + self.method = "DELETE" return self.run( - endpoint=f"api/{self.api_version}/jobs/cancel", - json={"id": job_id}, + endpoint=f"{self.api_version}/jobs/{job_id}", headers={"accept": "application/json"}, ) @@ -181,7 +182,7 @@ def test_connection(self): self.method = "GET" try: res = self.run( - endpoint=f"api/{self.api_version}/health", + endpoint=f"{self.api_version}/health", headers={"accept": "application/json"}, extra_options={"check_response": False}, ) diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py index 84a12dadfa2e5..0cc1396e8590d 100644 --- a/airflow/providers/airbyte/operators/airbyte.py +++ b/airflow/providers/airbyte/operators/airbyte.py @@ -79,8 +79,8 @@ def execute(self, context: Context) -> None: """Create Airbyte Job and wait to finish.""" hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) job_object = hook.submit_sync_connection(connection_id=self.connection_id) - self.job_id = job_object.json()["job"]["id"] - state = job_object.json()["job"]["status"] + self.job_id = job_object.json()["jobId"] + state = job_object.json()["status"] end_time = time.time() + self.timeout self.log.info("Job %s was submitted to Airbyte Server", self.job_id) diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py index 4556d554304e1..7496e79fa0212 100644 --- a/airflow/providers/airbyte/sensors/airbyte.py +++ b/airflow/providers/airbyte/sensors/airbyte.py @@ -81,7 +81,7 @@ def __init__( def poke(self, context: Context) -> bool: hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) job = hook.get_job(job_id=self.airbyte_job_id) - status = job.json()["job"]["status"] + status = job.json()["status"] if status == hook.FAILED: # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 @@ -111,7 +111,7 @@ def execute(self, context: Context) -> Any: else: hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id) job = hook.get_job(job_id=(int(self.airbyte_job_id))) - state = job.json()["job"]["status"] + state = job.json()["status"] end_time = time.time() + self.timeout self.log.info("Airbyte Job Id: Job %s", self.airbyte_job_id) diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py index 400741820b85d..da7ab25703e57 100644 --- a/tests/providers/airbyte/hooks/test_airbyte.py +++ b/tests/providers/airbyte/hooks/test_airbyte.py @@ -37,14 +37,14 @@ class TestAirbyteHook: airbyte_conn_id = "airbyte_conn_id_test" connection_id = "conn_test_sync" job_id = 1 - sync_connection_endpoint = "http://test-airbyte:8001/api/v1/connections/sync" - get_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/get" - cancel_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/cancel" + sync_connection_endpoint = "http://test-airbyte:8001/v1/jobs" + get_job_endpoint = f"http://test-airbyte:8001/v1/jobs/{job_id}" + cancel_job_endpoint = f"http://test-airbyte:8001/v1/jobs/{job_id}" - health_endpoint = "http://test-airbyte:8001/api/v1/health" - _mock_sync_conn_success_response_body = {"job": {"id": 1}} - _mock_job_status_success_response_body = {"job": {"status": "succeeded"}} - _mock_job_cancel_status = "cancelled" + health_endpoint = "http://test-airbyte:8001/v1/health" + _mock_sync_conn_success_response_body = {"jobId": 1, "status":"pending"} + _mock_job_status_success_response_body = {"status": "succeeded"} + _mock_job_cancel_status = {"status":"cancelled"} def setup_method(self): db.merge_conn( @@ -68,7 +68,7 @@ def test_submit_sync_connection(self, requests_mock): assert resp.json() == self._mock_sync_conn_success_response_body def test_get_job_status(self, requests_mock): - requests_mock.post( + requests_mock.get( self.get_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body ) resp = self.hook.get_job(job_id=self.job_id) @@ -76,7 +76,7 @@ def test_get_job_status(self, requests_mock): assert resp.json() == self._mock_job_status_success_response_body def test_cancel_job(self, requests_mock): - requests_mock.post( + requests_mock.delete( self.cancel_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body ) resp = self.hook.cancel_job(job_id=self.job_id) From 686b2a486d9e1e08a24365ac0a83a3fc1716d121 Mon Sep 17 00:00:00 2001 From: Mitch Date: Thu, 8 Feb 2024 12:20:21 +0000 Subject: [PATCH 2/3] keep version at 3.6.0 --- airflow/providers/airbyte/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/airbyte/__init__.py b/airflow/providers/airbyte/__init__.py index cfe3d2ed7d8c7..cdbc598fd24ff 100644 --- a/airflow/providers/airbyte/__init__.py +++ b/airflow/providers/airbyte/__init__.py @@ -27,7 +27,7 @@ __all__ = ["__version__"] -__version__ = "3.6.1" +__version__ = "3.6.0" try: from airflow import __version__ as airflow_version From 9d1b10f1a0e68d8f4c74798faf22a42fca135f2b Mon Sep 17 00:00:00 2001 From: Mitch Date: Thu, 8 Feb 2024 15:06:00 +0000 Subject: [PATCH 3/3] health at /health not /v1/health --- airflow/providers/airbyte/hooks/airbyte.py | 2 +- tests/providers/airbyte/hooks/test_airbyte.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py index 8d9765e969519..b1f25e5444380 100644 --- a/airflow/providers/airbyte/hooks/airbyte.py +++ b/airflow/providers/airbyte/hooks/airbyte.py @@ -182,7 +182,7 @@ def test_connection(self): self.method = "GET" try: res = self.run( - endpoint=f"{self.api_version}/health", + endpoint=f"health", headers={"accept": "application/json"}, extra_options={"check_response": False}, ) diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py index da7ab25703e57..18a9e35b03e61 100644 --- a/tests/providers/airbyte/hooks/test_airbyte.py +++ b/tests/providers/airbyte/hooks/test_airbyte.py @@ -41,7 +41,7 @@ class TestAirbyteHook: get_job_endpoint = f"http://test-airbyte:8001/v1/jobs/{job_id}" cancel_job_endpoint = f"http://test-airbyte:8001/v1/jobs/{job_id}" - health_endpoint = "http://test-airbyte:8001/v1/health" + health_endpoint = "http://test-airbyte:8001/health" _mock_sync_conn_success_response_body = {"jobId": 1, "status":"pending"} _mock_job_status_success_response_body = {"status": "succeeded"} _mock_job_cancel_status = {"status":"cancelled"}