From 9d8343bbab3574b549102f79875e0e57630a7bf2 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 7 Apr 2025 11:12:31 +0200 Subject: [PATCH 1/2] refactor: Ignore cursor specific parameters when instantiating Elasticsearch connection --- .../providers/elasticsearch/hooks/elasticsearch.py | 9 ++++++--- .../tests/unit/elasticsearch/hooks/test_elasticsearch.py | 5 +++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py index f9e50525fd6fb..fde29390a3707 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py @@ -18,6 +18,7 @@ from __future__ import annotations from collections.abc import Iterable, Mapping +from copy import deepcopy from functools import cached_property from typing import TYPE_CHECKING, Any from urllib import parse @@ -128,13 +129,15 @@ def __init__( self.user = user self.password = password self.scheme = scheme - self.kwargs = kwargs + self.kwargs = deepcopy(kwargs) + kwargs.pop("fetch_size", None) + kwargs.pop("field_multi_value_leniency", None) netloc = f"{host}:{port}" self.url = parse.urlunparse((scheme, netloc, "/", None, None, None)) if user and password: - self.es = Elasticsearch(self.url, http_auth=(user, password), **self.kwargs) + self.es = Elasticsearch(self.url, http_auth=(user, password), **kwargs) else: - self.es = Elasticsearch(self.url, **self.kwargs) + self.es = Elasticsearch(self.url, **kwargs) def cursor(self) -> ElasticsearchSQLCursor: return ElasticsearchSQLCursor(self.es, **self.kwargs) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py index 953e7dd50ef72..0862ce4833246 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py @@ -204,6 +204,11 @@ def test_execute_sql_query(self, mock_es): assert response == RESPONSE_WITHOUT_CURSOR + def test_connection_ignore_cursor_parameters(self): + assert ESConnection( + host="localhost", port=9200, fetch_size=1000, field_multi_value_leniency=True, + ) + class MockElasticsearch: def __init__(self, data: dict): From 872e212ee07801e794584c02974f649ffc3619b1 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 7 Apr 2025 11:41:47 +0200 Subject: [PATCH 2/2] refactor: Reformatted test_connection_ignore_cursor_parameters --- .../tests/unit/elasticsearch/hooks/test_elasticsearch.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py index 0862ce4833246..b8f8fe25bb6c9 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py @@ -206,7 +206,10 @@ def test_execute_sql_query(self, mock_es): def test_connection_ignore_cursor_parameters(self): assert ESConnection( - host="localhost", port=9200, fetch_size=1000, field_multi_value_leniency=True, + host="localhost", + port=9200, + fetch_size=1000, + field_multi_value_leniency=True, )