-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPivot
More file actions
135 lines (116 loc) · 5.42 KB
/
Pivot
File metadata and controls
135 lines (116 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import pandas as pd
from openpyxl import load_workbook
def read_large_pivot(file_directory: str,
file_name: str,
sheet_name: str,
chunk_size: int = 1000,
header_row: int = None,
program_filter: str = None,
new_ps_program_filter: str = None,
activity_filter: str = None) -> pd.DataFrame:
"""
Reads a large pivot table's cached data from an Excel sheet and filters rows by specified criteria.
This function opens an Excel file in read-only and data-only mode, targeting the specified sheet.
Since pivot table filters might be at the top of the sheet, you can specify the row number that
contains the actual column headings via the header_row parameter. Data is read in chunks for efficiency,
and optional filters for 'Program', 'New PS Program', and 'Activity' are applied.
Args:
file_directory (str): The path to the directory containing the Excel file. Use an empty string if local.
file_name (str): The name of the Excel file.
sheet_name (str): The name of the sheet that contains the pivot table data.
chunk_size (int): The number of rows to read per chunk. Defaults to 1000.
header_row (int, optional): The row number that contains the actual table headers.
If None, the first non-empty row is used.
program_filter (str, optional): Filter value for the 'Program' column.
new_ps_program_filter (str, optional): Filter value for the 'New PS Program' column.
activity_filter (str, optional): Filter value for the 'Activity' column.
Returns:
pd.DataFrame: A DataFrame containing the filtered data, or an empty DataFrame if the sheet or data is not found.
"""
# Construct the full file path.
full_path = os.path.join(file_directory, file_name) if file_directory else file_name
print(f"Opening file: {full_path}")
# Load the workbook in read-only and data-only mode.
wb = load_workbook(full_path, read_only=True, data_only=True)
# Check if the target sheet exists.
if sheet_name not in wb.sheetnames:
print(f"Error: Sheet '{sheet_name}' not found in the Excel file.")
return pd.DataFrame()
ws = wb[sheet_name]
data_chunks = []
# If header_row is specified, use it; otherwise, auto-detect the first non-empty row.
if header_row is not None:
header_row_values = next(ws.iter_rows(min_row=header_row, max_row=header_row, values_only=True))
header = list(header_row_values)
# Data starts from the row after header_row.
data_iter = ws.iter_rows(min_row=header_row+1, values_only=True)
else:
# Auto-detect header: first non-empty row.
data_iter = ws.iter_rows(values_only=True)
header = None
for row in data_iter:
if any(cell is not None for cell in row):
header = list(row)
break
if header is None:
print("Error: No header row found in the sheet.")
return pd.DataFrame()
print(f"Using header: {header}")
# Process rows in chunks.
current_chunk = []
row_count = 0
for row in data_iter:
# Skip rows that are completely empty.
if not any(cell is not None for cell in row):
continue
current_chunk.append(row)
row_count += 1
if row_count % chunk_size == 0:
data_chunks.append(pd.DataFrame(current_chunk, columns=header))
current_chunk = []
# Add any remaining rows.
if current_chunk:
data_chunks.append(pd.DataFrame(current_chunk, columns=header))
# Concatenate all chunks into a single DataFrame.
df = pd.concat(data_chunks, ignore_index=True) if data_chunks else pd.DataFrame(columns=header)
print(f"Finished reading. Total rows read (excluding header): {row_count}")
# Apply filters if specified.
if program_filter is not None:
if "Program" in df.columns:
df = df[df["Program"] == program_filter]
else:
print("Warning: 'Program' column not found in the data.")
if new_ps_program_filter is not None:
if "New PS Program" in df.columns:
df = df[df["New PS Program"] == new_ps_program_filter]
else:
print("Warning: 'New PS Program' column not found in the data.")
if activity_filter is not None:
if "Activity" in df.columns:
df = df[df["Activity"] == activity_filter]
else:
print("Warning: 'Activity' column not found in the data.")
return df
# Example usage:
if __name__ == "__main__":
# File path settings.
file_directory = r"C:\Users\U155771\OneDrive - L3Harris Technologies Inc\Code\Python\CAM Script"
file_name = r"labor.xlsm"
sheet_name = "Labor Pivot by Activity"
# Specify the header row where actual table headings are found.
# Adjust this number to the row that contains your real column headers.
header_row = 3
# Filter for rows where the 'New PS Program' column equals "XXXX"
new_ps_program_filter = "XXXX"
# Read and filter the pivot table data.
df_filtered = read_large_pivot(
file_directory,
file_name,
sheet_name,
chunk_size=1000,
header_row=header_row,
new_ps_program_filter=new_ps_program_filter
)
# Display the first few rows of the filtered data.
print(df_filtered.head())