From 8e3a39ec5e27b9b978245a64f5b75ea2ab63e4b5 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Fri, 29 Aug 2025 13:55:38 +0530 Subject: [PATCH 01/17] FIX: Handle empty data --- mssql_python/pybind/ddbc_bindings.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 8a88688ab..3d68758fb 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1701,6 +1701,9 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p } } else if (dataLen == SQL_NULL_DATA) { row.append(py::none()); + } else if (dataLen == 0) { + // Empty string + row.append(std::string()); } else { assert(dataLen == SQL_NO_TOTAL); LOG("SQLGetData couldn't determine the length of the data. " @@ -2241,7 +2244,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum row.append(py::none()); continue; } - assert(dataLen > 0 && "Must be > 0 since SQL_NULL_DATA & SQL_NO_DATA is already handled"); + assert(dataLen >= 0 && "Data length must be >= 0"); switch (dataType) { case SQL_CHAR: From 08f9a4c3dcf7bab5b778bdfd6315d6eeb95be752 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Fri, 29 Aug 2025 14:02:38 +0530 Subject: [PATCH 02/17] undo some stuff --- mssql_python/pybind/ddbc_bindings.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 3d68758fb..c5e4f949f 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1701,9 +1701,6 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p } } else if (dataLen == SQL_NULL_DATA) { row.append(py::none()); - } else if (dataLen == 0) { - // Empty string - row.append(std::string()); } else { assert(dataLen == SQL_NO_TOTAL); LOG("SQLGetData couldn't determine the length of the data. " From 38381fe344c2ef15e854fe2c3168b2920feb35a3 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Fri, 29 Aug 2025 15:38:56 +0530 Subject: [PATCH 03/17] 0 length fix and tests --- mssql_python/pybind/ddbc_bindings.cpp | 10 +- tests/test_004_cursor.py | 128 ++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 1 deletion(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index c5e4f949f..98d244e31 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1674,7 +1674,6 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p // TODO: Handle the return code better ret = SQLGetData_ptr(hStmt, i, SQL_C_CHAR, dataBuffer.data(), dataBuffer.size(), &dataLen); - if (SQL_SUCCEEDED(ret)) { // TODO: Refactor these if's across other switches to avoid code duplication // columnSize is in chars, dataLen is in bytes @@ -1701,6 +1700,9 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p } } else if (dataLen == SQL_NULL_DATA) { row.append(py::none()); + } else if (dataLen == 0) { + // Empty string + row.append(std::string("")); } else { assert(dataLen == SQL_NO_TOTAL); LOG("SQLGetData couldn't determine the length of the data. " @@ -1757,6 +1759,9 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p } } else if (dataLen == SQL_NULL_DATA) { row.append(py::none()); + } else if (dataLen == 0) { + // Empty string + row.append(py::str("")); } else { assert(dataLen == SQL_NO_TOTAL); LOG("SQLGetData couldn't determine the length of the data. " @@ -1953,6 +1958,9 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p } } else if (dataLen == SQL_NULL_DATA) { row.append(py::none()); + } else if (dataLen == 0) { + // Empty bytes + row.append(py::bytes("")); } else { assert(dataLen == SQL_NO_TOTAL); LOG("SQLGetData couldn't determine the length of the data. " diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 22149ea56..b2599c1b3 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -68,6 +68,134 @@ def test_cursor(cursor): """Check if the cursor is created""" assert cursor is not None, "Cursor should not be None" +def test_empty_string_handling(cursor, db_connection): + """Test that empty strings are handled correctly without assertion failures""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_empty_string") + cursor.execute("CREATE TABLE #pytest_empty_string (id INT, text_col NVARCHAR(100))") + db_connection.commit() + + # Insert empty string + cursor.execute("INSERT INTO #pytest_empty_string VALUES (1, '')") + db_connection.commit() + + # Fetch the empty string - this would previously cause assertion failure + cursor.execute("SELECT text_col FROM #pytest_empty_string WHERE id = 1") + row = cursor.fetchone() + assert row is not None, "Should return a row" + assert row[0] == '', "Should return empty string, not None" + + # Test with fetchall to ensure batch fetch works too + cursor.execute("SELECT text_col FROM #pytest_empty_string") + rows = cursor.fetchall() + assert len(rows) == 1, "Should return 1 row" + assert rows[0][0] == '', "fetchall should also return empty string" + + except Exception as e: + pytest.fail(f"Empty string handling test failed: {e}") + finally: + cursor.execute("DROP TABLE #pytest_empty_string") + db_connection.commit() + +def test_empty_binary_handling(cursor, db_connection): + """Test that empty binary data is handled correctly without assertion failures""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_empty_binary") + cursor.execute("CREATE TABLE #pytest_empty_binary (id INT, binary_col VARBINARY(100))") + db_connection.commit() + + # Insert empty binary data + cursor.execute("INSERT INTO #pytest_empty_binary VALUES (1, 0x)") # Empty binary literal + db_connection.commit() + + # Fetch the empty binary - this would previously cause assertion failure + cursor.execute("SELECT binary_col FROM #pytest_empty_binary WHERE id = 1") + row = cursor.fetchone() + assert row is not None, "Should return a row" + assert row[0] == b'', "Should return empty bytes, not None" + assert isinstance(row[0], bytes), "Should return bytes type" + assert len(row[0]) == 0, "Should be zero-length bytes" + + except Exception as e: + pytest.fail(f"Empty binary handling test failed: {e}") + finally: + cursor.execute("DROP TABLE #pytest_empty_binary") + db_connection.commit() + +def test_mixed_empty_and_null_values(cursor, db_connection): + """Test that empty strings/binary and NULL values are distinguished correctly""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_empty_vs_null") + cursor.execute(""" + CREATE TABLE #pytest_empty_vs_null ( + id INT, + text_col NVARCHAR(100), + binary_col VARBINARY(100) + ) + """) + db_connection.commit() + + # Insert mix of empty and NULL values + cursor.execute("INSERT INTO #pytest_empty_vs_null VALUES (1, '', 0x)") # Empty string and binary + cursor.execute("INSERT INTO #pytest_empty_vs_null VALUES (2, NULL, NULL)") # NULL values + cursor.execute("INSERT INTO #pytest_empty_vs_null VALUES (3, 'data', 0x1234)") # Non-empty values + db_connection.commit() + + # Fetch all rows + cursor.execute("SELECT id, text_col, binary_col FROM #pytest_empty_vs_null ORDER BY id") + rows = cursor.fetchall() + + # Validate row 1: empty values + assert rows[0][1] == '', "Row 1 should have empty string, not None" + assert rows[0][2] == b'', "Row 1 should have empty bytes, not None" + + # Validate row 2: NULL values + assert rows[1][1] is None, "Row 2 should have NULL (None) for text" + assert rows[1][2] is None, "Row 2 should have NULL (None) for binary" + + # Validate row 3: non-empty values + assert rows[2][1] == 'data', "Row 3 should have non-empty string" + assert rows[2][2] == b'\x12\x34', "Row 3 should have non-empty binary" + + except Exception as e: + pytest.fail(f"Empty vs NULL test failed: {e}") + finally: + cursor.execute("DROP TABLE #pytest_empty_vs_null") + db_connection.commit() + +def test_empty_string_edge_cases(cursor, db_connection): + """Test edge cases with empty strings""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_empty_edge") + cursor.execute("CREATE TABLE #pytest_empty_edge (id INT, data NVARCHAR(MAX))") + db_connection.commit() + + # Test various ways to insert empty strings + cursor.execute("INSERT INTO #pytest_empty_edge VALUES (1, '')") + cursor.execute("INSERT INTO #pytest_empty_edge VALUES (2, N'')") + cursor.execute("INSERT INTO #pytest_empty_edge VALUES (3, ?)", ['']) + cursor.execute("INSERT INTO #pytest_empty_edge VALUES (4, ?)", [u'']) + db_connection.commit() + + # Verify all are empty strings + cursor.execute("SELECT id, data, LEN(data) as length FROM #pytest_empty_edge ORDER BY id") + rows = cursor.fetchall() + + for row in rows: + assert row[1] == '', f"Row {row[0]} should have empty string" + assert row[2] == 0, f"Row {row[0]} should have length 0" + assert row[1] is not None, f"Row {row[0]} should not be None" + + except Exception as e: + pytest.fail(f"Empty string edge cases test failed: {e}") + finally: + cursor.execute("DROP TABLE #pytest_empty_edge") + db_connection.commit() + def test_insert_id_column(cursor, db_connection): """Test inserting data into the id column""" try: From 229c9574f5adb1e44255370c0475ff2fe625f8a9 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Fri, 29 Aug 2025 15:50:46 +0530 Subject: [PATCH 04/17] restore condition and cleanup --- mssql_python/pybind/ddbc_bindings.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 98d244e31..a0ed37b96 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1674,6 +1674,7 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p // TODO: Handle the return code better ret = SQLGetData_ptr(hStmt, i, SQL_C_CHAR, dataBuffer.data(), dataBuffer.size(), &dataLen); + if (SQL_SUCCEEDED(ret)) { // TODO: Refactor these if's across other switches to avoid code duplication // columnSize is in chars, dataLen is in bytes @@ -2249,7 +2250,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum row.append(py::none()); continue; } - assert(dataLen >= 0 && "Data length must be >= 0"); + assert(dataLen > 0 && "Must be > 0 since SQL_NULL_DATA & SQL_NO_DATA is already handled"); switch (dataType) { case SQL_CHAR: From 7dc1135a9c7f60a2a174c1f2176e18488fdfe1a8 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Fri, 29 Aug 2025 16:07:54 +0530 Subject: [PATCH 05/17] fixed assert --- mssql_python/pybind/ddbc_bindings.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index a0ed37b96..d1bb2d0e1 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -2250,7 +2250,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum row.append(py::none()); continue; } - assert(dataLen > 0 && "Must be > 0 since SQL_NULL_DATA & SQL_NO_DATA is already handled"); + assert(dataLen >= 0 && "Data length must be >= 0"); switch (dataType) { case SQL_CHAR: From 96e59cc515c02271a386082932c469d4401a379c Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Wed, 3 Sep 2025 15:15:35 +0530 Subject: [PATCH 06/17] FIX: Unix handling in Executemany --- mssql_python/cursor.py | 10 +- mssql_python/pybind/ddbc_bindings.cpp | 13 +- tests/test_004_cursor.py | 462 +++++++++++++++++++++++++- 3 files changed, 475 insertions(+), 10 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 88152aa2a..f15b0eff3 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -385,9 +385,9 @@ def _map_sql_type(self, param, parameters_list, i): False, ) return ( - ddbc_sql_const.SQL_BINARY.value, + ddbc_sql_const.SQL_VARBINARY.value, # Use VARBINARY instead of BINARY ddbc_sql_const.SQL_C_BINARY.value, - len(param), + max(len(param), 1), # Ensure minimum column size of 1 0, False, ) @@ -402,9 +402,9 @@ def _map_sql_type(self, param, parameters_list, i): True, ) return ( - ddbc_sql_const.SQL_BINARY.value, + ddbc_sql_const.SQL_VARBINARY.value, # Use VARBINARY instead of BINARY ddbc_sql_const.SQL_C_BINARY.value, - len(param), + max(len(param), 1), # Ensure minimum column size of 1 0, False, ) @@ -845,6 +845,8 @@ def _select_best_sample_value(column): return max(non_nulls, key=lambda s: len(str(s))) if all(isinstance(v, datetime.datetime) for v in non_nulls): return datetime.datetime.now() + if all(isinstance(v, (bytes, bytearray)) for v in non_nulls): + return max(non_nulls, key=lambda b: len(b)) if all(isinstance(v, datetime.date) for v in non_nulls): return datetime.date.today() return non_nulls[0] # fallback diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 2c1c13bfc..e1480cd6a 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1308,7 +1308,13 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, std::string offending = WideToUTF8(wstr); ThrowStdException("Input string exceeds allowed column size at parameter index " + std::to_string(paramIndex)); } +#if defined(__APPLE__) || defined(__linux__) + auto utf16Buf = WStringToSQLWCHAR(wstr); + size_t copySize = std::min(utf16Buf.size(), static_cast(info.columnSize + 1)); + std::memcpy(wcharArray + i * (info.columnSize + 1), utf16Buf.data(), copySize * sizeof(SQLWCHAR)); +#else std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); +#endif strLenOrIndArray[i] = SQL_NTS; } } @@ -1372,7 +1378,12 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, std::string str = columnValues[i].cast(); if (str.size() > info.columnSize) ThrowStdException("Input exceeds column size at index " + std::to_string(i)); - std::memcpy(charArray + i * (info.columnSize + 1), str.c_str(), str.size()); + // Clear the entire buffer slot first + std::memset(charArray + i * (info.columnSize + 1), 0, info.columnSize + 1); + // Then copy the actual data + if (str.size() > 0) { + std::memcpy(charArray + i * (info.columnSize + 1), str.c_str(), str.size()); + } strLenOrIndArray[i] = static_cast(str.size()); } } diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index b2599c1b3..835283d6f 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -667,20 +667,21 @@ def test_longvarbinary(cursor, db_connection): db_connection.commit() cursor.execute("INSERT INTO #pytest_longvarbinary_test (longvarbinary_column) VALUES (?), (?)", [bytearray("ABCDEFGHI", 'utf-8'), bytes("123!@#", 'utf-8')]) db_connection.commit() - expectedRows = 3 + expectedRows = 2 # Only 2 rows are inserted # fetchone test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = [] for i in range(0, expectedRows): rows.append(cursor.fetchone()) assert cursor.fetchone() == None, "longvarbinary_column is expected to have only {} rows".format(expectedRows) - assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 0" - assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" + # Both should return as bytes (database doesn't preserve Python type distinction) + assert tuple(rows[0]) == (bytes("ABCDEFGHI", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchone - row 0" + assert tuple(rows[1]) == (bytes("123!@#", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchone - row 1" # fetchall test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = cursor.fetchall() - assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 0" - assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" + assert tuple(rows[0]) == (bytes("ABCDEFGHI", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchall - row 0" + assert tuple(rows[1]) == (bytes("123!@#", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchall - row 1" except Exception as e: pytest.fail(f"SQL_LONGVARBINARY parsing test failed: {e}") finally: @@ -887,6 +888,457 @@ def test_execute_many(cursor, db_connection): count = cursor.fetchone()[0] assert count == 11, "Executemany failed" +def test_executemany_empty_strings(cursor, db_connection): + """Test executemany with empty strings - regression test for Unix UTF-16 conversion issue""" + try: + # Create test table for empty string testing + cursor.execute(""" + CREATE TABLE #pytest_empty_batch ( + id INT, + data NVARCHAR(50) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_empty_batch") + db_connection.commit() + + # Test data with mix of empty strings and regular strings + test_data = [ + (1, ''), + (2, 'non-empty'), + (3, ''), + (4, 'another'), + (5, '') + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_empty_batch VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, data FROM #pytest_empty_batch ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 5, f"Expected 5 rows, got {len(results)}" + + # Check each row individually + expected = [ + (1, ''), + (2, 'non-empty'), + (3, ''), + (4, 'another'), + (5, '') + ] + + for i, (actual, expected_row) in enumerate(zip(results, expected)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch - expected {expected_row[0]}, got {actual[0]}" + assert actual[1] == expected_row[1], f"Row {i}: Data mismatch - expected '{expected_row[1]}', got '{actual[1]}'" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_empty_batch") + db_connection.commit() + except: + pass + +def test_executemany_empty_strings_various_types(cursor, db_connection): + """Test executemany with empty strings in different column types""" + try: + # Create test table with different string types + cursor.execute(""" + CREATE TABLE #pytest_string_types ( + id INT, + varchar_col VARCHAR(50), + nvarchar_col NVARCHAR(50), + text_col TEXT, + ntext_col NTEXT + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_string_types") + db_connection.commit() + + # Test data with empty strings for different column types + test_data = [ + (1, '', '', '', ''), + (2, 'varchar', 'nvarchar', 'text', 'ntext'), + (3, '', '', '', ''), + ] + + # Execute the batch insert + cursor.executemany( + "INSERT INTO #pytest_string_types VALUES (?, ?, ?, ?, ?)", + test_data + ) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT * FROM #pytest_string_types ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 3, f"Expected 3 rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + for j, (actual_val, expected_val) in enumerate(zip(actual, expected_row)): + assert actual_val == expected_val, f"Row {i}, Col {j}: expected '{expected_val}', got '{actual_val}'" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_string_types") + db_connection.commit() + except: + pass + +def test_executemany_unicode_and_empty_strings(cursor, db_connection): + """Test executemany with mix of Unicode characters and empty strings""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_unicode_test ( + id INT, + data NVARCHAR(100) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_unicode_test") + db_connection.commit() + + # Test data with Unicode and empty strings + test_data = [ + (1, ''), + (2, 'Hello ๐Ÿ˜„'), + (3, ''), + (4, 'ไธญๆ–‡'), + (5, ''), + (6, 'ร‘ice tรซxt'), + (7, ''), + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_unicode_test VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, data FROM #pytest_unicode_test ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 7, f"Expected 7 rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" + assert actual[1] == expected_row[1], f"Row {i}: Data mismatch - expected '{expected_row[1]}', got '{actual[1]}'" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_unicode_test") + db_connection.commit() + except: + pass + +def test_executemany_large_batch_with_empty_strings(cursor, db_connection): + """Test executemany with large batch containing empty strings""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_large_batch ( + id INT, + data NVARCHAR(50) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_large_batch") + db_connection.commit() + + # Create large test data with alternating empty and non-empty strings + test_data = [] + for i in range(100): + if i % 3 == 0: + test_data.append((i, '')) # Every 3rd row is empty + else: + test_data.append((i, f'data_{i}')) + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_large_batch VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT COUNT(*) FROM #pytest_large_batch") + count = cursor.fetchone()[0] + assert count == 100, f"Expected 100 rows, got {count}" + + # Check a few specific rows + cursor.execute("SELECT id, data FROM #pytest_large_batch WHERE id IN (0, 1, 3, 6, 9) ORDER BY id") + results = cursor.fetchall() + + expected_subset = [ + (0, ''), # 0 % 3 == 0, should be empty + (1, 'data_1'), # 1 % 3 != 0, should have data + (3, ''), # 3 % 3 == 0, should be empty + (6, ''), # 6 % 3 == 0, should be empty + (9, ''), # 9 % 3 == 0, should be empty + ] + + for actual, expected in zip(results, expected_subset): + assert actual[0] == expected[0], f"ID mismatch: expected {expected[0]}, got {actual[0]}" + assert actual[1] == expected[1], f"Data mismatch for ID {actual[0]}: expected '{expected[1]}', got '{actual[1]}'" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_large_batch") + db_connection.commit() + except: + pass + +def test_executemany_compare_with_execute(cursor, db_connection): + """Test that executemany produces same results as individual execute calls""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_compare_test ( + id INT, + data NVARCHAR(50) + ) + """) + + # Test data with empty strings + test_data = [ + (1, ''), + (2, 'test'), + (3, ''), + (4, 'another'), + (5, ''), + ] + + # First, insert using individual execute calls + cursor.execute("DELETE FROM #pytest_compare_test") + for row_data in test_data: + cursor.execute("INSERT INTO #pytest_compare_test VALUES (?, ?)", row_data) + db_connection.commit() + + # Get results from individual inserts + cursor.execute("SELECT id, data FROM #pytest_compare_test ORDER BY id") + execute_results = cursor.fetchall() + + # Clear and insert using executemany + cursor.execute("DELETE FROM #pytest_compare_test") + cursor.executemany("INSERT INTO #pytest_compare_test VALUES (?, ?)", test_data) + db_connection.commit() + + # Get results from batch insert + cursor.execute("SELECT id, data FROM #pytest_compare_test ORDER BY id") + executemany_results = cursor.fetchall() + + # Compare results + assert len(execute_results) == len(executemany_results), "Row count mismatch between execute and executemany" + + for i, (exec_row, batch_row) in enumerate(zip(execute_results, executemany_results)): + assert exec_row[0] == batch_row[0], f"Row {i}: ID mismatch between execute and executemany" + assert exec_row[1] == batch_row[1], f"Row {i}: Data mismatch between execute and executemany - execute: '{exec_row[1]}', executemany: '{batch_row[1]}'" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_compare_test") + db_connection.commit() + except: + pass + +# def test_executemany_edge_cases_empty_strings(cursor, db_connection): +# """Test executemany edge cases with empty strings and special characters""" +# try: +# # Create test table +# cursor.execute(""" +# CREATE TABLE #pytest_edge_cases ( +# id INT, +# varchar_data VARCHAR(100), +# nvarchar_data NVARCHAR(100) +# ) +# """) + +# # Clear any existing data +# cursor.execute("DELETE FROM #pytest_edge_cases") +# db_connection.commit() + +# # Edge case test data +# test_data = [ +# # All empty strings +# (1, '', ''), +# # One empty, one not +# (2, '', 'not empty'), +# (3, 'not empty', ''), +# # Special whitespace cases +# (4, ' ', ' '), # Single and double space +# (5, '\t', '\n'), # Tab and newline +# # Mixed Unicode and empty +# (6, '', '๐Ÿš€'), +# (7, 'ASCII', ''), +# # Boundary cases +# (8, '', ''), # Another all empty +# ] + +# # Execute the batch insert +# cursor.executemany( +# "INSERT INTO #pytest_edge_cases VALUES (?, ?, ?)", +# test_data +# ) +# db_connection.commit() + +# # Verify the data was inserted correctly +# cursor.execute("SELECT id, varchar_data, nvarchar_data FROM #pytest_edge_cases ORDER BY id") +# results = cursor.fetchall() + +# # Check that we got the right number of rows +# assert len(results) == len(test_data), f"Expected {len(test_data)} rows, got {len(results)}" + +# # Check each row +# for i, (actual, expected_row) in enumerate(zip(results, test_data)): +# assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" +# assert actual[1] == expected_row[1], f"Row {i}: VARCHAR mismatch - expected '{repr(expected_row[1])}', got '{repr(actual[1])}'" +# assert actual[2] == expected_row[2], f"Row {i}: NVARCHAR mismatch - expected '{repr(expected_row[2])}', got '{repr(actual[2])}'" + +# finally: +# # Cleanup +# try: +# cursor.execute("DROP TABLE IF EXISTS #pytest_edge_cases") +# db_connection.commit() +# except: +# pass + +def test_executemany_null_vs_empty_string(cursor, db_connection): + """Test that executemany correctly distinguishes between NULL and empty string""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_null_vs_empty ( + id INT, + data NVARCHAR(50) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_null_vs_empty") + db_connection.commit() + + # Test data with NULLs and empty strings + test_data = [ + (1, None), # NULL + (2, ''), # Empty string + (3, None), # NULL + (4, 'data'), # Regular string + (5, ''), # Empty string + (6, None), # NULL + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_null_vs_empty VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, data FROM #pytest_null_vs_empty ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 6, f"Expected 6 rows, got {len(results)}" + + # Check each row, paying attention to NULL vs empty string + expected_results = [ + (1, None), # NULL should remain NULL + (2, ''), # Empty string should remain empty string + (3, None), # NULL should remain NULL + (4, 'data'), # Regular string + (5, ''), # Empty string should remain empty string + (6, None), # NULL should remain NULL + ] + + for i, (actual, expected) in enumerate(zip(results, expected_results)): + assert actual[0] == expected[0], f"Row {i}: ID mismatch" + if expected[1] is None: + assert actual[1] is None, f"Row {i}: Expected NULL, got '{actual[1]}'" + else: + assert actual[1] == expected[1], f"Row {i}: Expected '{expected[1]}', got '{actual[1]}'" + + # Also test with explicit queries for NULL vs empty + cursor.execute("SELECT COUNT(*) FROM #pytest_null_vs_empty WHERE data IS NULL") + null_count = cursor.fetchone()[0] + assert null_count == 3, f"Expected 3 NULL values, got {null_count}" + + cursor.execute("SELECT COUNT(*) FROM #pytest_null_vs_empty WHERE data = ''") + empty_count = cursor.fetchone()[0] + assert empty_count == 2, f"Expected 2 empty strings, got {empty_count}" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_null_vs_empty") + db_connection.commit() + except: + pass + +def test_executemany_binary_data_edge_cases(cursor, db_connection): + """Test executemany with binary data and empty byte arrays""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_binary_test ( + id INT, + binary_data VARBINARY(100) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_binary_test") + db_connection.commit() + + # Test data with binary data and empty bytes + test_data = [ + (1, b''), # Empty bytes + (2, b'hello'), # Regular bytes + (3, b''), # Empty bytes again + (4, b'\x00\x01\x02'), # Binary data with null bytes + (5, b''), # Empty bytes + (6, None), # NULL + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_binary_test VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_binary_test ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 6, f"Expected 6 rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" + if expected_row[1] is None: + assert actual[1] is None, f"Row {i}: Expected NULL, got {actual[1]}" + else: + assert actual[1] == expected_row[1], f"Row {i}: Binary data mismatch - expected {expected_row[1]}, got {actual[1]}" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_binary_test") + db_connection.commit() + except: + pass + def test_nextset(cursor): """Test nextset""" cursor.execute("SELECT * FROM #pytest_all_data_types WHERE id = 1;") From 18c92263044cf5b4913901e80acbc9e6ee4f2a21 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Wed, 3 Sep 2025 15:23:15 +0530 Subject: [PATCH 07/17] tests --- tests/test_004_cursor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 38236679a..8068567db 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -675,13 +675,13 @@ def test_longvarbinary(cursor, db_connection): rows.append(cursor.fetchone()) assert cursor.fetchone() == None, "longvarbinary_column is expected to have only {} rows".format(expectedRows) # Both should return as bytes (database doesn't preserve Python type distinction) - assert tuple(rows[0]) == (bytes("ABCDEFGHI", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchone - row 0" - assert tuple(rows[1]) == (bytes("123!@#", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchone - row 1" + assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 0" + assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" # fetchall test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = cursor.fetchall() - assert tuple(rows[0]) == (bytes("ABCDEFGHI", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchall - row 0" - assert tuple(rows[1]) == (bytes("123!@#", 'utf-8'),), "SQL_LONGVARBINARY parsing failed for fetchall - row 1" + assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 0" + assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" except Exception as e: pytest.fail(f"SQL_LONGVARBINARY parsing test failed: {e}") finally: From 28d844136e02c494c6bc0da52cd9445f6ec7ad7a Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Wed, 3 Sep 2025 16:23:41 +0530 Subject: [PATCH 08/17] Undo binary fixes since its in another branch now --- mssql_python/cursor.py | 10 ++--- mssql_python/pybind/ddbc_bindings.cpp | 7 +--- tests/test_004_cursor.py | 58 ++------------------------- 3 files changed, 8 insertions(+), 67 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index d212766cc..e2c811c9d 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -389,9 +389,9 @@ def _map_sql_type(self, param, parameters_list, i): False, ) return ( - ddbc_sql_const.SQL_VARBINARY.value, # Use VARBINARY instead of BINARY + ddbc_sql_const.SQL_BINARY.value, ddbc_sql_const.SQL_C_BINARY.value, - max(len(param), 1), # Ensure minimum column size of 1 + len(param), 0, False, ) @@ -406,9 +406,9 @@ def _map_sql_type(self, param, parameters_list, i): True, ) return ( - ddbc_sql_const.SQL_VARBINARY.value, # Use VARBINARY instead of BINARY + ddbc_sql_const.SQL_BINARY.value, ddbc_sql_const.SQL_C_BINARY.value, - max(len(param), 1), # Ensure minimum column size of 1 + len(param), 0, False, ) @@ -849,8 +849,6 @@ def _select_best_sample_value(column): return max(non_nulls, key=lambda s: len(str(s))) if all(isinstance(v, datetime.datetime) for v in non_nulls): return datetime.datetime.now() - if all(isinstance(v, (bytes, bytearray)) for v in non_nulls): - return max(non_nulls, key=lambda b: len(b)) if all(isinstance(v, datetime.date) for v in non_nulls): return datetime.date.today() return non_nulls[0] # fallback diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 3d169b28a..6a875aca7 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1426,12 +1426,7 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, std::string str = columnValues[i].cast(); if (str.size() > info.columnSize) ThrowStdException("Input exceeds column size at index " + std::to_string(i)); - // Clear the entire buffer slot first - std::memset(charArray + i * (info.columnSize + 1), 0, info.columnSize + 1); - // Then copy the actual data - if (str.size() > 0) { - std::memcpy(charArray + i * (info.columnSize + 1), str.c_str(), str.size()); - } + std::memcpy(charArray + i * (info.columnSize + 1), str.c_str(), str.size()); strLenOrIndArray[i] = static_cast(str.size()); } } diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 8068567db..88db00a14 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -667,7 +667,7 @@ def test_longvarbinary(cursor, db_connection): db_connection.commit() cursor.execute("INSERT INTO #pytest_longvarbinary_test (longvarbinary_column) VALUES (?), (?)", [bytearray("ABCDEFGHI", 'utf-8'), bytes("123!@#", 'utf-8')]) db_connection.commit() - expectedRows = 2 # Only 2 rows are inserted + expectedRows = 3 # fetchone test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = [] @@ -676,12 +676,12 @@ def test_longvarbinary(cursor, db_connection): assert cursor.fetchone() == None, "longvarbinary_column is expected to have only {} rows".format(expectedRows) # Both should return as bytes (database doesn't preserve Python type distinction) assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 0" - assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" + assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" # fetchall test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = cursor.fetchall() assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 0" - assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" + assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" except Exception as e: pytest.fail(f"SQL_LONGVARBINARY parsing test failed: {e}") finally: @@ -1287,58 +1287,6 @@ def test_executemany_null_vs_empty_string(cursor, db_connection): except: pass -def test_executemany_binary_data_edge_cases(cursor, db_connection): - """Test executemany with binary data and empty byte arrays""" - try: - # Create test table - cursor.execute(""" - CREATE TABLE #pytest_binary_test ( - id INT, - binary_data VARBINARY(100) - ) - """) - - # Clear any existing data - cursor.execute("DELETE FROM #pytest_binary_test") - db_connection.commit() - - # Test data with binary data and empty bytes - test_data = [ - (1, b''), # Empty bytes - (2, b'hello'), # Regular bytes - (3, b''), # Empty bytes again - (4, b'\x00\x01\x02'), # Binary data with null bytes - (5, b''), # Empty bytes - (6, None), # NULL - ] - - # Execute the batch insert - cursor.executemany("INSERT INTO #pytest_binary_test VALUES (?, ?)", test_data) - db_connection.commit() - - # Verify the data was inserted correctly - cursor.execute("SELECT id, binary_data FROM #pytest_binary_test ORDER BY id") - results = cursor.fetchall() - - # Check that we got the right number of rows - assert len(results) == 6, f"Expected 6 rows, got {len(results)}" - - # Check each row - for i, (actual, expected_row) in enumerate(zip(results, test_data)): - assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" - if expected_row[1] is None: - assert actual[1] is None, f"Row {i}: Expected NULL, got {actual[1]}" - else: - assert actual[1] == expected_row[1], f"Row {i}: Binary data mismatch - expected {expected_row[1]}, got {actual[1]}" - - finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_binary_test") - db_connection.commit() - except: - pass - def test_nextset(cursor): """Test nextset""" cursor.execute("SELECT * FROM #pytest_all_data_types WHERE id = 1;") From 7969a93bb9b3e155b7fd738782e325c37e87a8d8 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Wed, 3 Sep 2025 16:28:54 +0530 Subject: [PATCH 09/17] add edgecase test --- tests/test_004_cursor.py | 120 +++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 88db00a14..0ad85eb69 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -1156,66 +1156,66 @@ def test_executemany_compare_with_execute(cursor, db_connection): except: pass -# def test_executemany_edge_cases_empty_strings(cursor, db_connection): -# """Test executemany edge cases with empty strings and special characters""" -# try: -# # Create test table -# cursor.execute(""" -# CREATE TABLE #pytest_edge_cases ( -# id INT, -# varchar_data VARCHAR(100), -# nvarchar_data NVARCHAR(100) -# ) -# """) - -# # Clear any existing data -# cursor.execute("DELETE FROM #pytest_edge_cases") -# db_connection.commit() - -# # Edge case test data -# test_data = [ -# # All empty strings -# (1, '', ''), -# # One empty, one not -# (2, '', 'not empty'), -# (3, 'not empty', ''), -# # Special whitespace cases -# (4, ' ', ' '), # Single and double space -# (5, '\t', '\n'), # Tab and newline -# # Mixed Unicode and empty -# (6, '', '๐Ÿš€'), -# (7, 'ASCII', ''), -# # Boundary cases -# (8, '', ''), # Another all empty -# ] - -# # Execute the batch insert -# cursor.executemany( -# "INSERT INTO #pytest_edge_cases VALUES (?, ?, ?)", -# test_data -# ) -# db_connection.commit() - -# # Verify the data was inserted correctly -# cursor.execute("SELECT id, varchar_data, nvarchar_data FROM #pytest_edge_cases ORDER BY id") -# results = cursor.fetchall() - -# # Check that we got the right number of rows -# assert len(results) == len(test_data), f"Expected {len(test_data)} rows, got {len(results)}" - -# # Check each row -# for i, (actual, expected_row) in enumerate(zip(results, test_data)): -# assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" -# assert actual[1] == expected_row[1], f"Row {i}: VARCHAR mismatch - expected '{repr(expected_row[1])}', got '{repr(actual[1])}'" -# assert actual[2] == expected_row[2], f"Row {i}: NVARCHAR mismatch - expected '{repr(expected_row[2])}', got '{repr(actual[2])}'" - -# finally: -# # Cleanup -# try: -# cursor.execute("DROP TABLE IF EXISTS #pytest_edge_cases") -# db_connection.commit() -# except: -# pass +def test_executemany_edge_cases_empty_strings(cursor, db_connection): + """Test executemany edge cases with empty strings and special characters""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_edge_cases ( + id INT, + varchar_data VARCHAR(100), + nvarchar_data NVARCHAR(100) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_edge_cases") + db_connection.commit() + + # Edge case test data + test_data = [ + # All empty strings + (1, '', ''), + # One empty, one not + (2, '', 'not empty'), + (3, 'not empty', ''), + # Special whitespace cases + (4, ' ', ' '), # Single and double space + (5, '\t', '\n'), # Tab and newline + # Mixed Unicode and empty + # (6, '', '๐Ÿš€'), #TODO: Uncomment once nvarcharmax, varcharmax and unicode support is implemented for executemany + (7, 'ASCII', ''), + # Boundary cases + (8, '', ''), # Another all empty + ] + + # Execute the batch insert + cursor.executemany( + "INSERT INTO #pytest_edge_cases VALUES (?, ?, ?)", + test_data + ) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, varchar_data, nvarchar_data FROM #pytest_edge_cases ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == len(test_data), f"Expected {len(test_data)} rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" + assert actual[1] == expected_row[1], f"Row {i}: VARCHAR mismatch - expected '{repr(expected_row[1])}', got '{repr(actual[1])}'" + assert actual[2] == expected_row[2], f"Row {i}: NVARCHAR mismatch - expected '{repr(expected_row[2])}', got '{repr(actual[2])}'" + + finally: + # Cleanup + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_edge_cases") + db_connection.commit() + except: + pass def test_executemany_null_vs_empty_string(cursor, db_connection): """Test that executemany correctly distinguishes between NULL and empty string""" From 283a9991f763f4b5ebcbe6f2fc2d85787d2c64d2 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Wed, 3 Sep 2025 16:35:12 +0530 Subject: [PATCH 10/17] test cleanup --- tests/test_004_cursor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 0ad85eb69..1ae21ae53 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -674,7 +674,6 @@ def test_longvarbinary(cursor, db_connection): for i in range(0, expectedRows): rows.append(cursor.fetchone()) assert cursor.fetchone() == None, "longvarbinary_column is expected to have only {} rows".format(expectedRows) - # Both should return as bytes (database doesn't preserve Python type distinction) assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 0" assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" # fetchall test From 4b708b34aed95d27616cd4f5b2683028f06793d7 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Wed, 3 Sep 2025 16:37:44 +0530 Subject: [PATCH 11/17] test cleanup --- tests/test_004_cursor.py | 77 +++++++++++++++------------------------- 1 file changed, 28 insertions(+), 49 deletions(-) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 1ae21ae53..68ae762ef 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -934,14 +934,11 @@ def test_executemany_empty_strings(cursor, db_connection): for i, (actual, expected_row) in enumerate(zip(results, expected)): assert actual[0] == expected_row[0], f"Row {i}: ID mismatch - expected {expected_row[0]}, got {actual[0]}" assert actual[1] == expected_row[1], f"Row {i}: Data mismatch - expected '{expected_row[1]}', got '{actual[1]}'" - + except Exception as e: + pytest.fail(f"Executemany with empty strings failed: {e}") finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_empty_batch") - db_connection.commit() - except: - pass + cursor.execute("DROP TABLE IF EXISTS #pytest_empty_batch") + db_connection.commit() def test_executemany_empty_strings_various_types(cursor, db_connection): """Test executemany with empty strings in different column types""" @@ -986,14 +983,11 @@ def test_executemany_empty_strings_various_types(cursor, db_connection): for i, (actual, expected_row) in enumerate(zip(results, test_data)): for j, (actual_val, expected_val) in enumerate(zip(actual, expected_row)): assert actual_val == expected_val, f"Row {i}, Col {j}: expected '{expected_val}', got '{actual_val}'" - + except Exception as e: + pytest.fail(f"Executemany with empty strings in various types failed: {e}") finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_string_types") - db_connection.commit() - except: - pass + cursor.execute("DROP TABLE IF EXISTS #pytest_string_types") + db_connection.commit() def test_executemany_unicode_and_empty_strings(cursor, db_connection): """Test executemany with mix of Unicode characters and empty strings""" @@ -1036,14 +1030,11 @@ def test_executemany_unicode_and_empty_strings(cursor, db_connection): for i, (actual, expected_row) in enumerate(zip(results, test_data)): assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" assert actual[1] == expected_row[1], f"Row {i}: Data mismatch - expected '{expected_row[1]}', got '{actual[1]}'" - + except Exception as e: + pytest.fail(f"Executemany with Unicode and empty strings failed: {e}") finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_unicode_test") - db_connection.commit() - except: - pass + cursor.execute("DROP TABLE IF EXISTS #pytest_unicode_test") + db_connection.commit() def test_executemany_large_batch_with_empty_strings(cursor, db_connection): """Test executemany with large batch containing empty strings""" @@ -1092,14 +1083,11 @@ def test_executemany_large_batch_with_empty_strings(cursor, db_connection): for actual, expected in zip(results, expected_subset): assert actual[0] == expected[0], f"ID mismatch: expected {expected[0]}, got {actual[0]}" assert actual[1] == expected[1], f"Data mismatch for ID {actual[0]}: expected '{expected[1]}', got '{actual[1]}'" - + except Exception as e: + pytest.fail(f"Executemany with large batch and empty strings failed: {e}") finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_large_batch") - db_connection.commit() - except: - pass + cursor.execute("DROP TABLE IF EXISTS #pytest_large_batch") + db_connection.commit() def test_executemany_compare_with_execute(cursor, db_connection): """Test that executemany produces same results as individual execute calls""" @@ -1146,14 +1134,11 @@ def test_executemany_compare_with_execute(cursor, db_connection): for i, (exec_row, batch_row) in enumerate(zip(execute_results, executemany_results)): assert exec_row[0] == batch_row[0], f"Row {i}: ID mismatch between execute and executemany" assert exec_row[1] == batch_row[1], f"Row {i}: Data mismatch between execute and executemany - execute: '{exec_row[1]}', executemany: '{batch_row[1]}'" - + except Exception as e: + pytest.fail(f"Executemany vs execute comparison failed: {e}") finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_compare_test") - db_connection.commit() - except: - pass + cursor.execute("DROP TABLE IF EXISTS #pytest_compare_test") + db_connection.commit() def test_executemany_edge_cases_empty_strings(cursor, db_connection): """Test executemany edge cases with empty strings and special characters""" @@ -1207,14 +1192,11 @@ def test_executemany_edge_cases_empty_strings(cursor, db_connection): assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" assert actual[1] == expected_row[1], f"Row {i}: VARCHAR mismatch - expected '{repr(expected_row[1])}', got '{repr(actual[1])}'" assert actual[2] == expected_row[2], f"Row {i}: NVARCHAR mismatch - expected '{repr(expected_row[2])}', got '{repr(actual[2])}'" - + except Exception as e: + pytest.fail(f"Executemany edge cases with empty strings failed: {e}") finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_edge_cases") - db_connection.commit() - except: - pass + cursor.execute("DROP TABLE IF EXISTS #pytest_edge_cases") + db_connection.commit() def test_executemany_null_vs_empty_string(cursor, db_connection): """Test that executemany correctly distinguishes between NULL and empty string""" @@ -1277,14 +1259,11 @@ def test_executemany_null_vs_empty_string(cursor, db_connection): cursor.execute("SELECT COUNT(*) FROM #pytest_null_vs_empty WHERE data = ''") empty_count = cursor.fetchone()[0] assert empty_count == 2, f"Expected 2 empty strings, got {empty_count}" - + except Exception as e: + pytest.fail(f"Executemany NULL vs empty string test failed: {e}") finally: - # Cleanup - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_null_vs_empty") - db_connection.commit() - except: - pass + cursor.execute("DROP TABLE IF EXISTS #pytest_null_vs_empty") + db_connection.commit() def test_nextset(cursor): """Test nextset""" From 2ce421ca2941e7e05e9d1f828721ff0bdc460535 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Thu, 4 Sep 2025 11:07:20 +0530 Subject: [PATCH 12/17] add binary fix --- mssql_python/cursor.py | 20 ++------------ tests/test_004_cursor.py | 56 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 21 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index e2c811c9d..521ed482b 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -380,16 +380,8 @@ def _map_sql_type(self, param, parameters_list, i): ) if isinstance(param, bytes): - if len(param) > 8000: # Assuming VARBINARY(MAX) for long byte arrays - return ( - ddbc_sql_const.SQL_VARBINARY.value, - ddbc_sql_const.SQL_C_BINARY.value, - len(param), - 0, - False, - ) return ( - ddbc_sql_const.SQL_BINARY.value, + ddbc_sql_const.SQL_VARBINARY.value, ddbc_sql_const.SQL_C_BINARY.value, len(param), 0, @@ -397,16 +389,8 @@ def _map_sql_type(self, param, parameters_list, i): ) if isinstance(param, bytearray): - if len(param) > 8000: # Assuming VARBINARY(MAX) for long byte arrays - return ( - ddbc_sql_const.SQL_VARBINARY.value, - ddbc_sql_const.SQL_C_BINARY.value, - len(param), - 0, - True, - ) return ( - ddbc_sql_const.SQL_BINARY.value, + ddbc_sql_const.SQL_VARBINARY.value, ddbc_sql_const.SQL_C_BINARY.value, len(param), 0, diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 68ae762ef..f527057cc 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -667,7 +667,7 @@ def test_longvarbinary(cursor, db_connection): db_connection.commit() cursor.execute("INSERT INTO #pytest_longvarbinary_test (longvarbinary_column) VALUES (?), (?)", [bytearray("ABCDEFGHI", 'utf-8'), bytes("123!@#", 'utf-8')]) db_connection.commit() - expectedRows = 3 + expectedRows = 2 # Only 2 rows are inserted # fetchone test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = [] @@ -675,12 +675,12 @@ def test_longvarbinary(cursor, db_connection): rows.append(cursor.fetchone()) assert cursor.fetchone() == None, "longvarbinary_column is expected to have only {} rows".format(expectedRows) assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 0" - assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" + assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchone - row 1" # fetchall test cursor.execute("SELECT longvarbinary_column FROM #pytest_longvarbinary_test") rows = cursor.fetchall() assert rows[0] == [bytearray("ABCDEFGHI", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 0" - assert rows[1] == [bytes("123!@#\0\0\0", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" + assert rows[1] == [bytes("123!@#", 'utf-8')], "SQL_LONGVARBINARY parsing failed for fetchall - row 1" except Exception as e: pytest.fail(f"SQL_LONGVARBINARY parsing test failed: {e}") finally: @@ -1265,6 +1265,56 @@ def test_executemany_null_vs_empty_string(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_null_vs_empty") db_connection.commit() +def test_executemany_binary_data_edge_cases(cursor, db_connection): + """Test executemany with binary data and empty byte arrays""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_binary_test ( + id INT, + binary_data VARBINARY(100) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_binary_test") + db_connection.commit() + + # Test data with binary data and empty bytes + test_data = [ + (1, b''), # Empty bytes + (2, b'hello'), # Regular bytes + (3, b''), # Empty bytes again + (4, b'\x00\x01\x02'), # Binary data with null bytes + (5, b''), # Empty bytes + (6, None), # NULL + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_binary_test VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_binary_test ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 6, f"Expected 6 rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" + if expected_row[1] is None: + assert actual[1] is None, f"Row {i}: Expected NULL, got {actual[1]}" + else: + assert actual[1] == expected_row[1], f"Row {i}: Binary data mismatch expected {expected_row[1]}, got {actual[1]}" + except Exception as e: + pytest.fail(f"Executemany with binary data edge cases failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_binary_test") + db_connection.commit() + + def test_nextset(cursor): """Test nextset""" cursor.execute("SELECT * FROM #pytest_all_data_types WHERE id = 1;") From 85d44ac4196d4c1bbc28e39cc3a7a274751f9544 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Thu, 4 Sep 2025 11:32:12 +0530 Subject: [PATCH 13/17] fix --- mssql_python/cursor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 521ed482b..00e19b341 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -833,6 +833,8 @@ def _select_best_sample_value(column): return max(non_nulls, key=lambda s: len(str(s))) if all(isinstance(v, datetime.datetime) for v in non_nulls): return datetime.datetime.now() + if all(isinstance(v, (bytes, bytearray)) for v in non_nulls): + return max(non_nulls, key=lambda b: len(b)) if all(isinstance(v, datetime.date) for v in non_nulls): return datetime.date.today() return non_nulls[0] # fallback From ca3d047849949cfdeaa3194c8619be366e4034f0 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Thu, 4 Sep 2025 12:04:48 +0530 Subject: [PATCH 14/17] comment for clarification --- mssql_python/cursor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 00e19b341..971861db3 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -380,6 +380,8 @@ def _map_sql_type(self, param, parameters_list, i): ) if isinstance(param, bytes): + # Use VARBINARY for Python bytes/bytearray since they are variable-length by nature. + # This avoids storage waste from BINARY's zero-padding and matches Python's semantics. return ( ddbc_sql_const.SQL_VARBINARY.value, ddbc_sql_const.SQL_C_BINARY.value, @@ -389,6 +391,8 @@ def _map_sql_type(self, param, parameters_list, i): ) if isinstance(param, bytearray): + # Use VARBINARY for Python bytes/bytearray since they are variable-length by nature. + # This avoids storage waste from BINARY's zero-padding and matches Python's semantics. return ( ddbc_sql_const.SQL_VARBINARY.value, ddbc_sql_const.SQL_C_BINARY.value, From cf51e0cb346aceee38a23673fcf728886586590f Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Mon, 8 Sep 2025 12:11:32 +0530 Subject: [PATCH 15/17] added more tests --- tests/test_004_cursor.py | 277 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 24c4e09d1..864a42a9d 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -6271,6 +6271,283 @@ def test_executemany_utf16_length_validation(cursor, db_connection): drop_table_if_exists(cursor, "#pytest_utf16_validation") db_connection.commit() +def test_binary_data_over_8000_bytes(cursor, db_connection): + """Test binary data larger than 8000 bytes - document current driver limitations""" + try: + # Create test table with VARBINARY(MAX) to handle large data + drop_table_if_exists(cursor, "#pytest_large_binary") + cursor.execute(""" + CREATE TABLE #pytest_large_binary ( + id INT, + large_binary VARBINARY(MAX) + ) + """) + + # Test the current driver limitations: + # 1. Parameters cannot be > 8192 bytes + # 2. Fetch buffer is limited to 4096 bytes + + large_data = b'A' * 10000 # 10,000 bytes - exceeds parameter limit + + # This should fail with the current driver parameter limitation + try: + cursor.execute("INSERT INTO #pytest_large_binary VALUES (?, ?)", (1, large_data)) + pytest.fail("Expected streaming parameter error for data > 8192 bytes") + except RuntimeError as e: + error_msg = str(e) + assert "Streaming parameters is not yet supported" in error_msg, f"Expected streaming parameter error, got: {e}" + assert "8192 bytes" in error_msg, f"Expected 8192 bytes limit mentioned, got: {e}" + + # Test data that fits within both parameter and fetch limits (< 4096 bytes) + medium_data = b'B' * 3000 # 3,000 bytes - under both limits + small_data = b'C' * 1000 # 1,000 bytes - well under limits + + # These should work fine + cursor.execute("INSERT INTO #pytest_large_binary VALUES (?, ?)", (1, medium_data)) + cursor.execute("INSERT INTO #pytest_large_binary VALUES (?, ?)", (2, small_data)) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, large_binary FROM #pytest_large_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 2, f"Expected 2 rows, got {len(results)}" + assert len(results[0][1]) == 3000, f"Expected 3000 bytes, got {len(results[0][1])}" + assert len(results[1][1]) == 1000, f"Expected 1000 bytes, got {len(results[1][1])}" + assert results[0][1] == medium_data, "Medium binary data mismatch" + assert results[1][1] == small_data, "Small binary data mismatch" + + print("Note: Driver currently limits parameters to < 8192 bytes and fetch buffer to 4096 bytes.") + + except Exception as e: + pytest.fail(f"Binary data over 8000 bytes test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_large_binary") + db_connection.commit() + +def test_all_empty_binaries(cursor, db_connection): + """Test table with only empty binary values""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_all_empty_binary") + cursor.execute(""" + CREATE TABLE #pytest_all_empty_binary ( + id INT, + empty_binary VARBINARY(100) + ) + """) + + # Insert multiple rows with only empty binary data + test_data = [ + (1, b''), + (2, b''), + (3, b''), + (4, b''), + (5, b''), + ] + + cursor.executemany("INSERT INTO #pytest_all_empty_binary VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify all data is empty binary + cursor.execute("SELECT id, empty_binary FROM #pytest_all_empty_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 5, f"Expected 5 rows, got {len(results)}" + for i, row in enumerate(results, 1): + assert row[0] == i, f"ID mismatch for row {i}" + assert row[1] == b'', f"Row {i} should have empty binary, got {row[1]}" + assert isinstance(row[1], bytes), f"Row {i} should return bytes type, got {type(row[1])}" + assert len(row[1]) == 0, f"Row {i} should have zero-length binary" + + except Exception as e: + pytest.fail(f"All empty binaries test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_all_empty_binary") + db_connection.commit() + +def test_mixed_bytes_and_bytearray_types(cursor, db_connection): + """Test mixing bytes and bytearray types in same column with executemany""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_mixed_binary_types") + cursor.execute(""" + CREATE TABLE #pytest_mixed_binary_types ( + id INT, + binary_data VARBINARY(100) + ) + """) + + # Test data mixing bytes and bytearray for the same column + test_data = [ + (1, b'bytes_data'), # bytes type + (2, bytearray(b'bytearray_1')), # bytearray type + (3, b'more_bytes'), # bytes type + (4, bytearray(b'bytearray_2')), # bytearray type + (5, b''), # empty bytes + (6, bytearray()), # empty bytearray + (7, bytearray(b'\x00\x01\x02\x03')), # bytearray with null bytes + (8, b'\x04\x05\x06\x07'), # bytes with null bytes + ] + + # Execute with mixed types + cursor.executemany("INSERT INTO #pytest_mixed_binary_types VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_mixed_binary_types ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 8, f"Expected 8 rows, got {len(results)}" + + # Check each row - note that SQL Server returns everything as bytes + expected_values = [ + b'bytes_data', + b'bytearray_1', + b'more_bytes', + b'bytearray_2', + b'', + b'', + b'\x00\x01\x02\x03', + b'\x04\x05\x06\x07', + ] + + for i, (row, expected) in enumerate(zip(results, expected_values)): + assert row[0] == i + 1, f"ID mismatch for row {i+1}" + assert row[1] == expected, f"Row {i+1}: expected {expected}, got {row[1]}" + assert isinstance(row[1], bytes), f"Row {i+1} should return bytes type, got {type(row[1])}" + + except Exception as e: + pytest.fail(f"Mixed bytes and bytearray types test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_mixed_binary_types") + db_connection.commit() + +def test_binary_mostly_small_one_large(cursor, db_connection): + """Test binary column with mostly small/empty values but one large value (within driver limits)""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_mixed_size_binary") + cursor.execute(""" + CREATE TABLE #pytest_mixed_size_binary ( + id INT, + binary_data VARBINARY(MAX) + ) + """) + + # Create large binary value within both parameter and fetch limits (< 4096 bytes) + large_binary = b'X' * 3500 # 3,500 bytes - under both limits + + # Test data with mostly small/empty values and one large value + test_data = [ + (1, b''), # Empty + (2, b'small'), # Small value + (3, b''), # Empty again + (4, large_binary), # Large value (3,500 bytes) + (5, b'tiny'), # Small value + (6, b''), # Empty + (7, b'short'), # Small value + (8, b''), # Empty + ] + + # Execute with mixed sizes + cursor.executemany("INSERT INTO #pytest_mixed_size_binary VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_mixed_size_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 8, f"Expected 8 rows, got {len(results)}" + + # Check each row + expected_lengths = [0, 5, 0, 3500, 4, 0, 5, 0] + for i, (row, expected_len) in enumerate(zip(results, expected_lengths)): + assert row[0] == i + 1, f"ID mismatch for row {i+1}" + assert len(row[1]) == expected_len, f"Row {i+1}: expected length {expected_len}, got {len(row[1])}" + + # Special check for the large value + if i == 3: # Row 4 (index 3) has the large value + assert row[1] == large_binary, f"Row 4 should have large binary data" + + # Test that we can query the large value specifically + cursor.execute("SELECT binary_data FROM #pytest_mixed_size_binary WHERE id = 4") + large_result = cursor.fetchone() + assert len(large_result[0]) == 3500, "Large binary should be 3,500 bytes" + assert large_result[0] == large_binary, "Large binary data should match" + + print("Note: Large binary test uses 3,500 bytes due to current driver limits (8192 param, 4096 fetch).") + + except Exception as e: + pytest.fail(f"Binary mostly small one large test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_mixed_size_binary") + db_connection.commit() + +def test_only_null_and_empty_binary(cursor, db_connection): + """Test table with only NULL and empty binary values to ensure fallback doesn't produce size=0""" + try: + # Create test table + drop_table_if_exists(cursor, "#pytest_null_empty_binary") + cursor.execute(""" + CREATE TABLE #pytest_null_empty_binary ( + id INT, + binary_data VARBINARY(100) + ) + """) + + # Test data with only NULL and empty values + test_data = [ + (1, None), # NULL + (2, b''), # Empty bytes + (3, None), # NULL + (4, b''), # Empty bytes + (5, None), # NULL + (6, b''), # Empty bytes + ] + + # Execute with only NULL and empty values + cursor.executemany("INSERT INTO #pytest_null_empty_binary VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, binary_data FROM #pytest_null_empty_binary ORDER BY id") + results = cursor.fetchall() + + assert len(results) == 6, f"Expected 6 rows, got {len(results)}" + + # Check each row + expected_values = [None, b'', None, b'', None, b''] + for i, (row, expected) in enumerate(zip(results, expected_values)): + assert row[0] == i + 1, f"ID mismatch for row {i+1}" + + if expected is None: + assert row[1] is None, f"Row {i+1} should be NULL, got {row[1]}" + else: + assert row[1] == b'', f"Row {i+1} should be empty bytes, got {row[1]}" + assert isinstance(row[1], bytes), f"Row {i+1} should return bytes type, got {type(row[1])}" + assert len(row[1]) == 0, f"Row {i+1} should have zero length" + + # Test specific queries to ensure NULL vs empty distinction + cursor.execute("SELECT COUNT(*) FROM #pytest_null_empty_binary WHERE binary_data IS NULL") + null_count = cursor.fetchone()[0] + assert null_count == 3, f"Expected 3 NULL values, got {null_count}" + + cursor.execute("SELECT COUNT(*) FROM #pytest_null_empty_binary WHERE binary_data IS NOT NULL") + not_null_count = cursor.fetchone()[0] + assert not_null_count == 3, f"Expected 3 non-NULL values, got {not_null_count}" + + # Test that empty binary values have length 0 (not confused with NULL) + cursor.execute("SELECT COUNT(*) FROM #pytest_null_empty_binary WHERE DATALENGTH(binary_data) = 0") + empty_count = cursor.fetchone()[0] + assert empty_count == 3, f"Expected 3 empty binary values, got {empty_count}" + + except Exception as e: + pytest.fail(f"Only NULL and empty binary test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_null_empty_binary") + db_connection.commit() + def test_close(db_connection): """Test closing the cursor""" try: From 94237da45310411394a1e74b1bf425668da52d00 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Mon, 8 Sep 2025 12:28:29 +0530 Subject: [PATCH 16/17] reversing unwanted changes --- mssql_python/pybind/ddbc_bindings.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 99fd2a90e..b38854c84 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1369,12 +1369,6 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, std::string offending = WideToUTF8(wstr); ThrowStdException("Input string exceeds allowed column size at parameter index " + std::to_string(paramIndex)); } -#if defined(__APPLE__) || defined(__linux__) - auto utf16Buf = WStringToSQLWCHAR(wstr); - size_t copySize = std::min(utf16Buf.size(), static_cast(info.columnSize + 1)); - std::memcpy(wcharArray + i * (info.columnSize + 1), utf16Buf.data(), copySize * sizeof(SQLWCHAR)); -#else - std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); #endif strLenOrIndArray[i] = SQL_NTS; } From 282a52a30a021aeba2c77f24ddbb0be1b4d9ef92 Mon Sep 17 00:00:00 2001 From: Gaurav Sharma Date: Mon, 8 Sep 2025 12:29:21 +0530 Subject: [PATCH 17/17] reversing unwanted changes --- mssql_python/pybind/ddbc_bindings.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index b38854c84..bbc3a2f52 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1369,6 +1369,7 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, std::string offending = WideToUTF8(wstr); ThrowStdException("Input string exceeds allowed column size at parameter index " + std::to_string(paramIndex)); } + std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); #endif strLenOrIndArray[i] = SQL_NTS; }