diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py index b8ad957a9c6d8..b1f25e5444380 100644 --- a/airflow/providers/airbyte/hooks/airbyte.py +++ b/airflow/providers/airbyte/hooks/airbyte.py @@ -84,10 +84,10 @@ async def get_job_details(self, job_id: int) -> Any: :param job_id: The ID of an Airbyte Sync Job. """ headers, base_url = await self.get_headers_tenants_from_connection() - url = f"{base_url}/api/{self.api_version}/jobs/get" + url = f"{base_url}/{self.api_version}/jobs/{job_id}" self.log.info("URL for api request: %s", url) async with aiohttp.ClientSession(headers=headers) as session: - async with session.post(url=url, data=json.dumps({"id": job_id})) as response: + async with session.get(url=url) as response: try: response.raise_for_status() return await response.json() @@ -147,8 +147,8 @@ def submit_sync_connection(self, connection_id: str) -> Any: :param connection_id: Required. The ConnectionId of the Airbyte Connection. """ return self.run( - endpoint=f"api/{self.api_version}/connections/sync", - json={"connectionId": connection_id}, + endpoint=f"{self.api_version}/jobs", + json={"connectionId": connection_id, "jobType": "sync"}, headers={"accept": "application/json"}, ) @@ -158,8 +158,9 @@ def get_job(self, job_id: int) -> Any: :param job_id: Required. Id of the Airbyte job """ + self.method = "GET" return self.run( - endpoint=f"api/{self.api_version}/jobs/get", + endpoint=f"{self.api_version}/jobs/{job_id}", json={"id": job_id}, headers={"accept": "application/json"}, ) @@ -170,9 +171,9 @@ def cancel_job(self, job_id: int) -> Any: :param job_id: Required. Id of the Airbyte job """ + self.method = "DELETE" return self.run( - endpoint=f"api/{self.api_version}/jobs/cancel", - json={"id": job_id}, + endpoint=f"{self.api_version}/jobs/{job_id}", headers={"accept": "application/json"}, ) @@ -181,7 +182,7 @@ def test_connection(self): self.method = "GET" try: res = self.run( - endpoint=f"api/{self.api_version}/health", + endpoint=f"health", headers={"accept": "application/json"}, extra_options={"check_response": False}, ) diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py index 84a12dadfa2e5..0cc1396e8590d 100644 --- a/airflow/providers/airbyte/operators/airbyte.py +++ b/airflow/providers/airbyte/operators/airbyte.py @@ -79,8 +79,8 @@ def execute(self, context: Context) -> None: """Create Airbyte Job and wait to finish.""" hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) job_object = hook.submit_sync_connection(connection_id=self.connection_id) - self.job_id = job_object.json()["job"]["id"] - state = job_object.json()["job"]["status"] + self.job_id = job_object.json()["jobId"] + state = job_object.json()["status"] end_time = time.time() + self.timeout self.log.info("Job %s was submitted to Airbyte Server", self.job_id) diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py index 4556d554304e1..7496e79fa0212 100644 --- a/airflow/providers/airbyte/sensors/airbyte.py +++ b/airflow/providers/airbyte/sensors/airbyte.py @@ -81,7 +81,7 @@ def __init__( def poke(self, context: Context) -> bool: hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) job = hook.get_job(job_id=self.airbyte_job_id) - status = job.json()["job"]["status"] + status = job.json()["status"] if status == hook.FAILED: # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 @@ -111,7 +111,7 @@ def execute(self, context: Context) -> Any: else: hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id) job = hook.get_job(job_id=(int(self.airbyte_job_id))) - state = job.json()["job"]["status"] + state = job.json()["status"] end_time = time.time() + self.timeout self.log.info("Airbyte Job Id: Job %s", self.airbyte_job_id) diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py index 400741820b85d..18a9e35b03e61 100644 --- a/tests/providers/airbyte/hooks/test_airbyte.py +++ b/tests/providers/airbyte/hooks/test_airbyte.py @@ -37,14 +37,14 @@ class TestAirbyteHook: airbyte_conn_id = "airbyte_conn_id_test" connection_id = "conn_test_sync" job_id = 1 - sync_connection_endpoint = "http://test-airbyte:8001/api/v1/connections/sync" - get_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/get" - cancel_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/cancel" + sync_connection_endpoint = "http://test-airbyte:8001/v1/jobs" + get_job_endpoint = f"http://test-airbyte:8001/v1/jobs/{job_id}" + cancel_job_endpoint = f"http://test-airbyte:8001/v1/jobs/{job_id}" - health_endpoint = "http://test-airbyte:8001/api/v1/health" - _mock_sync_conn_success_response_body = {"job": {"id": 1}} - _mock_job_status_success_response_body = {"job": {"status": "succeeded"}} - _mock_job_cancel_status = "cancelled" + health_endpoint = "http://test-airbyte:8001/health" + _mock_sync_conn_success_response_body = {"jobId": 1, "status":"pending"} + _mock_job_status_success_response_body = {"status": "succeeded"} + _mock_job_cancel_status = {"status":"cancelled"} def setup_method(self): db.merge_conn( @@ -68,7 +68,7 @@ def test_submit_sync_connection(self, requests_mock): assert resp.json() == self._mock_sync_conn_success_response_body def test_get_job_status(self, requests_mock): - requests_mock.post( + requests_mock.get( self.get_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body ) resp = self.hook.get_job(job_id=self.job_id) @@ -76,7 +76,7 @@ def test_get_job_status(self, requests_mock): assert resp.json() == self._mock_job_status_success_response_body def test_cancel_job(self, requests_mock): - requests_mock.post( + requests_mock.delete( self.cancel_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body ) resp = self.hook.cancel_job(job_id=self.job_id)