diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py index f9e50525fd6fb..fde29390a3707 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py @@ -18,6 +18,7 @@ from __future__ import annotations from collections.abc import Iterable, Mapping +from copy import deepcopy from functools import cached_property from typing import TYPE_CHECKING, Any from urllib import parse @@ -128,13 +129,15 @@ def __init__( self.user = user self.password = password self.scheme = scheme - self.kwargs = kwargs + self.kwargs = deepcopy(kwargs) + kwargs.pop("fetch_size", None) + kwargs.pop("field_multi_value_leniency", None) netloc = f"{host}:{port}" self.url = parse.urlunparse((scheme, netloc, "/", None, None, None)) if user and password: - self.es = Elasticsearch(self.url, http_auth=(user, password), **self.kwargs) + self.es = Elasticsearch(self.url, http_auth=(user, password), **kwargs) else: - self.es = Elasticsearch(self.url, **self.kwargs) + self.es = Elasticsearch(self.url, **kwargs) def cursor(self) -> ElasticsearchSQLCursor: return ElasticsearchSQLCursor(self.es, **self.kwargs) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py index 953e7dd50ef72..b8f8fe25bb6c9 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py @@ -204,6 +204,14 @@ def test_execute_sql_query(self, mock_es): assert response == RESPONSE_WITHOUT_CURSOR + def test_connection_ignore_cursor_parameters(self): + assert ESConnection( + host="localhost", + port=9200, + fetch_size=1000, + field_multi_value_leniency=True, + ) + class MockElasticsearch: def __init__(self, data: dict):