Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion server/mergin/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ def load_user_from_header(header_val): # pylint: disable=W0613,W0612
app.app.config["SECRET_KEY"],
app.app.config["SECURITY_BEARER_SALT"],
header_val,
app.app.config["BEARER_TOKEN_EXPIRATION"],
)
user = User.query.filter_by(
id=data["user_id"], username=data["username"], email=data["email"]
Expand Down
15 changes: 13 additions & 2 deletions server/mergin/auth/bearer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial

import hashlib
from datetime import datetime, timezone
from itsdangerous import URLSafeTimedSerializer
from itsdangerous.exc import SignatureExpired, BadSignature
from flask.sessions import TaggedJSONSerializer


def decode_token(secret_key, salt, token, max_age=None):
def decode_token(secret_key, salt, token):
serializer = TaggedJSONSerializer()
signer_kwargs = {"key_derivation": "hmac", "digest_method": hashlib.sha1}
s = URLSafeTimedSerializer(
secret_key, salt=salt, serializer=serializer, signer_kwargs=signer_kwargs
)
return s.loads(token, max_age=max_age)
token_data = s.loads(token)
try:
expire = datetime.fromisoformat(token_data.get("expire"))
except (ValueError, TypeError):
raise BadSignature("Invalid token")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider this as critical failure and log it so we can find it in logs in case something like this would happen? 🤔


if expire < datetime.now(timezone.utc):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we build the token, we use pytz.utc, here when decoding we use timezone.utc -- I guess that is allright, right @varmar05?

raise SignatureExpired("Token expired")

return token_data


def encode_token(secret_key, salt, data):
Expand Down
58 changes: 57 additions & 1 deletion server/mergin/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import time
import itsdangerous
import pytest
import json
from flask import url_for
from sqlalchemy import desc
from unittest.mock import patch

from ..auth.bearer import decode_token, encode_token
from ..auth.forms import ResetPasswordForm
from ..auth.app import generate_confirmation_token, confirm_token
from ..auth.models import User, UserProfile, LoginHistory
Expand Down Expand Up @@ -958,3 +960,57 @@ def test_login_without_password(client):
headers=json_headers,
)
assert resp.status_code == 401


def test_bearer_token_expiration(app):
"""Test bearer token expiration in decode function"""
secret = app.config["SECRET_KEY"]
salt = app.config["SECURITY_BEARER_SALT"]
client = app.test_client()
user = User.query.first()

# valid token with longer expiration then usual
expire = datetime.now(timezone.utc) + timedelta(
seconds=2 * app.config["BEARER_TOKEN_EXPIRATION"]
)
token_data = {
"user_id": user.id,
"username": user.username,
"email": user.email,
"expire": str(expire),
}

token = encode_token(secret, salt, token_data)
data = decode_token(secret, salt, token)
assert data["expire"] == str(expire)

# try to login
resp = client.get(
"/v1/user/profile", headers={**json_headers, "Authorization": f"Bearer {token}"}
)
assert resp.status_code == 200
client.get(url_for("/.mergin_auth_controller_logout"))

# expired token
expire = datetime.now(timezone.utc) - timedelta(days=1)
token_data["expire"] = str(expire)
token = encode_token(secret, salt, token_data)

with pytest.raises(itsdangerous.exc.SignatureExpired):
decode_token(secret, salt, token)

resp = client.get(
"/v1/user/profile", headers={**json_headers, "Authorization": f"Bearer {token}"}
)
assert resp.status_code == 401

# invalid token
token_data["expire"] = 123
token = encode_token(secret, salt, token_data)
with pytest.raises(itsdangerous.exc.BadSignature):
decode_token(secret, salt, token)

resp = client.get(
"/v1/user/profile", headers={**json_headers, "Authorization": f"Bearer {token}"}
)
assert resp.status_code == 401
Loading