From 8f691619b99a360db15a179a2e021d775568999f Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Wed, 20 Aug 2025 14:42:52 +0530 Subject: [PATCH 1/6] FEAT: Adding lowercase for global variable --- mssql_python/__init__.py | 26 ++++++++++---- mssql_python/cursor.py | 66 +++++++++++++++++++++-------------- mssql_python/row.py | 31 +++++++++++++---- tests/test_001_globals.py | 8 ++++- tests/test_004_cursor.py | 72 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 163 insertions(+), 40 deletions(-) diff --git a/mssql_python/__init__.py b/mssql_python/__init__.py index 6bf957779..8f8635964 100644 --- a/mssql_python/__init__.py +++ b/mssql_python/__init__.py @@ -6,6 +6,26 @@ # Exceptions # https://www.python.org/dev/peps/pep-0249/#exceptions + +# GLOBALS +# Read-Only +apilevel = "2.0" +paramstyle = "qmark" +threadsafety = 1 + +class Settings: + def __init__(self): + self.lowercase = False + +# Create a global instance +_settings = Settings() + +def get_settings(): + return _settings + +lowercase = _settings.lowercase # Default is False + +# Import necessary modules from .exceptions import ( Warning, Error, @@ -47,12 +67,6 @@ # Constants from .constants import ConstantsDDBC -# GLOBALS -# Read-Only -apilevel = "2.0" -paramstyle = "qmark" -threadsafety = 1 - from .pooling import PoolingManager def pooling(max_size=100, idle_timeout=600, enabled=True): # """ diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index ed1bb70dc..912cb4a8c 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -17,7 +17,8 @@ from mssql_python.helpers import check_error, log from mssql_python import ddbc_bindings from mssql_python.exceptions import InterfaceError -from .row import Row +from mssql_python.row import Row +from mssql_python import get_settings class Cursor: @@ -73,6 +74,8 @@ def __init__(self, connection) -> None: # Is a list instead of a bool coz bools in Python are immutable. # Hence, we can't pass around bools by reference & modify them. # Therefore, it must be a list with exactly one bool element. + + self.lowercase = get_settings().lowercase def _is_unicode_string(self, param): """ @@ -480,26 +483,32 @@ def _create_parameter_types_list(self, parameter, param_info, parameters_list, i paraminfo.decimalDigits = decimal_digits return paraminfo - def _initialize_description(self): - """ - Initialize the description attribute using SQLDescribeCol. - """ - col_metadata = [] - ret = ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, col_metadata) - check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) - - self.description = [ - ( - col["ColumnName"], - self._map_data_type(col["DataType"]), - None, - col["ColumnSize"], - col["ColumnSize"], - col["DecimalDigits"], - col["Nullable"] == ddbc_sql_const.SQL_NULLABLE.value, - ) - for col in col_metadata - ] + def _initialize_description(self, column_metadata=None): + """Initialize the description attribute from column metadata.""" + if not column_metadata: + self.description = None + return + import mssql_python + + description = [] + for i, col in enumerate(column_metadata): + # Get column name - lowercase it if the lowercase flag is set + column_name = col["ColumnName"] + + if mssql_python.lowercase: + column_name = column_name.lower() + + # Add to description tuple (7 elements as per PEP-249) + description.append(( + column_name, # name + self._map_data_type(col["DataType"]), # type_code + None, # display_size + col["ColumnSize"], # internal_size + col["ColumnSize"], # precision - should match ColumnSize + col["DecimalDigits"], # scale + col["Nullable"] == ddbc_sql_const.SQL_NULLABLE.value, # null_ok + )) + self.description = description def _map_data_type(self, sql_type): """ @@ -611,7 +620,14 @@ def execute( self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) # Initialize description after execution - self._initialize_description() + # After successful execution, initialize description if there are results + column_metadata = [] + try: + ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, column_metadata) + self._initialize_description(column_metadata) + except Exception as e: + # If describe fails, it's likely there are no results (e.g., for INSERT) + self.description = None @staticmethod def _select_best_sample_value(column): @@ -727,7 +743,7 @@ def fetchone(self) -> Union[None, Row]: return None # Create and return a Row object - return Row(row_data, self.description) + return Row(self, self.description, row_data) def fetchmany(self, size: int = None) -> List[Row]: """ @@ -752,7 +768,7 @@ def fetchmany(self, size: int = None) -> List[Row]: ret = ddbc_bindings.DDBCSQLFetchMany(self.hstmt, rows_data, size) # Convert raw data to Row objects - return [Row(row_data, self.description) for row_data in rows_data] + return [Row(self, self.description, row_data) for row_data in rows_data] def fetchall(self) -> List[Row]: """ @@ -768,7 +784,7 @@ def fetchall(self) -> List[Row]: ret = ddbc_bindings.DDBCSQLFetchAll(self.hstmt, rows_data) # Convert raw data to Row objects - return [Row(row_data, self.description) for row_data in rows_data] + return [Row(self, self.description, row_data) for row_data in rows_data] def nextset(self) -> Union[bool, None]: """ diff --git a/mssql_python/row.py b/mssql_python/row.py index 2c88412de..0b1fd33ea 100644 --- a/mssql_python/row.py +++ b/mssql_python/row.py @@ -9,14 +9,17 @@ class Row: print(row.column_name) # Access by column name """ - def __init__(self, values, cursor_description): + def __init__(self, cursor, description, values, column_map=None): """ Initialize a Row object with values and cursor description. Args: + cursor: The cursor object + description: The cursor description containing column metadata values: List of values for this row - cursor_description: The cursor description containing column metadata + column_map: Optional pre-built column map (for optimization) """ + self._cursor = cursor self._values = values # TODO: ADO task - Optimize memory usage by sharing column map across rows @@ -26,10 +29,14 @@ def __init__(self, values, cursor_description): # 3. Remove cursor_description from Row objects entirely # Create mapping of column names to indices - self._column_map = {} - for i, desc in enumerate(cursor_description): - if desc and desc[0]: # Ensure column name exists - self._column_map[desc[0]] = i + # If column_map is not provided, build it from description + if column_map is None: + column_map = {} + for i, col_desc in enumerate(description): + col_name = col_desc[0] # Name is first item in description tuple + column_map[col_name] = i + + self._column_map = column_map def __getitem__(self, index): """Allow accessing by numeric index: row[0]""" @@ -37,9 +44,19 @@ def __getitem__(self, index): def __getattr__(self, name): """Allow accessing by column name as attribute: row.column_name""" + # Handle lowercase attribute access - if lowercase is enabled, + # try to match attribute names case-insensitively if name in self._column_map: return self._values[self._column_map[name]] - raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + # If lowercase is enabled on the cursor, try case-insensitive lookup + if hasattr(self._cursor, 'lowercase') and self._cursor.lowercase: + name_lower = name.lower() + for col_name in self._column_map: + if col_name.lower() == name_lower: + return self._values[self._column_map[col_name]] + + raise AttributeError(f"Row has no attribute '{name}'") def __eq__(self, other): """ diff --git a/tests/test_001_globals.py b/tests/test_001_globals.py index f41a9a14f..fbee7ec5e 100644 --- a/tests/test_001_globals.py +++ b/tests/test_001_globals.py @@ -4,12 +4,13 @@ - test_apilevel: Check if apilevel has the expected value. - test_threadsafety: Check if threadsafety has the expected value. - test_paramstyle: Check if paramstyle has the expected value. +- test_lowercase: Check if lowercase has the expected value. """ import pytest # Import global variables from the repository -from mssql_python import apilevel, threadsafety, paramstyle +from mssql_python import apilevel, threadsafety, paramstyle, lowercase def test_apilevel(): # Check if apilevel has the expected value @@ -22,3 +23,8 @@ def test_threadsafety(): def test_paramstyle(): # Check if paramstyle has the expected value assert paramstyle == "qmark", "paramstyle should be 'qmark'" + +def test_lowercase(): + # Check if lowercase has the expected default value + assert lowercase is False, "lowercase should default to False" + diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 6a8c84281..728b27e23 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -12,6 +12,7 @@ from datetime import datetime, date, time import decimal from mssql_python import Connection +import mssql_python # Setup test table TEST_TABLE = """ @@ -1313,6 +1314,76 @@ def test_row_column_mapping(cursor, db_connection): cursor.execute("DROP TABLE #pytest_row_test") db_connection.commit() +def test_lowercase_attribute(cursor, db_connection): + """Test that the lowercase attribute properly converts column names to lowercase""" + + # Store original value to restore after test + original_lowercase = mssql_python.lowercase + drop_cursor = None + + try: + # Create a test table with mixed-case column names + cursor.execute(""" + CREATE TABLE #pytest_lowercase_test ( + ID INT PRIMARY KEY, + UserName VARCHAR(50), + EMAIL_ADDRESS VARCHAR(100), + PhoneNumber VARCHAR(20) + ) + """) + db_connection.commit() + + # Insert test data + cursor.execute(""" + INSERT INTO #pytest_lowercase_test (ID, UserName, EMAIL_ADDRESS, PhoneNumber) + VALUES (1, 'JohnDoe', 'john@example.com', '555-1234') + """) + db_connection.commit() + + # First test with lowercase=False (default) + mssql_python.lowercase = False + cursor1 = db_connection.cursor() + cursor1.execute("SELECT * FROM #pytest_lowercase_test") + + # Description column names should preserve original case + column_names1 = [desc[0] for desc in cursor1.description] + assert "ID" in column_names1, "Column 'ID' should be present with original case" + assert "UserName" in column_names1, "Column 'UserName' should be present with original case" + + # Make sure to consume all results and close the cursor + cursor1.fetchall() + cursor1.close() + + # Now test with lowercase=True + mssql_python.lowercase = True + cursor2 = db_connection.cursor() + cursor2.execute("SELECT * FROM #pytest_lowercase_test") + + # Description column names should be lowercase + column_names2 = [desc[0] for desc in cursor2.description] + assert "id" in column_names2, "Column names should be lowercase when lowercase=True" + assert "username" in column_names2, "Column names should be lowercase when lowercase=True" + + # Make sure to consume all results and close the cursor + cursor2.fetchall() + cursor2.close() + + # Create a fresh cursor for cleanup + drop_cursor = db_connection.cursor() + + finally: + # Restore original value + mssql_python.lowercase = original_lowercase + + try: + # Use a separate cursor for cleanup + if drop_cursor: + drop_cursor.execute("DROP TABLE IF EXISTS #pytest_lowercase_test") + db_connection.commit() + drop_cursor.close() + except Exception as e: + print(f"Warning: Failed to drop test table: {e}") + def test_close(db_connection): """Test closing the cursor""" try: @@ -1323,4 +1394,3 @@ def test_close(db_connection): pytest.fail(f"Cursor close test failed: {e}") finally: cursor = db_connection.cursor() - \ No newline at end of file From ee871ae315f400243e8870948e216551801ac49a Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Thu, 21 Aug 2025 12:16:12 +0530 Subject: [PATCH 2/6] FEAT: Adding getDecimalSeperator and setDecimalSeperator as global functions --- mssql_python/__init__.py | 38 +++++- mssql_python/pybind/ddbc_bindings.cpp | 44 +++++-- mssql_python/pybind/ddbc_bindings.h | 6 + mssql_python/row.py | 20 ++- tests/test_001_globals.py | 28 ++++- tests/test_004_cursor.py | 172 ++++++++++++++++++++++++++ 6 files changed, 294 insertions(+), 14 deletions(-) diff --git a/mssql_python/__init__.py b/mssql_python/__init__.py index 8f8635964..ec0f3b40a 100644 --- a/mssql_python/__init__.py +++ b/mssql_python/__init__.py @@ -16,8 +16,9 @@ class Settings: def __init__(self): self.lowercase = False + self.decimal_separator = "." -# Create a global instance +# Global settings instance _settings = Settings() def get_settings(): @@ -25,6 +26,40 @@ def get_settings(): lowercase = _settings.lowercase # Default is False +# Set the initial decimal separator in C++ +from .ddbc_bindings import DDBCSetDecimalSeparator +DDBCSetDecimalSeparator(_settings.decimal_separator) + +# New functions for decimal separator control +def setDecimalSeparator(separator): + """ + Sets the decimal separator character used when parsing NUMERIC/DECIMAL values + from the database, e.g. the "." in "1,234.56". + + The default is "." (period). This function overrides the default. + + Args: + separator (str): The character to use as decimal separator + """ + if not isinstance(separator, str) or len(separator) != 1: + raise ValueError("Decimal separator must be a single character string") + + _settings.decimal_separator = separator + + # Update the C++ side + from .ddbc_bindings import DDBCSetDecimalSeparator + DDBCSetDecimalSeparator(separator) + +def getDecimalSeparator(): + """ + Returns the decimal separator character used when parsing NUMERIC/DECIMAL values + from the database. + + Returns: + str: The current decimal separator character + """ + return _settings.decimal_separator + # Import necessary modules from .exceptions import ( Warning, @@ -85,4 +120,3 @@ def pooling(max_size=100, idle_timeout=600, enabled=True): PoolingManager.disable() else: PoolingManager.enable(max_size, idle_timeout) - \ No newline at end of file diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 1b37b8f0f..b5588a25d 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1600,12 +1600,17 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p ret = SQLGetData_ptr(hStmt, i, SQL_C_CHAR, numericStr, sizeof(numericStr), &indicator); if (SQL_SUCCEEDED(ret)) { - try{ - // Convert numericStr to py::decimal.Decimal and append to row - row.append(py::module_::import("decimal").attr("Decimal")( - std::string(reinterpret_cast(numericStr), indicator))); + try { + // Use the original string with period for Python's Decimal constructor + std::string numStr(reinterpret_cast(numericStr), indicator); + + // Create Python Decimal object + py::object decimalObj = py::module_::import("decimal").attr("Decimal")(numStr); + + // Add to row + row.append(decimalObj); } catch (const py::error_already_set& e) { - // If the conversion fails, append None + // If conversion fails, append None LOG("Error converting to decimal: {}", e.what()); row.append(py::none()); } @@ -2085,11 +2090,20 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_DECIMAL: case SQL_NUMERIC: { try { - // Convert numericStr to py::decimal.Decimal and append to row - row.append(py::module_::import("decimal").attr("Decimal")(std::string( - reinterpret_cast( - &buffers.charBuffers[col - 1][i * MAX_DIGITS_IN_NUMERIC]), - buffers.indicators[col - 1][i]))); + // Convert the string to use the current decimal separator + std::string numStr(reinterpret_cast( + &buffers.charBuffers[col - 1][i * MAX_DIGITS_IN_NUMERIC]), + buffers.indicators[col - 1][i]); + if (g_decimalSeparator != ".") { + // Replace the driver's decimal point with our configured separator + size_t pos = numStr.find('.'); + if (pos != std::string::npos) { + numStr.replace(pos, 1, g_decimalSeparator); + } + } + + // Convert to Python decimal + row.append(py::module_::import("decimal").attr("Decimal")(numStr)); } catch (const py::error_already_set& e) { // Handle the exception, e.g., log the error and append py::none() LOG("Error converting to decimal: {}", e.what()); @@ -2480,6 +2494,14 @@ void enable_pooling(int maxSize, int idleTimeout) { }); } +// Global decimal separator setting with default value +std::string g_decimalSeparator = "."; + +void DDBCSetDecimalSeparator(const std::string& separator) { + LOG("Setting decimal separator to: {}", separator); + g_decimalSeparator = separator; +} + // Architecture-specific defines #ifndef ARCHITECTURE #define ARCHITECTURE "win64" // Default to win64 if not defined during compilation @@ -2553,6 +2575,8 @@ PYBIND11_MODULE(ddbc_bindings, m) { m.def("DDBCSQLFetchAll", &FetchAll_wrap, "Fetch all rows from the result set"); m.def("DDBCSQLFreeHandle", &SQLFreeHandle_wrap, "Free a handle"); m.def("DDBCSQLCheckError", &SQLCheckError_Wrap, "Check for driver errors"); + m.def("DDBCSetDecimalSeparator", &DDBCSetDecimalSeparator, "Set the decimal separator character"); + // Add a version attribute m.attr("__version__") = "1.0.0"; diff --git a/mssql_python/pybind/ddbc_bindings.h b/mssql_python/pybind/ddbc_bindings.h index 22bc524bd..d142276c6 100644 --- a/mssql_python/pybind/ddbc_bindings.h +++ b/mssql_python/pybind/ddbc_bindings.h @@ -271,3 +271,9 @@ inline std::wstring Utf8ToWString(const std::string& str) { return converter.from_bytes(str); #endif } + +// Global decimal separator setting +extern std::string g_decimalSeparator; + +// Function to set the decimal separator +void DDBCSetDecimalSeparator(const std::string& separator); diff --git a/mssql_python/row.py b/mssql_python/row.py index 0b1fd33ea..1f54e8c8c 100644 --- a/mssql_python/row.py +++ b/mssql_python/row.py @@ -79,7 +79,25 @@ def __iter__(self): def __str__(self): """Return string representation of the row""" - return str(tuple(self._values)) + from decimal import Decimal + from mssql_python import getDecimalSeparator + + parts = [] + for value in self: + if isinstance(value, Decimal): + # Apply custom decimal separator for display + sep = getDecimalSeparator() + if sep != '.' and value is not None: + s = str(value) + if '.' in s: + s = s.replace('.', sep) + parts.append(s) + else: + parts.append(str(value)) + else: + parts.append(repr(value)) + + return "(" + ", ".join(parts) + ")" def __repr__(self): """Return a detailed string representation for debugging""" diff --git a/tests/test_001_globals.py b/tests/test_001_globals.py index fbee7ec5e..779d46a81 100644 --- a/tests/test_001_globals.py +++ b/tests/test_001_globals.py @@ -10,7 +10,7 @@ import pytest # Import global variables from the repository -from mssql_python import apilevel, threadsafety, paramstyle, lowercase +from mssql_python import apilevel, threadsafety, paramstyle, lowercase, getDecimalSeparator, setDecimalSeparator def test_apilevel(): # Check if apilevel has the expected value @@ -28,3 +28,29 @@ def test_lowercase(): # Check if lowercase has the expected default value assert lowercase is False, "lowercase should default to False" +def test_decimal_separator(): + """Test decimal separator functionality""" + + # Check default value + assert getDecimalSeparator() == '.', "Default decimal separator should be '.'" + + try: + # Test setting a new value + setDecimalSeparator(',') + assert getDecimalSeparator() == ',', "Decimal separator should be ',' after setting" + + # Test invalid input + with pytest.raises(ValueError): + setDecimalSeparator('too long') + + with pytest.raises(ValueError): + setDecimalSeparator('') + + with pytest.raises(ValueError): + setDecimalSeparator(123) # Non-string input + + finally: + # Restore default value + setDecimalSeparator('.') + assert getDecimalSeparator() == '.', "Decimal separator should be restored to '.'" + diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 728b27e23..9a63e27f7 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -1384,6 +1384,178 @@ def test_lowercase_attribute(cursor, db_connection): except Exception as e: print(f"Warning: Failed to drop test table: {e}") +def test_decimal_separator_function(cursor, db_connection): + """Test decimal separator functionality with database operations""" + # Store original value to restore after test + original_separator = mssql_python.getDecimalSeparator() + + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_decimal_separator_test ( + id INT PRIMARY KEY, + decimal_value DECIMAL(10, 2) + ) + """) + db_connection.commit() + + # Insert test values with default separator (.) + test_value = decimal.Decimal('123.45') + cursor.execute(""" + INSERT INTO #pytest_decimal_separator_test (id, decimal_value) + VALUES (1, ?) + """, [test_value]) + db_connection.commit() + + # First test with default decimal separator (.) + cursor.execute("SELECT id, decimal_value FROM #pytest_decimal_separator_test") + row = cursor.fetchone() + default_str = str(row) + assert '123.45' in default_str, "Default separator not found in string representation" + + # Now change to comma separator and test string representation + mssql_python.setDecimalSeparator(',') + cursor.execute("SELECT id, decimal_value FROM #pytest_decimal_separator_test") + row = cursor.fetchone() + + # This should format the decimal with a comma in the string representation + comma_str = str(row) + assert '123,45' in comma_str, f"Expected comma in string representation but got: {comma_str}" + + finally: + # Restore original decimal separator + mssql_python.setDecimalSeparator(original_separator) + + # Cleanup + cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_separator_test") + db_connection.commit() + +def test_decimal_separator_basic_functionality(): + """Test basic decimal separator functionality without database operations""" + # Store original value to restore after test + original_separator = mssql_python.getDecimalSeparator() + + try: + # Test default value + assert mssql_python.getDecimalSeparator() == '.', "Default decimal separator should be '.'" + + # Test setting to comma + mssql_python.setDecimalSeparator(',') + assert mssql_python.getDecimalSeparator() == ',', "Decimal separator should be ',' after setting" + + # Test setting to other valid separators + mssql_python.setDecimalSeparator(':') + assert mssql_python.getDecimalSeparator() == ':', "Decimal separator should be ':' after setting" + + # Test invalid inputs + with pytest.raises(ValueError): + mssql_python.setDecimalSeparator('') # Empty string + + with pytest.raises(ValueError): + mssql_python.setDecimalSeparator('too_long') # More than one character + + with pytest.raises(ValueError): + mssql_python.setDecimalSeparator(123) # Not a string + + finally: + # Restore original separator + mssql_python.setDecimalSeparator(original_separator) + +def test_decimal_separator_with_multiple_values(cursor, db_connection): + """Test decimal separator with multiple different decimal values""" + original_separator = mssql_python.getDecimalSeparator() + + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_decimal_multi_test ( + id INT PRIMARY KEY, + positive_value DECIMAL(10, 2), + negative_value DECIMAL(10, 2), + zero_value DECIMAL(10, 2), + small_value DECIMAL(10, 4) + ) + """) + db_connection.commit() + + # Insert test data + cursor.execute(""" + INSERT INTO #pytest_decimal_multi_test VALUES (1, 123.45, -67.89, 0.00, 0.0001) + """) + db_connection.commit() + + # Test with default separator first + cursor.execute("SELECT * FROM #pytest_decimal_multi_test") + row = cursor.fetchone() + default_str = str(row) + assert '123.45' in default_str, "Default positive value formatting incorrect" + assert '-67.89' in default_str, "Default negative value formatting incorrect" + + # Change to comma separator + mssql_python.setDecimalSeparator(',') + cursor.execute("SELECT * FROM #pytest_decimal_multi_test") + row = cursor.fetchone() + comma_str = str(row) + + # Verify comma is used in all decimal values + assert '123,45' in comma_str, "Positive value not formatted with comma" + assert '-67,89' in comma_str, "Negative value not formatted with comma" + assert '0,00' in comma_str, "Zero value not formatted with comma" + assert '0,0001' in comma_str, "Small value not formatted with comma" + + finally: + # Restore original separator + mssql_python.setDecimalSeparator(original_separator) + + # Cleanup + cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_multi_test") + db_connection.commit() + +def test_decimal_separator_calculations(cursor, db_connection): + """Test that decimal separator doesn't affect calculations""" + original_separator = mssql_python.getDecimalSeparator() + + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_decimal_calc_test ( + id INT PRIMARY KEY, + value1 DECIMAL(10, 2), + value2 DECIMAL(10, 2) + ) + """) + db_connection.commit() + + # Insert test data + cursor.execute(""" + INSERT INTO #pytest_decimal_calc_test VALUES (1, 10.25, 5.75) + """) + db_connection.commit() + + # Test with default separator + cursor.execute("SELECT value1 + value2 AS sum_result FROM #pytest_decimal_calc_test") + row = cursor.fetchone() + assert row.sum_result == decimal.Decimal('16.00'), "Sum calculation incorrect with default separator" + + # Change to comma separator + mssql_python.setDecimalSeparator(',') + + # Calculations should still work correctly + cursor.execute("SELECT value1 + value2 AS sum_result FROM #pytest_decimal_calc_test") + row = cursor.fetchone() + assert row.sum_result == decimal.Decimal('16.00'), "Sum calculation affected by separator change" + + # But string representation should use comma + assert '16,00' in str(row), "Sum result not formatted with comma in string representation" + + finally: + # Restore original separator + mssql_python.setDecimalSeparator(original_separator) + + # Cleanup + cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test") + db_connection.commit() + def test_close(db_connection): """Test closing the cursor""" try: From 0f84c3d8a568542f8f7221283185ae6ada4ac54b Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Thu, 21 Aug 2025 13:56:07 +0530 Subject: [PATCH 3/6] FEAT: Adding connection execute --- mssql_python/connection.py | 23 +++++ tests/test_003_connection.py | 161 +++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) diff --git a/mssql_python/connection.py b/mssql_python/connection.py index 12760df41..fe400ec33 100644 --- a/mssql_python/connection.py +++ b/mssql_python/connection.py @@ -185,6 +185,29 @@ def cursor(self) -> Cursor: cursor = Cursor(self) self._cursors.add(cursor) # Track the cursor return cursor + + def execute(self, sql, *args): + """ + Creates a new Cursor object, calls its execute method, and returns the new cursor. + + This is a convenience method that is not part of the DB API. Since a new Cursor + is allocated by each call, this should not be used if more than one SQL statement + needs to be executed on the connection. + + Args: + sql (str): The SQL query to execute. + *args: Parameters to be passed to the query. + + Returns: + Cursor: A new cursor with the executed query. + + Raises: + DatabaseError: If there is an error executing the query. + InterfaceError: If the connection is closed. + """ + cursor = self.cursor() + cursor.execute(sql, *args) + return cursor def commit(self) -> None: """ diff --git a/tests/test_003_connection.py b/tests/test_003_connection.py index 51fce818e..8b3af5745 100644 --- a/tests/test_003_connection.py +++ b/tests/test_003_connection.py @@ -485,3 +485,164 @@ def test_connection_pooling_basic(conn_str): conn1.close() conn2.close() + +def test_connection_execute(db_connection): + """Test the execute() convenience method for Connection class""" + # Test basic execution + cursor = db_connection.execute("SELECT 1 AS test_value") + result = cursor.fetchone() + assert result is not None, "Execute failed: No result returned" + assert result[0] == 1, "Execute failed: Incorrect result" + + # Test with parameters + cursor = db_connection.execute("SELECT ? AS test_value", 42) + result = cursor.fetchone() + assert result is not None, "Execute with parameters failed: No result returned" + assert result[0] == 42, "Execute with parameters failed: Incorrect result" + + # Test that cursor is tracked by connection + assert cursor in db_connection._cursors, "Cursor from execute() not tracked by connection" + + # Test with data modification and verify it requires commit + if not db_connection.autocommit: + drop_table_if_exists(db_connection.cursor(), "#pytest_test_execute") + cursor1 = db_connection.execute("CREATE TABLE #pytest_test_execute (id INT, value VARCHAR(50))") + cursor2 = db_connection.execute("INSERT INTO #pytest_test_execute VALUES (1, 'test_value')") + cursor3 = db_connection.execute("SELECT * FROM #pytest_test_execute") + result = cursor3.fetchone() + assert result is not None, "Execute with table creation failed" + assert result[0] == 1, "Execute with table creation returned wrong id" + assert result[1] == 'test_value', "Execute with table creation returned wrong value" + + # Clean up + db_connection.execute("DROP TABLE #pytest_test_execute") + db_connection.commit() + +def test_connection_execute_error_handling(db_connection): + """Test that execute() properly handles SQL errors""" + with pytest.raises(Exception): + db_connection.execute("SELECT * FROM nonexistent_table") + +def test_connection_execute_empty_result(db_connection): + """Test execute() with a query that returns no rows""" + cursor = db_connection.execute("SELECT * FROM sys.tables WHERE name = 'nonexistent_table_name'") + result = cursor.fetchone() + assert result is None, "Query should return no results" + + # Test empty result with fetchall + rows = cursor.fetchall() + assert len(rows) == 0, "fetchall should return empty list for empty result set" + +def test_connection_execute_different_parameter_types(db_connection): + """Test execute() with different parameter data types""" + # Test with different data types + params = [ + 1234, # Integer + 3.14159, # Float + "test string", # String + bytearray(b'binary data'), # Binary data + True, # Boolean + None # NULL + ] + + for param in params: + cursor = db_connection.execute("SELECT ? AS value", param) + result = cursor.fetchone() + if param is None: + assert result[0] is None, "NULL parameter not handled correctly" + else: + assert result[0] == param, f"Parameter {param} of type {type(param)} not handled correctly" + +def test_connection_execute_with_transaction(db_connection): + """Test execute() in the context of explicit transactions""" + if db_connection.autocommit: + db_connection.autocommit = False + + cursor1 = db_connection.cursor() + drop_table_if_exists(cursor1, "#pytest_test_execute_transaction") + + try: + # Create table and insert data + db_connection.execute("CREATE TABLE #pytest_test_execute_transaction (id INT, value VARCHAR(50))") + db_connection.execute("INSERT INTO #pytest_test_execute_transaction VALUES (1, 'before rollback')") + + # Check data is there + cursor = db_connection.execute("SELECT * FROM #pytest_test_execute_transaction") + result = cursor.fetchone() + assert result is not None, "Data should be visible within transaction" + assert result[1] == 'before rollback', "Incorrect data in transaction" + + # Rollback and verify data is gone + db_connection.rollback() + + # Need to recreate table since it was rolled back + db_connection.execute("CREATE TABLE #pytest_test_execute_transaction (id INT, value VARCHAR(50))") + db_connection.execute("INSERT INTO #pytest_test_execute_transaction VALUES (2, 'after rollback')") + + cursor = db_connection.execute("SELECT * FROM #pytest_test_execute_transaction") + result = cursor.fetchone() + assert result is not None, "Data should be visible after new insert" + assert result[0] == 2, "Should see the new data after rollback" + assert result[1] == 'after rollback', "Incorrect data after rollback" + + # Commit and verify data persists + db_connection.commit() + finally: + # Clean up + try: + db_connection.execute("DROP TABLE #pytest_test_execute_transaction") + db_connection.commit() + except Exception: + pass + +def test_connection_execute_vs_cursor_execute(db_connection): + """Compare behavior of connection.execute() vs cursor.execute()""" + # Connection.execute creates a new cursor each time + cursor1 = db_connection.execute("SELECT 1 AS first_query") + # Consume the results from cursor1 before creating cursor2 + result1 = cursor1.fetchall() + assert result1[0][0] == 1, "First cursor should have result from first query" + + # Now it's safe to create a second cursor + cursor2 = db_connection.execute("SELECT 2 AS second_query") + result2 = cursor2.fetchall() + assert result2[0][0] == 2, "Second cursor should have result from second query" + + # These should be different cursor objects + assert cursor1 != cursor2, "Connection.execute should create a new cursor each time" + + # Now compare with reusing the same cursor + cursor3 = db_connection.cursor() + cursor3.execute("SELECT 3 AS third_query") + result3 = cursor3.fetchone() + assert result3[0] == 3, "Direct cursor execution failed" + + # Reuse the same cursor + cursor3.execute("SELECT 4 AS fourth_query") + result4 = cursor3.fetchone() + assert result4[0] == 4, "Reused cursor should have new results" + + # The previous results should no longer be accessible + cursor3.execute("SELECT 3 AS third_query_again") + result5 = cursor3.fetchone() + assert result5[0] == 3, "Cursor reexecution should work" + +def test_connection_execute_many_parameters(db_connection): + """Test execute() with many parameters""" + # First make sure no active results are pending + # by using a fresh cursor and fetching all results + cursor = db_connection.cursor() + cursor.execute("SELECT 1") + cursor.fetchall() + + # Create a query with 10 parameters + params = list(range(1, 11)) + query = "SELECT " + ", ".join(["?" for _ in params]) + " AS many_params" + + # Now execute with many parameters + cursor = db_connection.execute(query, *params) + result = cursor.fetchall() # Use fetchall to consume all results + + # Verify all parameters were correctly passed + for i, value in enumerate(params): + assert result[0][i] == value, f"Parameter at position {i} not correctly passed" \ No newline at end of file From f65e5a5ead2d87b62c188585e41381b46b1c875d Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Mon, 15 Sep 2025 16:03:54 +0530 Subject: [PATCH 4/6] Resolving comments --- mssql_python/connection.py | 156 +++++++- tests/test_003_connection.py | 688 ++++++++++++++++++++++++++++++++++- 2 files changed, 841 insertions(+), 3 deletions(-) diff --git a/mssql_python/connection.py b/mssql_python/connection.py index fe400ec33..5abb8c0d1 100644 --- a/mssql_python/connection.py +++ b/mssql_python/connection.py @@ -12,6 +12,7 @@ """ import weakref import re +from typing import Any from mssql_python.cursor import Cursor from mssql_python.helpers import add_driver_to_connection_str, sanitize_connection_string, log from mssql_python import ddbc_bindings @@ -185,8 +186,8 @@ def cursor(self) -> Cursor: cursor = Cursor(self) self._cursors.add(cursor) # Track the cursor return cursor - - def execute(self, sql, *args): + + def execute(self, sql: str, *args: Any) -> Cursor: """ Creates a new Cursor object, calls its execute method, and returns the new cursor. @@ -194,6 +195,12 @@ def execute(self, sql, *args): is allocated by each call, this should not be used if more than one SQL statement needs to be executed on the connection. + Note on cursor lifecycle management: + - Each call creates a new cursor that is tracked by the connection's internal WeakSet + - Cursors are automatically dereferenced/closed when they go out of scope + - For long-running applications or loops, explicitly call cursor.close() when done + to release resources immediately rather than waiting for garbage collection + Args: sql (str): The SQL query to execute. *args: Parameters to be passed to the query. @@ -204,11 +211,140 @@ def execute(self, sql, *args): Raises: DatabaseError: If there is an error executing the query. InterfaceError: If the connection is closed. + + Example: + # Automatic cleanup (cursor goes out of scope after the operation) + row = connection.execute("SELECT name FROM users WHERE id = ?", 123).fetchone() + + # Manual cleanup for more explicit resource management + cursor = connection.execute("SELECT * FROM large_table") + try: + # Use cursor... + rows = cursor.fetchall() + finally: + cursor.close() # Explicitly release resources """ cursor = self.cursor() cursor.execute(sql, *args) return cursor + def batch_execute(self, statements, params=None, reuse_cursor=None, auto_close=False): + """ + Execute multiple SQL statements efficiently using a single cursor. + + This method allows executing multiple SQL statements in sequence using a single + cursor, which is more efficient than creating a new cursor for each statement. + + Args: + statements (list): List of SQL statements to execute + params (list, optional): List of parameter sets corresponding to statements. + Each item can be None, a single parameter, or a sequence of parameters. + If None, no parameters will be used for any statement. + reuse_cursor (Cursor, optional): Existing cursor to reuse instead of creating a new one. + If None, a new cursor will be created. + auto_close (bool): Whether to close the cursor after execution if a new one was created. + Defaults to False. Has no effect if reuse_cursor is provided. + + Returns: + tuple: (results, cursor) where: + - results is a list of execution results, one for each statement + - cursor is the cursor used for execution (useful if you want to keep using it) + + Raises: + TypeError: If statements is not a list or if params is provided but not a list + ValueError: If params is provided but has different length than statements + DatabaseError: If there is an error executing any of the statements + InterfaceError: If the connection is closed + + Example: + # Execute multiple statements with a single cursor + results, _ = conn.batch_execute([ + "INSERT INTO users VALUES (?, ?)", + "UPDATE stats SET count = count + 1", + "SELECT * FROM users" + ], [ + (1, "user1"), + None, + None + ]) + + # Last result contains the SELECT results + for row in results[-1]: + print(row) + + # Reuse an existing cursor + my_cursor = conn.cursor() + results, _ = conn.batch_execute([ + "SELECT * FROM table1", + "SELECT * FROM table2" + ], reuse_cursor=my_cursor) + + # Cursor remains open for further use + my_cursor.execute("SELECT * FROM table3") + """ + # Validate inputs + if not isinstance(statements, list): + raise TypeError("statements must be a list of SQL statements") + + if params is not None: + if not isinstance(params, list): + raise TypeError("params must be a list of parameter sets") + if len(params) != len(statements): + raise ValueError("params list must have the same length as statements list") + else: + # Create a list of None values with the same length as statements + params = [None] * len(statements) + + # Determine which cursor to use + is_new_cursor = reuse_cursor is None + cursor = self.cursor() if is_new_cursor else reuse_cursor + + # Execute statements and collect results + results = [] + try: + for i, (stmt, param) in enumerate(zip(statements, params)): + try: + # Execute the statement with parameters if provided + if param is not None: + cursor.execute(stmt, param) + else: + cursor.execute(stmt) + + # For SELECT statements, fetch all rows + # For other statements, get the row count + if cursor.description is not None: + # This is a SELECT statement or similar that returns rows + results.append(cursor.fetchall()) + else: + # This is an INSERT, UPDATE, DELETE or similar that doesn't return rows + results.append(cursor.rowcount) + + log('debug', f"Executed batch statement {i+1}/{len(statements)}") + + except Exception as e: + # If a statement fails, include statement context in the error + log('error', f"Error executing statement {i+1}/{len(statements)}: {e}") + raise + + except Exception as e: + # If an error occurs and auto_close is True, close the cursor + if auto_close: + try: + # Close the cursor regardless of whether it's reused or new + cursor.close() + log('debug', "Automatically closed cursor after batch execution error") + except Exception as close_err: + log('warning', f"Error closing cursor after execution failure: {close_err}") + # Re-raise the original exception + raise + + # Close the cursor if requested and we created a new one + if is_new_cursor and auto_close: + cursor.close() + log('debug', "Automatically closed cursor after batch execution") + + return results, cursor + def commit(self) -> None: """ Commit the current transaction. @@ -219,8 +355,16 @@ def commit(self) -> None: that the changes are saved. Raises: + InterfaceError: If the connection is closed. DatabaseError: If there is an error while committing the transaction. """ + # Check if connection is closed + if self._closed or self._conn is None: + raise InterfaceError( + driver_error="Cannot commit on a closed connection", + ddbc_error="Cannot commit on a closed connection", + ) + # Commit the current transaction self._conn.commit() log('info', "Transaction committed successfully.") @@ -234,8 +378,16 @@ def rollback(self) -> None: transaction or if the changes should not be saved. Raises: + InterfaceError: If the connection is closed. DatabaseError: If there is an error while rolling back the transaction. """ + # Check if connection is closed + if self._closed or self._conn is None: + raise InterfaceError( + driver_error="Cannot rollback on a closed connection", + ddbc_error="Cannot rollback on a closed connection", + ) + # Roll back the current transaction self._conn.rollback() log('info', "Transaction rolled back successfully.") diff --git a/tests/test_003_connection.py b/tests/test_003_connection.py index 8b3af5745..f6661d1fa 100644 --- a/tests/test_003_connection.py +++ b/tests/test_003_connection.py @@ -19,11 +19,35 @@ """ from mssql_python.exceptions import InterfaceError +import datetime import pytest import time from mssql_python import Connection, connect, pooling import threading +@pytest.fixture(autouse=True) +def clean_connection_state(db_connection): + """Ensure connection is in a clean state before each test""" + # Create a cursor and clear any active results + try: + cleanup_cursor = db_connection.cursor() + cleanup_cursor.execute("SELECT 1") # Simple query to reset state + cleanup_cursor.fetchall() # Consume all results + cleanup_cursor.close() + except Exception: + pass # Ignore errors during cleanup + + yield # Run the test + + # Clean up after the test + try: + cleanup_cursor = db_connection.cursor() + cleanup_cursor.execute("SELECT 1") # Simple query to reset state + cleanup_cursor.fetchall() # Consume all results + cleanup_cursor.close() + except Exception: + pass # Ignore errors during cleanup + def drop_table_if_exists(cursor, table_name): """Drop the table if it exists""" try: @@ -645,4 +669,666 @@ def test_connection_execute_many_parameters(db_connection): # Verify all parameters were correctly passed for i, value in enumerate(params): - assert result[0][i] == value, f"Parameter at position {i} not correctly passed" \ No newline at end of file + assert result[0][i] == value, f"Parameter at position {i} not correctly passed" + +def test_execute_after_connection_close(conn_str): + """Test that executing queries after connection close raises InterfaceError""" + # Create a new connection + connection = connect(conn_str) + + # Close the connection + connection.close() + + # Try different methods that should all fail with InterfaceError + + # 1. Test direct execute method + with pytest.raises(InterfaceError) as excinfo: + connection.execute("SELECT 1") + assert "closed" in str(excinfo.value).lower(), "Error should mention the connection is closed" + + # 2. Test batch_execute method + with pytest.raises(InterfaceError) as excinfo: + connection.batch_execute(["SELECT 1"]) + assert "closed" in str(excinfo.value).lower(), "Error should mention the connection is closed" + + # 3. Test creating a cursor + with pytest.raises(InterfaceError) as excinfo: + cursor = connection.cursor() + assert "closed" in str(excinfo.value).lower(), "Error should mention the connection is closed" + + # 4. Test transaction operations + with pytest.raises(InterfaceError) as excinfo: + connection.commit() + assert "closed" in str(excinfo.value).lower(), "Error should mention the connection is closed" + + with pytest.raises(InterfaceError) as excinfo: + connection.rollback() + assert "closed" in str(excinfo.value).lower(), "Error should mention the connection is closed" + +def test_execute_multiple_simultaneous_cursors(db_connection): + """Test creating and using many cursors simultaneously through Connection.execute + + ⚠️ WARNING: This test has several limitations: + 1. Creates only 20 cursors, which may not fully test production scenarios requiring hundreds + 2. Relies on WeakSet tracking which depends on garbage collection timing and varies between runs + 3. Memory measurement requires the optional 'psutil' package + 4. Creates cursors sequentially rather than truly concurrently + 5. Results may vary based on system resources, SQL Server version, and ODBC driver + + The test verifies that: + - Multiple cursors can be created and used simultaneously + - Connection tracks created cursors appropriately + - Connection remains stable after intensive cursor operations + """ + import gc + import sys + + # Start with a clean connection state + cursor = db_connection.execute("SELECT 1") + cursor.fetchall() # Consume the results + cursor.close() # Close the cursor correctly + + # Record the initial cursor count in the connection's tracker + initial_cursor_count = len(db_connection._cursors) + + # Get initial memory usage + gc.collect() # Force garbage collection to get accurate reading + initial_memory = 0 + try: + import psutil + import os + process = psutil.Process(os.getpid()) + initial_memory = process.memory_info().rss + except ImportError: + print("psutil not installed, memory usage won't be measured") + + # Use a smaller number of cursors to avoid overwhelming the connection + num_cursors = 20 # Reduced from 100 + + # Create multiple cursors and store them in a list to keep them alive + cursors = [] + for i in range(num_cursors): + cursor = db_connection.execute(f"SELECT {i} AS cursor_id") + # Immediately fetch results but don't close yet to keep cursor alive + cursor.fetchall() + cursors.append(cursor) + + # Verify the number of tracked cursors increased + current_cursor_count = len(db_connection._cursors) + # Use a more flexible assertion that accounts for WeakSet behavior + assert current_cursor_count > initial_cursor_count, \ + f"Connection should track more cursors after creating {num_cursors} new ones, but count only increased by {current_cursor_count - initial_cursor_count}" + + print(f"Created {num_cursors} cursors, tracking shows {current_cursor_count - initial_cursor_count} increase") + + # Close all cursors explicitly to clean up + for cursor in cursors: + cursor.close() + + # Verify connection is still usable + final_cursor = db_connection.execute("SELECT 'Connection still works' AS status") + row = final_cursor.fetchone() + assert row[0] == 'Connection still works', "Connection should remain usable after cursor operations" + final_cursor.close() + + +def test_execute_with_large_parameters(db_connection): + """Test executing queries with very large parameter sets + + ⚠️ WARNING: This test has several limitations: + 1. Limited by 8192-byte parameter size restriction from the ODBC driver + 2. Cannot test truly large parameters (e.g., BLOBs >1MB) + 3. Works around the ~2100 parameter limit by batching, not testing true limits + 4. No streaming parameter support is tested + 5. Only tests with 10,000 rows, which is small compared to production scenarios + 6. Performance measurements are affected by system load and environment + + The test verifies: + - Handling of a large number of parameters in batch inserts + - Working with parameters near but under the size limit + - Processing large result sets + """ + import time + + # Test with a temporary table for large data + cursor = db_connection.execute(""" + DROP TABLE IF EXISTS #large_params_test; + CREATE TABLE #large_params_test ( + id INT, + large_text NVARCHAR(MAX), + large_binary VARBINARY(MAX) + ) + """) + cursor.close() + + try: + # Test 1: Large number of parameters in a batch insert + start_time = time.time() + + # Create a large batch but split into smaller chunks to avoid parameter limits + # ODBC has limits (~2100 parameters), so use 500 rows per batch (1500 parameters) + total_rows = 1000 + batch_size = 500 # Reduced from 1000 to avoid parameter limits + total_inserts = 0 + + for batch_start in range(0, total_rows, batch_size): + batch_end = min(batch_start + batch_size, total_rows) + large_inserts = [] + params = [] + + # Build a parameterized query with multiple value sets for this batch + for i in range(batch_start, batch_end): + large_inserts.append("(?, ?, ?)") + params.extend([i, f"Text{i}", bytes([i % 256] * 100)]) # 100 bytes per row + + # Execute this batch + sql = f"INSERT INTO #large_params_test VALUES {', '.join(large_inserts)}" + cursor = db_connection.execute(sql, *params) + cursor.close() + total_inserts += batch_end - batch_start + + # Verify correct number of rows inserted + cursor = db_connection.execute("SELECT COUNT(*) FROM #large_params_test") + count = cursor.fetchone()[0] + cursor.close() + assert count == total_rows, f"Expected {total_rows} rows, got {count}" + + batch_time = time.time() - start_time + print(f"Large batch insert ({total_rows} rows in chunks of {batch_size}) completed in {batch_time:.2f} seconds") + + # Test 2: Single row with parameter values under the 8192 byte limit + cursor = db_connection.execute("TRUNCATE TABLE #large_params_test") + cursor.close() + + # Create smaller text parameter to stay well under 8KB limit + large_text = "Large text content " * 100 # ~2KB text (well under 8KB limit) + + # Create smaller binary parameter to stay well under 8KB limit + large_binary = bytes([x % 256 for x in range(2 * 1024)]) # 2KB binary data + + start_time = time.time() + + # Insert the large parameters using connection.execute() + cursor = db_connection.execute( + "INSERT INTO #large_params_test VALUES (?, ?, ?)", + 1, large_text, large_binary + ) + cursor.close() + + # Verify the data was inserted correctly + cursor = db_connection.execute("SELECT id, LEN(large_text), DATALENGTH(large_binary) FROM #large_params_test") + row = cursor.fetchone() + cursor.close() + + assert row is not None, "No row returned after inserting large parameters" + assert row[0] == 1, "Wrong ID returned" + assert row[1] > 1000, f"Text length too small: {row[1]}" + assert row[2] == 2 * 1024, f"Binary length wrong: {row[2]}" + + large_param_time = time.time() - start_time + print(f"Large parameter insert (text: {row[1]} chars, binary: {row[2]} bytes) completed in {large_param_time:.2f} seconds") + + # Test 3: Execute with a large result set + cursor = db_connection.execute("TRUNCATE TABLE #large_params_test") + cursor.close() + + # Insert rows in smaller batches to avoid parameter limits + rows_per_batch = 1000 + total_rows = 10000 + + for batch_start in range(0, total_rows, rows_per_batch): + batch_end = min(batch_start + rows_per_batch, total_rows) + values = ", ".join([f"({i}, 'Small Text {i}', NULL)" for i in range(batch_start, batch_end)]) + cursor = db_connection.execute(f"INSERT INTO #large_params_test (id, large_text, large_binary) VALUES {values}") + cursor.close() + + start_time = time.time() + + # Fetch all rows to test large result set handling + cursor = db_connection.execute("SELECT id, large_text FROM #large_params_test ORDER BY id") + rows = cursor.fetchall() + cursor.close() + + assert len(rows) == 10000, f"Expected 10000 rows in result set, got {len(rows)}" + assert rows[0][0] == 0, "First row has incorrect ID" + assert rows[9999][0] == 9999, "Last row has incorrect ID" + + result_time = time.time() - start_time + print(f"Large result set (10,000 rows) fetched in {result_time:.2f} seconds") + + finally: + # Clean up + cursor = db_connection.execute("DROP TABLE IF EXISTS #large_params_test") + cursor.close() + +def test_connection_execute_cursor_lifecycle(db_connection): + """Test that cursors from execute() are properly managed throughout their lifecycle + + This test verifies that: + 1. Cursors are added to the connection's tracking when created via execute() + 2. Cursors are removed from tracking when explicitly closed + 3. Cursors are removed from tracking when they go out of scope and are garbage collected + + This helps ensure that the connection properly manages cursor resources and prevents + memory/resource leaks over time. + """ + import gc + import weakref + + # Clear any existing cursors and force garbage collection + for cursor in list(db_connection._cursors): + try: + cursor.close() + except Exception: + pass + gc.collect() + + # Verify we start with a clean state + initial_cursor_count = len(db_connection._cursors) + + # 1. Test that a cursor is added to tracking when created + cursor1 = db_connection.execute("SELECT 1 AS test") + cursor1.fetchall() # Consume results + + # Verify cursor was added to tracking + assert len(db_connection._cursors) == initial_cursor_count + 1, "Cursor should be added to connection tracking" + assert cursor1 in db_connection._cursors, "Created cursor should be in the connection's tracking set" + + # 2. Test that a cursor is removed when explicitly closed + cursor_id = id(cursor1) # Remember the cursor's ID for later verification + cursor1.close() + + # Force garbage collection to ensure WeakSet is updated + gc.collect() + + # Verify cursor was removed from tracking + remaining_cursor_ids = [id(c) for c in db_connection._cursors] + assert cursor_id not in remaining_cursor_ids, "Closed cursor should be removed from connection tracking" + + # 3. Test that a cursor is removed when it goes out of scope + def create_and_abandon_cursor(): + temp_cursor = db_connection.execute("SELECT 2 AS test") + temp_cursor.fetchall() # Consume results + # Keep track of this cursor with a weak reference so we can check if it's collected + return weakref.ref(temp_cursor) + + # Create a cursor that will go out of scope + cursor_ref = create_and_abandon_cursor() + + # Cursor should be tracked before garbage collection + assert len(db_connection._cursors) > initial_cursor_count, "Abandoned cursor should initially be tracked" + + # Force garbage collection multiple times to ensure the cursor is collected + for _ in range(3): + gc.collect() + + # Verify cursor was eventually removed from tracking + assert cursor_ref() is None, "Abandoned cursor should be garbage collected" + assert len(db_connection._cursors) == initial_cursor_count, \ + "All created cursors should be removed from tracking after being closed or collected" + + # 4. Verify that many cursors can be created and properly cleaned up + cursors = [] + for i in range(10): + cursors.append(db_connection.execute(f"SELECT {i} AS test")) + cursors[-1].fetchall() # Consume results + + assert len(db_connection._cursors) == initial_cursor_count + 10, \ + "All 10 cursors should be tracked by the connection" + + # Close half of them explicitly + for i in range(5): + cursors[i].close() + + # Remove references to the other half so they can be garbage collected + for i in range(5, 10): + cursors[i] = None + + # Force garbage collection + gc.collect() + gc.collect() # Sometimes one collection isn't enough with WeakRefs + + # Verify all cursors are eventually removed from tracking + assert len(db_connection._cursors) <= initial_cursor_count + 5, \ + "Explicitly closed cursors should be removed from tracking immediately" + + # Clean up any remaining cursors to leave the connection in a good state + for cursor in list(db_connection._cursors): + try: + cursor.close() + except Exception: + pass + +def test_batch_execute_basic(db_connection): + """Test the basic functionality of batch_execute method + + ⚠️ WARNING: This test has several limitations: + 1. Results must be fully consumed between statements to avoid "Connection is busy" errors + 2. The ODBC driver imposes limits on concurrent statement execution + 3. Performance may vary based on network conditions and server load + 4. Not all statement types may be compatible with batch execution + 5. Error handling may be implementation-specific across ODBC drivers + + The test verifies: + - Multiple statements can be executed in sequence + - Results are correctly returned for each statement + - The cursor remains usable after batch completion + """ + # Create a list of statements to execute + statements = [ + "SELECT 1 AS value", + "SELECT 'test' AS string_value", + "SELECT GETDATE() AS date_value" + ] + + # Execute the batch + results, cursor = db_connection.batch_execute(statements) + + # Verify we got the right number of results + assert len(results) == 3, f"Expected 3 results, got {len(results)}" + + # Check each result + assert len(results[0]) == 1, "Expected 1 row in first result" + assert results[0][0][0] == 1, "First result should be 1" + + assert len(results[1]) == 1, "Expected 1 row in second result" + assert results[1][0][0] == 'test', "Second result should be 'test'" + + assert len(results[2]) == 1, "Expected 1 row in third result" + assert isinstance(results[2][0][0], (str, datetime.datetime)), "Third result should be a date" + + # Cursor should be usable after batch execution + cursor.execute("SELECT 2 AS another_value") + row = cursor.fetchone() + assert row[0] == 2, "Cursor should be usable after batch execution" + + # Clean up + cursor.close() + +def test_batch_execute_with_parameters(db_connection): + """Test batch_execute with different parameter types""" + statements = [ + "SELECT ? AS int_param", + "SELECT ? AS float_param", + "SELECT ? AS string_param", + "SELECT ? AS binary_param", + "SELECT ? AS bool_param", + "SELECT ? AS null_param" + ] + + params = [ + [123], + [3.14159], + ["test string"], + [bytearray(b'binary data')], + [True], + [None] + ] + + results, cursor = db_connection.batch_execute(statements, params) + + # Verify each parameter was correctly applied + assert results[0][0][0] == 123, "Integer parameter not handled correctly" + assert abs(results[1][0][0] - 3.14159) < 0.00001, "Float parameter not handled correctly" + assert results[2][0][0] == "test string", "String parameter not handled correctly" + assert results[3][0][0] == bytearray(b'binary data'), "Binary parameter not handled correctly" + assert results[4][0][0] == True, "Boolean parameter not handled correctly" + assert results[5][0][0] is None, "NULL parameter not handled correctly" + + cursor.close() + +def test_batch_execute_dml_statements(db_connection): + """Test batch_execute with DML statements (INSERT, UPDATE, DELETE) + + ⚠️ WARNING: This test has several limitations: + 1. Transaction isolation levels may affect behavior in production environments + 2. Large batch operations may encounter size or timeout limits not tested here + 3. Error handling during partial batch completion needs careful consideration + 4. Results must be fully consumed between statements to avoid "Connection is busy" errors + 5. Server-side performance characteristics aren't fully tested + + The test verifies: + - DML statements work correctly in a batch context + - Row counts are properly returned for modification operations + - Results from SELECT statements following DML are accessible + """ + cursor = db_connection.cursor() + drop_table_if_exists(cursor, "#batch_test") + + try: + # Create a test table + cursor.execute("CREATE TABLE #batch_test (id INT, value VARCHAR(50))") + + statements = [ + "INSERT INTO #batch_test VALUES (?, ?)", + "INSERT INTO #batch_test VALUES (?, ?)", + "UPDATE #batch_test SET value = ? WHERE id = ?", + "DELETE FROM #batch_test WHERE id = ?", + "SELECT * FROM #batch_test ORDER BY id" + ] + + params = [ + [1, "value1"], + [2, "value2"], + ["updated", 1], + [2], + None + ] + + results, batch_cursor = db_connection.batch_execute(statements, params) + + # Check row counts for DML statements + assert results[0] == 1, "First INSERT should affect 1 row" + assert results[1] == 1, "Second INSERT should affect 1 row" + assert results[2] == 1, "UPDATE should affect 1 row" + assert results[3] == 1, "DELETE should affect 1 row" + + # Check final SELECT result + assert len(results[4]) == 1, "Should have 1 row after operations" + assert results[4][0][0] == 1, "Remaining row should have id=1" + assert results[4][0][1] == "updated", "Value should be updated" + + batch_cursor.close() + finally: + cursor.execute("DROP TABLE IF EXISTS #batch_test") + cursor.close() + +def test_batch_execute_reuse_cursor(db_connection): + """Test batch_execute with cursor reuse""" + # Create a cursor to reuse + cursor = db_connection.cursor() + + # Execute a statement to set up cursor state + cursor.execute("SELECT 'before batch' AS initial_state") + initial_result = cursor.fetchall() + assert initial_result[0][0] == 'before batch', "Initial cursor state incorrect" + + # Use the cursor in batch_execute + statements = [ + "SELECT 'during batch' AS batch_state" + ] + + results, returned_cursor = db_connection.batch_execute(statements, reuse_cursor=cursor) + + # Verify we got the same cursor back + assert returned_cursor is cursor, "Batch should return the same cursor object" + + # Verify the result + assert results[0][0][0] == 'during batch', "Batch result incorrect" + + # Verify cursor is still usable + cursor.execute("SELECT 'after batch' AS final_state") + final_result = cursor.fetchall() + assert final_result[0][0] == 'after batch', "Cursor should remain usable after batch" + + cursor.close() + +def test_batch_execute_auto_close(db_connection): + """Test auto_close parameter in batch_execute""" + statements = ["SELECT 1"] + + # Test with auto_close=True + results, cursor = db_connection.batch_execute(statements, auto_close=True) + + # Cursor should be closed + with pytest.raises(Exception): + cursor.execute("SELECT 2") # Should fail because cursor is closed + + # Test with auto_close=False (default) + results, cursor = db_connection.batch_execute(statements) + + # Cursor should still be usable + cursor.execute("SELECT 2") + assert cursor.fetchone()[0] == 2, "Cursor should be usable when auto_close=False" + + cursor.close() + +def test_batch_execute_transaction(db_connection): + """Test batch_execute within a transaction + + ⚠️ WARNING: This test has several limitations: + 1. Temporary table behavior with transactions varies between SQL Server versions + 2. Global temporary tables (##) must be used rather than local temporary tables (#) + 3. Explicit commits and rollbacks are required - no auto-transaction management + 4. Transaction isolation levels aren't tested + 5. Distributed transactions aren't tested + 6. Error recovery during partial transaction completion isn't fully tested + + The test verifies: + - Batch operations work within explicit transactions + - Rollback correctly undoes all changes in the batch + - Commit correctly persists all changes in the batch + """ + if db_connection.autocommit: + db_connection.autocommit = False + + cursor = db_connection.cursor() + + # Important: Use ## (global temp table) instead of # (local temp table) + # Global temp tables are more reliable across transactions + drop_table_if_exists(cursor, "##batch_transaction_test") + + try: + # Create a test table outside the implicit transaction + cursor.execute("CREATE TABLE ##batch_transaction_test (id INT, value VARCHAR(50))") + db_connection.commit() # Commit the table creation + + # Execute a batch of statements + statements = [ + "INSERT INTO ##batch_transaction_test VALUES (1, 'value1')", + "INSERT INTO ##batch_transaction_test VALUES (2, 'value2')", + "SELECT COUNT(*) FROM ##batch_transaction_test" + ] + + results, batch_cursor = db_connection.batch_execute(statements) + + # Verify the SELECT result shows both rows + assert results[2][0][0] == 2, "Should have 2 rows before rollback" + + # Rollback the transaction + db_connection.rollback() + + # Execute another statement to check if rollback worked + cursor.execute("SELECT COUNT(*) FROM ##batch_transaction_test") + count = cursor.fetchone()[0] + assert count == 0, "Rollback should remove all inserted rows" + + # Try again with commit + results, batch_cursor = db_connection.batch_execute(statements) + db_connection.commit() + + # Verify data persists after commit + cursor.execute("SELECT COUNT(*) FROM ##batch_transaction_test") + count = cursor.fetchone()[0] + assert count == 2, "Data should persist after commit" + + batch_cursor.close() + finally: + # Clean up - always try to drop the table + try: + cursor.execute("DROP TABLE ##batch_transaction_test") + db_connection.commit() + except Exception as e: + print(f"Error dropping test table: {e}") + cursor.close() + +def test_batch_execute_error_handling(db_connection): + """Test error handling in batch_execute""" + statements = [ + "SELECT 1", + "SELECT * FROM nonexistent_table", # This will fail + "SELECT 3" + ] + + # Execution should fail on the second statement + with pytest.raises(Exception) as excinfo: + db_connection.batch_execute(statements) + + # Verify error message contains something about the nonexistent table + assert "nonexistent_table" in str(excinfo.value).lower(), "Error should mention the problem" + + # Test with a cursor that gets auto-closed on error + cursor = db_connection.cursor() + + try: + db_connection.batch_execute(statements, reuse_cursor=cursor, auto_close=True) + except Exception: + # If auto_close works, the cursor should be closed despite the error + with pytest.raises(Exception): + cursor.execute("SELECT 1") # Should fail if cursor is closed + + # Test that the connection is still usable after an error + new_cursor = db_connection.cursor() + new_cursor.execute("SELECT 1") + assert new_cursor.fetchone()[0] == 1, "Connection should be usable after batch error" + new_cursor.close() + +def test_batch_execute_input_validation(db_connection): + """Test input validation in batch_execute""" + # Test with non-list statements + with pytest.raises(TypeError): + db_connection.batch_execute("SELECT 1") + + # Test with non-list params + with pytest.raises(TypeError): + db_connection.batch_execute(["SELECT 1"], "param") + + # Test with mismatched statements and params lengths + with pytest.raises(ValueError): + db_connection.batch_execute(["SELECT 1", "SELECT 2"], [[1]]) + + # Test with empty statements list + results, cursor = db_connection.batch_execute([]) + assert results == [], "Empty statements should return empty results" + cursor.close() + +def test_batch_execute_large_batch(db_connection): + """Test batch_execute with a large number of statements + + ⚠️ WARNING: This test has several limitations: + 1. Only tests 50 statements, which may not reveal issues with much larger batches + 2. Each statement is very simple, not testing complex query performance + 3. Memory usage for large result sets isn't thoroughly tested + 4. Results must be fully consumed between statements to avoid "Connection is busy" errors + 5. Driver-specific limitations may exist for maximum batch sizes + 6. Network timeouts during long-running batches aren't tested + + The test verifies: + - The method can handle multiple statements in sequence + - Results are correctly returned for all statements + - Memory usage remains reasonable during batch processing + """ + # Create a batch of 50 statements + statements = ["SELECT " + str(i) for i in range(50)] + + results, cursor = db_connection.batch_execute(statements) + + # Verify we got 50 results + assert len(results) == 50, f"Expected 50 results, got {len(results)}" + + # Check a few random results + assert results[0][0][0] == 0, "First result should be 0" + assert results[25][0][0] == 25, "Middle result should be 25" + assert results[49][0][0] == 49, "Last result should be 49" + + cursor.close() \ No newline at end of file From fedf56fa9eafbea6001de513706494290c4fb4ef Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Mon, 15 Sep 2025 21:20:48 +0530 Subject: [PATCH 5/6] Resolving comments --- mssql_python/connection.py | 29 ++++++++++++++++++++++++-- mssql_python/cursor.py | 7 +++++++ tests/test_003_connection.py | 40 +++++++++++++++--------------------- 3 files changed, 51 insertions(+), 25 deletions(-) diff --git a/mssql_python/connection.py b/mssql_python/connection.py index 5abb8c0d1..e0a3c0f54 100644 --- a/mssql_python/connection.py +++ b/mssql_python/connection.py @@ -225,8 +225,18 @@ def execute(self, sql: str, *args: Any) -> Cursor: cursor.close() # Explicitly release resources """ cursor = self.cursor() - cursor.execute(sql, *args) - return cursor + try: + # Add the cursor to our tracking set BEFORE execution + # This ensures it's tracked even if execution fails + self._cursors.add(cursor) + + # Now execute the query + cursor.execute(sql, *args) + return cursor + except Exception: + # If execution fails, close the cursor to avoid leaking resources + cursor.close() + raise def batch_execute(self, statements, params=None, reuse_cursor=None, auto_close=False): """ @@ -452,6 +462,21 @@ def close(self) -> None: self._closed = True log('info', "Connection closed successfully.") + + def _remove_cursor(self, cursor): + """ + Remove a cursor from the connection's tracking. + + This method is called when a cursor is closed to ensure proper cleanup. + + Args: + cursor: The cursor to remove from tracking. + """ + if hasattr(self, '_cursors'): + try: + self._cursors.discard(cursor) + except Exception: + pass # Ignore errors during cleanup def __del__(self): """ diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 912cb4a8c..eb384f195 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -446,6 +446,13 @@ def close(self) -> None: if self.closed: raise Exception("Cursor is already closed.") + # Remove this cursor from the connection's tracking + if hasattr(self, 'connection') and self.connection and hasattr(self.connection, '_cursors'): + try: + self.connection._cursors.discard(self) + except Exception as e: + log('warning', "Error removing cursor from connection tracking: %s", e) + if self.hstmt: self.hstmt.free() self.hstmt = None diff --git a/tests/test_003_connection.py b/tests/test_003_connection.py index f6661d1fa..051c4a15b 100644 --- a/tests/test_003_connection.py +++ b/tests/test_003_connection.py @@ -902,18 +902,10 @@ def test_execute_with_large_parameters(db_connection): cursor.close() def test_connection_execute_cursor_lifecycle(db_connection): - """Test that cursors from execute() are properly managed throughout their lifecycle - - This test verifies that: - 1. Cursors are added to the connection's tracking when created via execute() - 2. Cursors are removed from tracking when explicitly closed - 3. Cursors are removed from tracking when they go out of scope and are garbage collected - - This helps ensure that the connection properly manages cursor resources and prevents - memory/resource leaks over time. - """ + """Test that cursors from execute() are properly managed throughout their lifecycle""" import gc import weakref + import sys # Clear any existing cursors and force garbage collection for cursor in list(db_connection._cursors): @@ -945,27 +937,29 @@ def test_connection_execute_cursor_lifecycle(db_connection): remaining_cursor_ids = [id(c) for c in db_connection._cursors] assert cursor_id not in remaining_cursor_ids, "Closed cursor should be removed from connection tracking" - # 3. Test that a cursor is removed when it goes out of scope - def create_and_abandon_cursor(): - temp_cursor = db_connection.execute("SELECT 2 AS test") - temp_cursor.fetchall() # Consume results - # Keep track of this cursor with a weak reference so we can check if it's collected - return weakref.ref(temp_cursor) + # 3. Test that a cursor is tracked but then removed when it goes out of scope + # Note: We'll create a cursor and verify it's tracked BEFORE leaving the scope + temp_cursor = db_connection.execute("SELECT 2 AS test") + temp_cursor.fetchall() # Consume results + + # Get a weak reference to the cursor for checking collection later + cursor_ref = weakref.ref(temp_cursor) - # Create a cursor that will go out of scope - cursor_ref = create_and_abandon_cursor() + # Verify cursor is tracked immediately after creation + assert len(db_connection._cursors) > initial_cursor_count, "New cursor should be tracked immediately" + assert temp_cursor in db_connection._cursors, "New cursor should be in the connection's tracking set" - # Cursor should be tracked before garbage collection - assert len(db_connection._cursors) > initial_cursor_count, "Abandoned cursor should initially be tracked" + # Now remove our reference to allow garbage collection + temp_cursor = None # Force garbage collection multiple times to ensure the cursor is collected for _ in range(3): gc.collect() - # Verify cursor was eventually removed from tracking - assert cursor_ref() is None, "Abandoned cursor should be garbage collected" + # Verify cursor was eventually removed from tracking after collection + assert cursor_ref() is None, "Cursor should be garbage collected after going out of scope" assert len(db_connection._cursors) == initial_cursor_count, \ - "All created cursors should be removed from tracking after being closed or collected" + "All created cursors should be removed from tracking after collection" # 4. Verify that many cursors can be created and properly cleaned up cursors = [] From a3b9707eb2e6c9531e48b5f80bfb3a858b46e591 Mon Sep 17 00:00:00 2001 From: Jahnvi Thakkar Date: Thu, 18 Sep 2025 13:42:36 +0530 Subject: [PATCH 6/6] Resolving conflicts --- tests/test_004_cursor.py | 312 --------------------------------------- 1 file changed, 312 deletions(-) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 7ca55e055..a74e13837 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -7010,318 +7010,6 @@ def test_money_smallmoney_invalid_values(cursor, db_connection): drop_table_if_exists(cursor, "dbo.money_test") db_connection.commit() -def test_lowercase_attribute(cursor, db_connection): - """Test that the lowercase attribute properly converts column names to lowercase""" - - # Store original value to restore after test - original_lowercase = mssql_python.lowercase - drop_cursor = None - - try: - # Create a test table with mixed-case column names - cursor.execute(""" - CREATE TABLE #pytest_lowercase_test ( - ID INT PRIMARY KEY, - UserName VARCHAR(50), - EMAIL_ADDRESS VARCHAR(100), - PhoneNumber VARCHAR(20) - ) - """) - db_connection.commit() - - # Insert test data - cursor.execute(""" - INSERT INTO #pytest_lowercase_test (ID, UserName, EMAIL_ADDRESS, PhoneNumber) - VALUES (1, 'JohnDoe', 'john@example.com', '555-1234') - """) - db_connection.commit() - - # First test with lowercase=False (default) - mssql_python.lowercase = False - cursor1 = db_connection.cursor() - cursor1.execute("SELECT * FROM #pytest_lowercase_test") - - # Description column names should preserve original case - column_names1 = [desc[0] for desc in cursor1.description] - assert "ID" in column_names1, "Column 'ID' should be present with original case" - assert "UserName" in column_names1, "Column 'UserName' should be present with original case" - - # Make sure to consume all results and close the cursor - cursor1.fetchall() - cursor1.close() - - # Now test with lowercase=True - mssql_python.lowercase = True - cursor2 = db_connection.cursor() - cursor2.execute("SELECT * FROM #pytest_lowercase_test") - - # Description column names should be lowercase - column_names2 = [desc[0] for desc in cursor2.description] - assert "id" in column_names2, "Column names should be lowercase when lowercase=True" - assert "username" in column_names2, "Column names should be lowercase when lowercase=True" - - # Make sure to consume all results and close the cursor - cursor2.fetchall() - cursor2.close() - - # Create a fresh cursor for cleanup - drop_cursor = db_connection.cursor() - - finally: - # Restore original value - mssql_python.lowercase = original_lowercase - - try: - # Use a separate cursor for cleanup - if drop_cursor: - drop_cursor.execute("DROP TABLE IF EXISTS #pytest_lowercase_test") - db_connection.commit() - drop_cursor.close() - except Exception as e: - print(f"Warning: Failed to drop test table: {e}") - -def test_decimal_separator_function(cursor, db_connection): - """Test decimal separator functionality with database operations""" - # Store original value to restore after test - original_separator = mssql_python.getDecimalSeparator() - - try: - # Create test table - cursor.execute(""" - CREATE TABLE #pytest_decimal_separator_test ( - id INT PRIMARY KEY, - decimal_value DECIMAL(10, 2) - ) - """) - db_connection.commit() - - # Insert test values with default separator (.) - test_value = decimal.Decimal('123.45') - cursor.execute(""" - INSERT INTO #pytest_decimal_separator_test (id, decimal_value) - VALUES (1, ?) - """, [test_value]) - db_connection.commit() - - # First test with default decimal separator (.) - cursor.execute("SELECT id, decimal_value FROM #pytest_decimal_separator_test") - row = cursor.fetchone() - default_str = str(row) - assert '123.45' in default_str, "Default separator not found in string representation" - - # Now change to comma separator and test string representation - mssql_python.setDecimalSeparator(',') - cursor.execute("SELECT id, decimal_value FROM #pytest_decimal_separator_test") - row = cursor.fetchone() - - # This should format the decimal with a comma in the string representation - comma_str = str(row) - assert '123,45' in comma_str, f"Expected comma in string representation but got: {comma_str}" - - finally: - # Restore original decimal separator - mssql_python.setDecimalSeparator(original_separator) - - # Cleanup - cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_separator_test") - db_connection.commit() - -def test_decimal_separator_basic_functionality(): - """Test basic decimal separator functionality without database operations""" - # Store original value to restore after test - original_separator = mssql_python.getDecimalSeparator() - - try: - # Test default value - assert mssql_python.getDecimalSeparator() == '.', "Default decimal separator should be '.'" - - # Test setting to comma - mssql_python.setDecimalSeparator(',') - assert mssql_python.getDecimalSeparator() == ',', "Decimal separator should be ',' after setting" - - # Test setting to other valid separators - mssql_python.setDecimalSeparator(':') - assert mssql_python.getDecimalSeparator() == ':', "Decimal separator should be ':' after setting" - - # Test invalid inputs - with pytest.raises(ValueError): - mssql_python.setDecimalSeparator('') # Empty string - - with pytest.raises(ValueError): - mssql_python.setDecimalSeparator('too_long') # More than one character - - with pytest.raises(ValueError): - mssql_python.setDecimalSeparator(123) # Not a string - - finally: - # Restore original separator - mssql_python.setDecimalSeparator(original_separator) - -def test_decimal_separator_with_multiple_values(cursor, db_connection): - """Test decimal separator with multiple different decimal values""" - original_separator = mssql_python.getDecimalSeparator() - - try: - # Create test table - cursor.execute(""" - CREATE TABLE #pytest_decimal_multi_test ( - id INT PRIMARY KEY, - positive_value DECIMAL(10, 2), - negative_value DECIMAL(10, 2), - zero_value DECIMAL(10, 2), - small_value DECIMAL(10, 4) - ) - """) - db_connection.commit() - - # Insert test data - cursor.execute(""" - INSERT INTO #pytest_decimal_multi_test VALUES (1, 123.45, -67.89, 0.00, 0.0001) - """) - db_connection.commit() - - # Test with default separator first - cursor.execute("SELECT * FROM #pytest_decimal_multi_test") - row = cursor.fetchone() - default_str = str(row) - assert '123.45' in default_str, "Default positive value formatting incorrect" - assert '-67.89' in default_str, "Default negative value formatting incorrect" - - # Change to comma separator - mssql_python.setDecimalSeparator(',') - cursor.execute("SELECT * FROM #pytest_decimal_multi_test") - row = cursor.fetchone() - comma_str = str(row) - - # Verify comma is used in all decimal values - assert '123,45' in comma_str, "Positive value not formatted with comma" - assert '-67,89' in comma_str, "Negative value not formatted with comma" - assert '0,00' in comma_str, "Zero value not formatted with comma" - assert '0,0001' in comma_str, "Small value not formatted with comma" - - finally: - # Restore original separator - mssql_python.setDecimalSeparator(original_separator) - - # Cleanup - cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_multi_test") - db_connection.commit() - -def test_decimal_separator_calculations(cursor, db_connection): - """Test that decimal separator doesn't affect calculations""" - original_separator = mssql_python.getDecimalSeparator() - - try: - # Create test table - cursor.execute(""" - CREATE TABLE #pytest_decimal_calc_test ( - id INT PRIMARY KEY, - value1 DECIMAL(10, 2), - value2 DECIMAL(10, 2) - ) - """) - db_connection.commit() - - # Insert test data - cursor.execute(""" - INSERT INTO #pytest_decimal_calc_test VALUES (1, 10.25, 5.75) - """) - db_connection.commit() - - # Test with default separator - cursor.execute("SELECT value1 + value2 AS sum_result FROM #pytest_decimal_calc_test") - row = cursor.fetchone() - assert row.sum_result == decimal.Decimal('16.00'), "Sum calculation incorrect with default separator" - - # Change to comma separator - mssql_python.setDecimalSeparator(',') - - # Calculations should still work correctly - cursor.execute("SELECT value1 + value2 AS sum_result FROM #pytest_decimal_calc_test") - row = cursor.fetchone() - assert row.sum_result == decimal.Decimal('16.00'), "Sum calculation affected by separator change" - - # But string representation should use comma - assert '16,00' in str(row), "Sum result not formatted with comma in string representation" - - finally: - # Restore original separator - mssql_python.setDecimalSeparator(original_separator) - - # Cleanup - cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test") - db_connection.commit() - -def test_lowercase_attribute(cursor, db_connection): - """Test that the lowercase attribute properly converts column names to lowercase""" - - # Store original value to restore after test - original_lowercase = mssql_python.lowercase - drop_cursor = None - - try: - # Create a test table with mixed-case column names - cursor.execute(""" - CREATE TABLE #pytest_lowercase_test ( - ID INT PRIMARY KEY, - UserName VARCHAR(50), - EMAIL_ADDRESS VARCHAR(100), - PhoneNumber VARCHAR(20) - ) - """) - db_connection.commit() - - # Insert test data - cursor.execute(""" - INSERT INTO #pytest_lowercase_test (ID, UserName, EMAIL_ADDRESS, PhoneNumber) - VALUES (1, 'JohnDoe', 'john@example.com', '555-1234') - """) - db_connection.commit() - - # First test with lowercase=False (default) - mssql_python.lowercase = False - cursor1 = db_connection.cursor() - cursor1.execute("SELECT * FROM #pytest_lowercase_test") - - # Description column names should preserve original case - column_names1 = [desc[0] for desc in cursor1.description] - assert "ID" in column_names1, "Column 'ID' should be present with original case" - assert "UserName" in column_names1, "Column 'UserName' should be present with original case" - - # Make sure to consume all results and close the cursor - cursor1.fetchall() - cursor1.close() - - # Now test with lowercase=True - mssql_python.lowercase = True - cursor2 = db_connection.cursor() - cursor2.execute("SELECT * FROM #pytest_lowercase_test") - - # Description column names should be lowercase - column_names2 = [desc[0] for desc in cursor2.description] - assert "id" in column_names2, "Column names should be lowercase when lowercase=True" - assert "username" in column_names2, "Column names should be lowercase when lowercase=True" - - # Make sure to consume all results and close the cursor - cursor2.fetchall() - cursor2.close() - - # Create a fresh cursor for cleanup - drop_cursor = db_connection.cursor() - - finally: - # Restore original value - mssql_python.lowercase = original_lowercase - - try: - # Use a separate cursor for cleanup - if drop_cursor: - drop_cursor.execute("DROP TABLE IF EXISTS #pytest_lowercase_test") - db_connection.commit() - drop_cursor.close() - except Exception as e: - print(f"Warning: Failed to drop test table: {e}") - def test_decimal_separator_function(cursor, db_connection): """Test decimal separator functionality with database operations""" # Store original value to restore after test