-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
1740 lines (1511 loc) Β· 71.4 KB
/
app.py
File metadata and controls
1740 lines (1511 loc) Β· 71.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
import uuid
import os
import json
import subprocess
from datetime import datetime
import shutil
from typing import Optional
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import sqlite3
from contextlib import asynccontextmanager
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Import functions from other modules
from AI_feeds.predict import run_prediction
from pinata_uploader import upload_to_ipfs
# Global variables for real-time tracking
connected_clients = set()
prediction_stats = {
"total_predictions": 0,
"approved_predictions": 0,
"accuracy": 96
}
# Real-time analytics data - Infrastructure and Operational Focus
analytics_data = {
# Historical water consumption from uploaded CSV files
"actual_consumption": {
"monthly_trends": [
{"month": "Jan", "consumption": 0, "year": 2025},
{"month": "Feb", "consumption": 0, "year": 2025},
{"month": "Mar", "consumption": 0, "year": 2025},
{"month": "Apr", "consumption": 0, "year": 2025},
{"month": "May", "consumption": 0, "year": 2025},
{"month": "Jun", "consumption": 0, "year": 2025},
{"month": "Jul", "consumption": 0, "year": 2025},
{"month": "Aug", "consumption": 0, "year": 2025},
{"month": "Sep", "consumption": 0, "year": 2025},
{"month": "Oct", "consumption": 0, "year": 2025},
{"month": "Nov", "consumption": 0, "year": 2025},
{"month": "Dec", "consumption": 0, "year": 2025}
],
"borough_totals": {
"BRONX": 0,
"BROOKLYN": 0,
"MANHATTAN": 0,
"QUEENS": 0,
"STATEN_ISLAND": 0
}
},
# System infrastructure metrics
"infrastructure": {
"system_uptime": 99.8, # Percentage
"data_processing_rate": 0, # Files processed per hour
"database_size_mb": 0,
"api_response_time_ms": 0,
"file_upload_success_rate": 100,
"blockchain_connectivity": True,
"ipfs_connectivity": True,
"ai_model_status": "healthy"
},
# Data quality and processing metrics
"data_quality": {
"completeness_score": 95, # Percentage of complete data
"accuracy_score": 98, # Data validation accuracy
"timeliness_score": 92, # How recent the data is
"consistency_score": 97, # Data consistency across sources
"processed_files_count": 0,
"failed_uploads_count": 0
},
# Water conservation and efficiency metrics (not prediction-based)
"conservation": {
"monthly_conservation_rate": [2.1, 1.8, 2.5, 3.2, 2.9, 3.1, 2.7, 2.4, 3.0, 2.8, 3.3, 3.5], # Percentage reduction
"conservation_targets_met": 8, # Out of 12 months
"total_water_saved_mgd": 15.7, # Million gallons per day saved
"efficiency_improvements": [
{"category": "Infrastructure Upgrades", "savings_mgd": 5.2},
{"category": "Leak Detection", "savings_mgd": 3.8},
{"category": "Smart Meters", "savings_mgd": 4.1},
{"category": "Public Awareness", "savings_mgd": 2.6}
]
},
# System operational metrics
"operations": {
"daily_processing_volume": [], # Daily data processing volumes
"system_alerts": {
"critical": 0,
"warning": 2,
"info": 5
},
"service_health": {
"api_server": "healthy",
"database": "healthy",
"ai_engine": "healthy",
"blockchain_oracle": "healthy",
"file_storage": "healthy"
},
"performance_trends": {
"avg_response_time": [45, 42, 38, 41, 39, 37, 40, 38, 35, 36, 34, 33], # Last 12 hours in ms
"throughput": [250, 265, 280, 275, 290, 305, 295, 310, 325, 315, 340, 335] # Requests per hour
}
},
# Real-time consumption monitoring (from actual data uploads)
"real_time_consumption": [],
"last_updated": datetime.now().isoformat(),
"data_sources": ["csv_uploads", "manual_input", "system_monitoring"]
}
# Database initialization
def init_db():
"""Initialize SQLite database for tracking predictions"""
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
# Create tables if they don't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS prediction_stats (
id INTEGER PRIMARY KEY,
total_predictions INTEGER DEFAULT 0,
approved_predictions INTEGER DEFAULT 0,
accuracy REAL DEFAULT 96,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create analytics data table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analytics_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
borough TEXT,
consumption_value REAL,
quality_metric TEXT,
quality_value REAL,
efficiency_score REAL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create real-time consumption tracking table
cursor.execute('''
CREATE TABLE IF NOT EXISTS real_time_consumption (
id INTEGER PRIMARY KEY AUTOINCREMENT,
borough TEXT NOT NULL,
consumption REAL NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create analytics persistence table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analytics_data_persistent (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data_json TEXT NOT NULL,
source TEXT DEFAULT 'simulation',
has_real_data BOOLEAN DEFAULT FALSE,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create recent activities table for real-time activity feed
cursor.execute('''
CREATE TABLE IF NOT EXISTS recent_activities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
activity_type TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
metadata JSON,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id TEXT DEFAULT 'system',
severity TEXT DEFAULT 'info'
)
''')
# Insert initial stats if table is empty
cursor.execute('SELECT COUNT(*) FROM prediction_stats')
if cursor.fetchone()[0] == 0:
cursor.execute('''
INSERT INTO prediction_stats (total_predictions, approved_predictions, accuracy)
VALUES (0, 0, 96)
''')
conn.commit()
conn.close()
# Background task for keeping SSE connections alive (no data updates)
async def keep_connections_alive():
"""Background task to send heartbeat messages to keep SSE connections alive"""
while True:
try:
await asyncio.sleep(30) # Send heartbeat every 30 seconds
# Only send heartbeat, no data updates
await broadcast_heartbeat()
except Exception as e:
print(f"Error in heartbeat task: {e}")
async def broadcast_heartbeat():
"""Send heartbeat to keep SSE connections alive without updating data"""
if connected_clients:
disconnected = []
heartbeat_data = {
"type": "heartbeat",
"timestamp": datetime.now().isoformat(),
"message": "Connection alive - waiting for prediction data"
}
for client in connected_clients:
try:
await client.put(f"data: {json.dumps(heartbeat_data)}\n\n")
except Exception as e:
print(f"Error sending heartbeat: {e}")
disconnected.append(client)
# Remove disconnected connections
for conn in disconnected:
connected_clients.remove(conn)
# Lifespan event handler
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
init_db()
load_stats_from_db()
# Load persisted analytics data from previous sessions
has_persisted_data = load_analytics_from_db()
if not has_persisted_data:
print("π No persisted analytics data found - using default values")
# Start background task for keeping connections alive (heartbeat only)
task = asyncio.create_task(keep_connections_alive())
print("β
Analytics will ONLY update when predictions are generated")
yield
# Shutdown - cancel background task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Create FastAPI app with lifespan handler
app = FastAPI(
title="BIWMS API",
description="Blockchain Integrated Water Management System API",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Ensure directories exist
os.makedirs("data_uploads", exist_ok=True)
os.makedirs("AI_feeds/outputs", exist_ok=True)
# Database helper functions
def load_stats_from_db():
"""Load current stats from database"""
global prediction_stats
try:
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
cursor.execute('SELECT total_predictions, approved_predictions, accuracy FROM prediction_stats ORDER BY id DESC LIMIT 1')
result = cursor.fetchone()
if result:
prediction_stats["total_predictions"] = result[0]
prediction_stats["approved_predictions"] = result[1]
prediction_stats["accuracy"] = result[2]
print(f"π Loaded prediction stats from DB: {prediction_stats}")
else:
print("β οΈ No prediction stats found in database - using default values")
conn.close()
except Exception as e:
print(f"β Error loading stats from DB: {e}")
def save_analytics_to_db():
"""Save current analytics data to database for persistence"""
global analytics_data
try:
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
# Check if we have real prediction data
has_real_data = False
source = "simulation"
if analytics_data.get("real_time_consumption"):
has_real_data = any(item.get("source") == "real_prediction" for item in analytics_data["real_time_consumption"])
if has_real_data:
source = "real_prediction"
# Clear old data and insert new
cursor.execute('DELETE FROM analytics_data_persistent')
cursor.execute('''
INSERT INTO analytics_data_persistent (data_json, source, has_real_data)
VALUES (?, ?, ?)
''', (json.dumps(analytics_data), source, has_real_data))
conn.commit()
conn.close()
print(f"πΎ Analytics data saved to database (source: {source}, real_data: {has_real_data})")
except Exception as e:
print(f"Error saving analytics to DB: {e}")
def load_analytics_from_db():
"""Load persisted analytics data from database"""
global analytics_data
try:
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
cursor.execute('SELECT data_json, source, has_real_data FROM analytics_data_persistent ORDER BY id DESC LIMIT 1')
result = cursor.fetchone()
if result:
data_json, source, has_real_data = result
loaded_data = json.loads(data_json)
analytics_data.update(loaded_data)
print(f"π Loaded persisted analytics data (source: {source}, real_data: {has_real_data})")
return True
conn.close()
return False
except Exception as e:
print(f"Error loading analytics from DB: {e}")
return False
def update_stats_in_db():
"""Update stats in database"""
try:
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
cursor.execute('''
UPDATE prediction_stats
SET total_predictions = ?, approved_predictions = ?, accuracy = ?, last_updated = CURRENT_TIMESTAMP
WHERE id = (SELECT id FROM prediction_stats ORDER BY id DESC LIMIT 1)
''', (prediction_stats["total_predictions"], prediction_stats["approved_predictions"], prediction_stats["accuracy"]))
conn.commit()
conn.close()
except Exception as e:
print(f"Error updating stats in DB: {e}")
def add_activity(activity_type: str, title: str, description: str = None, metadata: dict = None, severity: str = "info"):
"""Add a new activity to the real-time activity feed"""
try:
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO recent_activities (activity_type, title, description, metadata, severity)
VALUES (?, ?, ?, ?, ?)
''', (activity_type, title, description, json.dumps(metadata) if metadata else None, severity))
conn.commit()
conn.close()
print(f"π Activity logged: {title}")
return True
except Exception as e:
print(f"β Error logging activity: {e}")
return False
def get_recent_activities(limit: int = 20):
"""Get recent activities for the activity feed"""
try:
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
cursor.execute('''
SELECT activity_type, title, description, metadata, timestamp, severity
FROM recent_activities
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
activities = []
for row in cursor.fetchall():
activity_type, title, description, metadata, timestamp, severity = row
activities.append({
"type": activity_type,
"title": title,
"description": description,
"metadata": json.loads(metadata) if metadata else {},
"timestamp": timestamp,
"severity": severity,
"time_ago": get_time_ago(timestamp),
"raw_timestamp": timestamp # Send raw timestamp for frontend calculation
})
conn.close()
return activities
except Exception as e:
print(f"β Error fetching activities: {e}")
return []
def get_time_ago(timestamp_str: str) -> str:
"""Convert timestamp to human-readable time ago format"""
try:
from datetime import datetime
# Parse the timestamp as local time (remove timezone info for consistency)
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '').replace('+00:00', ''))
now = datetime.now()
diff = now - timestamp
total_seconds = diff.total_seconds()
if diff.days > 0:
return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
elif total_seconds >= 3600:
hours = int(total_seconds // 3600)
return f"{hours} hour{'s' if hours > 1 else ''} ago"
elif total_seconds >= 60:
minutes = int(total_seconds // 60)
return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
else:
return "Just now"
except:
return "Unknown"
def reset_all_analytics():
"""Reset all analytics data and prediction stats to zero"""
global analytics_data, prediction_stats
try:
# Reset in-memory analytics data to default values
analytics_data = {
"actual_consumption": {
"monthly_trends": [
{"month": "Jan", "consumption": 0, "year": 2025},
{"month": "Feb", "consumption": 0, "year": 2025},
{"month": "Mar", "consumption": 0, "year": 2025},
{"month": "Apr", "consumption": 0, "year": 2025},
{"month": "May", "consumption": 0, "year": 2025},
{"month": "Jun", "consumption": 0, "year": 2025},
{"month": "Jul", "consumption": 0, "year": 2025},
{"month": "Aug", "consumption": 0, "year": 2025},
{"month": "Sep", "consumption": 0, "year": 2025},
{"month": "Oct", "consumption": 0, "year": 2025},
{"month": "Nov", "consumption": 0, "year": 2025},
{"month": "Dec", "consumption": 0, "year": 2025}
],
"borough_totals": {
"BRONX": 0,
"BROOKLYN": 0,
"MANHATTAN": 0,
"QUEENS": 0,
"STATEN_ISLAND": 0
}
},
# System infrastructure metrics
"infrastructure": {
"system_uptime": 99.8, # Percentage
"data_processing_rate": 0, # Files processed per hour
"database_size_mb": 0,
"api_response_time_ms": 0,
"file_upload_success_rate": 100,
"blockchain_connectivity": True,
"ipfs_connectivity": True,
"ai_model_status": "healthy"
},
# Data quality and processing metrics
"data_quality": {
"completeness_score": 95, # Percentage of complete data
"accuracy_score": 98, # Data validation accuracy
"timeliness_score": 92, # How recent the data is
"consistency_score": 97, # Data consistency across sources
"processed_files_count": 0,
"failed_uploads_count": 0
},
# Water conservation and efficiency metrics (not prediction-based)
"conservation": {
"monthly_conservation_rate": [2.1, 1.8, 2.5, 3.2, 2.9, 3.1, 2.7, 2.4, 3.0, 2.8, 3.3, 3.5], # Percentage reduction
"conservation_targets_met": 8, # Out of 12 months
"total_water_saved_mgd": 15.7, # Million gallons per day saved
"efficiency_improvements": [
{"category": "Infrastructure Upgrades", "savings_mgd": 5.2},
{"category": "Leak Detection", "savings_mgd": 3.8},
{"category": "Smart Meters", "savings_mgd": 4.1},
{"category": "Public Awareness", "savings_mgd": 2.6}
]
},
# System operational metrics
"operations": {
"daily_processing_volume": [], # Daily data processing volumes
"system_alerts": {
"critical": 0,
"warning": 2,
"info": 5
},
"service_health": {
"api_server": "healthy",
"database": "healthy",
"ai_engine": "healthy",
"blockchain_oracle": "healthy",
"file_storage": "healthy"
},
"performance_trends": {
"avg_response_time": [45, 42, 38, 41, 39, 37, 40, 38, 35, 36, 34, 33], # Last 12 hours in ms
"throughput": [250, 265, 280, 275, 290, 305, 295, 310, 325, 315, 340, 335] # Requests per hour
}
},
# Real-time consumption monitoring (from actual data uploads)
"real_time_consumption": [],
"last_updated": datetime.now().isoformat(),
"data_sources": ["csv_uploads", "manual_input", "system_monitoring"]
}
# Reset prediction stats
prediction_stats = {
"total_predictions": 0,
"approved_predictions": 0,
"accuracy": 95.0
}
# Clear database tables
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
# Clear all analytics-related tables
cursor.execute('DELETE FROM analytics_events')
cursor.execute('DELETE FROM real_time_consumption')
cursor.execute('DELETE FROM analytics_data_persistent')
cursor.execute('DELETE FROM recent_activities') # Clear activity feed too
# Reset prediction stats in database
cursor.execute('DELETE FROM prediction_stats')
cursor.execute('''
INSERT INTO prediction_stats (total_predictions, approved_predictions, accuracy)
VALUES (0, 0, 95.0)
''')
conn.commit()
conn.close()
# Log the reset activity
add_activity("system", "System Reset", "Analytics system was reset to zero",
{"reset_type": "full_reset"}, "warning")
print("π FULL ANALYTICS RESET COMPLETED")
print("π All analytics data reset to default values")
print("π Prediction stats reset to zero")
print("ποΈ All database tables cleared")
print("β¨ System ready for fresh start")
return True
except Exception as e:
print(f"β Error during analytics reset: {e}")
return False
def extract_real_data_from_csv(file_path):
"""Extract real water consumption data from uploaded CSV file"""
import pandas as pd
import os
try:
# Read the CSV file
df = pd.read_csv(file_path)
# Get file stats for system metrics
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
# Extract borough data (looking for common column patterns)
borough_data = {}
borough_columns = ['BRONX', 'BROOKLYN', 'MANHATTAN', 'QUEENS', 'STATEN_ISLAND']
for borough in borough_columns:
# Look for borough columns (case insensitive)
matching_cols = [col for col in df.columns if borough.lower() in col.lower()]
if matching_cols:
borough_data[borough] = df[matching_cols[0]].sum() if not df[matching_cols[0]].isna().all() else 0
else:
borough_data[borough] = 0
# Calculate monthly patterns from the data
monthly_patterns = [0] * 12
if len(df) >= 12:
# If we have 12 or more rows, assume they represent months
for i in range(min(12, len(df))):
# Sum all numeric columns for each month
numeric_cols = df.select_dtypes(include=['number']).columns
if len(numeric_cols) > 0:
monthly_patterns[i] = df.iloc[i][numeric_cols].sum()
else:
# If less than 12 rows, distribute the data across current and recent months
current_month = datetime.now().month - 1
numeric_cols = df.select_dtypes(include=['number']).columns
if len(numeric_cols) > 0:
total_consumption = df[numeric_cols].sum().sum()
monthly_patterns[current_month] = total_consumption
# Calculate data quality metrics
total_cells = df.shape[0] * df.shape[1]
missing_cells = df.isna().sum().sum()
numeric_columns = df.select_dtypes(include=['number']).columns
quality_metrics = {
"completeness_score": ((total_cells - missing_cells) / total_cells * 100) if total_cells > 0 else 0,
"accuracy_score": 95 + (len(numeric_columns) / len(df.columns) * 5) if len(df.columns) > 0 else 95,
"timeliness_score": 100, # Assume uploaded data is current
"consistency_score": 90 + (10 * (1 - (df.duplicated().sum() / len(df)))) if len(df) > 0 else 90,
"processed_files_count": 1,
"failed_uploads_count": 0
}
# Infrastructure metrics from file processing
infrastructure_metrics = {
"data_processing_rate": max(1, len(df) / 60), # Rows per minute as proxy
"database_size_mb": file_size_mb,
"api_response_time_ms": 35 + (file_size_mb * 2), # Estimated based on file size
"file_upload_success_rate": 100,
"blockchain_connectivity": True,
"ipfs_connectivity": True,
"ai_model_status": "healthy"
}
# Conservation metrics (derived from consumption patterns)
# Calculate conservation as percentage reduction from average consumption
avg_consumption = sum(monthly_patterns) / 12 if monthly_patterns else 1
conservation_data = {
"monthly_conservation_rate": [
max(0, min(5, ((avg_consumption - val) / avg_consumption * 100) * 0.05)) if avg_consumption > 0 else 0
for val in monthly_patterns
],
"total_water_saved_mgd": sum(monthly_patterns) * 0.001, # Convert to million gallons
"conservation_targets_met": sum(1 for rate in monthly_patterns if rate > 0)
}
return {
"borough_data": borough_data,
"monthly_patterns": monthly_patterns,
"data_quality_metrics": quality_metrics,
"infrastructure_metrics": infrastructure_metrics,
"conservation_data": conservation_data,
"file_stats": {
"size_mb": file_size_mb,
"row_count": len(df),
"column_count": len(df.columns)
}
}
except Exception as e:
print(f"Error extracting data from CSV: {e}")
return {
"borough_data": {"BRONX": 0, "BROOKLYN": 0, "MANHATTAN": 0, "QUEENS": 0, "STATEN_ISLAND": 0},
"monthly_patterns": [0] * 12,
"data_quality_metrics": {"completeness_score": 0, "accuracy_score": 0, "timeliness_score": 0, "consistency_score": 0},
"infrastructure_metrics": {},
"conservation_data": {},
"file_stats": {"size_mb": 0, "row_count": 0, "column_count": 0}
}
def update_analytics_with_real_prediction(prediction_file_path, uploaded_file_path=None):
"""Update analytics data with real prediction results and uploaded data"""
global analytics_data
try:
# Load the actual prediction results
with open(prediction_file_path, 'r') as f:
prediction_data = json.load(f)
# Extract real data from uploaded CSV if available
real_csv_data = None
if uploaded_file_path and uploaded_file_path.endswith('.csv'):
real_csv_data = extract_real_data_from_csv(uploaded_file_path)
# Update actual consumption with REAL prediction data
if "predicted_allocation" in prediction_data:
for borough, data in prediction_data["predicted_allocation"].items():
borough_upper = borough.upper()
# Use actual predicted consumption values
analytics_data["actual_consumption"]["borough_totals"][borough_upper] = data["consumption_hcf"]
# Update monthly consumption trends with real data if available
if real_csv_data and real_csv_data["monthly_patterns"]:
monthly_data = real_csv_data["monthly_patterns"]
current_year = datetime.now().year
for i, value in enumerate(monthly_data):
if i < len(analytics_data["actual_consumption"]["monthly_trends"]):
analytics_data["actual_consumption"]["monthly_trends"][i]["consumption"] = round(value, 2)
analytics_data["actual_consumption"]["monthly_trends"][i]["year"] = current_year
# Update data quality metrics with real data
if real_csv_data and real_csv_data["data_quality_metrics"]:
for metric, value in real_csv_data["data_quality_metrics"].items():
if metric in analytics_data["data_quality"]:
analytics_data["data_quality"][metric] = round(value, 2)
# Update infrastructure metrics with real file processing data
if real_csv_data and real_csv_data["infrastructure_metrics"]:
for metric, value in real_csv_data["infrastructure_metrics"].items():
if metric in analytics_data["infrastructure"]:
analytics_data["infrastructure"][metric] = value
# Update conservation data with real patterns
if real_csv_data and real_csv_data["conservation_data"]:
if "monthly_conservation_rate" in real_csv_data["conservation_data"]:
analytics_data["conservation"]["monthly_conservation_rate"] = real_csv_data["conservation_data"]["monthly_conservation_rate"][:12]
if "total_water_saved_mgd" in real_csv_data["conservation_data"]:
analytics_data["conservation"]["total_water_saved_mgd"] = real_csv_data["conservation_data"]["total_water_saved_mgd"]
# Update system performance based on prediction confidence
if "confidence_score" in prediction_data:
confidence = prediction_data["confidence_score"]
# Update system metrics based on AI performance
analytics_data["infrastructure"]["system_uptime"] = min(99.9, max(95.0, confidence))
analytics_data["infrastructure"]["ai_model_status"] = "healthy" if confidence >= 90 else "degraded"
# Update data quality accuracy based on prediction confidence
analytics_data["data_quality"]["accuracy_score"] = round(confidence, 2)
# Update operational metrics
current_time = datetime.now()
analytics_data["operations"]["daily_processing_volume"].append({
"timestamp": current_time.isoformat(),
"files_processed": 1,
"data_size_mb": real_csv_data["file_stats"]["size_mb"] if real_csv_data else 0
})
# Keep only last 24 hours of processing data
cutoff_time = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
analytics_data["operations"]["daily_processing_volume"] = [
entry for entry in analytics_data["operations"]["daily_processing_volume"]
if datetime.fromisoformat(entry["timestamp"]) >= cutoff_time
]
# Add real-time consumption data based on actual predictions
if "predicted_allocation" in prediction_data:
total_consumption = sum(data["consumption_hcf"] for data in prediction_data["predicted_allocation"].values())
analytics_data["real_time_consumption"].append({
"timestamp": prediction_data.get("timestamp", datetime.now().isoformat()),
"total_consumption": total_consumption,
"source": "real_prediction",
"prediction_id": prediction_data.get("prediction_id")
})
# Keep only recent consumption data (last 100 entries)
if len(analytics_data["real_time_consumption"]) > 100:
analytics_data["real_time_consumption"] = analytics_data["real_time_consumption"][-100:]
# Update system alerts based on data quality
if real_csv_data:
quality_score = real_csv_data["data_quality_metrics"]["completeness_score"]
if quality_score < 80:
analytics_data["operations"]["system_alerts"]["warning"] += 1
elif quality_score < 60:
analytics_data["operations"]["system_alerts"]["critical"] += 1
# Update last_updated timestamp
analytics_data["last_updated"] = datetime.now().isoformat()
# Store the data in database
save_analytics_to_db()
print(f"π― Analytics updated with REAL prediction data from {prediction_file_path}")
print(f"π Real borough consumption: {[f'{k}: {v:.1f} HCF' for k, v in analytics_data['actual_consumption']['borough_totals'].items()]}")
print(f"π Analytics data last_updated: {analytics_data['last_updated']}")
print(f"πΎ Analytics data saved to database successfully")
except Exception as e:
print(f"Error updating analytics with real prediction: {e}")
# Fallback to simulation mode if real data update fails
update_analytics_data_fallback()
def update_analytics_data_fallback():
"""Fallback method with simulated infrastructure data"""
import random
global analytics_data
# Update infrastructure metrics with realistic variations
analytics_data["infrastructure"]["system_uptime"] = min(99.9, max(95.0,
analytics_data["infrastructure"]["system_uptime"] + random.uniform(-0.1, 0.2)))
analytics_data["infrastructure"]["api_response_time_ms"] = max(25, min(150,
analytics_data["infrastructure"]["api_response_time_ms"] + random.uniform(-5, 10)))
analytics_data["infrastructure"]["data_processing_rate"] = max(0.5, min(20,
analytics_data["infrastructure"]["data_processing_rate"] + random.uniform(-1, 2)))
# Update data quality metrics with small variations
quality_metrics = ["completeness_score", "accuracy_score", "timeliness_score", "consistency_score"]
for metric in quality_metrics:
if metric in analytics_data["data_quality"]:
current = analytics_data["data_quality"][metric]
variation = random.uniform(-1, 2) # Small improvements over time
analytics_data["data_quality"][metric] = max(80, min(100, current + variation))
# Update conservation rates with seasonal patterns
current_month_idx = datetime.now().month - 1
if current_month_idx < len(analytics_data["conservation"]["monthly_conservation_rate"]):
base_rate = analytics_data["conservation"]["monthly_conservation_rate"][current_month_idx]
variation = random.uniform(-0.2, 0.3) # Conservation generally improves
analytics_data["conservation"]["monthly_conservation_rate"][current_month_idx] = max(0, min(5, base_rate + variation))
# Update system alerts occasionally
if random.random() < 0.1: # 10% chance of new alert
alert_type = random.choice(["warning", "info"])
analytics_data["operations"]["system_alerts"][alert_type] += 1
# Update performance trends (last 12 data points)
if len(analytics_data["operations"]["performance_trends"]["avg_response_time"]) >= 12:
analytics_data["operations"]["performance_trends"]["avg_response_time"].pop(0)
analytics_data["operations"]["performance_trends"]["throughput"].pop(0)
# Add new performance data points
last_response = analytics_data["operations"]["performance_trends"]["avg_response_time"][-1] if analytics_data["operations"]["performance_trends"]["avg_response_time"] else 35
last_throughput = analytics_data["operations"]["performance_trends"]["throughput"][-1] if analytics_data["operations"]["performance_trends"]["throughput"] else 300
new_response = max(25, min(60, last_response + random.uniform(-3, 2)))
new_throughput = max(200, min(400, last_throughput + random.uniform(-15, 20)))
analytics_data["operations"]["performance_trends"]["avg_response_time"].append(round(new_response))
analytics_data["operations"]["performance_trends"]["throughput"].append(round(new_throughput))
# Simulate processing volume updates
current_hour = datetime.now().hour
processing_entry = {
"timestamp": datetime.now().isoformat(),
"files_processed": random.randint(0, 3),
"data_size_mb": round(random.uniform(0.1, 5.0), 2)
}
analytics_data["operations"]["daily_processing_volume"].append(processing_entry)
# Keep only last 24 hours of processing data
cutoff_time = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
analytics_data["operations"]["daily_processing_volume"] = [
entry for entry in analytics_data["operations"]["daily_processing_volume"]
if datetime.fromisoformat(entry["timestamp"]) >= cutoff_time
]
analytics_data["last_updated"] = datetime.now().isoformat()
# Keep the old function name for simulation endpoint
def update_analytics_data(prediction_data=None):
"""Update analytics data - calls the appropriate method"""
if prediction_data:
# If we have real prediction data, use it
update_analytics_with_real_prediction(prediction_data)
else:
# Otherwise use fallback simulation
update_analytics_data_fallback()
def store_analytics_event(event_type, borough=None, consumption_value=None, quality_metric=None, quality_value=None, efficiency_score=None):
"""Store analytics events in database"""
try:
conn = sqlite3.connect('dashboard_stats.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO analytics_events (event_type, borough, consumption_value, quality_metric, quality_value, efficiency_score)
VALUES (?, ?, ?, ?, ?, ?)
''', (event_type, borough, consumption_value, quality_metric, quality_value, efficiency_score))
conn.commit()
conn.close()
except Exception as e:
print(f"Error storing analytics event: {e}")
async def broadcast_stats_update():
"""Broadcast stats update to all connected clients"""
if connected_clients:
update_data = {
"type": "stats_update",
"stats": prediction_stats,
"analytics": analytics_data,
"timestamp": datetime.now().isoformat()
}
message = f"data: {json.dumps(update_data)}\n\n"
disconnected_clients = set()
print(f"π Broadcasting to {len(connected_clients)} connected clients")
print(f"π Analytics data preview: Borough allocations: {list(analytics_data['actual_consumption']['borough_totals'].items())[:2]}...")
for client in connected_clients:
try:
await client.put(message)
print("β
Message sent to client")
except Exception as e:
print(f"β Failed to send message to client: {e}")
disconnected_clients.add(client)
# Remove disconnected clients
connected_clients.difference_update(disconnected_clients)
if disconnected_clients:
print(f"π§Ή Removed {len(disconnected_clients)} disconnected clients")
else:
print("β οΈ No connected clients to broadcast to")
async def broadcast_analytics_update():
"""Broadcast analytics updates to all connected clients"""
if connected_clients:
disconnected = []
update_data = {
"type": "analytics_update",
"analytics": analytics_data,
"timestamp": datetime.now().isoformat()
}
for client in connected_clients:
try:
await client.put(f"data: {json.dumps(update_data)}\n\n")
print("π Analytics data broadcasted to client")
except Exception as e:
print(f"Error broadcasting analytics: {e}")
disconnected.append(client)
# Remove disconnected connections
for conn in disconnected:
connected_clients.remove(conn)
async def broadcast_activity_update():
"""Broadcast activity updates to all connected clients"""
if connected_clients:
disconnected = []
recent_activities = get_recent_activities(10) # Get latest 10 activities
update_data = {
"type": "activity_update",
"activities": recent_activities,
"timestamp": datetime.now().isoformat()
}
for client in connected_clients:
try:
await client.put(f"data: {json.dumps(update_data)}\n\n")
print("π Activity update broadcasted to client")
except Exception as e:
print(f"Error broadcasting activity: {e}")
disconnected.append(client)
# Remove disconnected connections
for conn in disconnected:
connected_clients.remove(conn)
def validate_file_extension(filename: str) -> bool:
"""Validate that the file has an allowed extension"""
allowed_extensions = [".csv", ".json"]
return any(filename.endswith(ext) for ext in allowed_extensions)
@app.post("/predict", response_class=JSONResponse)
async def predict(file: UploadFile = File(...), stakeholder_address: str = Form("")):
"""
Upload a file with water consumption data, run predictions,
upload results to IPFS, and submit to the AIPredictionMultisig contract.
Args:
file: CSV or JSON file with water consumption data
stakeholder_address: Ethereum address of the authenticated stakeholder
Returns:
JSON response with prediction details
"""
# Debug logging
print(f"π Received stakeholder_address: '{stakeholder_address}' (length: {len(stakeholder_address)})")
# Validate stakeholder authorization
if not stakeholder_address:
print("β No stakeholder address provided")
raise HTTPException(
status_code=403,
detail="Access denied: Stakeholder wallet address required. Please connect your wallet and verify stakeholder status."
)
# Basic address format validation (40 hex characters after 0x)
if not stakeholder_address.startswith('0x') or len(stakeholder_address) != 42:
raise HTTPException(
status_code=400,
detail="Invalid stakeholder address format. Must be a valid Ethereum address."
)
print(f"π Authorized stakeholder prediction request from: {stakeholder_address}")
# Generate a unique prediction ID
prediction_id = str(uuid.uuid4())
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Validate file extension
if not validate_file_extension(file.filename):
raise HTTPException(status_code=400, detail="Only CSV or JSON files are allowed")
# Create unique filename