diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index e5fb6a7a26d91..fbd74e8e47d39 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -37,7 +37,8 @@ class DruidHook(BaseHook): which accepts index jobs :type druid_ingest_conn_id: string :param timeout: The interval between polling - the Druid job for the status of the ingestion job + the Druid job for the status of the ingestion job. + Must be greater than or equal to 1 :type timeout: int :param max_ingestion_time: The maximum ingestion time before assuming the job failed :type max_ingestion_time: int @@ -53,6 +54,9 @@ def __init__( self.max_ingestion_time = max_ingestion_time self.header = {'content-type': 'application/json'} + if self.timeout < 1: + raise ValueError("Druid timeout should be equal or greater than 1") + def get_conn_url(self): conn = self.get_connection(self.druid_ingest_conn_id) host = conn.host diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py index 6fd7b3cc76552..4243343b88227 100644 --- a/tests/hooks/test_druid_hook.py +++ b/tests/hooks/test_druid_hook.py @@ -98,7 +98,7 @@ def test_submit_unknown_response(self, m): @requests_mock.mock() def test_submit_timeout(self, m): - self.db_hook.timeout = 0 + self.db_hook.timeout = 1 self.db_hook.max_ingestion_time = 5 task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', @@ -131,7 +131,7 @@ def test_get_conn_url(self, mock_get_connection): get_conn_value.port = '1' get_conn_value.extra_dejson = {'endpoint': 'ingest'} mock_get_connection.return_value = get_conn_value - hook = DruidHook(timeout=0, max_ingestion_time=5) + hook = DruidHook(timeout=1, max_ingestion_time=5) self.assertEquals(hook.get_conn_url(), 'https://test_host:1/ingest')