diff --git a/README.md b/README.md index 655fcf4c..e7dcaebc 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ Python tools for MODFLOW development and testing. - [Test models](#test-models) - [Example scenarios](#example-scenarios) - [Reusable test case framework](#reusable-test-case-framework) + - [Parametrizing with `Case`](#parametrizing-with-case) + - [Generating cases dynamically](#generating-cases-dynamically) - [Executables container](#executables-container) - [Conditionally skipping tests](#conditionally-skipping-tests) - [Miscellaneous](#miscellaneous) @@ -155,22 +157,57 @@ This package provides a minimal framework for self-describing test cases which c A `Case` requires only a `name`, and has a single default attribute, `xfail=False`, indicating whether the test case is expected to succeed. (Test functions may of course choose to use or ignore this.) -For instance, to generate a set of similar test cases with `pytest-cases`: +#### Parametrizing with `Case` -```python -from pytest_cases import parametrize +`Case` can be used with `@pytest.mark.parametrize()` as usual. For instance: +```python +import pytest from modflow_devtools.case import Case template = Case(name="QA") cases = [ - template.copy_update(name=template.name + "1", question="What's the meaning of life, the universe, and everything?", answer=42), - template.copy_update(name=template.name + "2", question="Is a Case immutable?", answer="No, but it's better not to mutate it.") + template.copy_update(name=template.name + "1", + question="What's the meaning of life, the universe, and everything?", + answer=42), + template.copy_update(name=template.name + "2", + question="Is a Case immutable?", + answer="No, but it's probably best not to mutate it.") ] -@parametrize(data=cases, ids=[c.name for c in cases]) -def case_qa(case): - print(case.name, case.question, case.answer) + +@pytest.mark.parametrize("case", cases) +def test_cases(case): + assert len(cases) == 2 + assert cases[0] != cases[1] +``` + +#### Generating cases dynamically + +One pattern possible with `pytest-cases` is to programmatically generate test cases by parametrizing a function. This can be a convenient way to produce several similar test cases from a template: + +```python +from pytest_cases import parametrize, parametrize_with_cases +from modflow_devtools.case import Case + + +template = Case(name="QA") +gen_cases = [template.copy_update(name=f"{template.name}{i}", question=f"Q{i}", answer=f"A{i}") for i in range(3)] +info = "cases can be modified further in the generator function,"\ + " or the function may construct and return another object" + + +@parametrize(case=gen_cases, ids=[c.name for c in gen_cases]) +def qa_cases(case): + return case.copy_update(info=info) + + +@parametrize_with_cases("case", cases=".", prefix="qa_") +def test_qa(case): + assert "QA" in case.name + assert info == case.info + print(f"{case.name}:", f"{case.question}? {case.answer}") + print(case.info) ``` ### Executables container diff --git a/modflow_devtools/case.py b/modflow_devtools/case.py index cd6e7a2f..abe21757 100644 --- a/modflow_devtools/case.py +++ b/modflow_devtools/case.py @@ -6,7 +6,11 @@ class Case(SimpleNamespace): Minimal container for a reusable test case. """ - def __init__(self, **kwargs): + def __init__(self, case: "Case" = None, **kwargs): + if case is not None: + super().__init__(**case.__dict__.copy()) + return + if "name" not in kwargs: raise ValueError(f"Case name is required") @@ -26,7 +30,7 @@ def copy(self): Copies the test case. """ - return SimpleNamespace(**self.__dict__.copy()) + return Case(**self.__dict__.copy()) def copy_update(self, **kwargs): """ @@ -36,4 +40,4 @@ def copy_update(self, **kwargs): cpy = self.__dict__.copy() cpy.update(kwargs) - return SimpleNamespace(**cpy) + return Case(**cpy) diff --git a/modflow_devtools/download.py b/modflow_devtools/download.py index 1ea1bd15..b8f424b7 100644 --- a/modflow_devtools/download.py +++ b/modflow_devtools/download.py @@ -1,183 +1,17 @@ -# pylint: disable=E1101 - -"""Utility functions to: - -1. download and unzip software releases from the USGS and other organizations - (triangle, MT3DMS). -2. download the latest MODFLOW-based applications and utilities for MacOS, - Linux, and Windows from https://github.com/MODFLOW-USGS/executables -3. determine the latest version (GitHub tag) of a GitHub repository and a - dictionary containing the file name and the link to a asset on - contained in a github repository -4. compress all files in a list, files in a list of directories - -""" +import json import os -import shutil import sys import tarfile import timeit import urllib.request from os import PathLike from pathlib import Path -from typing import Optional -from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo - - -class MFZipFile(ZipFile): - """ZipFile file attributes are not being preserved. This class preserves - file attributes as described on StackOverflow at - https://stackoverflow.com/questions/39296101/python-zipfile-removes-execute-permissions-from-binaries - - """ +from typing import List, Optional +from warnings import warn - def extract(self, member, path=None, pwd=None): - """ - - Parameters - ---------- - member : str - individual file to extract. If member does not exist, all files - are extracted. - path : str - directory path to extract file in a zip file (default is None, - which results in files being extracted in the current directory) - pwd : str - zip file password (default is None) - - Returns - ------- - ret_val : int - return value indicating status of file extraction - - """ - if not isinstance(member, ZipInfo): - member = self.getinfo(member) - - if path is None: - path = os.getcwd() - - ret_val = self._extract_member(member, path, pwd) - attr = member.external_attr >> 16 - if attr != 0: - os.chmod(ret_val, attr) - - return ret_val - - def extractall(self, path=None, members=None, pwd=None): - """Extract all files in the zipfile. - - Parameters - ---------- - path : str - directory path to extract files in a zip file (default is None, - which results in files being extracted in the current directory) - members : str - individual files to extract (default is None, which extracts - all members) - pwd : str - zip file password (default is None) - - Returns - ------- - - """ - if members is None: - members = self.namelist() - - if path is None: - path = os.getcwd() - else: - if hasattr(os, "fspath"): - # introduced in python 3.6 and above - path = os.fspath(path) - - for zipinfo in members: - self.extract(zipinfo, path, pwd) - - @staticmethod - def compressall(path, file_pths=None, dir_pths=None, patterns=None): - """Compress selected files or files in selected directories. - - Parameters - ---------- - path : str - output zip file path - file_pths : str or list of str - file paths to include in the output zip file (default is None) - dir_pths : str or list of str - directory paths to include in the output zip file (default is None) - patterns : str or list of str - file patterns to include in the output zip file (default is None) - - Returns - ------- - success : bool - boolean indicating if the output zip file was created - - """ - - # create an empty list - if file_pths is None: - file_pths = [] - # convert files to a list - else: - if isinstance(file_pths, str): - file_pths = [file_pths] - elif isinstance(file_pths, tuple): - file_pths = list(file_pths) - - # remove directories from the file list - if len(file_pths) > 0: - file_pths = [e for e in file_pths if os.path.isfile(e)] - - # convert dirs to a list if a str (a tuple is allowed) - if dir_pths is None: - dir_pths = [] - else: - if isinstance(dir_pths, str): - dir_pths = [dir_pths] - - # convert find to a list if a str (a tuple is allowed) - if patterns is not None: - if isinstance(patterns, str): - patterns = [patterns] - - # walk through dirs and add files to the list - for dir_pth in dir_pths: - for dirname, subdirs, files in os.walk(dir_pth): - for filename in files: - fpth = os.path.join(dirname, filename) - # add the file if it does not exist in file_pths - if fpth not in file_pths: - file_pths.append(fpth) - - # remove file_paths that do not match the patterns - if patterns is not None: - tlist = [] - for file_pth in file_pths: - if any(p in os.path.basename(file_pth) for p in patterns): - tlist.append(file_pth) - file_pths = tlist - - # write the zipfile - success = True - if len(file_pths) > 0: - zf = ZipFile(path, "w", ZIP_DEFLATED) - - # write files to zip file - for file_pth in file_pths: - arcname = os.path.basename(file_pth) - zf.write(file_pth, arcname=arcname) - - # close the zip file - zf.close() - else: - msg = "No files to add to the zip file" - print(msg) - success = False +from modflow_devtools.zip import MFZipFile - return success +_max_http_tries = 3 def get_request(url, params={}): @@ -186,6 +20,8 @@ def get_request(url, params={}): This bears a GitHub API authentication token if github.com is in the URL and the GITHUB_TOKEN environment variable is set. + + Originally written by Mike Toews (mwtoews@gmail.com) for FloPy. """ if isinstance(params, dict): if len(params) > 0: @@ -202,6 +38,166 @@ def get_request(url, params={}): return urllib.request.Request(url, headers=headers) +def get_releases(repo, per_page=None, quiet=False) -> List[dict]: + """Get list of available releases.""" + req_url = f"https://api.github.com/repos/{repo}/releases" + + params = {} + if per_page is not None: + if per_page < 1 or per_page > 100: + raise ValueError("per_page must be between 1 and 100") + params["per_page"] = per_page + + request = get_request(req_url, params=params) + num_tries = 0 + while True: + num_tries += 1 + try: + with urllib.request.urlopen(request, timeout=10) as resp: + result = resp.read() + break + except urllib.error.HTTPError as err: + if err.code == 401 and os.environ.get("GITHUB_TOKEN"): + raise ValueError("GITHUB_TOKEN env is invalid") from err + elif err.code == 403 and "rate limit exceeded" in err.reason: + raise ValueError( + f"use GITHUB_TOKEN env to bypass rate limit ({err})" + ) from err + elif err.code in (404, 503) and num_tries < _max_http_tries: + # GitHub sometimes returns this error for valid URLs, so retry + print(f"URL request {num_tries} did not work ({err})") + continue + raise RuntimeError(f"cannot retrieve data from {req_url}") from err + + releases = json.loads(result.decode()) + if not quiet: + print(f"Found {len(releases)} releases for {repo}") + + return releases + + +def get_release(repo, tag="latest", quiet=False) -> dict: + """Get info about a particular release.""" + api_url = f"https://api.github.com/repos/{repo}" + req_url = ( + f"{api_url}/releases/latest" + if tag == "latest" + else f"{api_url}/releases/tags/{tag}" + ) + request = get_request(req_url) + releases = None + num_tries = 0 + + while True: + num_tries += 1 + try: + with urllib.request.urlopen(request, timeout=10) as resp: + result = resp.read() + remaining = int(resp.headers["x-ratelimit-remaining"]) + if remaining <= 10: + warn( + f"Only {remaining} GitHub API requests remaining " + "before rate-limiting" + ) + break + except urllib.error.HTTPError as err: + if err.code == 401 and os.environ.get("GITHUB_TOKEN"): + raise ValueError("GITHUB_TOKEN env is invalid") from err + elif err.code == 403 and "rate limit exceeded" in err.reason: + raise ValueError( + f"use GITHUB_TOKEN env to bypass rate limit ({err})" + ) from err + elif err.code == 404: + if releases is None: + releases = get_releases(repo, quiet) + if tag not in releases: + raise ValueError( + f"Release {tag} not found (choose from {', '.join(releases)})" + ) + elif err.code == 503 and num_tries < _max_http_tries: + # GitHub sometimes returns this error for valid URLs, so retry + warn(f"URL request {num_tries} did not work ({err})") + continue + raise RuntimeError(f"cannot retrieve data from {req_url}") from err + + release = json.loads(result.decode()) + tag_name = release["tag_name"] + if not quiet: + print(f"fetched release {tag_name!r} from {repo}") + + return release + + +def get_release_assets(repo, tag="latest", quiet=False) -> List[dict]: + pass + + +def get_artifacts( + repo, name=None, per_page=None, max_pages=None, quiet=False +) -> List[dict]: + """ + List repository artifacts via the GitHub API, optionally filtering by name pattern. + """ + + msg = f"artifact(s) for {repo}" + ( + f" matching name {name}" if name else "" + ) + url = f"https://api.github.com/repos/{repo}/actions/artifacts" + page = 0 + params = {} + + if name is not None: + if not isinstance(name, str) or len(name) == 0: + raise ValueError(f"name must be a non-empty string") + params["name"] = name + + if per_page is not None: + if per_page < 1 or per_page > 100: + raise ValueError("per_page must be between 1 and 100") + params["per_page"] = int(per_page) + + def get_result(): + tries = 0 + params["page"] = page + request = get_request(url, params=params) + while True: + tries += 1 + try: + print(f"Fetching {msg} (page {page}, size {per_page})") + with urllib.request.urlopen(request, timeout=10) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as err: + if err.code == 401 and os.environ.get("GITHUB_TOKEN"): + raise ValueError("GITHUB_TOKEN env is invalid") from err + elif err.code == 403 and "rate limit exceeded" in err.reason: + raise ValueError( + f"use GITHUB_TOKEN env to bypass rate limit ({err})" + ) from err + elif err.code in (404, 503) and tries < _max_http_tries: + # GitHub sometimes returns this error for valid URLs, so retry + print(f"URL request {tries} did not work ({err})") + continue + raise RuntimeError(f"cannot retrieve data from {url}") from err + + artifacts = [] + diff = 1 + max_pages = max_pages if max_pages else sys.maxsize + while diff > 0 and page < max_pages: + page += 1 + result = get_result() + total = result["total_count"] + if page == 1: + print(f"Repo {repo} has {total} {msg}") + + artifacts.extend(result["artifacts"]) + diff = total - len(artifacts) + + if not quiet: + print(f"Found {len(artifacts)} {msg}") + + return artifacts + + def download_and_unzip( url: str, path: Optional[PathLike] = None, @@ -245,6 +241,14 @@ def report(chunk, size, total): # download zip file file_path = path / url.split("/")[-1] + + if "github.com" in url: + github_token = os.environ.get("GITHUB_TOKEN", None) + if github_token: + opener = urllib.request.build_opener() + opener.addheaders = [("Bearer", github_token)] + urllib.request.install_opener(opener) + _, headers = urllib.request.urlretrieve( url, filename=str(file_path), reporthook=report ) @@ -298,516 +302,3 @@ def report(chunk, size, total): print(f"Done downloading and extracting {file_path.name} to {path}") return success - - -def zip_all(path, file_pths=None, dir_pths=None, patterns=None): - """Compress all files in the user-provided list of file paths and directory - paths that match the provided file patterns. - - Parameters - ---------- - path : str - path of the zip file that will be created - file_pths : str or list - file path or list of file paths to be compressed - dir_pths : str or list - directory path or list of directory paths to search for files that - will be compressed - patterns : str or list - file pattern or list of file patterns s to match to when creating a - list of files that will be compressed - - Returns - ------- - - """ - return MFZipFile.compressall( - path, file_pths=file_pths, dir_pths=dir_pths, patterns=patterns - ) - - -def _get_zipname(platform): - """Determine zipfile name for platform. - - Parameters - ---------- - platform : str - Platform that will run the executables. Valid values include mac, - linux, win32 and win64. If platform is None, then routine will - download the latest asset from the github repository. - - Returns - ------- - zipfile : str - Name of zipfile for platform - - """ - if platform is None: - if sys.platform.lower() == "darwin": - platform = "mac" - elif sys.platform.lower().startswith("linux"): - platform = "linux" - elif "win" in sys.platform.lower(): - is_64bits = sys.maxsize > 2**32 - if is_64bits: - platform = "win64" - else: - platform = "win32" - else: - errmsg = ( - f"Could not determine platform. sys.platform is {sys.platform}" - ) - raise Exception(errmsg) - else: - msg = f"unknown platform detected ({platform})" - success = platform in ["mac", "linux", "win32", "win64"] - if not success: - raise ValueError(msg) - return f"{platform}.zip" - - -def _get_default_repo(): - """Return the default repo name. - - Returns - ------- - default_repo : str - default github repository repo name - - """ - return "MODFLOW-USGS/executables" - - -def _get_default_url(): - """Return the default executables url path. - - Returns - ------- - default_url : str - default url for executables repository repo name - - """ - - return ( - f"https://github.com/{_get_default_repo()}/" - + "releases/latest/download/" - ) - - -def _get_default_json(tag_name=None): - """Return a default github api json for the provided release tag_name in a - github repository. - - Parameters - ---------- - tag_name : str - github repository release tag - - Returns - ------- - json_obj : dict - json object (dictionary) with a tag_name and assets including - file names and download links - - """ - # initialize json_obj dictionary - json_obj = {"tag_name": tag_name} - - # create appropriate url - if tag_name is not None: - url = ( - f"https://github.com/{_get_default_repo()}/" - + f"releases/latest/download/{tag_name}/" - ) - else: - url = ( - f"https://github.com/{_get_default_repo()}/" - + "releases/latest/download/" - ) - - # define asset names and paths for assets - names = ["mac.zip", "linux.zip", "win32.zip", "win64.zip"] - paths = [url + p for p in names] - - assets_list = [] - for name, path in zip(names, paths): - assets_list.append({"name": name, "browser_download_url": path}) - json_obj["assets"] = assets_list - - return json_obj - - -def _get_request_json(request_url, verbose=False, verify=True): - """Process a url request and return a json if successful. - - Parameters - ---------- - request_url : str - url for request - verbose : bool - boolean indicating if output will be printed to the terminal - default is false - verify : bool - boolean indicating if the url request should be verified - - Returns - ------- - success : bool - boolean indicating if the requat failed - status_code: integer - request status code - json_obj : dict - json object - - """ - import json - - max_requests = 10 - json_obj = None - success = True - - # open request - req = _request_get( - request_url, max_requests=max_requests, verbose=verbose, verify=verify - ) - - # connection established - retrieve the json - if req.ok: - json_obj = json.loads(req.text or req.content) - else: - success = req.status_code == requests.codes.ok - - return success, req, json_obj - - -def _repo_json( - github_repo, tag_name=None, error_return=False, verbose=False, verify=True -): - """Return the github api json for the latest github release in a github - repository. - - Parameters - ---------- - github_repo : str - Repository name, such as MODFLOW-USGS/modflow6 - tag_name : str - github repository release tag - error_return : bool - boolean indicating if None will be returned if there are GitHub API - issues - verbose : bool - boolean indicating if output will be printed to the terminal - verify : bool - boolean indicating if the url request should be verified - - Returns - ------- - json_obj : dict - json object (dictionary) with a tag_name and assets including - file names and download links - - """ - repo_url = f"https://api.github.com/repos/{github_repo}" - - if tag_name is None: - request_url = f"{repo_url}/releases/latest" - else: - request_url = f"{repo_url}/releases" - success, _, json_cat = _get_request_json( - request_url, verbose=verbose, verify=verify - ) - if success: - request_url = None - for release in json_cat: - if release["tag_name"] == tag_name: - request_url = release["url"] - break - if request_url is None: - msg = ( - f"Could not find tag_name ('{tag_name}') " - + "in release catalog" - ) - if error_return: - print(msg) - return None - else: - raise Exception(msg) - else: - msg = "Could not get release catalog from " + request_url - if error_return: - if verbose: - print(msg) - return None - else: - raise Exception(msg) - - msg = "Requesting asset data " - if tag_name is not None: - msg += f"for tag_name '{tag_name}' " - msg += f"from: {request_url}" - if verbose: - print(msg) - - # process the request - success, req, json_obj = _get_request_json( - request_url, verbose=verbose, verify=verify - ) - - # evaluate request errors - if not success: - if github_repo == _get_default_repo(): - msg = f"will use default values for {github_repo}" - if verbose: - print(msg) - json_obj = _get_default_json(tag_name) - else: - msg = "Could not find json from " + request_url - if verbose: - print(msg) - if error_return: - json_obj = None - else: - req.raise_for_status() - - # return json object - return json_obj - - -def get_repo_assets( - github_repo=None, version=None, error_return=False, verify=True -): - """Return a dictionary containing the file name and the link to the asset - contained in a github repository. - - Parameters - ---------- - github_repo : str - Repository name, such as MODFLOW-USGS/modflow6. If github_repo is - None set to 'MODFLOW-USGS/executables' - version : str - github repository release tag - error_return : bool - boolean indicating if None will be returned if there are GitHub API - issues - verify : bool - boolean indicating if the url request should be verified - - Returns - ------- - result_dict : dict - dictionary of file names and links - - """ - if github_repo is None: - github_repo = _get_default_repo() - - # get json and extract assets - json_obj = _repo_json( - github_repo, tag_name=version, error_return=error_return, verify=verify - ) - if json_obj is None: - result_dict = None - else: - assets = json_obj["assets"] - - # build simple assets dictionary - result_dict = {} - for asset in assets: - k = asset["name"] - if version is None: - value = github_repo + f"/{k}" - else: - value = asset["browser_download_url"] - result_dict[k] = value - - return result_dict - - -def repo_latest_version(github_repo=None, verify=True): - """Return a string of the latest version number (tag) contained in a github - repository release. - - Parameters - ---------- - github_repo : str - Repository name, such as MODFLOW-USGS/modflow6. If github_repo is - None set to 'MODFLOW-USGS/executables' - - Returns - ------- - version : str - string with the latest version/tag number - - """ - if github_repo is None: - github_repo = _get_default_repo() - - # get json - json_obj = _repo_json(github_repo, verify=verify) - - return json_obj["tag_name"] - - -def getmfexes( - pth=".", - version=None, - platform=None, - exes=None, - verbose=False, - verify=True, -): - """Get the latest MODFLOW binary executables from a github site - (https://github.com/MODFLOW-USGS/executables) for the specified operating - system and put them in the specified path. - - Parameters - ---------- - pth : str - Location to put the executables (default is current working directory) - version : str - Version of the MODFLOW-USGS/executables release to use. If version is - None the github repo will be queried for the version number. - platform : str - Platform that will run the executables. Valid values include mac, - linux, win32 and win64. If platform is None, then routine will - download the latest asset from the github repository. - exes : str or list of strings - executable or list of executables to retain - verbose : bool - boolean indicating if output will be printed to the terminal - verify : bool - boolean indicating if the url request should be verified - - """ - # set download directory to path in case a selection of files - download_dir = pth - - # Determine the platform in order to construct the zip file name - zipname = _get_zipname(platform) - - # Evaluate exes keyword - if exes is not None: - download_dir = os.path.join(".", "download_dir") - if isinstance(exes, str): - exes = tuple(exes) - elif isinstance(exes, (int, float)): - msg = "exes keyword must be a string or a list/tuple of strings" - raise TypeError(msg) - - # Determine path for file download and then download and unzip - if version is None: - download_url = _get_default_url() + zipname - else: - assets = get_repo_assets( - github_repo=_get_default_repo(), version=version, verify=verify - ) - download_url = assets[zipname] - download_and_unzip( - download_url, - download_dir, - verbose=verbose, - verify=verify, - ) - - if exes is not None: - # make sure pth exists - if not os.path.exists(pth): - if verbose: - print(f"Creating the directory:\n {pth}") - os.makedirs(pth) - - # move select files to pth - for f in os.listdir(download_dir): - src = os.path.join(download_dir, f) - dst = os.path.join(pth, f) - for exe in exes: - if exe in f: - shutil.move(src, dst) - break - - # remove the download directory - if os.path.isdir(download_dir): - if verbose: - print("Removing folder " + download_dir) - shutil.rmtree(download_dir) - - return - - -def getmfnightly( - pth=".", - platform=None, - exes=None, - verbose=False, - verify=True, -): - """Get the latest MODFLOW 6 binary nightly-build executables from github - (https://github.com/MODFLOW-USGS/modflow6-nightly-build/) for the specified - operating system and put them in the specified path. - - Parameters - ---------- - pth : str - Location to put the executables (default is current working directory) - platform : str - Platform that will run the executables. Valid values include mac, - linux, win32 and win64. If platform is None, then routine will - download the latest asset from the github repository. - exes : str or list of strings - executable or list of executables to retain - verbose : bool - boolean indicating if output will be printed to the terminal - verify : bool - boolean indicating if the url request should be verified - - """ - # set download directory to path in case a selection of files - download_dir = pth - - # Determine the platform in order to construct the zip file name - zipname = _get_zipname(platform) - - # Evaluate exes keyword - if exes is not None: - download_dir = os.path.join(".", "download_dir") - if isinstance(exes, str): - exes = tuple(exes) - elif isinstance(exes, (int, float)): - msg = "exes keyword must be a string or a list/tuple of strings" - raise TypeError(msg) - - # Determine path for file download and then download and unzip - # https://github.com/MODFLOW-USGS/modflow6-nightly-build/releases/latest/download/ - download_url = ( - "https://github.com/MODFLOW-USGS/" - + "modflow6-nightly-build/releases/latest/download/" - + zipname - ) - download_and_unzip( - download_url, - download_dir, - verbose=verbose, - verify=verify, - ) - - if exes is not None: - # make sure pth exists - if not os.path.exists(pth): - if verbose: - print(f"Creating the directory:\n {pth}") - os.makedirs(pth) - - # move select files to pth - for f in os.listdir(download_dir): - src = os.path.join(download_dir, f) - dst = os.path.join(pth, f) - for exe in exes: - if exe in f: - shutil.move(src, dst) - break - - # remove the download directory - if os.path.isdir(download_dir): - if verbose: - print("Removing folder " + download_dir) - shutil.rmtree(download_dir) diff --git a/modflow_devtools/misc.py b/modflow_devtools/misc.py index 151231ae..6731797c 100644 --- a/modflow_devtools/misc.py +++ b/modflow_devtools/misc.py @@ -47,6 +47,17 @@ def __exit__(self, exc_type, exc_value, traceback): pass +def get_ostag() -> str: + """Determine operating system tag from sys.platform.""" + if sys.platform.startswith("linux"): + return "linux" + elif sys.platform.startswith("win"): + return "win" + ("64" if sys.maxsize > 2**32 else "32") + elif sys.platform.startswith("darwin"): + return "mac" + raise ValueError(f"platform {sys.platform!r} not supported") + + def get_suffixes(ostag) -> Tuple[str, str]: """Returns executable and library suffixes for the given OS (as returned by sys.platform)""" @@ -63,7 +74,11 @@ def get_suffixes(ostag) -> Tuple[str, str]: def run_cmd(*args, verbose=False, **kwargs): - """Run any command, return tuple (stdout, stderr, returncode).""" + """ + Run any command, return tuple (stdout, stderr, returncode). + + Originally written by Mike Toews (mwtoews@gmail.com) for FloPy. + """ args = [str(g) for g in args] if verbose: print("running: " + " ".join(args)) @@ -257,12 +272,22 @@ def is_github_rate_limited() -> Optional[bool]: def has_exe(exe): + """ + Determines if the given executable is available on the path. + + Originally written by Mike Toews (mwtoews@gmail.com) for FloPy. + """ if exe not in _has_exe_cache: _has_exe_cache[exe] = bool(which(exe)) return _has_exe_cache[exe] def has_pkg(pkg): + """ + Determines if the given Python package is installed. + + Originally written by Mike Toews (mwtoews@gmail.com) for FloPy. + """ if pkg not in _has_pkg_cache: # for some dependencies, package name and import name are different diff --git a/modflow_devtools/test/test_case.py b/modflow_devtools/test/test_case.py index 9d4c5143..88c27446 100644 --- a/modflow_devtools/test/test_case.py +++ b/modflow_devtools/test/test_case.py @@ -1,5 +1,6 @@ import pytest from modflow_devtools.case import Case +from pytest_cases import parametrize, parametrize_with_cases def test_requires_name(): @@ -31,3 +32,49 @@ def test_copy_update(): assert copy is not copy2 assert copy.foo == "bar" assert copy2.foo == "baz" + + +template = Case(name="QA") +cases = [ + template.copy_update( + name=template.name + "1", + question="What's the meaning of life, the universe, and everything?", + answer=42, + ), + template.copy_update( + name=template.name + "2", + question="Is a Case immutable?", + answer="No, but it's probably best not to mutate it.", + ), +] + + +@pytest.mark.parametrize("case", cases) +def test_cases(case): + assert len(cases) == 2 + assert cases[0] != cases[1] + + +gen_cases = [ + template.copy_update( + name=f"{template.name}{i}", question=f"Q{i}", answer=f"A{i}" + ) + for i in range(3) +] +info = ( + "cases can be modified further in the generator function," + " or the function may construct and return another object" +) + + +@parametrize(case=gen_cases, ids=[c.name for c in gen_cases]) +def qa_cases(case): + return case.copy_update(info=info) + + +@parametrize_with_cases("case", cases=".", prefix="qa_") +def test_qa(case): + assert "QA" in case.name + assert info == case.info + print(f"{case.name}:", f"{case.question}? {case.answer}") + print(case.info) diff --git a/modflow_devtools/test/test_download.py b/modflow_devtools/test/test_download.py index 4a5bf6c8..b3b82c52 100644 --- a/modflow_devtools/test/test_download.py +++ b/modflow_devtools/test/test_download.py @@ -1,8 +1,77 @@ import pytest from flaky import flaky -from modflow_devtools.download import download_and_unzip +from modflow_devtools.download import ( + download_and_unzip, + get_artifacts, + get_release, + get_releases, +) from modflow_devtools.markers import requires_github +_repos = [ + "MODFLOW-USGS/modflow6", + "MODFLOW-USGS/modflow6-nightly-build", + "MODFLOW-USGS/executables", +] + + +@pytest.mark.parametrize("per_page", [-1, 0, 101, 1000]) +def test_get_releases_bad_page_size(per_page): + with pytest.raises(ValueError): + get_releases("executables", per_page=per_page) + + +@flaky +@requires_github +@pytest.mark.parametrize("repo", _repos) +def test_get_releases(repo): + releases = get_releases(repo) + assert any(releases) + assert all("created_at" in r for r in releases) + + assets = [a for aa in [r["assets"] for r in releases] for a in aa] + assert all(repo in a["browser_download_url"] for a in assets) + + # test page size option + if repo == "MODFLOW-USGS/modflow6-nightly-build": + assert len(releases) == 30 # 30-day retention period + + +@flaky +@requires_github +@pytest.mark.parametrize("repo", _repos) +def test_get_release(repo): + tag = "latest" + release = get_release(repo, tag) + assets = release["assets"] + expected_names = ["linux.zip", "mac.zip", "win64.zip"] + actual_names = [asset["name"] for asset in assets] + + if repo == "MODFLOW-USGS/modflow6": + # can remove if modflow6 releases follow asset name conventions followed in executables and nightly build repos + assert set([a.rpartition("_")[2] for a in actual_names]) >= set( + [a for a in expected_names if not a.startswith("win")] + ) + else: + assert set(actual_names) >= set(expected_names) + + +@flaky +@requires_github +@pytest.mark.parametrize("name", [None, "rtd-files", "run-time-comparison"]) +@pytest.mark.parametrize("per_page", [None, 100]) +def test_get_artifacts(tmp_path, name, per_page): + artifacts = get_artifacts( + "MODFLOW-USGS/modflow6", + name=name, + quiet=False, + per_page=per_page, + max_pages=3, + ) + + if any(artifacts) and name: + assert all(name == a["name"] for a in artifacts) + @flaky @requires_github