STREAMFLOW — GỢI Ý CẢI THIỆN CODE
Đánh giá tổng quan: Project có kiến trúc tốt, pipeline rõ ràng. Dưới đây là
các điểm cần chú ý, sắp xếp theo mức độ ưu tiên.
========================================
- BẢO MẬT (CRITICAL)
========================================
[1.1] Secret key hardcode trong config.py
File: api_service/src/config.py, dòng:
secret_key: str = "streamflow-super-secret-key-change-in-production"
Vấn đề: Nếu ai clone repo mà quên đổi biến môi trường, secret JWT sẽ bị lộ.
Gợi ý: Không có giá trị default, bắt buộc phải set từ env:
secret_key: str # không có default — raise error nếu thiếu
Hoặc dùng pydantic validator để báo lỗi rõ ràng khi start.
[1.2] CORS allow_origins = [""]
File: api_service/src/main.py, dòng:
allow_origins=[""], # tighten in production
Vấn đề: Cho phép mọi domain gọi API, nguy hiểm khi deploy thật.
Gợi ý: Đọc từ env:
CORS_ORIGINS=http://localhost:3000,https://yourdomain.com
[1.3] Password MySQL hardcode trong config
File: api_service/src/config.py
db_url: str = "mysql+pymysql://root:stream_flow@mysql:3306/..."
Vấn đề: Nếu .env không được load, app vẫn chạy với password mặc định.
Gợi ý: Tách user/password ra biến riêng không có default, hoặc dùng
secret management (Docker secrets, Vault).
[1.4] Chạy MySQL với user root
Gợi ý: Tạo user MySQL riêng với quyền hạn chế (chỉ SELECT/INSERT/UPDATE
trên database cần thiết), không dùng root cho ứng dụng.
========================================
2. HIỆU NĂNG (HIGH)
[2.1] N+1 query trong list_latest_quotes()
File: api_service/src/services/stock_service.py
Vấn đề: Hàm load toàn bộ trade_rows, quote_rows, foreign_rows rồi build
dict trong Python. Khi có 1500+ mã, mỗi lần gọi GET /stocks sẽ load rất
nhiều dữ liệu vào RAM.
Gợi ý: Dùng một query JOIN duy nhất với subquery lấy max(id) per symbol,
thay vì 3 query riêng + join trong Python:
SELECT t.*, q.bid_price1, q.bid_vol1, ...
FROM data_trade t
LEFT JOIN data_quote q ON q.id = (SELECT MAX(id) FROM data_quote WHERE symbol_id = t.symbol)
WHERE t.id IN (SELECT MAX(id) FROM data_trade GROUP BY symbol)
[2.2] Thiếu cache cho GET /stocks
Vấn đề: Endpoint này được gọi mỗi vài giây bởi frontend, nhưng data thực
tế chỉ thay đổi khi có tick mới từ Kafka.
Gợi ý: Thêm in-memory cache với TTL 1-2 giây (dùng asyncio.Lock + dict),
hoặc dùng Redis nếu scale nhiều instance API.
[2.3] MySQL polling cho candlestick mỗi 10 giây dùng raw pymysql
File: api_service/src/websocket/bridge.py, hàm _poll_candlesticks()
Vấn đề: Mở và đóng connection MySQL mỗi 10s, không có connection pool.
Nếu MySQL chậm hoặc disconnect, exception sẽ bỏ qua và vòng lặp tiếp tục.
Gợi ý:
- Dùng connection pool (aiomysql hoặc tái dùng SQLAlchemy async engine)
- Thêm exponential backoff khi có lỗi liên tiếp
[2.4] broadcast_all() gửi price_update tới MỌI client
File: api_service/src/websocket/bridge.py
Vấn đề: Mỗi tick trade đều broadcast tới tất cả connection qua broadcast_all().
Khi có 500+ clients và 1500+ mã cùng lúc → bottleneck lớn.
Gợi ý: Chỉ broadcast_all() cho index_update. price_update chỉ nên gửi
qua broadcast_to_symbol() cho client đang xem mã đó. Frontend dùng endpoint
GET /stocks để lấy bảng giá toàn thị trường.
========================================
3. ĐỘ TIN CẬY / ROBUSTNESS (MEDIUM)
[3.1] Tạo database/table trong lifespan() không dùng migration tool
File: api_service/src/main.py
Vấn đề: CREATE TABLE IF NOT EXISTS mỗi lần start — không track được schema
version, khó rollback khi thay đổi cấu trúc bảng.
Gợi ý: Dùng Alembic để quản lý migration. Đặc biệt quan trọng khi thêm
cột mới hoặc thay đổi index sau này.
[3.2] kafka_bridge_loop() không có retry logic
File: api_service/src/websocket/bridge.py
Vấn đề: Nếu Kafka ngắt kết nối giữa chừng, loop dừng hoàn toàn và không
tự reconnect. Consumer group sẽ bị mất.
Gợi ý: Wrap vòng while True bên ngoài với retry:
while True:
try:
await _run_consumer()
except Exception as e:
logger.error("Bridge crashed, restarting in 5s: %s", e)
await asyncio.sleep(5)
[3.3] consumer_unified.py chạy nhiều thread nhưng không có health check
File: consumer/consumer_unified.py
Vấn đề: Nếu một thread consumer chết (exception không catch), các thread
khác vẫn chạy bình thường và Docker không biết service đang lỗi.
Gợi ý: Thêm watchdog thread kiểm tra tất cả consumer thread còn sống,
hoặc expose HTTP endpoint /health từ consumer container.
[3.4] Fallback trong get_ohlcv() dùng volume=_f(r.last_vol) (float)
File: api_service/src/services/stock_service.py, cuối hàm get_ohlcv()
Vấn đề: volume nên là int nhưng đang dùng _f() (float). Tuy nhỏ nhưng
có thể gây lỗi khi frontend expect integer.
Gợi ý: Đổi thành _i(r.last_vol).
[3.5] Warrant detection logic dễ sai edge case
File: api_service/src/services/stock_service.py
def _is_warrant(symbol: str) -> bool:
return len(symbol) > 3 and symbol[-4:].isdigit() and symbol[:2] not in ...
Vấn đề: Logic detect warrant dựa trên pattern chuỗi, dễ false positive
với mã mới. Nên có bảng tham chiếu từ SSI hoặc field instrument_type
trong database nếu SSI cung cấp.
[3.6] _disconnect_unsafe() gọi async disconnect nhưng tên có "unsafe"
File: api_service/src/websocket/manager.py
Vấn đề: Tên misleading — hàm này không close WebSocket connection thực
sự (không gọi ws.close()), chỉ xóa khỏi internal dict. Nếu client vẫn
còn kết nối ở transport layer, có thể leak.
Gợi ý: Thêm try: await ws.close() except: pass trước khi xóa khỏi dict.
========================================
4. CODE QUALITY / MAINTAINABILITY
[4.1] Hardcode database name "data" trong bridge.py
File: api_service/src/websocket/bridge.py
database="data",
Gợi ý: Đưa vào Settings hoặc ít nhất là constant ở đầu file.
[4.2] StockService nhận 2 DB nhưng chỉ inject 1
File: api_service/src/services/stock_service.py
Vấn đề: Constructor nhận db: Session nhưng cần cả streaming_db lẫn
warehouse_db. Hiện tại inject 1 session từ get_streaming_db() và tự query
cả 2 database qua cross-db SQL (data.* và warehouse.*).
Gợi ý: Tách ra 2 dependency rõ ràng:
def init(self, streaming_db: Session, warehouse_db: Session)
Điều này làm rõ hàm nào đọc từ DB nào.
[4.3] get_history() tính timestamp bằng toordinal() — dễ nhầm timezone
File: api_service/src/services/stock_service.py
timestamp=(r.trading_date.toordinal() - _EPOCH_ORD) * 86400 * 1000
Vấn đề: trading_date là date (không có timezone). Cách tính này giả định
UTC, nhưng HOSE giao dịch theo giờ Việt Nam (UTC+7).
Gợi ý: Dùng datetime.combine(r.trading_date, time.min).replace(tzinfo=timezone.utc)
hoặc xác nhận frontend xử lý timestamp này là "ngày" không phải "thời điểm".
[4.4] _ETF_PREFIXES là class variable nhưng được dùng như global constant
File: api_service/src/services/stock_service.py
Gợi ý: Đưa ra ngoài class thành module-level constant:
_ETF_PREFIXES: frozenset[str] = frozenset({"VF", "E1", "SSIAM", ...})
[4.5] Không có type annotation đầy đủ ở nhiều chỗ
Ví dụ: các hàm _parse_trade, _parse_quote, _parse_index trong bridge.py
return dict | None nhưng không annotate. Với Python 3.12 nên dùng
TypedDict để define cấu trúc message rõ ràng hơn.
========================================
5. THIẾU / NÊN THÊM
[5.1] Không có rate limiting trên API
Vấn đề: Ai cũng có thể spam GET /stocks hoặc WebSocket connect không giới hạn.
Gợi ý: Thêm slowapi (dùng Redis hoặc memory) cho REST endpoints.
[5.2] Không có log rotation config cho production
Consumer dùng RotatingFileHandler (tốt), nhưng API service chỉ dùng
basicConfig. Nên cấu hình log level từ env và có file handler khi production.
[5.3] Thiếu integration test
Hiện chỉ có test/ với testStream và testML — không có test cho API endpoints
hoặc consumer logic. Gợi ý thêm pytest + httpx cho API tests.
[5.4] Docker image dùng image: apache/kafka:latest
"latest" tag có thể thay đổi bất ngờ. Nên pin version cụ thể:
image: apache/kafka:3.7.0
[5.5] Không có .dockerignore hoặc .gitignore cho frontend/node_modules
frontend/node_modules/ đang bị commit vào repo (thấy trong zip). Nên
thêm vào .gitignore và dùng multi-stage Docker build để cài npm packages
trong build stage, không commit node_modules.
========================================
TÓM TẮT ƯU TIÊN
CRITICAL (fix ngay trước deploy):
- [1.1] Secret key JWT không có default
- [1.2] CORS allow_origins = ["*"]
- [5.5] node_modules commit vào repo
HIGH (nên fix trong sprint tới):
- [2.1] N+1 query trong list_latest_quotes
- [2.4] broadcast_all() cho mọi tick
- [3.2] Kafka bridge không tự reconnect
- [3.3] Consumer thread không có health check
MEDIUM (cải thiện dần):
- [2.2] Cache cho GET /stocks
- [3.1] Dùng Alembic thay CREATE TABLE IF NOT EXISTS
- [3.4] volume dùng float thay vì int
- [5.1] Rate limiting
LOW (nice to have):
- [4.x] Code quality improvements
- [5.2] Log rotation
- [5.3] Integration tests
STREAMFLOW — GỢI Ý CẢI THIỆN CODE
Đánh giá tổng quan: Project có kiến trúc tốt, pipeline rõ ràng. Dưới đây là
các điểm cần chú ý, sắp xếp theo mức độ ưu tiên.
========================================
========================================
[1.1] Secret key hardcode trong config.py
File: api_service/src/config.py, dòng:
secret_key: str = "streamflow-super-secret-key-change-in-production"
Vấn đề: Nếu ai clone repo mà quên đổi biến môi trường, secret JWT sẽ bị lộ.
Gợi ý: Không có giá trị default, bắt buộc phải set từ env:
secret_key: str # không có default — raise error nếu thiếu
Hoặc dùng pydantic validator để báo lỗi rõ ràng khi start.
[1.2] CORS allow_origins = [""]
File: api_service/src/main.py, dòng:
allow_origins=[""], # tighten in production
Vấn đề: Cho phép mọi domain gọi API, nguy hiểm khi deploy thật.
Gợi ý: Đọc từ env:
CORS_ORIGINS=http://localhost:3000,https://yourdomain.com
[1.3] Password MySQL hardcode trong config
File: api_service/src/config.py
db_url: str = "mysql+pymysql://root:stream_flow@mysql:3306/..."
Vấn đề: Nếu .env không được load, app vẫn chạy với password mặc định.
Gợi ý: Tách user/password ra biến riêng không có default, hoặc dùng
secret management (Docker secrets, Vault).
[1.4] Chạy MySQL với user root
Gợi ý: Tạo user MySQL riêng với quyền hạn chế (chỉ SELECT/INSERT/UPDATE
trên database cần thiết), không dùng root cho ứng dụng.
========================================
2. HIỆU NĂNG (HIGH)
[2.1] N+1 query trong list_latest_quotes()
File: api_service/src/services/stock_service.py
Vấn đề: Hàm load toàn bộ trade_rows, quote_rows, foreign_rows rồi build
dict trong Python. Khi có 1500+ mã, mỗi lần gọi GET /stocks sẽ load rất
nhiều dữ liệu vào RAM.
Gợi ý: Dùng một query JOIN duy nhất với subquery lấy max(id) per symbol,
thay vì 3 query riêng + join trong Python:
SELECT t.*, q.bid_price1, q.bid_vol1, ...
FROM data_trade t
LEFT JOIN data_quote q ON q.id = (SELECT MAX(id) FROM data_quote WHERE symbol_id = t.symbol)
WHERE t.id IN (SELECT MAX(id) FROM data_trade GROUP BY symbol)
[2.2] Thiếu cache cho GET /stocks
Vấn đề: Endpoint này được gọi mỗi vài giây bởi frontend, nhưng data thực
tế chỉ thay đổi khi có tick mới từ Kafka.
Gợi ý: Thêm in-memory cache với TTL 1-2 giây (dùng asyncio.Lock + dict),
hoặc dùng Redis nếu scale nhiều instance API.
[2.3] MySQL polling cho candlestick mỗi 10 giây dùng raw pymysql
File: api_service/src/websocket/bridge.py, hàm _poll_candlesticks()
Vấn đề: Mở và đóng connection MySQL mỗi 10s, không có connection pool.
Nếu MySQL chậm hoặc disconnect, exception sẽ bỏ qua và vòng lặp tiếp tục.
Gợi ý:
[2.4] broadcast_all() gửi price_update tới MỌI client
File: api_service/src/websocket/bridge.py
Vấn đề: Mỗi tick trade đều broadcast tới tất cả connection qua broadcast_all().
Khi có 500+ clients và 1500+ mã cùng lúc → bottleneck lớn.
Gợi ý: Chỉ broadcast_all() cho index_update. price_update chỉ nên gửi
qua broadcast_to_symbol() cho client đang xem mã đó. Frontend dùng endpoint
GET /stocks để lấy bảng giá toàn thị trường.
========================================
3. ĐỘ TIN CẬY / ROBUSTNESS (MEDIUM)
[3.1] Tạo database/table trong lifespan() không dùng migration tool
File: api_service/src/main.py
Vấn đề: CREATE TABLE IF NOT EXISTS mỗi lần start — không track được schema
version, khó rollback khi thay đổi cấu trúc bảng.
Gợi ý: Dùng Alembic để quản lý migration. Đặc biệt quan trọng khi thêm
cột mới hoặc thay đổi index sau này.
[3.2] kafka_bridge_loop() không có retry logic
File: api_service/src/websocket/bridge.py
Vấn đề: Nếu Kafka ngắt kết nối giữa chừng, loop dừng hoàn toàn và không
tự reconnect. Consumer group sẽ bị mất.
Gợi ý: Wrap vòng while True bên ngoài với retry:
while True:
try:
await _run_consumer()
except Exception as e:
logger.error("Bridge crashed, restarting in 5s: %s", e)
await asyncio.sleep(5)
[3.3] consumer_unified.py chạy nhiều thread nhưng không có health check
File: consumer/consumer_unified.py
Vấn đề: Nếu một thread consumer chết (exception không catch), các thread
khác vẫn chạy bình thường và Docker không biết service đang lỗi.
Gợi ý: Thêm watchdog thread kiểm tra tất cả consumer thread còn sống,
hoặc expose HTTP endpoint /health từ consumer container.
[3.4] Fallback trong get_ohlcv() dùng
volume=_f(r.last_vol)(float)File: api_service/src/services/stock_service.py, cuối hàm get_ohlcv()
Vấn đề: volume nên là int nhưng đang dùng _f() (float). Tuy nhỏ nhưng
có thể gây lỗi khi frontend expect integer.
Gợi ý: Đổi thành _i(r.last_vol).
[3.5] Warrant detection logic dễ sai edge case
File: api_service/src/services/stock_service.py
def _is_warrant(symbol: str) -> bool:
return len(symbol) > 3 and symbol[-4:].isdigit() and symbol[:2] not in ...
Vấn đề: Logic detect warrant dựa trên pattern chuỗi, dễ false positive
với mã mới. Nên có bảng tham chiếu từ SSI hoặc field
instrument_typetrong database nếu SSI cung cấp.
[3.6] _disconnect_unsafe() gọi async disconnect nhưng tên có "unsafe"
File: api_service/src/websocket/manager.py
Vấn đề: Tên misleading — hàm này không close WebSocket connection thực
sự (không gọi ws.close()), chỉ xóa khỏi internal dict. Nếu client vẫn
còn kết nối ở transport layer, có thể leak.
Gợi ý: Thêm try: await ws.close() except: pass trước khi xóa khỏi dict.
========================================
4. CODE QUALITY / MAINTAINABILITY
[4.1] Hardcode database name "data" trong bridge.py
File: api_service/src/websocket/bridge.py
database="data",
Gợi ý: Đưa vào Settings hoặc ít nhất là constant ở đầu file.
[4.2] StockService nhận 2 DB nhưng chỉ inject 1
File: api_service/src/services/stock_service.py
Vấn đề: Constructor nhận
db: Sessionnhưng cần cả streaming_db lẫnwarehouse_db. Hiện tại inject 1 session từ get_streaming_db() và tự query
cả 2 database qua cross-db SQL (data.* và warehouse.*).
Gợi ý: Tách ra 2 dependency rõ ràng:
def init(self, streaming_db: Session, warehouse_db: Session)
Điều này làm rõ hàm nào đọc từ DB nào.
[4.3] get_history() tính timestamp bằng toordinal() — dễ nhầm timezone
File: api_service/src/services/stock_service.py
timestamp=(r.trading_date.toordinal() - _EPOCH_ORD) * 86400 * 1000
Vấn đề: trading_date là date (không có timezone). Cách tính này giả định
UTC, nhưng HOSE giao dịch theo giờ Việt Nam (UTC+7).
Gợi ý: Dùng datetime.combine(r.trading_date, time.min).replace(tzinfo=timezone.utc)
hoặc xác nhận frontend xử lý timestamp này là "ngày" không phải "thời điểm".
[4.4] _ETF_PREFIXES là class variable nhưng được dùng như global constant
File: api_service/src/services/stock_service.py
Gợi ý: Đưa ra ngoài class thành module-level constant:
_ETF_PREFIXES: frozenset[str] = frozenset({"VF", "E1", "SSIAM", ...})
[4.5] Không có type annotation đầy đủ ở nhiều chỗ
Ví dụ: các hàm _parse_trade, _parse_quote, _parse_index trong bridge.py
return
dict | Nonenhưng không annotate. Với Python 3.12 nên dùngTypedDict để define cấu trúc message rõ ràng hơn.
========================================
5. THIẾU / NÊN THÊM
[5.1] Không có rate limiting trên API
Vấn đề: Ai cũng có thể spam GET /stocks hoặc WebSocket connect không giới hạn.
Gợi ý: Thêm slowapi (dùng Redis hoặc memory) cho REST endpoints.
[5.2] Không có log rotation config cho production
Consumer dùng RotatingFileHandler (tốt), nhưng API service chỉ dùng
basicConfig. Nên cấu hình log level từ env và có file handler khi production.
[5.3] Thiếu integration test
Hiện chỉ có test/ với testStream và testML — không có test cho API endpoints
hoặc consumer logic. Gợi ý thêm pytest + httpx cho API tests.
[5.4] Docker image dùng
image: apache/kafka:latest"latest" tag có thể thay đổi bất ngờ. Nên pin version cụ thể:
image: apache/kafka:3.7.0
[5.5] Không có .dockerignore hoặc .gitignore cho frontend/node_modules
frontend/node_modules/ đang bị commit vào repo (thấy trong zip). Nên
thêm vào .gitignore và dùng multi-stage Docker build để cài npm packages
trong build stage, không commit node_modules.
========================================
TÓM TẮT ƯU TIÊN
CRITICAL (fix ngay trước deploy):
HIGH (nên fix trong sprint tới):
MEDIUM (cải thiện dần):
LOW (nice to have):