diff --git a/pychron/cloud/qr.py b/pychron/cloud/qr.py new file mode 100644 index 000000000..b7e554627 --- /dev/null +++ b/pychron/cloud/qr.py @@ -0,0 +1,118 @@ +# =============================================================================== +# Copyright 2026 Jake Ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +"""QR-code generation for the device-code enrollment flow. + +The device-code grant returns a ``verification_url_complete`` — +the verification URL with the ``user_code`` already encoded as a +query parameter. Encoding that string as a QR lets the admin scan +the workstation's screen with a phone instead of typing the URL + +short code by hand. + +Backed by ``segno`` (pure Python, ~24KB, no PIL dependency for PNG +output). The output path is a regular file the caller is responsible +for cleaning up. +""" + +from __future__ import absolute_import + +import os + +import segno + +from pychron.cloud.paths import pychron_dir + + +_SLUG_SAFE = "-_" + + +def _sanitize_slug(slug): + """Whitelist a caller-supplied slug to ``[A-Za-z0-9_-]``. + + The slug is interpolated into a filename, so any byte that is not + explicitly safe is replaced with an underscore. This blocks path + traversal (``..`` collapses to ``__``, ``/`` to ``_``) and trims + null bytes / control characters that could confuse the OS path + layer. + """ + if not slug: + return "" + return "".join(c if c.isalnum() or c in _SLUG_SAFE else "_" for c in str(slug)) + + +def qr_dir(): + """Return ``~/.pychron/qr/``, creating it 0700 if missing.""" + path = os.path.join(pychron_dir(), "qr") + if not os.path.isdir(path): + os.makedirs(path, mode=0o700) + elif os.name == "posix": + try: + os.chmod(path, 0o700) + except OSError: + pass + return path + + +def make_qr_png(url, out_path, scale=8, border=2): + """Encode ``url`` as a QR code and write it to ``out_path`` as PNG. + + Uses error-correction level M (15% recoverable) which is plenty + for a screen-to-camera scan of an https URL of typical length. + ``scale`` is the pixel-per-module size; ``border`` is the + quiet-zone width in modules. Defaults render to roughly + ~330×330 px for a 33-module symbol — readable from arm's length. + """ + if not url: + raise ValueError("url is empty") + qr = segno.make(url, error="m") + qr.save(out_path, kind="png", scale=scale, border=border) + if os.name == "posix": + try: + os.chmod(out_path, 0o600) + except OSError: + pass + return out_path + + +def make_qr_for_device_code(verification_url_complete, host_slug=""): + """Convenience wrapper: emit ``~/.pychron/qr/device_.png``. + + Returns the absolute path to the generated PNG. Overwrites any + prior file at the same path so a fresh enrollment does not pick + up a stale QR from an earlier attempt. + + ``host_slug`` is sanitized to ``[A-Za-z0-9_-]`` so an + attacker-controlled value (e.g. a malicious server-issued + ``lab_name`` or a hand-edited preference) cannot escape the + scoped ``~/.pychron/qr/`` directory. After path construction the + final ``out_path`` is also asserted to live under + :func:`qr_dir` as defense-in-depth. + """ + if not verification_url_complete: + raise ValueError("verification_url_complete is empty") + safe_slug = _sanitize_slug(host_slug) or "default" + name = "device_{}.png".format(safe_slug) + base = qr_dir() + out_path = os.path.join(base, name) + # Defense-in-depth: even if the slug whitelist regresses, the + # resolved absolute path must remain under qr_dir(). + real_base = os.path.realpath(base) + real_out = os.path.realpath(out_path) + if os.path.commonpath([real_base, real_out]) != real_base: + raise ValueError("refusing to write QR outside the scoped qr/ dir: {0!r}".format(out_path)) + return make_qr_png(verification_url_complete, out_path) + + +# ============= EOF ============================================= diff --git a/pychron/cloud/tasks/preferences.py b/pychron/cloud/tasks/preferences.py index cc3d63a43..e90bc19f9 100644 --- a/pychron/cloud/tasks/preferences.py +++ b/pychron/cloud/tasks/preferences.py @@ -32,8 +32,8 @@ from envisage.ui.tasks.preferences_pane import PreferencesPane from pyface.api import GUI -from traits.api import Bool, Button, Password, Str -from traitsui.api import Color, Group, HGroup, Item, VGroup, View +from traits.api import Bool, Button, File, Password, Str +from traitsui.api import Color, Group, HGroup, ImageEditor, Item, VGroup, View from pychron.cloud.api_client import ( CloudAPIError, @@ -48,6 +48,7 @@ get_token, set_token, ) +from pychron.cloud.qr import make_qr_for_device_code from pychron.cloud.workstation_setup import ( DeviceEnrollmentCancelled, KeyringWriteFailedError, @@ -90,6 +91,11 @@ class CloudPreferences(BasePreferencesHelper): cancel_enrollment_button = Button("Cancel enrollment") _pending_user_code = Str _pending_verification_url = Str + # PNG path for the verification-URL QR. Admin scans it from the + # workstation screen with their phone instead of typing the URL + + # user_code by hand. Empty string until the server returns the + # `verification_url_complete` payload. + _pending_qr_path = File _pending_active = Bool(False) _should_cancel_enrollment = Bool(False) @@ -125,6 +131,7 @@ def _is_preference_trait(self, trait_name): "cancel_enrollment_button", "_pending_user_code", "_pending_verification_url", + "_pending_qr_path", "_pending_active", "_should_cancel_enrollment", "_recovery_token", @@ -207,6 +214,7 @@ def _enroll_via_device_code_button_fired(self): self._should_cancel_enrollment = False self._pending_user_code = "" self._pending_verification_url = "" + self._pending_qr_path = "" self._recovery_token = "" self._recovery_lab = "" self._pending_active = True @@ -224,14 +232,23 @@ def _enroll_via_device_code_button_fired(self): def _on_device_code_user_code( self, user_code, verification_url, verification_url_complete, expires_at ): - """Worker-thread callback: surface the user_code + URL in the pane. + """Worker-thread callback: surface the user_code + URL + QR in the pane. Trait writes from non-UI threads are dispatched to the UI thread by the Pyface event loop, so the operator sees the code as soon - as the server returns it. + as the server returns it. QR generation runs on this thread + (small file, ~hundreds of microseconds for a typical URL); a + failure is non-fatal — the typed code + URL still work. """ self._pending_user_code = user_code self._pending_verification_url = verification_url + try: + self._pending_qr_path = make_qr_for_device_code( + verification_url_complete, host_slug=self.lab_name or "default" + ) + except Exception as exc: + logger.warning("device-code QR generation failed: %s", exc) + self._pending_qr_path = "" self._remote_status = "Show {} to admin at {}".format(user_code, verification_url) self._remote_status_color = normalize_color_name("orange") @@ -317,6 +334,7 @@ def _apply_keyring_recovery(self, lab_name, api_token): def _reset_pending(self): self._pending_user_code = "" self._pending_verification_url = "" + self._pending_qr_path = "" self._pending_active = False self._should_cancel_enrollment = False @@ -475,6 +493,16 @@ def traits_view(self): visible_when="_pending_active", ), ), + HGroup( + Item( + "_pending_qr_path", + show_label=False, + editor=ImageEditor(), + tooltip="Scan with the admin's phone to open the " + "verification page with the user_code pre-filled.", + visible_when="_pending_qr_path != ''", + ), + ), HGroup( Item( "_recovery_token", diff --git a/pyproject.toml b/pyproject.toml index ba9f801fb..b3a3f939f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ dependencies = [ "requests-oauthlib>=2.0.0,<3", "scikit-image>=0.26.0,<1", "scipy>=1.16.2,<2", + "segno>=1.6,<2", "six>=1.17.0,<2", "sqlalchemy>=2.0.44,<3", "statsmodels>=0.14.5,<1", diff --git a/test/cloud/test_qr.py b/test/cloud/test_qr.py new file mode 100644 index 000000000..4b864e133 --- /dev/null +++ b/test/cloud/test_qr.py @@ -0,0 +1,111 @@ +"""Unit tests for pychron.cloud.qr.""" + +import os +import tempfile +import unittest +from unittest.mock import patch + +from pychron.cloud import qr + + +class MakeQrPngTestCase(unittest.TestCase): + URL = "https://api.example/device?user_code=ABCD-EFGH" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self._patcher = patch( + "pychron.cloud.paths.os.path.expanduser", + lambda p: p.replace("~", self.tmp), + ) + self._patcher.start() + self.addCleanup(self._patcher.stop) + self.addCleanup(self._rmtree, self.tmp) + + def _rmtree(self, path): + import shutil + + shutil.rmtree(path, ignore_errors=True) + + def test_writes_png_with_correct_magic(self): + out = os.path.join(self.tmp, "test_qr.png") + path = qr.make_qr_png(self.URL, out) + self.assertEqual(path, out) + self.assertTrue(os.path.isfile(out)) + with open(out, "rb") as f: + self.assertEqual(f.read(8), b"\x89PNG\r\n\x1a\n") + + def test_writes_under_pychron_qr_dir(self): + path = qr.make_qr_for_device_code(self.URL, host_slug="lab-mac-01") + self.assertTrue(path.endswith("device_lab-mac-01.png")) + self.assertTrue(os.path.isfile(path)) + + def test_overwrites_prior_file_for_same_host(self): + """A re-enrollment for the same host must overwrite the earlier + QR rather than accumulating ``device__2.png`` etc.""" + first = qr.make_qr_for_device_code(self.URL, host_slug="lab-mac-01") + size_first = os.path.getsize(first) + # Different URL → different content → confirm overwrite happened. + second = qr.make_qr_for_device_code( + "https://api.example/device?user_code=WXYZ-1234", + host_slug="lab-mac-01", + ) + self.assertEqual(first, second) + self.assertNotEqual(os.path.getsize(second), 0) + # Different content of similar length is plausible — the key + # guarantee is that exactly one file exists for this slug. + listing = os.listdir(os.path.dirname(second)) + self.assertEqual( + [n for n in listing if n.startswith("device_lab-mac-01")], + ["device_lab-mac-01.png"], + ) + + def test_empty_url_rejected(self): + with self.assertRaises(ValueError): + qr.make_qr_png("", os.path.join(self.tmp, "x.png")) + with self.assertRaises(ValueError): + qr.make_qr_for_device_code("", host_slug="x") + + def test_default_host_slug(self): + """Caller may omit host_slug — file is named ``device_default.png``.""" + path = qr.make_qr_for_device_code(self.URL) + self.assertTrue(path.endswith("device_default.png")) + + def test_path_traversal_in_host_slug_sanitized(self): + """Path-traversal payloads in ``host_slug`` (e.g. an attacker- + controlled lab_name preference) must NOT escape the scoped + ``~/.pychron/qr/`` directory. The slug whitelist replaces any + non-``[A-Za-z0-9_-]`` byte with an underscore + an absolute- + path containment check is asserted defensively at the writer.""" + evil_slugs = ( + "../../etc/passwd", + "..", + "../../tmp/owned", + "a/b/c", + "lab.name", + "foo\x00bar", + ) + qr_root = os.path.realpath(qr.qr_dir()) + for slug in evil_slugs: + path = qr.make_qr_for_device_code(self.URL, host_slug=slug) + real = os.path.realpath(path) + self.assertTrue( + real.startswith(qr_root + os.sep) or real == qr_root, + f"{slug!r} escaped: {path}", + ) + # Filename never carries traversal markers post-sanitize. + self.assertNotIn("/", os.path.basename(path)) + self.assertNotIn("..", os.path.basename(path)) + self.assertNotIn("\x00", os.path.basename(path)) + + def test_sanitize_slug_preserves_safe_chars(self): + """Real-world slugs (alnum + ``-`` + ``_``) pass through unchanged.""" + for slug in ("NMGRL", "lab-2024_NM", "abc123", "test-lab_42"): + self.assertEqual(qr._sanitize_slug(slug), slug) + + def test_sanitize_slug_handles_none_and_empty(self): + self.assertEqual(qr._sanitize_slug(None), "") + self.assertEqual(qr._sanitize_slug(""), "") + + +if __name__ == "__main__": + unittest.main() diff --git a/uv.lock b/uv.lock index 239ddb294..1c808ff7f 100644 --- a/uv.lock +++ b/uv.lock @@ -1218,6 +1218,7 @@ dependencies = [ { name = "requests-oauthlib" }, { name = "scikit-image" }, { name = "scipy" }, + { name = "segno" }, { name = "six" }, { name = "sqlalchemy" }, { name = "statsmodels" }, @@ -1298,6 +1299,7 @@ requires-dist = [ { name = "requests-oauthlib", specifier = ">=2.0.0,<3" }, { name = "scikit-image", specifier = ">=0.26.0,<1" }, { name = "scipy", specifier = ">=1.16.2,<2" }, + { name = "segno", specifier = ">=1.6,<2" }, { name = "six", specifier = ">=1.17.0,<2" }, { name = "sqlalchemy", specifier = ">=2.0.44,<3" }, { name = "statsmodels", specifier = ">=0.14.5,<1" }, @@ -1675,6 +1677,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" }, ] +[[package]] +name = "segno" +version = "1.6.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1c/2e/b396f750c53f570055bf5a9fc1ace09bed2dff013c73b7afec5702a581ba/segno-1.6.6.tar.gz", hash = "sha256:e60933afc4b52137d323a4434c8340e0ce1e58cec71439e46680d4db188f11b3", size = 1628586, upload-time = "2025-03-12T22:12:53.324Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d6/02/12c73fd423eb9577b97fc1924966b929eff7074ae6b2e15dd3d30cb9e4ae/segno-1.6.6-py3-none-any.whl", hash = "sha256:28c7d081ed0cf935e0411293a465efd4d500704072cdb039778a2ab8736190c7", size = 76503, upload-time = "2025-03-12T22:12:48.106Z" }, +] + [[package]] name = "setuptools" version = "80.9.0"