diff --git a/server/mergin/app.py b/server/mergin/app.py index 8910c6d1..d0fd2f3a 100644 --- a/server/mergin/app.py +++ b/server/mergin/app.py @@ -209,7 +209,6 @@ def load_user_from_header(header_val): # pylint: disable=W0613,W0612 app.app.config["SECRET_KEY"], app.app.config["SECURITY_BEARER_SALT"], header_val, - app.app.config["BEARER_TOKEN_EXPIRATION"], ) user = User.query.filter_by( id=data["user_id"], username=data["username"], email=data["email"] diff --git a/server/mergin/auth/bearer.py b/server/mergin/auth/bearer.py index 1c54a054..adadec77 100644 --- a/server/mergin/auth/bearer.py +++ b/server/mergin/auth/bearer.py @@ -3,17 +3,28 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import hashlib +from datetime import datetime, timezone from itsdangerous import URLSafeTimedSerializer +from itsdangerous.exc import SignatureExpired, BadSignature from flask.sessions import TaggedJSONSerializer -def decode_token(secret_key, salt, token, max_age=None): +def decode_token(secret_key, salt, token): serializer = TaggedJSONSerializer() signer_kwargs = {"key_derivation": "hmac", "digest_method": hashlib.sha1} s = URLSafeTimedSerializer( secret_key, salt=salt, serializer=serializer, signer_kwargs=signer_kwargs ) - return s.loads(token, max_age=max_age) + token_data = s.loads(token) + try: + expire = datetime.fromisoformat(token_data.get("expire")) + except (ValueError, TypeError): + raise BadSignature("Invalid token") + + if expire < datetime.now(timezone.utc): + raise SignatureExpired("Token expired") + + return token_data def encode_token(secret_key, salt, data): diff --git a/server/mergin/tests/test_auth.py b/server/mergin/tests/test_auth.py index 3a4c2de4..90777122 100644 --- a/server/mergin/tests/test_auth.py +++ b/server/mergin/tests/test_auth.py @@ -2,14 +2,16 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import time +import itsdangerous import pytest import json from flask import url_for from sqlalchemy import desc from unittest.mock import patch +from ..auth.bearer import decode_token, encode_token from ..auth.forms import ResetPasswordForm from ..auth.app import generate_confirmation_token, confirm_token from ..auth.models import User, UserProfile, LoginHistory @@ -958,3 +960,57 @@ def test_login_without_password(client): headers=json_headers, ) assert resp.status_code == 401 + + +def test_bearer_token_expiration(app): + """Test bearer token expiration in decode function""" + secret = app.config["SECRET_KEY"] + salt = app.config["SECURITY_BEARER_SALT"] + client = app.test_client() + user = User.query.first() + + # valid token with longer expiration then usual + expire = datetime.now(timezone.utc) + timedelta( + seconds=2 * app.config["BEARER_TOKEN_EXPIRATION"] + ) + token_data = { + "user_id": user.id, + "username": user.username, + "email": user.email, + "expire": str(expire), + } + + token = encode_token(secret, salt, token_data) + data = decode_token(secret, salt, token) + assert data["expire"] == str(expire) + + # try to login + resp = client.get( + "/v1/user/profile", headers={**json_headers, "Authorization": f"Bearer {token}"} + ) + assert resp.status_code == 200 + client.get(url_for("/.mergin_auth_controller_logout")) + + # expired token + expire = datetime.now(timezone.utc) - timedelta(days=1) + token_data["expire"] = str(expire) + token = encode_token(secret, salt, token_data) + + with pytest.raises(itsdangerous.exc.SignatureExpired): + decode_token(secret, salt, token) + + resp = client.get( + "/v1/user/profile", headers={**json_headers, "Authorization": f"Bearer {token}"} + ) + assert resp.status_code == 401 + + # invalid token + token_data["expire"] = 123 + token = encode_token(secret, salt, token_data) + with pytest.raises(itsdangerous.exc.BadSignature): + decode_token(secret, salt, token) + + resp = client.get( + "/v1/user/profile", headers={**json_headers, "Authorization": f"Bearer {token}"} + ) + assert resp.status_code == 401