-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathpredict_posture.py
More file actions
157 lines (119 loc) Β· 5.14 KB
/
predict_posture.py
File metadata and controls
157 lines (119 loc) Β· 5.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
POSTURE PREDICTION - Extremely Simple Usage
This script takes RAW keypoint data and predicts posture clusters.
Only clusters 2 and 3 are considered GOOD postures.
Usage:
python predict_posture.py
Or import and use:
from predict_posture import predict_posture
result = predict_posture(keypoint_data, bounding_box_data)
"""
import pickle
import numpy as np
import pandas as pd
from pathlib import Path
import warnings
# Suppress sklearn warnings for cleaner output
warnings.filterwarnings("ignore", message="X does not have valid feature names")
warnings.filterwarnings("ignore", category=UserWarning, module="sklearn")
def normalize_keypoints(keypoints_df, bounding_boxes_df):
"""Normalize raw keypoint coordinates using bounding boxes."""
# Join with bounding boxes
if "image_id" in bounding_boxes_df.columns:
bounding_boxes_df = bounding_boxes_df.set_index("image_id")
joined_df = keypoints_df.join(bounding_boxes_df.add_prefix("bounding_box_"))
# Get x and y columns
x_columns = [col for col in joined_df.columns if col.endswith("-x")]
y_columns = [col for col in joined_df.columns if col.endswith("-y")]
# Normalize coordinates
normalized_df = joined_df.copy()
normalized_df[x_columns] = (
normalized_df[x_columns]
.subtract(joined_df["bounding_box_x1"], axis=0)
.div(joined_df["bounding_box_width"], axis=0)
)
normalized_df[y_columns] = (
normalized_df[y_columns]
.subtract(joined_df["bounding_box_y1"], axis=0)
.div(joined_df["bounding_box_height"], axis=0)
)
return normalized_df[x_columns + y_columns]
def load_pretrained_models(model_dir="trained_models/keypoint_models"):
"""Load the pretrained posture clustering models."""
model_path = Path(model_dir)
with open(model_path / "scaler.pkl", "rb") as f:
scaler = pickle.load(f)
with open(model_path / "pca.pkl", "rb") as f:
pca = pickle.load(f)
with open(model_path / "kmeans.pkl", "rb") as f:
kmeans = pickle.load(f)
return scaler, pca, kmeans
def predict_posture(
keypoints_df, bounding_boxes_df, model_dir="trained_models/keypoint_models"
):
"""
Predict posture from RAW keypoint data.
Args:
keypoints_df: DataFrame with raw keypoint coordinates (x,y for each keypoint)
bounding_boxes_df: DataFrame with bounding box info (x1, y1, width, height)
model_dir: Path to trained models
Returns:
dict: {
'clusters': array of cluster predictions,
'good_posture': boolean array (True for clusters 1&2),
'confidence': prediction confidence scores
}
"""
# Load models
scaler, pca, kmeans = load_pretrained_models(model_dir)
# Normalize keypoints
normalized_features = normalize_keypoints(keypoints_df, bounding_boxes_df)
# Apply ML pipeline
X_scaled = scaler.transform(normalized_features.values)
X_pca = pca.transform(X_scaled)
# Predict clusters
clusters = kmeans.predict(X_pca)
# Calculate distances to cluster centers (confidence)
distances = kmeans.transform(X_pca)
confidence = 1 / (1 + distances.min(axis=1)) # Higher = more confident
# Determine good posture (only clusters 2 and 3 are good)
good_posture = np.isin(clusters, [2, 3])
return {"clusters": clusters, "good_posture": good_posture, "confidence": confidence}
def example_usage():
"""Example showing how to use the posture prediction."""
print("π POSTURE PREDICTION EXAMPLE")
print("=" * 50)
try:
# Load sample data
from helpers.load_waybetter_db import load_keypoints_db, load_bounding_boxes_db
print("π Loading sample data...")
keypoints_df = load_keypoints_db().head(10) # First 10 samples
bounding_boxes_df = load_bounding_boxes_db()
print(f" - {len(keypoints_df)} keypoint samples")
print(f" - {len(bounding_boxes_df)} bounding boxes")
# Predict postures
print("\nπ€ Predicting postures...")
results = predict_posture(keypoints_df, bounding_boxes_df)
# Display results
print("\nπ RESULTS:")
print(f" Clusters: {results['clusters']}")
print(f" Good posture: {results['good_posture']}")
print(f" Confidence: {results['confidence']:.3f}")
# Summary
n_good = results["good_posture"].sum()
n_total = len(results["clusters"])
print(f"\nβ
SUMMARY:")
print(f" Good postures: {n_good}/{n_total} ({n_good/n_total*100:.1f}%)")
print(f" Cluster breakdown: {np.bincount(results['clusters'])}")
# Individual results
print(f"\nπ INDIVIDUAL RESULTS:")
for i, (cluster, good, conf) in enumerate(
zip(results["clusters"], results["good_posture"], results["confidence"])
):
status = "β
GOOD" if good else "β BAD"
print(f" Sample {i+1}: Cluster {cluster} | {status} | Confidence: {conf:.3f}")
except Exception as e:
print(f"β Error: {e}")
print("π‘ Make sure you have trained models in 'trained_models/keypoint_models/'")
if __name__ == "__main__":
example_usage()