diff --git a/.gitignore b/.gitignore index 150e5c1..a66fc2a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ build/* dist/* env/* .vscode/ +.idea/ .ipynb_checkpoints/ config.json __pycache__/ diff --git a/README.md b/README.md index d78af49..a5da5a6 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,11 @@ Inspect4py currently works **only for Python 3 projects**. ## Background: +`inspect4py` added the functionality of capture [Data Flow Graphs](http://bears.ece.ucsb.edu/research-info/DP/dfg.html) for each function inspired by GraphCodeBERT: [Github](https://github.com/microsoft/CodeBERT) & [Paper](https://arxiv.org/abs/2009.08366). The illustration is given: +|Source Code|List Output|Networkx Image| +|:-:|:-:|:-:| +|
def max(a, b):
x = 0
if a > b:
x = a
else:
x = b
return x
|
('a', 3, 'comesFrom', [], [])
('b', 5, 'comesFrom', [], [])
('x', 8, 'computedFrom', ['0'], [10])
('0', 10, 'comesFrom', [], [])
('a', 12, 'comesFrom', ['a'], [3])
('b', 14, 'comesFrom', ['b'], [5])
('x', 16, 'computedFrom', ['a'], [18])
('a', 18, 'comesFrom', ['a'], [3])
('x', 21, 'computedFrom', ['b'], [23])
('b', 23, 'comesFrom', ['b'], [5])
('x', 25, 'comesFrom', ['x'], [16, 21])
|![image](docs/images/data_flow.png)| + `inspect4py` uses [ASTs](https://en.wikipedia.org/wiki/Abstract_syntax_tree), more specifically the [ast](https://docs.python.org/3/library/ast.html) module in Python, generating a tree of objects (per file) whose classes all inherit from [ast.AST](https://docs.python.org/3/library/ast.html#ast.AST). @@ -60,6 +65,12 @@ Please cite our MSR 2022 demo paper: ### Preliminaries +Make sure you have tree-sitter installed, C complier is needed, more [info](https://github.com/tree-sitter/tree-sitter): + +``` +pip install tree-sitter +``` + Make sure you have graphviz installed: ``` @@ -71,7 +82,7 @@ We have tested `inspect4py` in Python 3.7+. **Our recommended version is Python ### Operative System -We have tested `inspect4py` in Unix and MacOs. +We have tested `inspect4py` in Unix, MacOS and Windows 11(22621.1265). ### Installation from pypi `inspect4py` is [available in pypi!](https://pypi.org/project/inspect4py/) Just install it like a regular package: @@ -106,6 +117,9 @@ pigar setuptools==54.2.0 json2html configparser +bigcode_astgen +GitPython +tree-sitter ``` If you want to run the evaluations, do not forget to add `pandas` to the previous set. @@ -218,6 +232,8 @@ Options: -rm, --readme extract all readme files in the target repository. -md, --metadata extract metadata of the target repository using Github API. + -df, --data_flow extract data flow graph for every function, BOOL + -st, --symbol_table symbol table file location. STR --help Show this message and exit. ``` diff --git a/docs/images/data_flow.png b/docs/images/data_flow.png new file mode 100644 index 0000000..4e993a0 Binary files /dev/null and b/docs/images/data_flow.png differ diff --git a/inspect4py/__init__.py b/inspect4py/__init__.py index fa9c4ec..2792152 100644 --- a/inspect4py/__init__.py +++ b/inspect4py/__init__.py @@ -1 +1 @@ -__version__ = '0.0.6' +__version__ = '0.0.7' diff --git a/inspect4py/cli.py b/inspect4py/cli.py index 166b275..1890ec4 100644 --- a/inspect4py/cli.py +++ b/inspect4py/cli.py @@ -1,13 +1,16 @@ +import ast import json import tokenize import types import builtins import click from docstring_parser import parse as doc_parse +from tree_sitter import Language, Parser from inspect4py import __version__ from inspect4py.staticfg import builder from inspect4py.utils import * +# from utils import * """ Code Inspector @@ -26,7 +29,7 @@ class CodeInspection: - def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abstract_syntax_tree, source_code): + def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abstract_syntax_tree, source_code, data_flow, parser): """ init method initializes the Code_Inspection object :param self self: represent the instance of the class :param str path: the file to inspect @@ -41,6 +44,8 @@ def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abs self.out_json_path = out_json_path self.abstract_syntax_tree = abstract_syntax_tree self.source_code = source_code + self.data_flow = data_flow + self.parser = parser self.tree = self.parser_file() if self.tree != "AST_ERROR": self.nodes = self.walk() @@ -51,13 +56,14 @@ def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abs self.bodyInfo = self.inspect_body() if control_flow: self.out_control_flow_path = out_control_flow_path - self.controlFlowInfo = self.inspect_controlflow() + self.controlFlowInfo = self.inspect_controlflow("png") else: self.controlFlowInfo = {} self.fileJson = self.file_json() else: self.fileJson = {} + def find_classDef(self): classDef_nodes = [node for node in self.nodes if isinstance(node, ast.ClassDef)] class_init=[] @@ -466,6 +472,13 @@ def file_json(self): json.dump(prune_json(file_dict), outfile) return [file_dict, json_file] + # def get_parser_data_flow(self): + # parser = Parser() + # LANGUAGE = Language(self.symbol_table, "python") + # parser.set_language(LANGUAGE) + # parser = [parser, DFG_python] + # return parser + def _f_definitions(self, functions_definitions): """_f_definitions extracts the name, args, docstring returns, raises of a list of functions or a methods. @@ -477,11 +490,15 @@ def _f_definitions(self, functions_definitions): :param list functions_definitions: represent a list with all functions or methods nodes :return dictionary: a dictionary with the all the information at function/method level """ - + # print(functions_definitions) funcs_info = {} for f in functions_definitions: + # for node in ast.walk(f): + # print(node.name) + funcs_info[f.name] = {} ds_f = ast.get_docstring(f) + # print(ds_f) try: docstring = doc_parse(ds_f) funcs_info[f.name]["doc"] = {} @@ -577,7 +594,10 @@ def _f_definitions(self, functions_definitions): funcs_info[f.name]["ast"] = ast_to_json(f) if self.source_code: funcs_info[f.name]["source_code"] = ast_to_source_code(f) - + if self.data_flow: + code_tokens, dfg = extract_dataflow(funcs_info[f.name]["source_code"], self.parser, "python") + funcs_info[f.name]["data_flow"] = dfg + funcs_info[f.name]["code_tokens"] = code_tokens return funcs_info def _skip_dynamic_calls(self, funcs_info, classes_info, check_name, name, var_name): @@ -1204,6 +1224,7 @@ def create_output_dirs(output_dir, control_flow): @click.option('-i', '--input_path', type=str, required=True, help="input path of the file or directory to inspect.") @click.option('-o', '--output_dir', type=str, default="output_dir", help="output directory path to store results. If the directory does not exist, the tool will create it.") +@click.option('-st','--symbol_table', type=str, default="my_language.so", help="symbol table for the target function") @click.option('-ignore_dir', '--ignore_dir_pattern', multiple=True, default=[".", "__pycache__"], help="ignore directories starting with a certain pattern. This parameter can be provided multiple times " "to ignore multiple directory patterns.") @@ -1231,16 +1252,35 @@ def create_output_dirs(output_dir, control_flow): help="extract all readme files in the target repository.") @click.option('-md', '--metadata', type=bool, is_flag=True, help="extract metadata of the target repository using Github API. (requires repository to have the .git folder)") +@click.option('-df', '--data_flow', type=bool, is_flag=True, + help="extract data flow graph of every function in the target repository") + def main(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, html_output, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, source_code, license_detection, readme, - metadata): + metadata, data_flow, symbol_table): + if data_flow: + if symbol_table == "my_language.so": # default option + path_to_languages = str(Path(__file__).parent / "resources") + if sys.platform.startswith("win") or sys.platform.startswith("cygwin"): + language = Language(path_to_languages + os.path.sep + "python_win.so", "python") + else: + language = Language(path_to_languages + os.path.sep + "python_unix.so", "python") + else: + language = Language(symbol_table, "python") + parser = Parser() + parser.set_language(language) + parser = [parser, DFG_python] + else: + parser = [] + + # print(parsers) if (not os.path.isfile(input_path)) and (not os.path.isdir(input_path)): print('The file or directory specified does not exist') sys.exit() if os.path.isfile(input_path): cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) # Generate the call list of a file call_list_data = call_list_file(code_info) @@ -1279,18 +1319,20 @@ def main(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requir for f in files: if ".py" in f and not f.endswith(".pyc"): try: + path = os.path.join(subdir, f) relative_path = Path(subdir).relative_to(Path(input_path).parent) out_dir = str(Path(output_dir) / relative_path) cf_dir, json_dir = create_output_dirs(out_dir, control_flow) - code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) + # print(parsers) if code_info.fileJson: if out_dir not in dir_info: dir_info[out_dir] = [code_info.fileJson[0]] else: dir_info[out_dir].append(code_info.fileJson[0]) except: - print("Error when processing " + f + ": ", sys.exc_info()[0]) + print("Error when processing " + f + ": ", sys.exc_info()) continue # Generate the call list of the Dir @@ -1332,7 +1374,7 @@ def main(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requir dir_info["software_type"] = "not found" if license_detection: try: - licenses_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "licenses") + licenses_path = str(Path(__file__).parent / "licenses") license_text = extract_license(input_path) rank_list = detect_license(license_text, licenses_path) dir_info["license"] = {} diff --git a/inspect4py/parse_setup_files.py b/inspect4py/parse_setup_files.py index 593c99e..126130c 100644 --- a/inspect4py/parse_setup_files.py +++ b/inspect4py/parse_setup_files.py @@ -56,7 +56,9 @@ def parse_setup_py(parent_dir): if single_line: elem = setup_content[console_index] cs = elem.split("=") - cs_string = cs[0].strip().replace('\'', '').split('["')[1] + # print(cs) + # print(cs[1].strip()) + cs_string = cs[1].strip().replace('\'', '').split('["')[1] cs_list.append(normalize(cs_string)) setup_info["installation"] = "pip install " + cs_string setup_info["run"].append(cs_string) diff --git a/inspect4py/resources/python_unix.so b/inspect4py/resources/python_unix.so new file mode 100755 index 0000000..cf3d771 Binary files /dev/null and b/inspect4py/resources/python_unix.so differ diff --git a/inspect4py/resources/python_win.so b/inspect4py/resources/python_win.so new file mode 100644 index 0000000..921ee0b Binary files /dev/null and b/inspect4py/resources/python_win.so differ diff --git a/inspect4py/utils.py b/inspect4py/utils.py index 83115ea..6fbcb1d 100644 --- a/inspect4py/utils.py +++ b/inspect4py/utils.py @@ -55,7 +55,8 @@ def extract_directory_tree(input_path, ignore_dirs, ignore_files, visual=0): ignore_set = tuple(list(ignore_dirs) + list(ignore_files) + ignore_set) if visual: paths = DisplayablePath.make_tree(Path(input_path), criteria=lambda - path: True if path.name not in ignore_set and not os.path.join("../", path.name).endswith(".pyc") else False) + path: True if path.name not in ignore_set and not os.path.join("../", path.name).endswith( + ".pyc") else False) for path in paths: print(path.displayable()) return get_directory_structure(input_path, ignore_set) @@ -75,7 +76,7 @@ def prune_json(json_dict): else: for a, b in json_dict.items(): if a == "ast" and b: - final_dict[a] = b # Avoid pruning AST fields + final_dict[a] = b # Avoid pruning AST fields continue if b or isinstance(b, bool): if isinstance(b, dict): @@ -100,14 +101,13 @@ def extract_requirements(input_path): # Answering yes (echo y), we allow searching for PyPI # for the missing modules and filter some unnecessary modules. - - #print(sys.version_info) - if sys.version_info[0] <=3 and sys.version_info[1]<=9: + # print(sys.version_info) + if sys.version_info[0] <= 3 and sys.version_info[1] <= 9: cmd = 'echo y | pigar -P ' + input_path + ' -p ' + file_name else: cmd = ' pigar generate ' + input_path + ' -f ' + file_name + ' --question-answer yes --auto-select' - - #print("-----> cmd: %s" %cmd) + + # print("-----> cmd: %s" %cmd) proc = subprocess.Popen(cmd.encode('utf-8'), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() @@ -118,7 +118,7 @@ def extract_requirements(input_path): for line in lines: try: if line != "\n": - if " == " in line: + if " == " in line: splitLine = line.split(" == ") else: splitLine = line.split("==") @@ -128,8 +128,8 @@ def extract_requirements(input_path): # Note: Pigar requirement file is being deleted # in the future we might want to keep it (just commenting the line bellow) - #os.system('rm ' + file_name) - #print("Exracted requirements :%s" %req_dict) + # os.system('rm ' + file_name) + # print("Exracted requirements :%s" %req_dict) return req_dict except: @@ -175,7 +175,7 @@ def extract_software_invocation(dir_info, dir_tree_info, input_path, call_list, body_only_files = [] flag_service_main = 0 for key in dir_info: # filter (lambda key: key not in "directory_tree", dir_info): - if key!="requirements" and key!="directory_tree": # Note: We need to filter out directory_tree + if key != "requirements" and key != "directory_tree": # Note: We need to filter out directory_tree for elem in dir_info[key]: if elem["main_info"]["main_flag"]: flag_service_main = 0 @@ -188,7 +188,7 @@ def extract_software_invocation(dir_info, dir_tree_info, input_path, call_list, try: # 2. Exploration for services in files with "mains" flag_service, software_invocation_info = service_check(elem, software_invocation_info, - server_dependencies, "main", readme) + server_dependencies, "main", readme) except: main_files.append(elem["file"]["path"]) @@ -209,19 +209,19 @@ def extract_software_invocation(dir_info, dir_tree_info, input_path, call_list, # this list (of lists) stores the mains that each main import import_mains = [] - + # this list (of lists) stores the mains that each main is imported by - imported_by = [None]*len(main_files) + imported_by = [None] * len(main_files) # 3. Exploration for main scripts for m in range(0, len(main_files)): m_calls = find_file_calls(main_files[m], call_list) # HERE I STORE WHICH OTHER MAIN FILES CALLS EACH "M" MAIN_FILE m_imports = extract_relations(main_files[m], m_calls, main_files, call_list) - + # storing those m_imports in the import_mains[m] import_mains.append(m_imports) - + for m_i in m_imports: m_secondary[main_files.index(m_i)] = 1 @@ -286,7 +286,6 @@ def extract_software_invocation(dir_info, dir_tree_info, input_path, call_list, return software_invocation_info - def generate_output_html(pruned_json, output_file_html): """ Method to generate a simple HTML view of the obtained JSON. @@ -331,9 +330,9 @@ def list_functions_classes_from_module(m, path): type = "internal" except: - - #module = __import__(m) - #functions = dir(module) + + # module = __import__(m) + # functions = dir(module) type = "external" return functions, classes, type @@ -352,22 +351,22 @@ def type_module(m, i, path): return "internal" else: if m: - m = m.replace(".", "/") - file_module = abs_repo_path + "/" + m + ".py" - file_module_path = Path(file_module) - if file_module_path.is_file(): - return "internal" - else: - file_module = abs_repo_path + "/" + m + "/main.py" - file_module_path = Path(file_module) - if file_module_path.is_file(): - return "internal" - else: - return "external" + m = m.replace(".", "/") + file_module = abs_repo_path + "/" + m + ".py" + file_module_path = Path(file_module) + if file_module_path.is_file(): + return "internal" + else: + file_module = abs_repo_path + "/" + m + "/main.py" + file_module_path = Path(file_module) + if file_module_path.is_file(): + return "internal" + else: + return "external" else: dir_module = abs_repo_path + "/" + i if os.path.exists(dir_module): - return "internal" + return "internal" else: return "external" @@ -419,7 +418,8 @@ def call_list_dir(dir_info): call_list[dir][file_path]["body"] = extract_call_functions(file_info, body=1) call_list[dir][file_path]["classes"] = {} for class_n in file_info["classes"]: - call_list[dir][file_path]["classes"][class_n] = extract_call_methods(file_info["classes"][class_n]["methods"]) + call_list[dir][file_path]["classes"][class_n] = extract_call_methods( + file_info["classes"][class_n]["methods"]) return call_list @@ -433,8 +433,8 @@ def find_file_calls(file_name, call_list): def find_module_calls(module, call_list): for dir in call_list: for elem in call_list[dir]: - if "/"+module+"." in elem: - #print("---MODULE %s, elem %s, giving call_list[%s][%s]" %(module, elem, dir, elem)) + if "/" + module + "." in elem: + # print("---MODULE %s, elem %s, giving call_list[%s][%s]" %(module, elem, dir, elem)) return call_list[dir][elem] # DFS algorithm - Allowing up to 2 levels of depth. @@ -457,7 +457,7 @@ def file_in_call(base, call, file, m_imports, call_list, orig_base, level): elif orig_base in call: return 0 - elif level < level_depth and call!="": + elif level < level_depth and call != "": m_calls_extern = {} module_base = call.split(".")[0] module_base = module_base + "." @@ -522,7 +522,7 @@ def extract_relations(file_name, m_calls, main_files, call_list): level = 0 flag_found = extract_data(base, m_calls[m_c], file, m_imports, flag_found, call_list, orig_base, level) if flag_found: - #return m_imports + # return m_imports break return m_imports @@ -622,6 +622,7 @@ def rank_software_invocation(soft_invocation_info_list): entry["ranking"] = position return soft_invocation_info_list + def ast_to_json(ast_obj): """ Function to convert the AST object into JSON format. @@ -631,6 +632,7 @@ def ast_to_json(ast_obj): ast_generator.tree = ast_obj return ast_generator.generate_ast() + def ast_to_source_code(ast_obj): """ Function to convert the AST object into source code. @@ -650,8 +652,8 @@ def dice_coefficient(a, b): if len(b) == 1: b = b + u"." - a_bigrams = {a[i : i + 2] for i in range(len(a) - 1)} - b_bigrams = {b[i : i + 2] for i in range(len(b) - 1)} + a_bigrams = {a[i: i + 2] for i in range(len(a) - 1)} + b_bigrams = {b[i: i + 2] for i in range(len(b) - 1)} overlap = len(a_bigrams & b_bigrams) dice_coeff = overlap * 2.0 / (len(a_bigrams) + len(b_bigrams)) @@ -714,7 +716,7 @@ def detect_license(license_text, licenses_path, threshold=0.9): rank_list = [] for licen in os.listdir(licenses_path): - with open(os.path.join(licenses_path, licen), "r") as f: + with open(os.path.join(licenses_path, licen), "r", encoding='UTF-8') as f: parser = pattern.search(f.read()) if parser is None: continue @@ -727,6 +729,7 @@ def detect_license(license_text, licenses_path, threshold=0.9): return sorted(rank_list, key=lambda t: t[1], reverse=True) + def extract_readme(input_path: str, output_dir: str) -> dict: """ Function to extract content of all readme file under the input directory. @@ -744,6 +747,7 @@ def extract_readme(input_path: str, output_dir: str) -> dict: return readme_files + def get_github_metadata(input_path: str) -> dict: """ Function to extract metadata from the remote repository using Github api. @@ -775,7 +779,7 @@ def get_github_metadata(input_path: str) -> dict: def find_index_init(depInfo, calls, class_init): - index_remove=[] + index_remove = [] for dep in depInfo: if dep["type_element"] == "class": if dep["import"] in calls: @@ -787,15 +791,273 @@ def find_index_init(depInfo, calls, class_init): index_remove.append(calls.index(i)) return index_remove + def update_list_calls(info, index_remove): - updated_calls=[] + updated_calls = [] for i in range(0, len(info["calls"])): if i in index_remove: continue updated_calls.append(info["calls"][i]) - ### These lines are for removing duplicate calls + ### These lines are for removing duplicate calls res = [] - for i in updated_calls : + for i in updated_calls: if i not in res: res.append(i) return res + + +def tree_to_variable_index(root_node, index_to_code): + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + index = (root_node.start_point, root_node.end_point) + _, code = index_to_code[index] + if root_node.type != code: + return [(root_node.start_point, root_node.end_point)] + else: + return [] + else: + code_tokens = [] + for child in root_node.children: + code_tokens += tree_to_variable_index(child, index_to_code) + return code_tokens + + +def DFG_python(root_node, index_to_code, states): + assignment = ['assignment', 'augmented_assignment', 'for_in_clause'] + if_statement = ['if_statement'] + for_statement = ['for_statement'] + while_statement = ['while_statement'] + do_first_statement = ['for_in_clause'] + def_statement = ['default_parameter'] + states = states.copy() + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + idx, code = index_to_code[(root_node.start_point, root_node.end_point)] + if root_node.type == code: + return [], states + elif code in states: + return [(code, idx, 'comesFrom', [code], states[code].copy())], states + else: + if root_node.type == 'identifier': + states[code] = [idx] + return [(code, idx, 'comesFrom', [], [])], states + elif root_node.type in def_statement: + name = root_node.child_by_field_name('name') + value = root_node.child_by_field_name('value') + DFG = [] + if value is None: + indexs = tree_to_variable_index(name, index_to_code) + for index in indexs: + idx, code = index_to_code[index] + DFG.append((code, idx, 'comesFrom', [], [])) + states[code] = [idx] + return sorted(DFG, key=lambda x: x[1]), states + else: + name_indexs = tree_to_variable_index(name, index_to_code) + value_indexs = tree_to_variable_index(value, index_to_code) + temp, states = DFG_python(value, index_to_code, states) + DFG += temp + for index1 in name_indexs: + idx1, code1 = index_to_code[index1] + for index2 in value_indexs: + idx2, code2 = index_to_code[index2] + DFG.append((code1, idx1, 'comesFrom', [code2], [idx2])) + states[code1] = [idx1] + return sorted(DFG, key=lambda x: x[1]), states + elif root_node.type in assignment: + if root_node.type == 'for_in_clause': + right_nodes = [root_node.children[-1]] + left_nodes = [root_node.child_by_field_name('left')] + else: + if root_node.child_by_field_name('right') is None: + return [], states + left_nodes = [x for x in root_node.child_by_field_name('left').children if x.type != ','] + right_nodes = [x for x in root_node.child_by_field_name('right').children if x.type != ','] + if len(right_nodes) != len(left_nodes): + left_nodes = [root_node.child_by_field_name('left')] + right_nodes = [root_node.child_by_field_name('right')] + if len(left_nodes) == 0: + left_nodes = [root_node.child_by_field_name('left')] + if len(right_nodes) == 0: + right_nodes = [root_node.child_by_field_name('right')] + DFG = [] + for node in right_nodes: + temp, states = DFG_python(node, index_to_code, states) + DFG += temp + + for left_node, right_node in zip(left_nodes, right_nodes): + left_tokens_index = tree_to_variable_index(left_node, index_to_code) + right_tokens_index = tree_to_variable_index(right_node, index_to_code) + temp = [] + for token1_index in left_tokens_index: + idx1, code1 = index_to_code[token1_index] + temp.append((code1, idx1, 'computedFrom', [index_to_code[x][1] for x in right_tokens_index], + [index_to_code[x][0] for x in right_tokens_index])) + states[code1] = [idx1] + DFG += temp + return sorted(DFG, key=lambda x: x[1]), states + elif root_node.type in if_statement: + DFG = [] + current_states = states.copy() + others_states = [] + tag = False + if 'else' in root_node.type: + tag = True + for child in root_node.children: + if 'else' in child.type: + tag = True + if child.type not in ['elif_clause', 'else_clause']: + temp, current_states = DFG_python(child, index_to_code, current_states) + DFG += temp + else: + temp, new_states = DFG_python(child, index_to_code, states) + DFG += temp + others_states.append(new_states) + others_states.append(current_states) + if tag is False: + others_states.append(states) + new_states = {} + for dic in others_states: + for key in dic: + if key not in new_states: + new_states[key] = dic[key].copy() + else: + new_states[key] += dic[key] + for key in new_states: + new_states[key] = sorted(list(set(new_states[key]))) + return sorted(DFG, key=lambda x: x[1]), new_states + elif root_node.type in for_statement: + DFG = [] + for i in range(2): + right_nodes = [x for x in root_node.child_by_field_name('right').children if x.type != ','] + left_nodes = [x for x in root_node.child_by_field_name('left').children if x.type != ','] + if len(right_nodes) != len(left_nodes): + left_nodes = [root_node.child_by_field_name('left')] + right_nodes = [root_node.child_by_field_name('right')] + if len(left_nodes) == 0: + left_nodes = [root_node.child_by_field_name('left')] + if len(right_nodes) == 0: + right_nodes = [root_node.child_by_field_name('right')] + for node in right_nodes: + temp, states = DFG_python(node, index_to_code, states) + DFG += temp + for left_node, right_node in zip(left_nodes, right_nodes): + left_tokens_index = tree_to_variable_index(left_node, index_to_code) + right_tokens_index = tree_to_variable_index(right_node, index_to_code) + temp = [] + for token1_index in left_tokens_index: + idx1, code1 = index_to_code[token1_index] + temp.append((code1, idx1, 'computedFrom', [index_to_code[x][1] for x in right_tokens_index], + [index_to_code[x][0] for x in right_tokens_index])) + states[code1] = [idx1] + DFG += temp + if root_node.children[-1].type == "block": + temp, states = DFG_python(root_node.children[-1], index_to_code, states) + DFG += temp + dic = {} + for x in DFG: + if (x[0], x[1], x[2]) not in dic: + dic[(x[0], x[1], x[2])] = [x[3], x[4]] + else: + dic[(x[0], x[1], x[2])][0] = list(set(dic[(x[0], x[1], x[2])][0] + x[3])) + dic[(x[0], x[1], x[2])][1] = sorted(list(set(dic[(x[0], x[1], x[2])][1] + x[4]))) + DFG = [(x[0], x[1], x[2], y[0], y[1]) for x, y in sorted(dic.items(), key=lambda t: t[0][1])] + return sorted(DFG, key=lambda x: x[1]), states + elif root_node.type in while_statement: + DFG = [] + for i in range(2): + for child in root_node.children: + temp, states = DFG_python(child, index_to_code, states) + DFG += temp + dic = {} + for x in DFG: + if (x[0], x[1], x[2]) not in dic: + dic[(x[0], x[1], x[2])] = [x[3], x[4]] + else: + dic[(x[0], x[1], x[2])][0] = list(set(dic[(x[0], x[1], x[2])][0] + x[3])) + dic[(x[0], x[1], x[2])][1] = sorted(list(set(dic[(x[0], x[1], x[2])][1] + x[4]))) + DFG = [(x[0], x[1], x[2], y[0], y[1]) for x, y in sorted(dic.items(), key=lambda t: t[0][1])] + return sorted(DFG, key=lambda x: x[1]), states + else: + DFG = [] + for child in root_node.children: + if child.type in do_first_statement: + temp, states = DFG_python(child, index_to_code, states) + DFG += temp + for child in root_node.children: + if child.type not in do_first_statement: + temp, states = DFG_python(child, index_to_code, states) + DFG += temp + + return sorted(DFG, key=lambda x: x[1]), states + + +def tree_to_variable_index(root_node, index_to_code): + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + index = (root_node.start_point, root_node.end_point) + _, code = index_to_code[index] + if root_node.type != code: + return [(root_node.start_point, root_node.end_point)] + else: + return [] + else: + code_tokens = [] + for child in root_node.children: + code_tokens += tree_to_variable_index(child, index_to_code) + return code_tokens + + +def index_to_code_token(index, code): + start_point = index[0] + end_point = index[1] + if start_point[0] == end_point[0]: + s = code[start_point[0]][start_point[1]:end_point[1]] + else: + s = "" + s += code[start_point[0]][start_point[1]:] + for i in range(start_point[0] + 1, end_point[0]): + s += code[i] + s += code[end_point[0]][:end_point[1]] + return s + + +def tree_to_token_index(root_node): + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + return [(root_node.start_point, root_node.end_point)] + else: + code_tokens = [] + for child in root_node.children: + code_tokens += tree_to_token_index(child) + return code_tokens + + +def extract_dataflow(code, parser, lang): + # obtain dataflow + if lang == "php": + code = "" + try: + tree = parser[0].parse(bytes(code, 'utf8')) + root_node = tree.root_node + tokens_index = tree_to_token_index(root_node) + code = code.split('\n') + code_tokens = [index_to_code_token(x, code) for x in tokens_index] + index_to_code = {} + for idx, (index, code) in enumerate(zip(tokens_index, code_tokens)): + index_to_code[index] = (idx, code) + try: + DFG, _ = parser[1](root_node, index_to_code, {}) + except: + DFG = [] + DFG = sorted(DFG, key=lambda x: x[1]) + indexs = set() + for d in DFG: + if len(d[-1]) != 0: + indexs.add(d[1]) + for x in d[-1]: + indexs.add(x) + new_DFG = [] + for d in DFG: + if d[1] in indexs: + new_DFG.append(d) + dfg = new_DFG + except: + dfg = [] + return code_tokens, dfg diff --git a/pyproject.toml b/pyproject.toml index 882a3f9..af12a37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,5 +9,6 @@ requires = [ "setuptools==54.2.0", "json2html", "configparser", + "tree-sitter" ] build-backend = "setuptools.build_meta" diff --git a/requirements.txt b/requirements.txt index 82d2c73..28e5068 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ json2html configparser bigcode_astgen GitPython +tree-sitter diff --git a/test/test_files/test_data_flow.py b/test/test_files/test_data_flow.py new file mode 100644 index 0000000..8924a86 --- /dev/null +++ b/test/test_files/test_data_flow.py @@ -0,0 +1,8 @@ +def max(a, b): + x = 0 + if a > b: + x = a + else: + x = b + return x + diff --git a/test/test_inspect4py.py b/test/test_inspect4py.py index ce2ae50..72376f4 100644 --- a/test/test_inspect4py.py +++ b/test/test_inspect4py.py @@ -1,22 +1,27 @@ +import os.path import unittest -import json import shutil -import requests +from tree_sitter import Parser from inspect4py.cli import * -from inspect4py import cli, utils + +test_data_path = str(Path(__file__).parent / "test_files") + os.path.sep +test_out_path = str(Path(__file__).parent) class Test(unittest.TestCase): def test_call_list_super(self): dictionary = {'Rectangle': {}, 'Square': {'__init__': {'local': ['super_test.Rectangle.__init__']}}} - input_path = "./test_files/test_inheritance/super_test.py" - output_dir = "./output_dir" + input_path = test_data_path + "test_inheritance" + os.path.sep + "super_test.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False abstract_syntax_tree = False source_code = False + data_flow = False + parser = [] cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data["classes"]['Rectangle'] == dictionary['Rectangle']) @@ -31,65 +36,80 @@ def test_call_list_super_test_5(self): 'face_area': {'local': ['super_test_5.Rectangle.area']}, 'surface_area': {'local': ['super_test_5.Rectangle.area']}}} input_path = "./test_files/test_inheritance/super_test_5.py" - output_dir = "./output_dir" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False abstract_syntax_tree = False source_code = False + data_flow = False + parser = [] cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data['body'] == dictionary['body']) def test_call_list_nested(self): - dictionary = {'functions': {'test': {'local': ['nested_call.MyClass.func']}}, + dictionary = {'functions': {'test': {'local': ['nested_call.MyClass.func']}}, 'body': {'local': ['nested_call.test']}, 'classes': {'MyClass': { - 'func': {'local': ['nested_call.MyClass.func.nested'], 'nested': {'nested': {'local': ['print']}}}}}} - input_path = "./test_files/test_inheritance/nested_call.py" - output_dir = "./output_dir" + 'func': {'local': ['nested_call.MyClass.func.nested'], 'nested': {'nested': {'local': ['print']}}}}}} + input_path = test_data_path + os.path.sep + "test_inheritance" + os.path.sep +"nested_call.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) def test_call_list_super_nested(self): - dictionary = {'functions': {'func_d': {'local': ['super_nested_call.func_d.func_e'], - 'nested': {'func_e': {'local': ['print']}}}, - 'main': {'local': ['super_nested_call.MyClass.func_a', 'super_nested_call.func_d']}}, + dictionary = {'functions': {'func_d': {'local': ['super_nested_call.func_d.func_e'], + 'nested': {'func_e': {'local': ['print']}}}, + 'main': { + 'local': ['super_nested_call.MyClass.func_a', 'super_nested_call.func_d']}}, 'body': {'local': ['super_nested_call.main']}, - 'classes': {'MyClass': {'func_a': {'local': ['print', 'super_nested_call.MyClass.func_a.func_b'], - 'nested': {'func_b': {'local': ['print', 'super_nested_call.MyClass.func_a.func_b.func_c'], - 'nested': {'func_c': {'local': ['print']}}}}}}}} - input_path = "./test_files/test_inheritance/super_nested_call.py" - output_dir = "./output_dir" + 'classes': {'MyClass': {'func_a': {'local': ['print', 'super_nested_call.MyClass.func_a.func_b'], + 'nested': {'func_b': {'local': ['print', + 'super_nested_call.MyClass.func_a.func_b.func_c'], + 'nested': { + 'func_c': {'local': ['print']}}}}}}}} + input_path = test_data_path + os.path.sep + "test_inheritance" +os.path.sep +"super_nested_call.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] + abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) def test_call_list_import(self): dictionary = {'functions': {'funct_D': {'local': ['print', 'test_functions.funct_A']}}, 'body': { - 'local': ['test_functions.funct_A', 'test_import.funct_D']}, - 'classes': {'MyClass_D': {'__init__': {'local': ['print', 'test_functions.funct_C', 'test_import.funct_D']}}, - 'MyClass_E': {'__init__': {'local': ['print']}}}} - input_path = "./test_files/test_inheritance/test_import.py" - output_dir = "./output_dir" + 'local': ['test_functions.funct_A', 'test_import.funct_D']}, + 'classes': {'MyClass_D': { + '__init__': {'local': ['print', 'test_functions.funct_C', 'test_import.funct_D']}}, + 'MyClass_E': {'__init__': {'local': ['print']}}}} + input_path = test_data_path + os.path.sep + "test_inheritance" + os.path.sep + "test_import.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] + abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -97,14 +117,16 @@ def test_call_list_import(self): def test_call_list_external_module(self): dictionary = {'body': { 'local': ['random.seed', 'print', 'random.random']}} - input_path = "./test_files/test_random.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_random.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data['body'] == dictionary['body']) @@ -113,14 +135,16 @@ def test_call_list_argument_call(self): dictionary = {'functions': {'func_1': {'local': ['print', 'argument_call.func_2']}}, 'body': {'local': ['print', 'argument_call.func_1', 'argument_call.MyClass.func_a']}, 'classes': {'MyClass': {'func_a': {'local': ['print', 'argument_call.MyClass.func_b']}}}} - input_path = "./test_files/test_dynamic/argument_call.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic" + os.path.sep +"argument_call.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data['body'] == dictionary['body']) @@ -128,29 +152,34 @@ def test_call_list_argument_call(self): def test_call_list_dynamic_body(self): dictionary = {'functions': {'func_2': {'local': ['test_dynamic.func_1']}}, 'body': {'local': ['test_dynamic.func_2', 'print']}, 'classes': {}} - input_path = "./test_files/test_dynamic/test_dynamic.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic" + os.path.sep + "test_dynamic.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) def test_call_list_dynamic_func(self): dictionary = {'functions': {'func_2': {'local': ['test_dynamic_func.func_1']}, - 'main': {'local': ['test_dynamic_func.func_2', 'print']}}, 'body': {'local': ['test_dynamic_func.main']}, 'classes': {}} - input_path = "./test_files/test_dynamic/test_dynamic_func.py" - output_dir = "./output_dir" + 'main': {'local': ['test_dynamic_func.func_2', 'print']}}, + 'body': {'local': ['test_dynamic_func.main']}, 'classes': {}} + input_path = test_data_path + os.path.sep + "test_dynamic" + os.path.sep + "test_dynamic_func.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -159,14 +188,16 @@ def test_call_list_dynamic_body_import(self): dictionary = {'functions': {'func_3': {'local': ['test_dynamic_func.func_1']}}, 'body': {'local': ['test_dynamic_import.func_3', 'print']}, 'classes': {}} - input_path = "./test_files/test_dynamic/test_dynamic_import.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic"+ os.path.sep + "test_dynamic_import.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -175,14 +206,16 @@ def test_call_list_dynamic_body_from_import(self): dictionary = {'functions': {'func_3': {'local': ['test_dynamic_func.func_1']}}, 'body': {'local': ['test_dynamic_from_import.func_3', 'print']}, 'classes': {}} - input_path = "./test_files/test_dynamic/test_dynamic_from_import.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic" + os.path.sep + "test_dynamic_from_import.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -191,14 +224,16 @@ def test_call_list_dynamic_import_alias(self): dictionary = {'functions': {'func_3': {'local': ['test_dynamic_func.td.func_1']}}, 'body': {'local': ['test_dynamic_import_alias.func_3', 'print']}, 'classes': {}} - input_path = "./test_files/test_dynamic/test_dynamic_import_alias.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic"+os.path.sep+"test_dynamic_import_alias.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -207,32 +242,35 @@ def test_call_list_dynamic_import_method(self): dictionary = {'functions': {'func_2': {'local': ['test_dynamic_method.MyClass.func_1']}, 'main': { 'local': ['test_dynamic_method.func_2', 'print']}}, 'body': {'local': ['test_dynamic_method.main']}, 'classes': {'MyClass': {}}} - input_path = "./test_files/test_dynamic/test_dynamic_method.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic"+ os.path.sep+"test_dynamic_method.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) def test_call_list_dynamic_import_method_variable(self): - dictionary = {'functions': {'func_2': {'local': ['test_dynamic_method_variable.MyClass.func_1']}, - 'main': {'local': ['test_dynamic_method_variable.func_2', 'print']}}, - 'body': {'local': ['test_dynamic_method_variable.main']}, 'classes': {'MyClass': {}}} + dictionary = {'functions': {'func_2': {'local': ['test_dynamic_method_variable.MyClass.func_1']}, + 'main': {'local': ['test_dynamic_method_variable.func_2', 'print']}}, + 'body': {'local': ['test_dynamic_method_variable.main']}, 'classes': {'MyClass': {}}} - - input_path = "./test_files/test_dynamic/test_dynamic_method_variable.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic" + os.path.sep + "test_dynamic_method_variable.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -241,22 +279,25 @@ def test_call_list_dynamic_class_import(self): dictionary = {'functions': {}, 'body': { 'local': ['test_dynamic_class_import.MyClass.func_3']}, 'classes': {'MyClass': {'func_3': {'local': ['test_dynamic_func.func_1']}}}} - input_path = "./test_files/test_dynamic/test_dynamic_class_import.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_dynamic" + os.path.sep + "test_dynamic_class_import.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) def test_service(self): - input_path = "./test_files/Chowlk" - output_dir = "./output_dir" - + input_path = test_data_path + os.path.sep + "Chowlk" + output_dir = test_out_path + os.path.sep + "output_dir" + data_flow = False + symbol_table = "" ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] requirements = False @@ -270,16 +311,16 @@ def test_service(self): readme = False metadata = False dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "service" def test_package(self): - input_path = "./test_files/somef" - output_dir = "./output_dir" - + input_path = test_data_path + os.path.sep + "somef" + output_dir = test_out_path + os.path.sep + "output_dir" + ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] requirements = False @@ -292,17 +333,19 @@ def test_package(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "package" def test_library(self): - input_path = "./test_files/pylops" - output_dir = "./output_dir" - + input_path = test_data_path + os.path.sep + "pylops" + output_dir = test_out_path + os.path.sep + "output_dir" + ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] requirements = False @@ -315,18 +358,19 @@ def test_library(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "library" - def test_multiple_mains(self): - input_path = "./test_files/test_multiple_mains" - output_dir = "./output_dir" - + input_path = test_data_path + os.path.sep + "test_multiple_mains" + output_dir = test_out_path + os.path.sep + "output_dir" + ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] requirements = False @@ -339,23 +383,24 @@ def test_multiple_mains(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) imports = dir_info['software_invocation'] shutil.rmtree(output_dir) + num_imports = 0 for i in imports: if "test.py" in i['run']: - num_imports = len (i['imports']) - break + num_imports = len(i['imports']) + break assert num_imports == 2 - - def test_script(self): - input_path = "./test_files/BoostingMonocularDepth" - output_dir = "./output_dir" - + input_path = test_data_path + os.path.sep + "BoostingMonocularDepth" + output_dir = test_out_path + os.path.sep + "output_dir" + ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] requirements = False @@ -368,42 +413,47 @@ def test_script(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "script" -# Test for testing ast trees -# def test_issue_110(): -# output_html_file = "test_issue_110_output.html" -# self.assertEquals(3, 4) -# m = MakeDocco(input_data_file="test_issue_110_input.ttl") -# m.document(destination=output_html_file) -# assert "balance between £1,000 and £1,000,000 GBP" in open(output_html_file).read() - -# def crop_transform68(rimg, landmark, image_size, src): -# -# assert landmark.shape[0] == 68 or landmark.shape[0] == 5 -# assert landmark.shape[1] == 2 -# tform = trans.SimilarityTransform() -# -# tform.estimate(landmark, src) -# M = tform.params[0:2, :] -# img = cv2.warpAffine( -# rimg, M, (image_size[1], image_size[0]), borderValue=0.0) -# return img + # Test for testing ast trees + # def test_issue_110(): + # output_html_file = "test_issue_110_output.html" + # self.assertEquals(3, 4) + # m = MakeDocco(input_data_file="test_issue_110_input.ttl") + # m.document(destination=output_html_file) + # assert "balance between £1,000 and £1,000,000 GBP" in open(output_html_file).read() + + # def crop_transform68(rimg, landmark, image_size, src): + # + # assert landmark.shape[0] == 68 or landmark.shape[0] == 5 + # assert landmark.shape[1] == 2 + # tform = trans.SimilarityTransform() + # + # tform.estimate(landmark, src) + # M = tform.params[0:2, :] + # img = cv2.warpAffine( + # rimg, M, (image_size[1], image_size[0]), borderValue=0.0) + # return img def test_ast_function(self): - input_path = "./test_files/test_basic/test_basic_function.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_basic" + os.path.sep +"test_basic_function.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + abstract_syntax_tree = True source_code = False + data_flow = False + parser = [] cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) shutil.rmtree(output_dir) expected_ast = [ @@ -426,15 +476,49 @@ def test_ast_function(self): actual_ast = code_info.fileJson[0]["functions"]["foo"]["ast"] assert expected_ast == actual_ast + def test_data_flow(self): + input_path = test_data_path + os.path.sep + "test_data_flow.py" + output_dir = test_out_path + os.path.sep + "output_dir" + control_flow = False + abstract_syntax_tree = False + source_code = True + data_flow = True + path_to_languages = str(Path(__file__).parent.parent / "inspect4py" / "resources") + if sys.platform.startswith("win") or sys.platform.startswith("cygwin"): + language = Language(path_to_languages + os.path.sep + "python_win.so", "python") + else: # mac and unix should be compatible + language = Language(path_to_languages + os.path.sep + "python_unix.so", "python") + parser = Parser() + parser.set_language(language) + parser = [parser, DFG_python] + cf_dir, json_dir = create_output_dirs(output_dir, control_flow) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) + expected_dfg = [('a', 3, 'comesFrom', [], []), + ('b', 5, 'comesFrom', [], []), + ('x', 8, 'computedFrom', ['0'], [10]), + ('0', 10, 'comesFrom', [], []), + ('a', 12, 'comesFrom', ['a'], [3]), + ('b', 14, 'comesFrom', ['b'], [5]), + ('x', 16, 'computedFrom', ['a'], [18]), + ('a', 18, 'comesFrom', ['a'], [3]), + ('x', 21, 'computedFrom', ['b'], [23]), + ('b', 23, 'comesFrom', ['b'], [5]), + ('x', 25, 'comesFrom', ['x'], [16, 21])] + actual_dfg = code_info.fileJson[0]["functions"]["max"]["data_flow"] + assert actual_dfg == expected_dfg + def test_ast_method(self): - input_path = "./test_files/test_basic/test_basic_method.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_basic" + os.path.sep + "test_basic_method.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = True source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) shutil.rmtree(output_dir) expected_ast = [ @@ -464,14 +548,16 @@ def test_ast_method(self): assert expected_ast == actual_ast def test_ast_body(self): - input_path = "./test_files/test_basic/test_basic_body.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep +"test_basic" + os.path.sep + "test_basic_body.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = True source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) shutil.rmtree(output_dir) expected_ast = [ @@ -485,35 +571,38 @@ def test_ast_body(self): {"id": 1, "type": "NameLoad", "value": "print"}, {"id": 2, "type": "NameLoad", "value": "var"}, ], - ] + ] actual_ast = code_info.fileJson[0]["body"]["ast"] assert expected_ast == actual_ast def test_source_code_function(self): - input_path = "./test_files/test_basic/test_basic_function.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep +"test_basic" + os.path.sep + "test_basic_function.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = True cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) shutil.rmtree(output_dir) - expected_code = "def foo(arg1, arg2):\n print('Hello %s', arg1)\n return arg2" # Single double quote sensitive + expected_code = "def foo(arg1, arg2):\n print('Hello %s', arg1)\n return arg2" # Single double quote sensitive actual_code = code_info.fileJson[0]["functions"]["foo"]["source_code"] assert expected_code == actual_code - def test_source_code_method(self): - input_path = "./test_files/test_basic/test_basic_method.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_basic" + os.path.sep +"test_basic_method.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = True cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) shutil.rmtree(output_dir) expected_code = "def __init__(self, arg):\n self.arg = arg\n print('Hello %s' % self.arg)" @@ -521,24 +610,26 @@ def test_source_code_method(self): assert expected_code == actual_code def test_source_code_body(self): - input_path = "./test_files/test_basic/test_basic_body.py" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_basic" + os.path.sep + "test_basic_body.py" + output_dir = test_out_path + os.path.sep + "output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = True cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) shutil.rmtree(output_dir) expected_code = ["print('Hello world')", "print(var)"] actual_code = code_info.fileJson[0]["body"]["source_code"] assert expected_code == actual_code - def test_license_detection(self): - input_paths = ["./test_files/Chowlk", "./test_files/pylops", "./test_files/somef"] - output_dir = "./output_dir" + input_paths = [test_data_path + os.path.sep + "Chowlk", test_data_path + os.path.sep + "pylops", + test_data_path + os.path.sep + "somef"] + output_dir = test_out_path + os.path.sep + "output_dir" fig = False ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] @@ -552,7 +643,8 @@ def test_license_detection(self): license_detection = True readme = False metadata = False - + data_flow = False + symbol_table = "" expected_liceses = ['Apache-2.0', 'LGPL-3.0', 'MIT'] first_rank_licenses = [] for input_path in input_paths: @@ -560,7 +652,7 @@ def test_license_detection(self): ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) first_rank_licenses.append(next(iter(dir_info["license"]["detected_type"][0]))) shutil.rmtree(output_dir) @@ -568,8 +660,8 @@ def test_license_detection(self): def test_license_text_extraction(self): license_text = "A random license." - input_path = "./test_files/test_license_extraction" - output_dir = "./output_dir" + input_path = test_data_path + os.path.sep + "test_license_extraction" + output_dir = test_out_path + os.path.sep + "output_dir" fig = False ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] @@ -583,20 +675,21 @@ def test_license_text_extraction(self): license_detection = True readme = False metadata = False - + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, + symbol_table=symbol_table) assert dir_info["license"]["extracted_text"] == license_text - def test_readme(self): - input_path = "./test_files/test_readme" - output_dir = "./output_dir" - + input_path = test_data_path + os.path.sep + "test_readme" + output_dir = test_out_path + os.path.sep + "output_dir" + ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] requirements = False @@ -609,59 +702,32 @@ def test_readme(self): license_detection = False readme = True metadata = False - + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, + source_code, license_detection, readme, metadata, data_flow, symbol_table) expected_readme_files = { - f"{output_dir}/test_readme/README.md": "README.md in root dir\n", - f"{output_dir}/test_readme/subdir/README.txt": "README.txt in subdir\n", - f"{output_dir}/test_readme/subdir/subsubdir/README.rst": "README.rst in subsubdir\n" + f"{output_dir}"+os.path.sep+"test_readme"+os.path.sep+"README.md": "README.md in root dir\n", + f"{output_dir}"+os.path.sep+"test_readme"+os.path.sep+"subdir"+os.path.sep+"README.txt": "README.txt in subdir\n", + f"{output_dir}"+os.path.sep+"test_readme"+os.path.sep+"subdir"+os.path.sep+"subsubdir"+os.path.sep+"README.rst": "README.rst in subsubdir\n" } actual_readme_files = dir_info["readme_files"] - assert expected_readme_files == actual_readme_files - - - #def test_metadata(self): - # """ - # Need to execute under test/test_files/: - # `git clone https://github.com/githubtraining/hellogitworld.git` - # to pass this test, as getting metadata requires the local repository - # to have a .git folder. - # """ - # input_path = "./test_files/hellogitworld" - # output_dir = "./output_dir" - # - # ignore_dir_pattern = [".", "__pycache__"] - # ignore_file_pattern = [".", "__pycache__"] - # requirements = False - # call_list = False - # control_flow = False - # directory_tree = False - # software_invocation = False - ## abstract_syntax_tree = False - # source_code = False - # license_detection = False - # readme = False - # metadata = True - # - # dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - # call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - # source_code, license_detection, readme, metadata) - # try: - # response = requests.get("https://api.github.com/repos/githubtraining/hellogitworld") - # expected_metadata = response.json() - # except requests.RequestException as e: - # print(f"Error sending requests to Github API: {e}") - # raise e - # actual_metadata = dir_info["metadata"] - # assert expected_metadata == actual_metadata - - -def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata): + assert expected_readme_files == actual_readme_files + + +def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, + control_flow, directory_tree, software_invocation, abstract_syntax_tree, source_code, + license_detection, readme, + metadata, data_flow, symbol_table): + if data_flow: + LANGUAGE = Language(symbol_table, "python") + parser = Parser() + parser.set_language(LANGUAGE) + parser = [parser, DFG_python] + else: + parser = [] dir_info = {} # retrieve readme text at the root level (if any) readme = "" @@ -684,7 +750,8 @@ def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pat relative_path = Path(subdir).relative_to(Path(input_path).parent) out_dir = str(Path(output_dir) / relative_path) cf_dir, json_dir = create_output_dirs(out_dir, control_flow) - code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, + data_flow, parser) if out_dir not in dir_info: dir_info[out_dir] = [code_info.fileJson[0]] else: @@ -700,7 +767,7 @@ def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pat call_file_html = output_dir + "/call_graph.html" generate_output_html(pruned_call_list_data, call_file_html) call_json_file = output_dir + "/call_graph.json" - with open(call_json_file, 'w') as outfile: + with open(call_json_file, 'rb') as outfile: json.dump(pruned_call_list_data, outfile) # Note:1 for visualising the tree, nothing or 0 for not. if requirements: @@ -728,8 +795,7 @@ def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pat # Extract the first for software type. dir_info["software_type"] = rank_software_invocation(soft_invocation_info_list) if license_detection: - licenses_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), - "../inspect4py/licenses") + licenses_path = str(Path(__file__).parent.parent / "inspect4py" / "licenses") license_text = extract_license(input_path) rank_list = detect_license(license_text, licenses_path) dir_info["license"] = {}