-
Notifications
You must be signed in to change notification settings - Fork 0
Remove DingTalk and replace Mirage encryption #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| import base64 | ||
| import os | ||
| import re | ||
| from functools import lru_cache | ||
|
|
||
| from cryptography.fernet import Fernet, MultiFernet | ||
| from cryptography.hazmat.backends import default_backend | ||
| from cryptography.hazmat.primitives import padding | ||
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | ||
| from django.conf import settings | ||
| from django.utils.encoding import force_bytes, force_str | ||
|
|
||
| from common.utils.aes_decryptor import Prpcrypt | ||
|
|
||
| ENCRYPTED_VALUE_PREFIX = "enc1:" | ||
| HEX_RE = re.compile(r"^[0-9a-fA-F]+$") | ||
|
|
||
|
|
||
| class DecryptionError(ValueError): | ||
| pass | ||
|
|
||
|
|
||
| def generate_field_encryption_key(): | ||
| return Fernet.generate_key().decode("ascii") | ||
|
|
||
|
|
||
| def is_encrypted_value(value): | ||
| return isinstance(value, str) and value.startswith(ENCRYPTED_VALUE_PREFIX) | ||
|
|
||
|
|
||
| def encrypt_value(value): | ||
| if value is None: | ||
| return None | ||
| if not _load_field_encryption_keys(): | ||
| raise ValueError( | ||
| "FIELD_ENCRYPTION_KEYS must be configured before writing encrypted values." | ||
| ) | ||
| token = get_multi_fernet().encrypt(force_bytes(value)) | ||
| return f"{ENCRYPTED_VALUE_PREFIX}{force_str(token)}" | ||
|
|
||
|
|
||
| def decrypt_value(value): | ||
| if value is None: | ||
| return None | ||
| if not isinstance(value, str): | ||
| return value | ||
| if is_encrypted_value(value): | ||
| token = value[len(ENCRYPTED_VALUE_PREFIX) :] | ||
| return force_str(get_multi_fernet().decrypt(force_bytes(token))) | ||
| return decrypt_legacy_value(value) | ||
|
|
||
|
|
||
| def decrypt_legacy_value(value): | ||
| if value in ("", None): | ||
| return value | ||
|
|
||
| mirage_value = _try_decrypt_mirage(value) | ||
| if mirage_value is not None: | ||
| return mirage_value | ||
|
|
||
| old_value = _try_decrypt_old_hex(value) | ||
| if old_value is not None: | ||
| return old_value | ||
|
|
||
| raise DecryptionError( | ||
| "Unable to decrypt legacy value. " f"marker={value[:16]!r} length={len(value)}" | ||
| ) | ||
|
|
||
|
|
||
| @lru_cache(maxsize=1) | ||
| def get_multi_fernet(): | ||
| keys = _load_field_encryption_keys() | ||
| if not keys: | ||
| raise ValueError( | ||
| "FIELD_ENCRYPTION_KEYS must be configured before writing encrypted values." | ||
| ) | ||
| return MultiFernet([Fernet(key) for key in keys]) | ||
|
|
||
|
|
||
| def _load_field_encryption_keys(): | ||
| raw_keys = getattr(settings, "FIELD_ENCRYPTION_KEYS", "") or os.environ.get( | ||
| "FIELD_ENCRYPTION_KEYS", "" | ||
| ) | ||
| keys = [] | ||
| for raw_key in raw_keys.split(","): | ||
| key = raw_key.strip() | ||
| if not key: | ||
| continue | ||
| try: | ||
| Fernet(key) | ||
| except Exception as exc: | ||
| raise ValueError("Invalid FIELD_ENCRYPTION_KEYS entry.") from exc | ||
| keys.append(key.encode("ascii")) | ||
| return keys | ||
|
|
||
|
|
||
| def _try_decrypt_mirage(value): | ||
| try: | ||
| key = getattr(settings, "MIRAGE_SECRET_KEY", None) or getattr( | ||
| settings, "SECRET_KEY" | ||
| ) | ||
| if not key: | ||
| return None | ||
| cipher_key = base64.urlsafe_b64encode(force_bytes(key))[:32] | ||
| if len(cipher_key) not in (16, 24, 32): | ||
| return None | ||
| cipher_mode = getattr(settings, "MIRAGE_CIPHER_MODE", "ECB") | ||
| iv = force_bytes(getattr(settings, "MIRAGE_CIPHER_IV", "1234567890abcdef")) | ||
| encrypted = base64.urlsafe_b64decode(force_bytes(value)) | ||
| if cipher_mode == "CBC": | ||
| mode = modes.CBC(iv) | ||
| else: | ||
| mode = modes.ECB() | ||
| decryptor = Cipher( | ||
| algorithms.AES(cipher_key), mode, default_backend() | ||
| ).decryptor() | ||
| unpadder = padding.PKCS7(algorithms.AES(cipher_key).block_size).unpadder() | ||
| plaintext = decryptor.update(encrypted) + decryptor.finalize() | ||
|
jruszo marked this conversation as resolved.
Dismissed
|
||
| unpadded = unpadder.update(plaintext) + unpadder.finalize() | ||
| return force_str(unpadded) | ||
| except Exception: | ||
| return None | ||
|
|
||
|
|
||
| def _try_decrypt_old_hex(value): | ||
| if len(value) < 32 or len(value) % 2 != 0 or HEX_RE.fullmatch(value) is None: | ||
| return None | ||
| try: | ||
| return Prpcrypt().decrypt(value) | ||
| except Exception: | ||
| return None | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from common.fields.encrypted import EncryptedCharField, EncryptedTextField | ||
|
|
||
| __all__ = ["EncryptedCharField", "EncryptedTextField"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| from django.core.validators import MaxLengthValidator | ||
| from django.db import models | ||
|
|
||
| from common.encryption import decrypt_value, encrypt_value | ||
|
|
||
|
|
||
| class EncryptedMixin(models.Field): | ||
| def __init__(self, *args, **kwargs): | ||
| self._plaintext_max_length = kwargs.get("max_length") | ||
| super().__init__(*args, **kwargs) | ||
| if self._plaintext_max_length: | ||
| self.validators.append(MaxLengthValidator(self._plaintext_max_length)) | ||
|
|
||
| def get_prep_value(self, value): | ||
| value = super().get_prep_value(value) | ||
| if value is None: | ||
| return None | ||
| return encrypt_value(value) | ||
|
|
||
| def from_db_value(self, value, expression, connection): | ||
| if value is None: | ||
| return None | ||
| if value == "": | ||
| return value | ||
| return decrypt_value(value) | ||
|
|
||
| def to_python(self, value): | ||
| return super().to_python(value) | ||
|
|
||
|
|
||
| class EncryptedTextField(EncryptedMixin, models.TextField): | ||
| pass | ||
|
|
||
|
|
||
| class EncryptedCharField(EncryptedMixin, models.TextField): | ||
| pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.