-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
267 lines (226 loc) · 7.67 KB
/
database.py
File metadata and controls
267 lines (226 loc) · 7.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
"""
Database connection and operations for RDS PostgreSQL.
"""
import psycopg2
from psycopg2.extras import RealDictCursor, Json
from typing import Dict, Any, Optional
from fastapi import HTTPException
import os
import random
from contextlib import contextmanager
from dotenv import load_dotenv
# Load environment variables from .env file
# Use override=True to ensure .env values take precedence over system environment
load_dotenv(override=True)
# RDS Connection Configuration
DB_CONFIG = {
'host': os.getenv('RDS_HOST', '52.206.226.18'),
'port': int(os.getenv('RDS_PORT', 5432)),
'database': os.getenv('RDS_DATABASE', 'postgres'),
'user': os.getenv('RDS_USER', 'postgres'),
'password': os.getenv('RDS_PASSWORD'),
'sslmode': 'require' # RDS requires SSL encryption
}
@contextmanager
def get_db_connection():
"""
Context manager for database connections.
Automatically handles connection closing.
Usage:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
"""
conn = None
try:
conn = psycopg2.connect(**DB_CONFIG)
yield conn
conn.commit()
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
conn.close()
def test_connection():
"""
Test database connection.
Returns True if connection successful, False otherwise.
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
print("✓ Database connection successful!")
return True
except Exception as e:
print(f"✗ Database connection failed: {str(e)}")
return False
def generate_8_digit_id() -> int:
"""
Generate a random 8-digit integer ID.
Range: 10000000 to 99999999
Returns:
8-digit integer
"""
return random.randint(10000000, 99999999)
def save_build(owner_id: int, canvas: Dict[str, Any], cf_template: Optional[Dict[str, Any]] = None) -> int:
"""
Save a new build with canvas and optional CloudFormation template.
Generates an 8-digit unique ID for the build.
Args:
owner_id: User ID of the build owner
canvas: Canvas JSON from frontend
cf_template: Generated CloudFormation template JSON
Returns:
build_id: 8-digit ID of the created build
"""
with get_db_connection() as conn:
cursor = conn.cursor()
# Try up to 10 times to insert with a unique ID
max_attempts = 10
for attempt in range(max_attempts):
build_id = generate_8_digit_id()
try:
cursor.execute(
"""
INSERT INTO build (id, owner_id, canvas, cf_template)
VALUES (%s, %s, %s, %s)
RETURNING id
""",
(build_id, owner_id, Json(canvas), Json(cf_template) if cf_template else None)
)
# If successful, retrieve the ID and return
build_id = cursor.fetchone()[0]
print(f"✓ Build saved with ID: {build_id}")
return build_id
except psycopg2.errors.UniqueViolation:
# ID collision - rollback and try again with a new ID
conn.rollback()
if attempt == max_attempts - 1:
raise Exception(f"Failed to generate unique build ID after {max_attempts} attempts")
continue
raise Exception("Failed to save build: exceeded maximum retry attempts")
def get_build(build_id: int) -> Optional[Dict[str, Any]]:
"""
Get build by ID.
Args:
build_id: ID of the build
Returns:
Build record with canvas and cf_template
"""
with get_db_connection() as conn:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(
"""
SELECT id, owner_id, canvas, cf_template, created_at
FROM build
WHERE id = %s
""",
(build_id,)
)
result = cursor.fetchone()
return dict(result) if result else None
def update_build_canvas_and_template(build_id: int, canvas: Dict[str, Any], cf_template: Optional[Dict[str, Any]] = None) -> bool:
"""
Update existing build with new canvas and/or CF template.
Used when user updates their deployment.
Args:
build_id: ID of the build to update
canvas: Updated canvas JSON
cf_template: Updated CloudFormation template JSON
Returns:
True if update successful, False otherwise
"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE build
SET canvas = %s,
cf_template = %s
WHERE id = %s
""",
(Json(canvas), Json(cf_template) if cf_template else None, build_id)
)
rows_affected = cursor.rowcount
if rows_affected > 0:
print(f"Build {build_id} updated successfully")
return True
else:
print(f"Build {build_id} not found")
return False
def get_builds_by_owner(owner_id: int) -> list:
with get_db_connection() as conn:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(
"""
SELECT id, owner_id, canvas, cf_template, created_at,project_name,description,status
FROM build
WHERE owner_id = %s
ORDER BY created_at DESC
""",
(owner_id,)
)
results = cursor.fetchall()
return [dict(row) for row in results]
def is_build_deployed(build_id: int) -> bool:
"""
Check if a build has been deployed (has a CF template).
Args:
build_id: ID of the build
Returns:
True if build has a CloudFormation template (deployed)
"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT cf_template IS NOT NULL as deployed
FROM build
WHERE id = %s
""",
(build_id,)
)
result = cursor.fetchone()
return result[0] if result else False
def log_activity(build_id: int, user_id: int, change: str):
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO activity_log (build_id, user_id, change)
VALUES (%s, %s, %s)
""",
(build_id, user_id, change)
)
print(f"✓ Activity logged for build {build_id}")
def get_access_token_for_owner(owner_username: str) -> str:
"""
Retrieves the GitHub access token from the dedicated encrypted column.
"""
token = None
with get_db_connection() as conn:
cursor = conn.cursor()
# Select ONLY the access token
cursor.execute(
"""
SELECT github_access_token
FROM account
WHERE github_login = %s
""",
(owner_username,)
)
result = cursor.fetchone()
if result:
# Result is (token_string,), so we take the first element
token = result[0]
if not token:
raise HTTPException(status_code=404, detail=f"Token not found for user '{owner_username}'.")
return token
if __name__ == "__main__":
# Test connection when running this file directly
print("Testing database connection...")
test_connection()