diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 968f8da0e..e4ee512ba 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,7 +19,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip pytest pytest-timeout + python -m pip install -e .[dev] - name: Test with pytest run: | pytest -v diff --git a/CHANGELOG.md b/CHANGELOG.md index 018ace895..3a10a7c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,6 @@ # **Upcoming release** - -## XXX - -- XXX +## New feature +- #464 Improve autoimport code to use a sqllite3 database, cache all available modules quickly, search for names and produce import statements, sort import statements. # Release 1.0.0 diff --git a/docs/library.rst b/docs/library.rst index 60f6303e6..cd20e393e 100644 --- a/docs/library.rst +++ b/docs/library.rst @@ -836,6 +836,34 @@ returns the list of modules with the given global name. global name that starts with the given prefix. +There are currently two implementations of autoimport in rope, a deprecated +implementation that uses pickle-based storage +(rope.contrib.autoimport.pickle.AutoImport) and a new, experimental one that +uses sqlite3 database (rope.contrib.autoimport.sqlite.AutoImport). New and +existing integrations should migrate to the sqlite3 storage as the pickle-based +autoimport will be removed in the future. + +By default, the sqlite3-based only stores autoimport cache in an in-memory +sqlite3 database, you can make it write the import cache to persistent storage +by passing memory=False to AutoImport constructor. + +It must be closed when done with the ```AutoImport.close()``` method. + +AutoImport can search for a name from both modules and statements you can import from them. + +.. code-block:: python + + from rope.base.project import Project + from rope.contrib.autoimport import AutoImport + + project = Project("/path/to/project") + autoimport = AutoImport(project, memory=False) + autoimport.generate_resource_cache() # Generates a cache of the local modules, from the project you're working on + autoimport.generate_modules_cache() # Generates a cache of external modules + print(autoimport.search("Dict")) + autoimport.close() + + Cross-Project Refactorings -------------------------- diff --git a/rope/contrib/autoimport/__init__.py b/rope/contrib/autoimport/__init__.py new file mode 100644 index 000000000..0a3b59f1c --- /dev/null +++ b/rope/contrib/autoimport/__init__.py @@ -0,0 +1,8 @@ +"""AutoImport module for rope.""" +from .pickle import AutoImport as _PickleAutoImport +from .sqlite import AutoImport as _SqliteAutoImport + + +AutoImport = _PickleAutoImport + +__all__ = ["AutoImport"] diff --git a/rope/contrib/autoimport/defs.py b/rope/contrib/autoimport/defs.py new file mode 100644 index 000000000..51e31df2d --- /dev/null +++ b/rope/contrib/autoimport/defs.py @@ -0,0 +1,109 @@ +"""Definitions of types for the Autoimport program.""" +import pathlib +from enum import Enum +from typing import NamedTuple, Optional + + +class Source(Enum): + """Describes the source of the package, for sorting purposes.""" + + PROJECT = 0 # Obviously any project packages come first + MANUAL = 1 # Placeholder since Autoimport classifies manually added modules + BUILTIN = 2 + STANDARD = 3 # We want to favor standard library items + SITE_PACKAGE = 4 + UNKNOWN = 5 + + # modified_time + + +class ModuleInfo(NamedTuple): + """Descriptor of information to get names from a module.""" + + filepath: Optional[pathlib.Path] + modname: str + underlined: bool + process_imports: bool = False + + +class ModuleFile(ModuleInfo): + """Descriptor of information to get names from a file using ast.""" + + filepath: pathlib.Path + modname: str + underlined: bool + process_imports: bool = False + + +class ModuleCompiled(ModuleInfo): + """Descriptor of information to get names using imports.""" + + filepath = None + modname: str + underlined: bool + process_imports: bool = False + + +class PackageType(Enum): + """Describes the type of package, to determine how to get the names from it.""" + + BUILTIN = 0 # No file exists, compiled into python. IE: Sys + STANDARD = 1 # Just a folder + COMPILED = 2 # .so module + SINGLE_FILE = 3 # a .py file + + +class NameType(Enum): + """Describes the type of Name for lsp completions. Taken from python lsp server.""" + + Text = 1 + Method = 2 + Function = 3 + Constructor = 4 + Field = 5 + Variable = 6 + Class = 7 + Interface = 8 + Module = 9 + Property = 10 + Unit = 11 + Value = 12 + Enum = 13 + Keyword = 14 + Snippet = 15 + Color = 16 + File = 17 + Reference = 18 + Folder = 19 + EnumMember = 20 + Constant = 21 + Struct = 22 + Event = 23 + Operator = 24 + TypeParameter = 25 + + +class Package(NamedTuple): + """Attributes of a package.""" + + name: str + source: Source + path: Optional[pathlib.Path] + type: PackageType + + +class Name(NamedTuple): + """A Name to be added to the database.""" + + name: str + modname: str + package: str + source: Source + name_type: NameType + + +class PartialName(NamedTuple): + """Partial information of a Name.""" + + name: str + name_type: NameType diff --git a/rope/contrib/autoimport/parse.py b/rope/contrib/autoimport/parse.py new file mode 100644 index 000000000..63dab52ec --- /dev/null +++ b/rope/contrib/autoimport/parse.py @@ -0,0 +1,137 @@ +""" +Functions to find importable names. + +Can extract names from source code of a python file, .so object, or builtin module. +""" + +import ast +import inspect +import logging +import pathlib +from importlib import import_module +from typing import Generator, List + +from .defs import ( + ModuleCompiled, + ModuleFile, + ModuleInfo, + Name, + NameType, + Package, + PartialName, + Source, +) + +logger = logging.getLogger(__name__) + + +def get_type_ast(node: ast.AST) -> NameType: + """Get the lsp type of a node.""" + if isinstance(node, ast.ClassDef): + return NameType.Class + if isinstance(node, ast.FunctionDef): + return NameType.Function + if isinstance(node, ast.Assign): + return NameType.Variable + return NameType.Text # default value + + +def get_names_from_file( + module: pathlib.Path, + underlined: bool = False, +) -> Generator[PartialName, None, None]: + """Get all the names from a given file using ast.""" + + try: + root_node = ast.parse(module.read_bytes()) + except SyntaxError as error: + print(error) + return + for node in ast.iter_child_nodes(root_node): + if isinstance(node, ast.Assign): + for target in node.targets: + try: + assert isinstance(target, ast.Name) + if underlined or not target.id.startswith("_"): + yield PartialName( + target.id, + get_type_ast(node), + ) + except (AttributeError, AssertionError): + # TODO handle tuple assignment + pass + elif isinstance(node, (ast.FunctionDef, ast.ClassDef)): + if underlined or not node.name.startswith("_"): + yield PartialName( + node.name, + get_type_ast(node), + ) + + +def get_type_object(imported_object) -> NameType: + """Determine the type of an object.""" + if inspect.isclass(imported_object): + return NameType.Class + if inspect.isfunction(imported_object) or inspect.isbuiltin(imported_object): + return NameType.Function + return NameType.Constant + + +def get_names(module: ModuleInfo, package: Package) -> List[Name]: + """Get all names from a module and package.""" + if isinstance(module, ModuleCompiled): + return list( + get_names_from_compiled(package.name, package.source, module.underlined) + ) + if isinstance(module, ModuleFile): + return [ + combine(package, module, partial_name) + for partial_name in get_names_from_file(module.filepath, module.underlined) + ] + return [] + + +def get_names_from_compiled( + package: str, + source: Source, + underlined: bool = False, +) -> Generator[Name, None, None]: + """ + Get the names from a compiled module. + + Instead of using ast, it imports the module. + Parameters + ---------- + package : str + package to import. Must be in sys.path + underlined : bool + include underlined names + """ + # builtins is banned because you never have to import it + # python_crun is banned because it crashes python + banned = ["builtins", "python_crun"] + if package in banned or (package.startswith("_") and not underlined): + return # Builtins is redundant since you don't have to import it. + if source not in (Source.BUILTIN, Source.STANDARD): + return + try: + module = import_module(str(package)) + except ImportError: + logger.error(f"{package} could not be imported for autoimport analysis") + return + else: + for name, value in inspect.getmembers(module): + if underlined or not name.startswith("_"): + if ( + inspect.isclass(value) + or inspect.isfunction(value) + or inspect.isbuiltin(value) + ): + yield Name( + str(name), package, package, source, get_type_object(value) + ) + + +def combine(package: Package, module: ModuleFile, name: PartialName) -> Name: + """Combine information to form a full name.""" + return Name(name.name, module.modname, package.name, package.source, name.name_type) diff --git a/rope/contrib/autoimport.py b/rope/contrib/autoimport/pickle.py similarity index 94% rename from rope/contrib/autoimport.py rename to rope/contrib/autoimport/pickle.py index 4d6890e2a..bd617be7c 100644 --- a/rope/contrib/autoimport.py +++ b/rope/contrib/autoimport/pickle.py @@ -1,3 +1,15 @@ +""" +IMPORTANT: This is a deprecated implementation of autoimport using pickle-based +storage. + +This pickle-based autoimport is provided only for backwards compatibility +purpose and will be removed and the sqlite backend will be the new default +implementation in the future. + +If you are still using this module, you should migrate to the new and improved +sqlite-based storage backend (rope.contrib.autoimport.sqlite.AutoImport). +""" + import re from rope.base import builtins diff --git a/rope/contrib/autoimport/sqlite.py b/rope/contrib/autoimport/sqlite.py new file mode 100644 index 000000000..66e07520f --- /dev/null +++ b/rope/contrib/autoimport/sqlite.py @@ -0,0 +1,500 @@ +"""AutoImport module for rope.""" +import pathlib +import re +import sqlite3 +import sys +from collections import OrderedDict +from concurrent.futures import Future, ProcessPoolExecutor, as_completed +from itertools import chain +from typing import Generator, Iterable, List, Optional, Tuple + +from rope.base import exceptions, libutils, resourceobserver, taskhandle +from rope.base.project import Project +from rope.base.resources import Resource +from rope.contrib.autoimport.defs import ModuleFile, Name, Package, PackageType, Source +from rope.contrib.autoimport.parse import get_names +from rope.contrib.autoimport.utils import ( + get_files, + get_modname_from_path, + get_package_tuple, + sort_and_deduplicate, + sort_and_deduplicate_tuple, +) +from rope.refactor import importutils + + +def get_future_names( + packages: List[Package], underlined: bool, job_set: taskhandle.JobSet +) -> Generator[Future, None, None]: + """Get all names as futures.""" + with ProcessPoolExecutor() as executor: + for package in packages: + for module in get_files(package, underlined): + job_set.started_job(module.modname) + if not isinstance(job_set, taskhandle.NullJobSet): + job_set.count += 1 + yield executor.submit(get_names, module, package) + + +def filter_packages( + packages: Iterable[Package], underlined: bool, existing: List[str] +) -> Iterable[Package]: + """Filter list of packages to parse.""" + if underlined: + + def filter_package(package: Package) -> bool: + return package.name not in existing + + else: + + def filter_package(package: Package) -> bool: + return package.name not in existing and not package.name.startswith("_") + + return filter(filter_package, packages) + + +class AutoImport: + """A class for finding the module that provides a name. + + This class maintains a cache of global names in python modules. + Note that this cache is not accurate and might be out of date. + + """ + + connection: sqlite3.Connection + underlined: bool + project: Project + + def __init__(self, project, observe=True, underlined=False, memory=True): + """Construct an AutoImport object. + + Parameters + ___________ + project : rope.base.project.Project + the project to use for project imports + observe : bool + if true, listen for project changes and update the cache. + underlined : bool + If `underlined` is `True`, underlined names are cached, too. + memory : bool + if true, don't persist to disk + """ + self.project = project + self.underlined = underlined + db_path: str + if memory or project.ropefolder is None: + db_path = ":memory:" + else: + db_path = f"{project.ropefolder.path}/autoimport.db" + self.connection = sqlite3.connect(db_path) + self._setup_db() + self._check_all() + if observe: + observer = resourceobserver.ResourceObserver( + changed=self._changed, moved=self._moved, removed=self._removed + ) + project.add_observer(observer) + + def _setup_db(self): + packages_table = "(package TEXT)" + names_table = ( + "(name TEXT, module TEXT, package TEXT, source INTEGER, type INTEGER)" + ) + self.connection.execute(f"create table if not exists names{names_table}") + self.connection.execute(f"create table if not exists packages{packages_table}") + self.connection.commit() + + def import_assist(self, starting: str): + """ + Find modules that have a global name that starts with `starting`. + + Parameters + __________ + starting : str + what all the names should start with + Return + __________ + Return a list of ``(name, module)`` tuples + """ + results = self.connection.execute( + "select name, module, source from names WHERE name LIKE (?)", + (starting + "%",), + ).fetchall() + for result in results: + if not self._check_import(result[1]): + del results[result] + return sort_and_deduplicate_tuple( + results + ) # Remove duplicates from multiple occurences of the same item + + def search(self, name: str, exact_match: bool = False) -> List[Tuple[str, str]]: + """ + Search both modules and names for an import string. + + Returns list of import statement ,modname pairs + """ + if not exact_match: + name = name + "%" # Makes the query a starts_with query + results: List[Tuple[str, str, int]] = [] + for import_name, module, source in self.connection.execute( + "SELECT name, module, source FROM names WHERE name LIKE (?)", (name,) + ): + results.append((f"from {module} import {import_name}", import_name, source)) + for module, source in self.connection.execute( + "Select module, source FROM names where module LIKE (?)", ("%." + name,) + ): + parts = module.split(".") + import_name = parts[-1] + remaining = parts[0] + for part in parts[1:-1]: + remaining += "." + remaining += part + results.append( + (f"from {remaining} import {import_name}", import_name, source) + ) + for module, source in self.connection.execute( + "Select module, source from names where module LIKE (?)", (name,) + ): + results.append((f"import {module}", module, source)) + return sort_and_deduplicate_tuple(results) + + def lsp_search( + self, name: str, exact_match: bool = False + ) -> Tuple[List[Tuple[str, str, int, int]], List[Tuple[str, str, int, int]]]: + """ + Search both modules and names for an import string. + + Returns the name, import statement, source, split into normal names and modules. + """ + if not exact_match: + name = name + "%" # Makes the query a starts_with query + results_name: List[Tuple[str, str, int, int]] = [] + results_module: List[Tuple[str, str, int, int]] = [] + for import_name, module, source, name_type in self.connection.execute( + "SELECT name, module, source, type FROM names WHERE name LIKE (?)", (name,) + ): + results_name.append( + (f"from {module} import {import_name}", import_name, source, name_type) + ) + for module, source, name_type in self.connection.execute( + "Select module, source, type FROM names where module LIKE (?)", + ("%." + name,), + ): + parts = module.split(".") + import_name = parts[-1] + remaining = parts[0] + for part in parts[1:-1]: + remaining += "." + remaining += part + results_module.append( + ( + f"from {remaining} import {import_name}", + import_name, + source, + name_type, + ) + ) + for module, source, name_type in self.connection.execute( + "Select module, source, type from names where module LIKE (?)", (name,) + ): + results_module.append((f"import {module}", module, source, name_type)) + return results_name, results_module + + def get_modules(self, name) -> List[str]: + """Get the list of modules that have global `name`.""" + results = self.connection.execute( + "SELECT module, source FROM names WHERE name LIKE (?)", (name,) + ).fetchall() + for result in results: + if not self._check_import(result[0]): + del results[result] + return sort_and_deduplicate(results) + + def get_all_names(self) -> List[str]: + """Get the list of all cached global names.""" + self._check_all() + results = self.connection.execute("select name from names").fetchall() + return results + + def _dump_all(self) -> Tuple[List[Name], List[Package]]: + """Dump the entire database.""" + self._check_all() + name_results = self.connection.execute("select * from names").fetchall() + package_results = self.connection.execute("select * from packages").fetchall() + return name_results, package_results + + def generate_cache( + self, + resources: List[Resource] = None, + underlined: bool = False, + task_handle=taskhandle.NullTaskHandle(), + ): + """Generate global name cache for project files. + + If `resources` is a list of `rope.base.resource.File`, only + those files are searched; otherwise all python modules in the + project are cached. + """ + if resources is None: + resources = self.project.get_python_files() + job_set = task_handle.create_jobset( + "Generating autoimport cache", len(resources) + ) + # Should be very fast, so doesn't need multithreaded computation + for file in resources: + job_set.started_job(f"Working on {file.path}") + self.update_resource(file, underlined, commit=False) + job_set.finished_job() + self.connection.commit() + + def generate_modules_cache( + self, + modules: List[str] = None, + task_handle=taskhandle.NullTaskHandle(), + single_thread: bool = False, + underlined: bool = False, + ): + """ + Generate global name cache for external modules listed in `modules`. + + If no modules are provided, it will generate a cache for every module avalible. + This method searches in your sys.path and configured python folders. + Do not use this for generating your own project's internal names, + use generate_resource_cache for that instead. + """ + packages: List[Package] = [] + if self.underlined: + underlined = True + existing = self._get_existing() + if modules is None: + packages = self._get_available_packages() + else: + for modname in modules: + package = self._find_package_path(modname) + if package is None: + continue + packages.append(package) + packages = list(filter_packages(packages, underlined, existing)) + self._add_packages(packages) + job_set = task_handle.create_jobset("Generating autoimport cache", 0) + if single_thread: + for package in packages: + for module in get_files(package, underlined): + job_set.started_job(module.modname) + for name in get_names(module, package): + self._add_name(name) + job_set.finished_job() + else: + for future_name in as_completed( + get_future_names(packages, underlined, job_set) + ): + self._add_names(future_name.result()) + job_set.finished_job() + + self.connection.commit() + + def update_module(self, module: str): + """Update a module in the cache, or add it if it doesn't exist.""" + self._del_if_exist(module) + self.generate_modules_cache([module]) + + def close(self): + """Close the autoimport database.""" + self.connection.commit() + self.connection.close() + + def get_name_locations(self, name): + """Return a list of ``(resource, lineno)`` tuples.""" + result = [] + modules = self.connection.execute( + "select module from names where name like (?)", (name,) + ).fetchall() + for module in modules: + try: + module_name = module[0] + if module_name.startswith(f"{self._project_name}."): + module_name = ".".join(module_name.split(".")) + pymodule = self.project.get_module(module_name) + if name in pymodule: + pyname = pymodule[name] + module, lineno = pyname.get_definition_location() + if module is not None: + resource = module.get_module().get_resource() + if resource is not None and lineno is not None: + result.append((resource, lineno)) + except exceptions.ModuleNotFoundError: + pass + return result + + def clear_cache(self): + """Clear all entries in global-name cache. + + It might be a good idea to use this function before + regenerating global names. + + """ + self.connection.execute("drop table names") + self._setup_db() + self.connection.commit() + + def find_insertion_line(self, code): + """Guess at what line the new import should be inserted.""" + match = re.search(r"^(def|class)\s+", code) + if match is not None: + code = code[: match.start()] + try: + pymodule = libutils.get_string_module(self.project, code) + except exceptions.ModuleSyntaxError: + return 1 + testmodname = "__rope_testmodule_rope" + importinfo = importutils.NormalImport(((testmodname, None),)) + module_imports = importutils.get_module_imports(self.project, pymodule) + module_imports.add_import(importinfo) + code = module_imports.get_changed_source() + offset = code.index(testmodname) + lineno = code.count("\n", 0, offset) + 1 + return lineno + + def update_resource( + self, resource: Resource, underlined: bool = False, commit: bool = True + ): + """Update the cache for global names in `resource`.""" + underlined = underlined if underlined else self.underlined + package = get_package_tuple(self._project_path, self.project) + if package is None or package.path is None: + return + resource_path: pathlib.Path = pathlib.Path(resource.real_path) + # The project doesn't need its name added to the path, + # since the standard python file layout accounts for that + # so we set add_package_name to False + resource_modname: str = get_modname_from_path( + resource_path, package.path, add_package_name=False + ) + module = ModuleFile(resource_path, resource_modname, underlined) + self._del_if_exist(module_name=resource_modname, commit=False) + for name in get_names(module, package): + self._add_name(name) + if commit: + self.connection.commit() + + def _changed(self, resource): + if not resource.is_folder(): + self.update_resource(resource) + + def _moved(self, resource: Resource, newresource: Resource): + if not resource.is_folder(): + modname = self._modname(resource) + self._del_if_exist(modname) + self.update_resource(newresource) + + def _del_if_exist(self, module_name, commit: bool = True): + self.connection.execute("delete from names where module = ?", (module_name,)) + if commit: + self.connection.commit() + + def _get_python_folders(self) -> List[pathlib.Path]: + folders = self.project.get_python_path_folders() + folder_paths = [ + pathlib.Path(folder.path) for folder in folders if folder.path != "/usr/bin" + ] + return list(OrderedDict.fromkeys(folder_paths)) + + def _get_available_packages(self) -> List[Package]: + packages: List[Package] = [ + Package(module, Source.BUILTIN, None, PackageType.BUILTIN) + for module in sys.builtin_module_names + ] + for folder in self._get_python_folders(): + for package in folder.iterdir(): + package_tuple = get_package_tuple(package, self.project) + if package_tuple is None: + continue + packages.append(package_tuple) + return packages + + def _add_packages(self, packages: List[Package]): + for package in packages: + self.connection.execute("INSERT into packages values(?)", (package.name,)) + + def _get_existing(self) -> List[str]: + existing: List[str] = list( + chain(*self.connection.execute("select * from packages").fetchall()) + ) + existing.append(self._project_name) + return existing + + @property + def _project_name(self): + package_path: pathlib.Path = pathlib.Path(self.project.address) + package_tuple = get_package_tuple(package_path) + if package_tuple is None: + return None + return package_tuple[0] + + @property + def _project_path(self): + return pathlib.Path(self.project.address) + + def _modname(self, resource: Resource): + resource_path: pathlib.Path = pathlib.Path(resource.real_path) + package_path: pathlib.Path = pathlib.Path(self.project.address) + resource_modname: str = get_modname_from_path( + resource_path, package_path, add_package_name=False + ) + return resource_modname + + def _removed(self, resource): + if not resource.is_folder(): + modname = self._modname(resource) + self._del_if_exist(modname) + + def _add_future_names(self, names: Future): + self._add_names(names.result()) + + def _add_names(self, names: Iterable[Name]): + for name in names: + self._add_name(name) + + def _add_name(self, name: Name): + self.connection.execute( + "insert into names values (?,?,?,?,?)", + ( + name.name, + name.modname, + name.package, + name.source.value, + name.name_type.value, + ), + ) + + def _check_import(self, module: pathlib.Path) -> bool: + """ + Check the ability to import an external package, removes it if not avalible. + + Parameters + ---------- + module: pathlib.path + The module to check + Returns + ---------- + """ + # Not Implemented Yet, silently will fail + return True + + def _check_all(self): + """Check all modules and removes bad ones.""" + pass + + def _find_package_path(self, target_name: str) -> Optional[Package]: + if target_name in sys.builtin_module_names: + return Package(target_name, Source.BUILTIN, None, PackageType.BUILTIN) + for folder in self._get_python_folders(): + for package in folder.iterdir(): + package_tuple = get_package_tuple(package, self.project) + if package_tuple is None: + continue + name, source, package_path, package_type = package_tuple + if name == target_name: + return package_tuple + + return None diff --git a/rope/contrib/autoimport/utils.py b/rope/contrib/autoimport/utils.py new file mode 100644 index 000000000..73234d83d --- /dev/null +++ b/rope/contrib/autoimport/utils.py @@ -0,0 +1,126 @@ +"""Utility functions for the autoimport code.""" +import pathlib +import sys +from collections import OrderedDict +from typing import Generator, List, Optional, Tuple + +from rope.base.project import Project + +from .defs import ModuleCompiled, ModuleFile, ModuleInfo, Package, PackageType, Source + + +def get_package_tuple( + package_path: pathlib.Path, project: Optional[Project] = None +) -> Optional[Package]: + """ + Get package name and type from a path. + + Checks for common issues, such as not being a viable python module + Returns None if not a viable package. + """ + package_name = package_path.name + package_source = get_package_source(package_path, project) + if package_name.startswith(".") or package_name == "__pycache__": + return None + if package_path.is_file(): + if package_name.endswith(".so"): + name = package_name.split(".")[0] + return Package(name, package_source, package_path, PackageType.COMPILED) + if package_name.endswith(".py"): + stripped_name = package_path.stem + return Package( + stripped_name, package_source, package_path, PackageType.SINGLE_FILE + ) + return None + if package_name.endswith((".egg-info", ".dist-info")): + return None + return Package(package_name, package_source, package_path, PackageType.STANDARD) + + +def get_package_source( + package: pathlib.Path, project: Optional[Project] = None +) -> Source: + """Detect the source of a given package. Rudimentary implementation.""" + if project is not None and project.address in str(package): + return Source.PROJECT + if "site-packages" in package.parts: + return Source.SITE_PACKAGE + if package.as_posix().startswith(sys.prefix): + return Source.STANDARD + return Source.UNKNOWN + + +def get_modname_from_path( + modpath: pathlib.Path, package_path: pathlib.Path, add_package_name: bool = True +) -> str: + """Get module name from a path in respect to package.""" + package_name: str = package_path.stem + rel_path_parts = modpath.relative_to(package_path).parts + modname = "" + if len(rel_path_parts) > 0: + for part in rel_path_parts[:-1]: + modname += part + modname += "." + if rel_path_parts[-1] == "__init__": + modname = modname[:-1] + else: + modname = modname + modpath.stem + if add_package_name: + modname = package_name if modname == "" else package_name + "." + modname + else: + assert modname != "." + return modname + + +def sort_and_deduplicate(results: List[Tuple[str, int]]) -> List[str]: + """Sort and deduplicate a list of name, source entries.""" + results = sorted(results, key=lambda y: y[-1]) + results_sorted = [name for name, source in results] + return list(OrderedDict.fromkeys(results_sorted)) + + +def sort_and_deduplicate_tuple( + results: List[Tuple[str, str, int]] +) -> List[Tuple[str, str]]: + """Sort and deduplicate a list of name, module, source entries.""" + results = sorted(results, key=lambda y: y[-1]) + results_sorted = [result[:-1] for result in results] + return list(OrderedDict.fromkeys(results_sorted)) + + +def should_parse(path: pathlib.Path, underlined: bool) -> bool: + if underlined: + return True + for part in path.parts: + if part.startswith("_"): + return False + return True + + +def get_files( + package: Package, underlined: bool = False +) -> Generator[ModuleInfo, None, None]: + """Find all files to parse in a given path using __init__.py.""" + if package.type in (PackageType.COMPILED, PackageType.BUILTIN): + if package.source in (Source.STANDARD, Source.BUILTIN): + yield ModuleCompiled(None, package.name, underlined, process_imports=True) + elif package.type == PackageType.SINGLE_FILE: + assert package.path + assert package.path.suffix == ".py" + yield ModuleFile(package.path, package.path.stem, underlined) + else: + assert package.path + for file in package.path.glob("*.py"): + if file.name == "__init__.py": + yield ModuleFile( + file, + get_modname_from_path(file.parent, package.path), + underlined, + process_imports=True, + ) + elif should_parse(file, underlined): + yield ModuleFile( + file, + get_modname_from_path(file, package.path), + underlined, + ) diff --git a/ropetest/contrib/autoimport/conftest.py b/ropetest/contrib/autoimport/conftest.py new file mode 100644 index 000000000..7e7044032 --- /dev/null +++ b/ropetest/contrib/autoimport/conftest.py @@ -0,0 +1,60 @@ +import pathlib + +import pytest + +from ropetest import testutils + + +@pytest.fixture +def project(): + project = testutils.sample_project() + yield project + testutils.remove_project(project) + + +@pytest.fixture +def mod1(project): + mod1 = testutils.create_module(project, "mod1") + yield mod1 + + +@pytest.fixture +def mod1_path(mod1): + yield pathlib.Path(mod1.real_path) + + +@pytest.fixture +def project_path(project): + yield pathlib.Path(project.address) + + +@pytest.fixture +def typing_path(): + import typing + + yield pathlib.Path(typing.__file__) + + + + +@pytest.fixture +def build_env_path(): + from build import env + + yield pathlib.Path(env.__file__) + + +@pytest.fixture +def build_path(): + import build + + # Uses __init__.py so we need the parent + + yield pathlib.Path(build.__file__).parent + + +@pytest.fixture +def zlib_path(): + import zlib + + yield pathlib.Path(zlib.__file__) diff --git a/ropetest/contrib/autoimport/parsetest.py b/ropetest/contrib/autoimport/parsetest.py new file mode 100644 index 000000000..2f59c91d3 --- /dev/null +++ b/ropetest/contrib/autoimport/parsetest.py @@ -0,0 +1,17 @@ +from rope.contrib.autoimport import parse +from rope.contrib.autoimport.defs import Name, NameType, PartialName, Source + + +def test_typing_names(typing_path): + names = list(parse.get_names_from_file(typing_path)) + assert PartialName("Text", NameType.Variable) in names + + +def test_find_sys(): + names = list(parse.get_names_from_compiled("sys", Source.BUILTIN)) + assert Name("exit", "sys", "sys", Source.BUILTIN, NameType.Function) in names + + +def test_find_underlined(): + names = list(parse.get_names_from_compiled("os", Source.BUILTIN, underlined=True)) + assert Name("_exit", "os", "os", Source.BUILTIN, NameType.Function) in names diff --git a/ropetest/contrib/autoimport/utilstest.py b/ropetest/contrib/autoimport/utilstest.py new file mode 100644 index 000000000..423c0e754 --- /dev/null +++ b/ropetest/contrib/autoimport/utilstest.py @@ -0,0 +1,59 @@ +"""Tests for autoimport utility functions, written in pytest""" +import pathlib + +from rope.contrib.autoimport import utils +from rope.contrib.autoimport.defs import Package, PackageType, Source + + +def test_get_package_source(mod1_path, project): + assert utils.get_package_source(mod1_path, project) == Source.PROJECT + + +def test_get_package_source_not_project(mod1_path): + assert utils.get_package_source(mod1_path) == Source.UNKNOWN + + +def test_get_package_source_pytest(build_path): + # pytest is not installed as part of the standard library + # but should be installed into site_packages, + # so it should return Source.SITE_PACKAGE + assert utils.get_package_source(build_path) == Source.SITE_PACKAGE + + +def test_get_package_source_typing(typing_path): + + assert utils.get_package_source(typing_path) == Source.STANDARD + + +def test_get_modname_project_no_add(mod1_path, project_path): + + assert utils.get_modname_from_path(mod1_path, project_path, False) == "mod1" + + +def test_get_modname_single_file(typing_path): + + assert utils.get_modname_from_path(typing_path, typing_path) == "typing" + + +def test_get_modname_folder(build_path, build_env_path): + + assert utils.get_modname_from_path(build_env_path, build_path) == "build.env" + + +def test_get_package_tuple_sample(project_path): + assert Package( + "sample_project", Source.UNKNOWN, project_path, PackageType.STANDARD + ) == utils.get_package_tuple(project_path) + + +def test_get_package_tuple_typing(typing_path): + + assert Package( + "typing", Source.STANDARD, typing_path, PackageType.SINGLE_FILE + ) == utils.get_package_tuple(typing_path) + + +def test_get_package_tuple_compiled(zlib_path): + assert Package( + "zlib", Source.STANDARD, zlib_path, PackageType.COMPILED + ) == utils.get_package_tuple(zlib_path) diff --git a/ropetest/contrib/autoimporttest.py b/ropetest/contrib/autoimporttest.py index 995eba6b9..8862bdc5a 100644 --- a/ropetest/contrib/autoimporttest.py +++ b/ropetest/contrib/autoimporttest.py @@ -3,8 +3,8 @@ except ImportError: import unittest +from rope.contrib.autoimport import sqlite as autoimport from ropetest import testutils -from rope.contrib import autoimport class AutoImportTest(unittest.TestCase): @@ -28,11 +28,6 @@ def test_update_resource(self): self.importer.update_resource(self.mod1) self.assertEqual([("myvar", "mod1")], self.importer.import_assist("myva")) - def test_update_module(self): - self.mod1.write("myvar = None") - self.importer.update_module("mod1") - self.assertEqual([("myvar", "mod1")], self.importer.import_assist("myva")) - def test_update_non_existent_module(self): self.importer.update_module("does_not_exists_this") self.assertEqual([], self.importer.import_assist("myva")) @@ -118,11 +113,39 @@ def test_name_locations_with_multiple_occurrences(self): def test_handling_builtin_modules(self): self.importer.update_module("sys") - self.assertTrue("sys" in self.importer.get_modules("exit")) - - def test_submodules(self): - self.assertEqual(set([self.mod1]), autoimport.submodules(self.mod1)) - self.assertEqual(set([self.mod2, self.pkg]), autoimport.submodules(self.pkg)) + self.assertIn("sys", self.importer.get_modules("exit")) + + def test_search_submodule(self): + self.importer.update_module("build") + import_statement = ("from build import env", "env") + self.assertIn(import_statement, self.importer.search("env", exact_match=True)) + self.assertIn(import_statement, self.importer.search("en")) + self.assertIn(import_statement, self.importer.search("env")) + + def test_search_module(self): + self.importer.update_module("os") + import_statement = ("import os", "os") + self.assertIn(import_statement, self.importer.search("os", exact_match=True)) + self.assertIn(import_statement, self.importer.search("os")) + self.assertIn(import_statement, self.importer.search("o")) + + def test_search(self): + self.importer.update_module("typing") + import_statement = ("from typing import Dict", "Dict") + self.assertIn(import_statement, self.importer.search("Dict", exact_match=True)) + self.assertIn(import_statement, self.importer.search("Dict")) + self.assertIn(import_statement, self.importer.search("Dic")) + self.assertIn(import_statement, self.importer.search("Di")) + self.assertIn(import_statement, self.importer.search("D")) + + def test_generate_full_cache(self): + """The single thread test takes much longer than the multithread test but is easier to debug""" + single_thread = False + self.importer.generate_modules_cache(single_thread=single_thread) + self.assertIn(("from typing import Dict", "Dict"), self.importer.search("Dict")) + self.assertTrue(len(self.importer._dump_all()) > 0) + for table in self.importer._dump_all(): + self.assertTrue(len(table) > 0) class AutoImportObservingTest(unittest.TestCase):