-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathverify_basis.py
More file actions
66 lines (48 loc) · 1.66 KB
/
verify_basis.py
File metadata and controls
66 lines (48 loc) · 1.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Verify basis_table.csv by re-running every query and comparing actual_output."""
import csv
import sys
from pathlib import Path
# Set JAVA_HOME if needed
import os
os.environ.setdefault("JAVA_HOME", "/opt/homebrew/opt/openjdk@17")
from dialect_mapper.basis.tools import execute_spark_sql
def verify(csv_path: str):
path = Path(csv_path)
if not path.exists():
print(f"File not found: {csv_path}")
sys.exit(1)
with open(path) as f:
reader = csv.DictReader(f)
rows = list(reader)
if not rows:
print("CSV is empty.")
sys.exit(1)
passed = 0
failed = 0
print(f"Verifying {len(rows)} basis queries from {csv_path}\n")
for row in rows:
query = row["query"]
expected = row["actual_output"]
result = execute_spark_sql.invoke({"query": query})
match = result.strip() == expected.strip()
status = "PASS" if match else "FAIL"
if match:
passed += 1
else:
failed += 1
if not match:
print(f" {status} | {row['id']} | {query}")
print(f" expected: {repr(expected)}")
print(f" got: {repr(result)}")
else:
print(f" {status} | {row['id']} | {query} => {result}")
print(f"\n{'='*50}")
print(f"Results: {passed} passed, {failed} failed, {len(rows)} total")
if failed > 0:
sys.exit(1)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python verify_basis.py <path_to_basis_table.csv>")
print("Example: python verify_basis.py output/concat_ws/basis_table.csv")
sys.exit(1)
verify(sys.argv[1])