From d96a2fd7eb4c84f8df10d2539f28c585efd6e4ec Mon Sep 17 00:00:00 2001 From: darkag Date: Fri, 16 Jun 2023 18:20:45 +0200 Subject: [PATCH 01/12] first try parameters --- airflow/providers/vertica/hooks/vertica.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index 92a74ea36901c..6f0e5b6b154bd 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -45,6 +45,12 @@ def get_conn(self) -> connect: conn_config["port"] = 5433 else: conn_config["port"] = int(conn.port) - + + if conn.extra_dejson.get("connection_load_balance", False): + conn_config["connection_load_balance"] = bool(conn.extra_dejson["connection_load_balance"]) + + if conn.extra_dejson.get("session_label", False): + conn_config["session_label"] = conn.extra_dejson["session_label"] + conn = connect(**conn_config) return conn From f5cd910cb74275b79699d434d51eeefd2acfb86f Mon Sep 17 00:00:00 2001 From: darkag Date: Tue, 20 Jun 2023 19:19:21 +0200 Subject: [PATCH 02/12] Add binding to all options Add binding from extra to vertica connection parameters --- airflow/providers/vertica/hooks/vertica.py | 39 ++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index 6f0e5b6b154bd..df866cc158e6a 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -51,6 +51,45 @@ def get_conn(self) -> connect: if conn.extra_dejson.get("session_label", False): conn_config["session_label"] = conn.extra_dejson["session_label"] + + if conn.extra_dejson.get("backup_server_node", False): + conn_config["backup_server_node"] = conn.extra_dejson["backup_server_node"] + + if conn.extra_dejson.get("binary_transfer", False): + conn_config["binary_transfer"] = bool(conn.extra_dejson["binary_transfer"]) + + if conn.extra_dejson.get("connection_timeout", False): + conn_config["connection_timeout"] = int(conn.extra_dejson["connection_timeout"]) + + if conn.extra_dejson.get("disable_copy_local", False): + conn_config["disable_copy_local"] = bool(conn.extra_dejson["disable_copy_local"]) + + if conn.extra_dejson.get("kerberos_host_name", False): + conn_config["kerberos_host_name"] = conn.extra_dejson["kerberos_host_name"] + + if conn.extra_dejson.get("kerberos_service_name", False): + conn_config["kerberos_service_name"] = conn.extra_dejson["kerberos_service_name"] + + if conn.extra_dejson.get("log_level", False): + conn_config["log_level"] = conn.extra_dejson["log_level"] + if conn.extra_dejson.get("log_path", False): + conn_config["log_path"] = conn.extra_dejson["log_path"] + + if conn.extra_dejson.get("request_complex_types", False): + conn_config["request_complex_types"] = bool(conn.extra_dejson["request_complex_types"]) + + if conn.extra_dejson.get("use_prepared_statements", False): + conn_config["use_prepared_statements"] = bool(conn.extra_dejson["use_prepared_statements"]) + + if conn.extra_dejson.get("unicode_error", False): + conn_config["unicode_error"] = conn.extra_dejson["unicode_error"] + + if conn.extra_dejson.get("workload", False): + conn_config["workload"] = conn.extra_dejson["workload"] + + if conn.extra_dejson.get("ssl", False): + conn_config["ssl"] = conn.extra_dejson["ssl"] + conn = connect(**conn_config) return conn From 60da8c355bf6e8d9cb97fb0728ffb34994b95f9f Mon Sep 17 00:00:00 2001 From: darkag Date: Fri, 23 Jun 2023 21:22:41 +0200 Subject: [PATCH 03/12] Update vertica.py Simplification of code, better handling of log level parameter, correction for "request_complex_types" which is true by default (it would have been impossible to pass false with the previous version) --- airflow/providers/vertica/hooks/vertica.py | 85 ++++++++++++---------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index df866cc158e6a..6821a1148beb0 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -46,50 +46,57 @@ def get_conn(self) -> connect: else: conn_config["port"] = int(conn.port) - if conn.extra_dejson.get("connection_load_balance", False): - conn_config["connection_load_balance"] = bool(conn.extra_dejson["connection_load_balance"]) + conn_extra = conn.extra_dejson + if "connection_load_balance" in conn_extra: + conn_config["connection_load_balance"] = bool(conn_extra["connection_load_balance"]) - if conn.extra_dejson.get("session_label", False): - conn_config["session_label"] = conn.extra_dejson["session_label"] - - if conn.extra_dejson.get("backup_server_node", False): - conn_config["backup_server_node"] = conn.extra_dejson["backup_server_node"] - - if conn.extra_dejson.get("binary_transfer", False): - conn_config["binary_transfer"] = bool(conn.extra_dejson["binary_transfer"]) - - if conn.extra_dejson.get("connection_timeout", False): - conn_config["connection_timeout"] = int(conn.extra_dejson["connection_timeout"]) - - if conn.extra_dejson.get("disable_copy_local", False): - conn_config["disable_copy_local"] = bool(conn.extra_dejson["disable_copy_local"]) - - if conn.extra_dejson.get("kerberos_host_name", False): - conn_config["kerberos_host_name"] = conn.extra_dejson["kerberos_host_name"] - - if conn.extra_dejson.get("kerberos_service_name", False): - conn_config["kerberos_service_name"] = conn.extra_dejson["kerberos_service_name"] - - if conn.extra_dejson.get("log_level", False): - conn_config["log_level"] = conn.extra_dejson["log_level"] + conn_config["session_label"] = conn_extra.get("session_label") + conn_config["backup_server_node"] = conn_extra.get("backup_server_node") + + if "binary_transfer" in conn_extra: + conn_config["binary_transfer"] = bool(conn_extra["binary_transfer"]) + + if "connection_timeout" in conn_extra: + conn_config["connection_timeout"] = int(conn_extra["connection_timeout"]) + + if "disable_copy_local" in conn_extra: + conn_config["disable_copy_local"] = bool(conn_extra["disable_copy_local"]) + + conn_config["kerberos_host_name"] = conn_extra.get("kerberos_host_name") + conn_config["kerberos_service_name"] = conn_extra.get("kerberos_service_name") + + if "log_level" in conn_extra: + import logging + log_lvl = conn_extra["log_level"] + if isinstance(log_lvl, str): + log_lvl = log_lvl.lower() + if log_lvl == "critical": + conn_config["log_level"] = logging.CRITICAL + elif log_lvl == "error": + conn_config["log_level"] = logging.ERROR + elif log_lvl == "warning": + conn_config["log_level"] = logging.WARNING + elif log_lvl == "info": + conn_config["log_level"] = logging.INFO + elif log_lvl == "debug": + conn_config["log_level"] = logging.DEBUG + elif log_lvl == "notset": + conn_config["log_level"] = logging.NOTSET + else: + conn_config["log_level"] = int(conn_extra["log_level"]) - if conn.extra_dejson.get("log_path", False): - conn_config["log_path"] = conn.extra_dejson["log_path"] - - if conn.extra_dejson.get("request_complex_types", False): - conn_config["request_complex_types"] = bool(conn.extra_dejson["request_complex_types"]) - - if conn.extra_dejson.get("use_prepared_statements", False): - conn_config["use_prepared_statements"] = bool(conn.extra_dejson["use_prepared_statements"]) + if "log_path" in conn_extra: + conn_config["log_path"] = conn_extra["log_path"] - if conn.extra_dejson.get("unicode_error", False): - conn_config["unicode_error"] = conn.extra_dejson["unicode_error"] + if "request_complex_types" in conn_extra: + conn_config["request_complex_types"] = bool(conn_extra["request_complex_types"]) - if conn.extra_dejson.get("workload", False): - conn_config["workload"] = conn.extra_dejson["workload"] + if "use_prepared_statements" in conn_extra: + conn_config["use_prepared_statements"] = bool(conn_extra["use_prepared_statements"]) - if conn.extra_dejson.get("ssl", False): - conn_config["ssl"] = conn.extra_dejson["ssl"] + conn_config["unicode_error"] = conn_extra.get("unicode_error") + conn_config["workload"] = conn_extra.get("workload") + conn_config["ssl"] = conn_extra.get("ssl") conn = connect(**conn_config) return conn From 478b4ef1fd9d9002b29a9280baaa58069d252a1f Mon Sep 17 00:00:00 2001 From: darkag Date: Sat, 24 Jun 2023 07:44:12 +0200 Subject: [PATCH 04/12] Fix to pass test Fix useless spaces, pass options only if present in extra, loop over options to reduce code size --- airflow/providers/vertica/hooks/vertica.py | 40 +++++++--------------- 1 file changed, 12 insertions(+), 28 deletions(-) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index 6821a1148beb0..522417b52c6e0 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -45,28 +45,25 @@ def get_conn(self) -> connect: conn_config["port"] = 5433 else: conn_config["port"] = int(conn.port) - - conn_extra = conn.extra_dejson - if "connection_load_balance" in conn_extra: - conn_config["connection_load_balance"] = bool(conn_extra["connection_load_balance"]) - - conn_config["session_label"] = conn_extra.get("session_label") - conn_config["backup_server_node"] = conn_extra.get("backup_server_node") - if "binary_transfer" in conn_extra: - conn_config["binary_transfer"] = bool(conn_extra["binary_transfer"]) + boolOptions = ["connection_load_balance", "binary_transfer", "disable_copy_local", "request_complex_types", "use_prepared_statements"] + stdOptions = ["session_label", "backup_server_node", "kerberos_host_name", "kerberos_service_name", "log_path", "unicode_error", "workload", "ssl"] + conn_extra = conn.extra_dejson - if "connection_timeout" in conn_extra: - conn_config["connection_timeout"] = int(conn_extra["connection_timeout"]) + for bo in boolOptions: + if bo in conn_extra: + conn_config[bo] = bool(conn_extra[bo]) - if "disable_copy_local" in conn_extra: - conn_config["disable_copy_local"] = bool(conn_extra["disable_copy_local"]) + for so in stdOptions: + if so in conn_extra: + conn_config[so] = conn_extra[so] - conn_config["kerberos_host_name"] = conn_extra.get("kerberos_host_name") - conn_config["kerberos_service_name"] = conn_extra.get("kerberos_service_name") + if "connection_timeout" in conn_extra: + conn_config["connection_timeout"] = int(conn_extra["connection_timeout"]) if "log_level" in conn_extra: import logging + log_lvl = conn_extra["log_level"] if isinstance(log_lvl, str): log_lvl = log_lvl.lower() @@ -84,19 +81,6 @@ def get_conn(self) -> connect: conn_config["log_level"] = logging.NOTSET else: conn_config["log_level"] = int(conn_extra["log_level"]) - - if "log_path" in conn_extra: - conn_config["log_path"] = conn_extra["log_path"] - - if "request_complex_types" in conn_extra: - conn_config["request_complex_types"] = bool(conn_extra["request_complex_types"]) - - if "use_prepared_statements" in conn_extra: - conn_config["use_prepared_statements"] = bool(conn_extra["use_prepared_statements"]) - conn_config["unicode_error"] = conn_extra.get("unicode_error") - conn_config["workload"] = conn_extra.get("workload") - conn_config["ssl"] = conn_extra.get("ssl") - conn = connect(**conn_config) return conn From 688adcc7137d0bb763d236f58d3008112e9ced29 Mon Sep 17 00:00:00 2001 From: darkag Date: Sun, 25 Jun 2023 07:22:41 +0200 Subject: [PATCH 05/12] fix naming convention --- airflow/providers/vertica/hooks/vertica.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index 522417b52c6e0..7de7ac6bcd964 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -46,15 +46,15 @@ def get_conn(self) -> connect: else: conn_config["port"] = int(conn.port) - boolOptions = ["connection_load_balance", "binary_transfer", "disable_copy_local", "request_complex_types", "use_prepared_statements"] - stdOptions = ["session_label", "backup_server_node", "kerberos_host_name", "kerberos_service_name", "log_path", "unicode_error", "workload", "ssl"] + bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "request_complex_types", "use_prepared_statements"] + std_options = ["session_label", "backup_server_node", "kerberos_host_name", "kerberos_service_name", "log_path", "unicode_error", "workload", "ssl"] conn_extra = conn.extra_dejson - for bo in boolOptions: + for bo in bool_options: if bo in conn_extra: conn_config[bo] = bool(conn_extra[bo]) - for so in stdOptions: + for so in std_options: if so in conn_extra: conn_config[so] = conn_extra[so] From 2e8a108c106fa8e984298e6f7ef1b796dd53bb96 Mon Sep 17 00:00:00 2001 From: ivascot Date: Sun, 25 Jun 2023 12:59:29 +0200 Subject: [PATCH 06/12] add tests and doc --- airflow/providers/vertica/hooks/vertica.py | 2 +- .../connections/vertica.rst | 85 +++++++++++++++++++ tests/providers/vertica/hooks/test_vertica.py | 49 +++++++++++ 3 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 docs/apache-airflow-providers-vertica/connections/vertica.rst diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index 7de7ac6bcd964..d6366bb739ad0 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -59,7 +59,7 @@ def get_conn(self) -> connect: conn_config[so] = conn_extra[so] if "connection_timeout" in conn_extra: - conn_config["connection_timeout"] = int(conn_extra["connection_timeout"]) + conn_config["connection_timeout"] = float(conn_extra["connection_timeout"]) if "log_level" in conn_extra: import logging diff --git a/docs/apache-airflow-providers-vertica/connections/vertica.rst b/docs/apache-airflow-providers-vertica/connections/vertica.rst new file mode 100644 index 0000000000000..65384e6f0f730 --- /dev/null +++ b/docs/apache-airflow-providers-vertica/connections/vertica.rst @@ -0,0 +1,85 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:vertica: + +Vertica Connection +================ +The Vertica connection type provides connection to a Vertica database. + +Configuring the Connection +-------------------------- +Host (required) + The host to connect to. + +Schema (optional) + Specify the schema name to be used in the database. + +Login (required) + Specify the user name to connect. + +Password (required) + Specify the password to connect. + +Extra (optional) + Specify the extra parameters (as json dictionary) that can be used in Vertica + connection. + + The following extras are supported: + + * ``backup_server_node``: See `Connection Failover `. + * ``binary_transfer``: See `Data Transfer Format `_. + * ``connection_load_balance``: See `Connection Load Balancing `_. + * ``connection_timeout``: The number of seconds (can be a nonnegative floating point number) the client + waits for a socket operation (Establishing a TCP connection or read/write operation). + * ``disable_copy_local``: See `COPY FROM LOCAL `_. + * ``kerberos_host_name``: See `Kerberos Authentication `_. + * ``kerberos_service_name``: See `Kerberos Authentication `_. + * ``log_level``: See `Logging `_. + * ``log_path``: See `Logging `_. + * ``request_complex_types:``: See `SQL Data conversion to Python objects `_. + * ``session_label``: Sets a label for the connection on the server. + * ``ssl``: Support only True or False. See `TLS/SSL `_. + * ``unicode_error``: See `UTF-8 encoding issues `_. + * ``use_prepared_statements``: See `Passing parameters to SQL queries `_. + * ``workload``: Sets the workload name associated with this session. + + See `vertica-python docs `_ for details. + + + Example "extras" field: + + .. code-block:: json + + { + "connection_load_balance": True, + "log_path": "/tmp/vertica.log", + "log_level": "error", + "ssl": True + } + + or + + .. code-block:: json + + { + "session_label": "airflow-session", + "connection_timeout": 30, + "backup_server_node": ["bck_server_1", "bck_server_2"] + } diff --git a/tests/providers/vertica/hooks/test_vertica.py b/tests/providers/vertica/hooks/test_vertica.py index e78c2a0c5c813..8dfb09022f1e2 100644 --- a/tests/providers/vertica/hooks/test_vertica.py +++ b/tests/providers/vertica/hooks/test_vertica.py @@ -47,6 +47,55 @@ def test_get_conn(self, mock_connect): host="host", port=5433, database="vertica", user="login", password="password" ) + @patch("airflow.providers.vertica.hooks.vertica.connect") + def test_get_conn_extra_parameters_no_cast(self, mock_connect): + extra_dict = self.connection.extra_dejson + bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "use_prepared_statements"] + for bo in bool_options: + extra_dict.update({ bo: True}) + extra_dict.update({ "request_complex_types": False}) + + std_options = ["session_label", "kerberos_host_name", "kerberos_service_name", "log_path", "unicode_error", "workload", "ssl"] + for so in std_options: + extra_dict.update({ so: so}) + bck_server_node = ["1.2.3.4", "4.3.2.1"] + conn_timeout = 30 + log_lvl = 40 + extra_dict.update({"backup_server_node":bck_server_node}) + extra_dict.update({"connection_timeout":conn_timeout}) + extra_dict.update({"log_level":log_lvl}) + + self.db_hook.get_conn() + assert mock_connect.call_count == 1 + args, kwargs = mock_connect.call_args + assert args == () + for bo in bool_options: + assert kwargs[bo] == True + assert kwargs["request_complex_types"] == False + for so in std_options: + assert kwargs[so] == so + assert bck_server_node[0] in kwargs["backup_server_node"] + assert bck_server_node[1] in kwargs["backup_server_node"] + assert kwargs["connection_timeout"] == conn_timeout + assert kwargs["log_level"] == log_lvl + + @patch("airflow.providers.vertica.hooks.vertica.connect") + def test_get_conn_extra_parameters_cast(self, mock_connect): + import logging + extra_dict = self.connection.extra_dejson + bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "use_prepared_statements"] + for bo in bool_options: + extra_dict.update({ bo: "True"}) + extra_dict.update({ "request_complex_types": "False"}) + extra_dict.update({"log_level":"Error"}) + self.db_hook.get_conn() + assert mock_connect.call_count == 1 + args, kwargs = mock_connect.call_args + assert args == () + for bo in bool_options: + assert kwargs[bo] == True + assert kwargs["request_complex_types"] == False + assert kwargs["log_level"] == logging.ERROR class TestVerticaHook: def setup_method(self): From da1a22b10bf2de94840c87389f399a2e4e1235c5 Mon Sep 17 00:00:00 2001 From: ivascot Date: Mon, 26 Jun 2023 09:34:56 +0200 Subject: [PATCH 07/12] Remove log_path from Extra --- airflow/providers/vertica/hooks/vertica.py | 3 ++- docs/apache-airflow-providers-vertica/connections/vertica.rst | 3 +-- tests/providers/vertica/hooks/test_vertica.py | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index d6366bb739ad0..780b03ec33747 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -47,7 +47,7 @@ def get_conn(self) -> connect: conn_config["port"] = int(conn.port) bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "request_complex_types", "use_prepared_statements"] - std_options = ["session_label", "backup_server_node", "kerberos_host_name", "kerberos_service_name", "log_path", "unicode_error", "workload", "ssl"] + std_options = ["session_label", "backup_server_node", "kerberos_host_name", "kerberos_service_name", "unicode_error", "workload", "ssl"] conn_extra = conn.extra_dejson for bo in bool_options: @@ -65,6 +65,7 @@ def get_conn(self) -> connect: import logging log_lvl = conn_extra["log_level"] + conn_config["log_path"] = None if isinstance(log_lvl, str): log_lvl = log_lvl.lower() if log_lvl == "critical": diff --git a/docs/apache-airflow-providers-vertica/connections/vertica.rst b/docs/apache-airflow-providers-vertica/connections/vertica.rst index 65384e6f0f730..81cdb39e9e1c1 100644 --- a/docs/apache-airflow-providers-vertica/connections/vertica.rst +++ b/docs/apache-airflow-providers-vertica/connections/vertica.rst @@ -51,8 +51,7 @@ Extra (optional) * ``disable_copy_local``: See `COPY FROM LOCAL `_. * ``kerberos_host_name``: See `Kerberos Authentication `_. * ``kerberos_service_name``: See `Kerberos Authentication `_. - * ``log_level``: See `Logging `_. - * ``log_path``: See `Logging `_. + * ``log_level``: Enable vertica client logging. Traces will be visible in tasks log. See `Logging `_. * ``request_complex_types:``: See `SQL Data conversion to Python objects `_. * ``session_label``: Sets a label for the connection on the server. * ``ssl``: Support only True or False. See `TLS/SSL `_. diff --git a/tests/providers/vertica/hooks/test_vertica.py b/tests/providers/vertica/hooks/test_vertica.py index 8dfb09022f1e2..2bc2c327d5689 100644 --- a/tests/providers/vertica/hooks/test_vertica.py +++ b/tests/providers/vertica/hooks/test_vertica.py @@ -55,7 +55,7 @@ def test_get_conn_extra_parameters_no_cast(self, mock_connect): extra_dict.update({ bo: True}) extra_dict.update({ "request_complex_types": False}) - std_options = ["session_label", "kerberos_host_name", "kerberos_service_name", "log_path", "unicode_error", "workload", "ssl"] + std_options = ["session_label", "kerberos_host_name", "kerberos_service_name", "unicode_error", "workload", "ssl"] for so in std_options: extra_dict.update({ so: so}) bck_server_node = ["1.2.3.4", "4.3.2.1"] @@ -78,6 +78,7 @@ def test_get_conn_extra_parameters_no_cast(self, mock_connect): assert bck_server_node[1] in kwargs["backup_server_node"] assert kwargs["connection_timeout"] == conn_timeout assert kwargs["log_level"] == log_lvl + assert kwargs["log_path"] is None @patch("airflow.providers.vertica.hooks.vertica.connect") def test_get_conn_extra_parameters_cast(self, mock_connect): @@ -96,6 +97,7 @@ def test_get_conn_extra_parameters_cast(self, mock_connect): assert kwargs[bo] == True assert kwargs["request_complex_types"] == False assert kwargs["log_level"] == logging.ERROR + assert kwargs["log_path"] is None class TestVerticaHook: def setup_method(self): From 78731fad434edce3ec13585526d2925cefabef07 Mon Sep 17 00:00:00 2001 From: ivascot Date: Mon, 26 Jun 2023 13:31:02 +0200 Subject: [PATCH 08/12] remove log_path from example --- docs/apache-airflow-providers-vertica/connections/vertica.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/apache-airflow-providers-vertica/connections/vertica.rst b/docs/apache-airflow-providers-vertica/connections/vertica.rst index 81cdb39e9e1c1..0007305b3c94c 100644 --- a/docs/apache-airflow-providers-vertica/connections/vertica.rst +++ b/docs/apache-airflow-providers-vertica/connections/vertica.rst @@ -68,7 +68,6 @@ Extra (optional) { "connection_load_balance": True, - "log_path": "/tmp/vertica.log", "log_level": "error", "ssl": True } From 2b756d829022e9c4d454de0b1d3230a65f58edc5 Mon Sep 17 00:00:00 2001 From: ivascot Date: Mon, 26 Jun 2023 18:14:21 +0200 Subject: [PATCH 09/12] add docstring for test_get_conn_extra_parameters_cast --- tests/providers/vertica/hooks/test_vertica.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/providers/vertica/hooks/test_vertica.py b/tests/providers/vertica/hooks/test_vertica.py index 2bc2c327d5689..ea9cad1ab77ca 100644 --- a/tests/providers/vertica/hooks/test_vertica.py +++ b/tests/providers/vertica/hooks/test_vertica.py @@ -82,6 +82,9 @@ def test_get_conn_extra_parameters_no_cast(self, mock_connect): @patch("airflow.providers.vertica.hooks.vertica.connect") def test_get_conn_extra_parameters_cast(self, mock_connect): + """Test if parameters that can be passed either as string or int/bool + like log_level are correctly converted when passed as string + (while test_get_conn_extra_parameters_no_cast tests them passed as int/bool)""" import logging extra_dict = self.connection.extra_dejson bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "use_prepared_statements"] From ebd4acd4b1abe644a7cd8c0b40d443c0227a260c Mon Sep 17 00:00:00 2001 From: ivascot Date: Mon, 26 Jun 2023 18:29:04 +0200 Subject: [PATCH 10/12] docstring --- tests/providers/vertica/hooks/test_vertica.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/providers/vertica/hooks/test_vertica.py b/tests/providers/vertica/hooks/test_vertica.py index ea9cad1ab77ca..62751552e203e 100644 --- a/tests/providers/vertica/hooks/test_vertica.py +++ b/tests/providers/vertica/hooks/test_vertica.py @@ -49,6 +49,7 @@ def test_get_conn(self, mock_connect): @patch("airflow.providers.vertica.hooks.vertica.connect") def test_get_conn_extra_parameters_no_cast(self, mock_connect): + """Test if parameters are correctly passed to connection""" extra_dict = self.connection.extra_dejson bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "use_prepared_statements"] for bo in bool_options: From 594f5fffdd9cde5ee6ab535ab5073ae138da7653 Mon Sep 17 00:00:00 2001 From: ivascot Date: Wed, 28 Jun 2023 22:17:27 +0200 Subject: [PATCH 11/12] Fix error in tests and typo for static checks --- airflow/providers/vertica/hooks/vertica.py | 18 +++++- .../connections/vertica.rst | 8 +-- tests/providers/vertica/hooks/test_vertica.py | 58 ++++++++++++------- 3 files changed, 58 insertions(+), 26 deletions(-) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index 780b03ec33747..ab0355413bd2b 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -46,8 +46,22 @@ def get_conn(self) -> connect: else: conn_config["port"] = int(conn.port) - bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "request_complex_types", "use_prepared_statements"] - std_options = ["session_label", "backup_server_node", "kerberos_host_name", "kerberos_service_name", "unicode_error", "workload", "ssl"] + bool_options = [ + "connection_load_balance", + "binary_transfer", + "disable_copy_local", + "request_complex_types", + "use_prepared_statements", + ] + std_options = [ + "session_label", + "backup_server_node", + "kerberos_host_name", + "kerberos_service_name", + "unicode_error", + "workload", + "ssl" + ] conn_extra = conn.extra_dejson for bo in bool_options: diff --git a/docs/apache-airflow-providers-vertica/connections/vertica.rst b/docs/apache-airflow-providers-vertica/connections/vertica.rst index 0007305b3c94c..2a040d0c2c69e 100644 --- a/docs/apache-airflow-providers-vertica/connections/vertica.rst +++ b/docs/apache-airflow-providers-vertica/connections/vertica.rst @@ -39,14 +39,14 @@ Password (required) Extra (optional) Specify the extra parameters (as json dictionary) that can be used in Vertica - connection. + connection. The following extras are supported: * ``backup_server_node``: See `Connection Failover `. * ``binary_transfer``: See `Data Transfer Format `_. * ``connection_load_balance``: See `Connection Load Balancing `_. - * ``connection_timeout``: The number of seconds (can be a nonnegative floating point number) the client + * ``connection_timeout``: The number of seconds (can be a nonnegative floating point number) the client. waits for a socket operation (Establishing a TCP connection or read/write operation). * ``disable_copy_local``: See `COPY FROM LOCAL `_. * ``kerberos_host_name``: See `Kerberos Authentication `_. @@ -67,9 +67,9 @@ Extra (optional) .. code-block:: json { - "connection_load_balance": True, + "connection_load_balance": true, "log_level": "error", - "ssl": True + "ssl": true } or diff --git a/tests/providers/vertica/hooks/test_vertica.py b/tests/providers/vertica/hooks/test_vertica.py index 62751552e203e..d8b8345984a93 100644 --- a/tests/providers/vertica/hooks/test_vertica.py +++ b/tests/providers/vertica/hooks/test_vertica.py @@ -19,7 +19,7 @@ from unittest import mock from unittest.mock import patch - +import json from airflow.models import Connection from airflow.providers.vertica.hooks.vertica import VerticaHook @@ -51,28 +51,40 @@ def test_get_conn(self, mock_connect): def test_get_conn_extra_parameters_no_cast(self, mock_connect): """Test if parameters are correctly passed to connection""" extra_dict = self.connection.extra_dejson - bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "use_prepared_statements"] + bool_options = [ + "connection_load_balance", + "binary_transfer", + "disable_copy_local", + "use_prepared_statements" + ] for bo in bool_options: - extra_dict.update({ bo: True}) - extra_dict.update({ "request_complex_types": False}) - - std_options = ["session_label", "kerberos_host_name", "kerberos_service_name", "unicode_error", "workload", "ssl"] + extra_dict.update({bo: True}) + extra_dict.update({"request_complex_types": False}) + + std_options = [ + "session_label", + "kerberos_host_name", + "kerberos_service_name", + "unicode_error", + "workload", + "ssl" + ] for so in std_options: - extra_dict.update({ so: so}) + extra_dict.update({so: so}) bck_server_node = ["1.2.3.4", "4.3.2.1"] conn_timeout = 30 log_lvl = 40 - extra_dict.update({"backup_server_node":bck_server_node}) - extra_dict.update({"connection_timeout":conn_timeout}) - extra_dict.update({"log_level":log_lvl}) - + extra_dict.update({"backup_server_node": bck_server_node}) + extra_dict.update({"connection_timeout": conn_timeout}) + extra_dict.update({"log_level": log_lvl}) + self.connection.extra = json.dumps(extra_dict) self.db_hook.get_conn() assert mock_connect.call_count == 1 args, kwargs = mock_connect.call_args assert args == () for bo in bool_options: - assert kwargs[bo] == True - assert kwargs["request_complex_types"] == False + assert kwargs[bo] is True + assert kwargs["request_complex_types"] is False for so in std_options: assert kwargs[so] == so assert bck_server_node[0] in kwargs["backup_server_node"] @@ -84,22 +96,28 @@ def test_get_conn_extra_parameters_no_cast(self, mock_connect): @patch("airflow.providers.vertica.hooks.vertica.connect") def test_get_conn_extra_parameters_cast(self, mock_connect): """Test if parameters that can be passed either as string or int/bool - like log_level are correctly converted when passed as string - (while test_get_conn_extra_parameters_no_cast tests them passed as int/bool)""" + like log_level are correctly converted when passed as string + (while test_get_conn_extra_parameters_no_cast tests them passed as int/bool)""" import logging extra_dict = self.connection.extra_dejson - bool_options = ["connection_load_balance", "binary_transfer", "disable_copy_local", "use_prepared_statements"] + bool_options = [ + "connection_load_balance", + "binary_transfer", + "disable_copy_local", + "use_prepared_statements" + ] for bo in bool_options: extra_dict.update({ bo: "True"}) - extra_dict.update({ "request_complex_types": "False"}) - extra_dict.update({"log_level":"Error"}) + extra_dict.update({"request_complex_types": "False"}) + extra_dict.update({"log_level": "Error"}) + self.connection.extra = json.dumps(extra_dict) self.db_hook.get_conn() assert mock_connect.call_count == 1 args, kwargs = mock_connect.call_args assert args == () for bo in bool_options: - assert kwargs[bo] == True - assert kwargs["request_complex_types"] == False + assert kwargs[bo] is True + assert kwargs["request_complex_types"] is False assert kwargs["log_level"] == logging.ERROR assert kwargs["log_path"] is None From d588b9434528d01568cc40aaa1f3202cf7bbdf18 Mon Sep 17 00:00:00 2001 From: darkag Date: Thu, 29 Jun 2023 22:48:23 +0200 Subject: [PATCH 12/12] multiples fixes (doc, test, parsing) --- airflow/providers/vertica/hooks/vertica.py | 4 ++-- .../connections/vertica.rst | 10 +++++----- docs/apache-airflow-providers-vertica/index.rst | 7 +++++++ tests/providers/vertica/hooks/test_vertica.py | 13 ++++++++----- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index ab0355413bd2b..06b2e3cf179b7 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -60,13 +60,13 @@ def get_conn(self) -> connect: "kerberos_service_name", "unicode_error", "workload", - "ssl" + "ssl", ] conn_extra = conn.extra_dejson for bo in bool_options: if bo in conn_extra: - conn_config[bo] = bool(conn_extra[bo]) + conn_config[bo] = str(conn_extra[bo]).lower() in ["true", "on"] for so in std_options: if so in conn_extra: diff --git a/docs/apache-airflow-providers-vertica/connections/vertica.rst b/docs/apache-airflow-providers-vertica/connections/vertica.rst index 2a040d0c2c69e..86f583a548124 100644 --- a/docs/apache-airflow-providers-vertica/connections/vertica.rst +++ b/docs/apache-airflow-providers-vertica/connections/vertica.rst @@ -20,7 +20,7 @@ .. _howto/connection:vertica: Vertica Connection -================ +================== The Vertica connection type provides connection to a Vertica database. Configuring the Connection @@ -43,11 +43,11 @@ Extra (optional) The following extras are supported: - * ``backup_server_node``: See `Connection Failover `. + * ``backup_server_node``: See `Connection Failover `_. * ``binary_transfer``: See `Data Transfer Format `_. * ``connection_load_balance``: See `Connection Load Balancing `_. - * ``connection_timeout``: The number of seconds (can be a nonnegative floating point number) the client. - waits for a socket operation (Establishing a TCP connection or read/write operation). + * ``connection_timeout``: The number of seconds (can be a nonnegative floating point number) the client + waits for a socket operation (Establishing a TCP connection or read/write operation). * ``disable_copy_local``: See `COPY FROM LOCAL `_. * ``kerberos_host_name``: See `Kerberos Authentication `_. * ``kerberos_service_name``: See `Kerberos Authentication `_. @@ -58,7 +58,7 @@ Extra (optional) * ``unicode_error``: See `UTF-8 encoding issues `_. * ``use_prepared_statements``: See `Passing parameters to SQL queries `_. * ``workload``: Sets the workload name associated with this session. - + See `vertica-python docs `_ for details. diff --git a/docs/apache-airflow-providers-vertica/index.rst b/docs/apache-airflow-providers-vertica/index.rst index db09f1924c4d7..ae7c2457b8117 100644 --- a/docs/apache-airflow-providers-vertica/index.rst +++ b/docs/apache-airflow-providers-vertica/index.rst @@ -29,6 +29,13 @@ Changelog Security +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Connection types + .. toctree:: :hidden: :maxdepth: 1 diff --git a/tests/providers/vertica/hooks/test_vertica.py b/tests/providers/vertica/hooks/test_vertica.py index d8b8345984a93..146c3bcd1136b 100644 --- a/tests/providers/vertica/hooks/test_vertica.py +++ b/tests/providers/vertica/hooks/test_vertica.py @@ -17,9 +17,10 @@ # under the License. from __future__ import annotations +import json from unittest import mock from unittest.mock import patch -import json + from airflow.models import Connection from airflow.providers.vertica.hooks.vertica import VerticaHook @@ -55,7 +56,7 @@ def test_get_conn_extra_parameters_no_cast(self, mock_connect): "connection_load_balance", "binary_transfer", "disable_copy_local", - "use_prepared_statements" + "use_prepared_statements", ] for bo in bool_options: extra_dict.update({bo: True}) @@ -67,7 +68,7 @@ def test_get_conn_extra_parameters_no_cast(self, mock_connect): "kerberos_service_name", "unicode_error", "workload", - "ssl" + "ssl", ] for so in std_options: extra_dict.update({so: so}) @@ -99,15 +100,16 @@ def test_get_conn_extra_parameters_cast(self, mock_connect): like log_level are correctly converted when passed as string (while test_get_conn_extra_parameters_no_cast tests them passed as int/bool)""" import logging + extra_dict = self.connection.extra_dejson bool_options = [ "connection_load_balance", "binary_transfer", "disable_copy_local", - "use_prepared_statements" + "use_prepared_statements", ] for bo in bool_options: - extra_dict.update({ bo: "True"}) + extra_dict.update({bo: "True"}) extra_dict.update({"request_complex_types": "False"}) extra_dict.update({"log_level": "Error"}) self.connection.extra = json.dumps(extra_dict) @@ -121,6 +123,7 @@ def test_get_conn_extra_parameters_cast(self, mock_connect): assert kwargs["log_level"] == logging.ERROR assert kwargs["log_path"] is None + class TestVerticaHook: def setup_method(self): self.cur = mock.MagicMock(rowcount=0)