diff --git a/README.md b/README.md index d78af49..a5da5a6 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,11 @@ Inspect4py currently works **only for Python 3 projects**. ## Background: +`inspect4py` added the functionality of capture [Data Flow Graphs](http://bears.ece.ucsb.edu/research-info/DP/dfg.html) for each function inspired by GraphCodeBERT: [Github](https://github.com/microsoft/CodeBERT) & [Paper](https://arxiv.org/abs/2009.08366). The illustration is given: +|Source Code|List Output|Networkx Image| +|:-:|:-:|:-:| +|
def max(a, b):
x = 0
if a > b:
x = a
else:
x = b
return x
|
('a', 3, 'comesFrom', [], [])
('b', 5, 'comesFrom', [], [])
('x', 8, 'computedFrom', ['0'], [10])
('0', 10, 'comesFrom', [], [])
('a', 12, 'comesFrom', ['a'], [3])
('b', 14, 'comesFrom', ['b'], [5])
('x', 16, 'computedFrom', ['a'], [18])
('a', 18, 'comesFrom', ['a'], [3])
('x', 21, 'computedFrom', ['b'], [23])
('b', 23, 'comesFrom', ['b'], [5])
('x', 25, 'comesFrom', ['x'], [16, 21])
|![image](docs/images/data_flow.png)| + `inspect4py` uses [ASTs](https://en.wikipedia.org/wiki/Abstract_syntax_tree), more specifically the [ast](https://docs.python.org/3/library/ast.html) module in Python, generating a tree of objects (per file) whose classes all inherit from [ast.AST](https://docs.python.org/3/library/ast.html#ast.AST). @@ -60,6 +65,12 @@ Please cite our MSR 2022 demo paper: ### Preliminaries +Make sure you have tree-sitter installed, C complier is needed, more [info](https://github.com/tree-sitter/tree-sitter): + +``` +pip install tree-sitter +``` + Make sure you have graphviz installed: ``` @@ -71,7 +82,7 @@ We have tested `inspect4py` in Python 3.7+. **Our recommended version is Python ### Operative System -We have tested `inspect4py` in Unix and MacOs. +We have tested `inspect4py` in Unix, MacOS and Windows 11(22621.1265). ### Installation from pypi `inspect4py` is [available in pypi!](https://pypi.org/project/inspect4py/) Just install it like a regular package: @@ -106,6 +117,9 @@ pigar setuptools==54.2.0 json2html configparser +bigcode_astgen +GitPython +tree-sitter ``` If you want to run the evaluations, do not forget to add `pandas` to the previous set. @@ -218,6 +232,8 @@ Options: -rm, --readme extract all readme files in the target repository. -md, --metadata extract metadata of the target repository using Github API. + -df, --data_flow extract data flow graph for every function, BOOL + -st, --symbol_table symbol table file location. STR --help Show this message and exit. ``` diff --git a/docs/images/data_flow.png b/docs/images/data_flow.png new file mode 100644 index 0000000..4e993a0 Binary files /dev/null and b/docs/images/data_flow.png differ diff --git a/inspect4py/cli.py b/inspect4py/cli.py index 166b275..6ee07bd 100644 --- a/inspect4py/cli.py +++ b/inspect4py/cli.py @@ -1,13 +1,16 @@ +import ast import json import tokenize import types import builtins import click from docstring_parser import parse as doc_parse +from tree_sitter import Language, Parser from inspect4py import __version__ from inspect4py.staticfg import builder from inspect4py.utils import * +# from utils import * """ Code Inspector @@ -26,7 +29,7 @@ class CodeInspection: - def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abstract_syntax_tree, source_code): + def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abstract_syntax_tree, source_code, data_flow, parser): """ init method initializes the Code_Inspection object :param self self: represent the instance of the class :param str path: the file to inspect @@ -41,6 +44,8 @@ def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abs self.out_json_path = out_json_path self.abstract_syntax_tree = abstract_syntax_tree self.source_code = source_code + self.data_flow = data_flow + self.parser = parser self.tree = self.parser_file() if self.tree != "AST_ERROR": self.nodes = self.walk() @@ -58,6 +63,7 @@ def __init__(self, path, out_control_flow_path, out_json_path, control_flow, abs else: self.fileJson = {} + def find_classDef(self): classDef_nodes = [node for node in self.nodes if isinstance(node, ast.ClassDef)] class_init=[] @@ -466,6 +472,13 @@ def file_json(self): json.dump(prune_json(file_dict), outfile) return [file_dict, json_file] + # def get_parser_data_flow(self): + # parser = Parser() + # LANGUAGE = Language(self.symbol_table, "python") + # parser.set_language(LANGUAGE) + # parser = [parser, DFG_python] + # return parser + def _f_definitions(self, functions_definitions): """_f_definitions extracts the name, args, docstring returns, raises of a list of functions or a methods. @@ -477,11 +490,15 @@ def _f_definitions(self, functions_definitions): :param list functions_definitions: represent a list with all functions or methods nodes :return dictionary: a dictionary with the all the information at function/method level """ - + # print(functions_definitions) funcs_info = {} for f in functions_definitions: + # for node in ast.walk(f): + # print(node.name) + funcs_info[f.name] = {} ds_f = ast.get_docstring(f) + # print(ds_f) try: docstring = doc_parse(ds_f) funcs_info[f.name]["doc"] = {} @@ -577,7 +594,10 @@ def _f_definitions(self, functions_definitions): funcs_info[f.name]["ast"] = ast_to_json(f) if self.source_code: funcs_info[f.name]["source_code"] = ast_to_source_code(f) - + if self.data_flow: + code_tokens, dfg = extract_dataflow(funcs_info[f.name]["source_code"], self.parser, "python") + funcs_info[f.name]["data_flow"] = dfg + funcs_info[f.name]["code_tokens"] = code_tokens return funcs_info def _skip_dynamic_calls(self, funcs_info, classes_info, check_name, name, var_name): @@ -1204,6 +1224,7 @@ def create_output_dirs(output_dir, control_flow): @click.option('-i', '--input_path', type=str, required=True, help="input path of the file or directory to inspect.") @click.option('-o', '--output_dir', type=str, default="output_dir", help="output directory path to store results. If the directory does not exist, the tool will create it.") +@click.option('-st','--symbol_table', type=str, default="my_language.so", help="symbol table for the target function") @click.option('-ignore_dir', '--ignore_dir_pattern', multiple=True, default=[".", "__pycache__"], help="ignore directories starting with a certain pattern. This parameter can be provided multiple times " "to ignore multiple directory patterns.") @@ -1231,16 +1252,28 @@ def create_output_dirs(output_dir, control_flow): help="extract all readme files in the target repository.") @click.option('-md', '--metadata', type=bool, is_flag=True, help="extract metadata of the target repository using Github API. (requires repository to have the .git folder)") +@click.option('-df', '--data_flow', type=bool, is_flag=True, + help="extract data flow graph of every function in the target repository") + def main(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, html_output, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, source_code, license_detection, readme, - metadata): + metadata, data_flow, symbol_table): + if data_flow: + LANGUAGE = Language(symbol_table, "python") + parser = Parser() + parser.set_language(LANGUAGE) + parser = [parser, DFG_python] + else: + parser = [] + + # print(parsers) if (not os.path.isfile(input_path)) and (not os.path.isdir(input_path)): print('The file or directory specified does not exist') sys.exit() if os.path.isfile(input_path): cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) # Generate the call list of a file call_list_data = call_list_file(code_info) @@ -1279,11 +1312,13 @@ def main(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requir for f in files: if ".py" in f and not f.endswith(".pyc"): try: + path = os.path.join(subdir, f) relative_path = Path(subdir).relative_to(Path(input_path).parent) out_dir = str(Path(output_dir) / relative_path) cf_dir, json_dir = create_output_dirs(out_dir, control_flow) - code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) + # print(parsers) if code_info.fileJson: if out_dir not in dir_info: dir_info[out_dir] = [code_info.fileJson[0]] diff --git a/inspect4py/parse_setup_files.py b/inspect4py/parse_setup_files.py index 593c99e..126130c 100644 --- a/inspect4py/parse_setup_files.py +++ b/inspect4py/parse_setup_files.py @@ -56,7 +56,9 @@ def parse_setup_py(parent_dir): if single_line: elem = setup_content[console_index] cs = elem.split("=") - cs_string = cs[0].strip().replace('\'', '').split('["')[1] + # print(cs) + # print(cs[1].strip()) + cs_string = cs[1].strip().replace('\'', '').split('["')[1] cs_list.append(normalize(cs_string)) setup_info["installation"] = "pip install " + cs_string setup_info["run"].append(cs_string) diff --git a/inspect4py/resources/my-languages.so b/inspect4py/resources/my-languages.so new file mode 100644 index 0000000..921ee0b Binary files /dev/null and b/inspect4py/resources/my-languages.so differ diff --git a/inspect4py/utils.py b/inspect4py/utils.py index 83115ea..5c6117e 100644 --- a/inspect4py/utils.py +++ b/inspect4py/utils.py @@ -714,7 +714,7 @@ def detect_license(license_text, licenses_path, threshold=0.9): rank_list = [] for licen in os.listdir(licenses_path): - with open(os.path.join(licenses_path, licen), "r") as f: + with open(os.path.join(licenses_path, licen), "r", encoding='UTF-8') as f: parser = pattern.search(f.read()) if parser is None: continue @@ -773,7 +773,6 @@ def get_github_metadata(input_path: str) -> dict: return github_metadata - def find_index_init(depInfo, calls, class_init): index_remove=[] for dep in depInfo: @@ -793,9 +792,261 @@ def update_list_calls(info, index_remove): if i in index_remove: continue updated_calls.append(info["calls"][i]) - ### These lines are for removing duplicate calls + ### These lines are for removing duplicate calls res = [] for i in updated_calls : if i not in res: res.append(i) return res + +def tree_to_variable_index(root_node, index_to_code): + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + index = (root_node.start_point, root_node.end_point) + _, code = index_to_code[index] + if root_node.type != code: + return [(root_node.start_point, root_node.end_point)] + else: + return [] + else: + code_tokens = [] + for child in root_node.children: + code_tokens += tree_to_variable_index(child, index_to_code) + return code_tokens + +def DFG_python(root_node, index_to_code, states): + assignment = ['assignment', 'augmented_assignment', 'for_in_clause'] + if_statement = ['if_statement'] + for_statement = ['for_statement'] + while_statement = ['while_statement'] + do_first_statement = ['for_in_clause'] + def_statement = ['default_parameter'] + states = states.copy() + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + idx, code = index_to_code[(root_node.start_point, root_node.end_point)] + if root_node.type == code: + return [], states + elif code in states: + return [(code, idx, 'comesFrom', [code], states[code].copy())], states + else: + if root_node.type == 'identifier': + states[code] = [idx] + return [(code, idx, 'comesFrom', [], [])], states + elif root_node.type in def_statement: + name = root_node.child_by_field_name('name') + value = root_node.child_by_field_name('value') + DFG = [] + if value is None: + indexs = tree_to_variable_index(name, index_to_code) + for index in indexs: + idx, code = index_to_code[index] + DFG.append((code, idx, 'comesFrom', [], [])) + states[code] = [idx] + return sorted(DFG, key=lambda x: x[1]), states + else: + name_indexs = tree_to_variable_index(name, index_to_code) + value_indexs = tree_to_variable_index(value, index_to_code) + temp, states = DFG_python(value, index_to_code, states) + DFG += temp + for index1 in name_indexs: + idx1, code1 = index_to_code[index1] + for index2 in value_indexs: + idx2, code2 = index_to_code[index2] + DFG.append((code1, idx1, 'comesFrom', [code2], [idx2])) + states[code1] = [idx1] + return sorted(DFG, key=lambda x: x[1]), states + elif root_node.type in assignment: + if root_node.type == 'for_in_clause': + right_nodes = [root_node.children[-1]] + left_nodes = [root_node.child_by_field_name('left')] + else: + if root_node.child_by_field_name('right') is None: + return [], states + left_nodes = [x for x in root_node.child_by_field_name('left').children if x.type != ','] + right_nodes = [x for x in root_node.child_by_field_name('right').children if x.type != ','] + if len(right_nodes) != len(left_nodes): + left_nodes = [root_node.child_by_field_name('left')] + right_nodes = [root_node.child_by_field_name('right')] + if len(left_nodes) == 0: + left_nodes = [root_node.child_by_field_name('left')] + if len(right_nodes) == 0: + right_nodes = [root_node.child_by_field_name('right')] + DFG = [] + for node in right_nodes: + temp, states = DFG_python(node, index_to_code, states) + DFG += temp + + for left_node, right_node in zip(left_nodes, right_nodes): + left_tokens_index = tree_to_variable_index(left_node, index_to_code) + right_tokens_index = tree_to_variable_index(right_node, index_to_code) + temp = [] + for token1_index in left_tokens_index: + idx1, code1 = index_to_code[token1_index] + temp.append((code1, idx1, 'computedFrom', [index_to_code[x][1] for x in right_tokens_index], + [index_to_code[x][0] for x in right_tokens_index])) + states[code1] = [idx1] + DFG += temp + return sorted(DFG, key=lambda x: x[1]), states + elif root_node.type in if_statement: + DFG = [] + current_states = states.copy() + others_states = [] + tag = False + if 'else' in root_node.type: + tag = True + for child in root_node.children: + if 'else' in child.type: + tag = True + if child.type not in ['elif_clause', 'else_clause']: + temp, current_states = DFG_python(child, index_to_code, current_states) + DFG += temp + else: + temp, new_states = DFG_python(child, index_to_code, states) + DFG += temp + others_states.append(new_states) + others_states.append(current_states) + if tag is False: + others_states.append(states) + new_states = {} + for dic in others_states: + for key in dic: + if key not in new_states: + new_states[key] = dic[key].copy() + else: + new_states[key] += dic[key] + for key in new_states: + new_states[key] = sorted(list(set(new_states[key]))) + return sorted(DFG, key=lambda x: x[1]), new_states + elif root_node.type in for_statement: + DFG = [] + for i in range(2): + right_nodes = [x for x in root_node.child_by_field_name('right').children if x.type != ','] + left_nodes = [x for x in root_node.child_by_field_name('left').children if x.type != ','] + if len(right_nodes) != len(left_nodes): + left_nodes = [root_node.child_by_field_name('left')] + right_nodes = [root_node.child_by_field_name('right')] + if len(left_nodes) == 0: + left_nodes = [root_node.child_by_field_name('left')] + if len(right_nodes) == 0: + right_nodes = [root_node.child_by_field_name('right')] + for node in right_nodes: + temp, states = DFG_python(node, index_to_code, states) + DFG += temp + for left_node, right_node in zip(left_nodes, right_nodes): + left_tokens_index = tree_to_variable_index(left_node, index_to_code) + right_tokens_index = tree_to_variable_index(right_node, index_to_code) + temp = [] + for token1_index in left_tokens_index: + idx1, code1 = index_to_code[token1_index] + temp.append((code1, idx1, 'computedFrom', [index_to_code[x][1] for x in right_tokens_index], + [index_to_code[x][0] for x in right_tokens_index])) + states[code1] = [idx1] + DFG += temp + if root_node.children[-1].type == "block": + temp, states = DFG_python(root_node.children[-1], index_to_code, states) + DFG += temp + dic = {} + for x in DFG: + if (x[0], x[1], x[2]) not in dic: + dic[(x[0], x[1], x[2])] = [x[3], x[4]] + else: + dic[(x[0], x[1], x[2])][0] = list(set(dic[(x[0], x[1], x[2])][0] + x[3])) + dic[(x[0], x[1], x[2])][1] = sorted(list(set(dic[(x[0], x[1], x[2])][1] + x[4]))) + DFG = [(x[0], x[1], x[2], y[0], y[1]) for x, y in sorted(dic.items(), key=lambda t: t[0][1])] + return sorted(DFG, key=lambda x: x[1]), states + elif root_node.type in while_statement: + DFG = [] + for i in range(2): + for child in root_node.children: + temp, states = DFG_python(child, index_to_code, states) + DFG += temp + dic = {} + for x in DFG: + if (x[0], x[1], x[2]) not in dic: + dic[(x[0], x[1], x[2])] = [x[3], x[4]] + else: + dic[(x[0], x[1], x[2])][0] = list(set(dic[(x[0], x[1], x[2])][0] + x[3])) + dic[(x[0], x[1], x[2])][1] = sorted(list(set(dic[(x[0], x[1], x[2])][1] + x[4]))) + DFG = [(x[0], x[1], x[2], y[0], y[1]) for x, y in sorted(dic.items(), key=lambda t: t[0][1])] + return sorted(DFG, key=lambda x: x[1]), states + else: + DFG = [] + for child in root_node.children: + if child.type in do_first_statement: + temp, states = DFG_python(child, index_to_code, states) + DFG += temp + for child in root_node.children: + if child.type not in do_first_statement: + temp, states = DFG_python(child, index_to_code, states) + DFG += temp + + return sorted(DFG,key=lambda x:x[1]),states + +def tree_to_variable_index(root_node, index_to_code): + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + index = (root_node.start_point, root_node.end_point) + _, code = index_to_code[index] + if root_node.type != code: + return [(root_node.start_point, root_node.end_point)] + else: + return [] + else: + code_tokens = [] + for child in root_node.children: + code_tokens += tree_to_variable_index(child, index_to_code) + return code_tokens + + +def index_to_code_token(index, code): + start_point = index[0] + end_point = index[1] + if start_point[0] == end_point[0]: + s = code[start_point[0]][start_point[1]:end_point[1]] + else: + s = "" + s += code[start_point[0]][start_point[1]:] + for i in range(start_point[0] + 1, end_point[0]): + s += code[i] + s += code[end_point[0]][:end_point[1]] + return s + +def tree_to_token_index(root_node): + if (len(root_node.children) == 0 or root_node.type == 'string') and root_node.type != 'comment': + return [(root_node.start_point, root_node.end_point)] + else: + code_tokens = [] + for child in root_node.children: + code_tokens += tree_to_token_index(child) + return code_tokens + +def extract_dataflow(code, parser,lang): + #obtain dataflow + if lang=="php": + code="" + try: + tree = parser[0].parse(bytes(code,'utf8')) + root_node = tree.root_node + tokens_index=tree_to_token_index(root_node) + code=code.split('\n') + code_tokens=[index_to_code_token(x,code) for x in tokens_index] + index_to_code={} + for idx,(index,code) in enumerate(zip(tokens_index,code_tokens)): + index_to_code[index]=(idx,code) + try: + DFG,_=parser[1](root_node,index_to_code,{}) + except: + DFG=[] + DFG=sorted(DFG,key=lambda x:x[1]) + indexs=set() + for d in DFG: + if len(d[-1])!=0: + indexs.add(d[1]) + for x in d[-1]: + indexs.add(x) + new_DFG=[] + for d in DFG: + if d[1] in indexs: + new_DFG.append(d) + dfg=new_DFG + except: + dfg=[] + return code_tokens, dfg \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 82d2c73..28e5068 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ json2html configparser bigcode_astgen GitPython +tree-sitter diff --git a/test/test_files/test_data_flow.py b/test/test_files/test_data_flow.py new file mode 100644 index 0000000..8924a86 --- /dev/null +++ b/test/test_files/test_data_flow.py @@ -0,0 +1,8 @@ +def max(a, b): + x = 0 + if a > b: + x = a + else: + x = b + return x + diff --git a/test/test_inspect4py.py b/test/test_inspect4py.py index ce2ae50..5c1283f 100644 --- a/test/test_inspect4py.py +++ b/test/test_inspect4py.py @@ -15,8 +15,10 @@ def test_call_list_super(self): control_flow = False abstract_syntax_tree = False source_code = False + data_flow = False + parser = [] cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data["classes"]['Rectangle'] == dictionary['Rectangle']) @@ -35,8 +37,10 @@ def test_call_list_super_test_5(self): control_flow = False abstract_syntax_tree = False source_code = False + data_flow = False + parser = [] cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data['body'] == dictionary['body']) @@ -48,11 +52,12 @@ def test_call_list_nested(self): input_path = "./test_files/test_inheritance/nested_call.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -68,11 +73,13 @@ def test_call_list_super_nested(self): input_path = "./test_files/test_inheritance/super_nested_call.py" output_dir = "./output_dir" control_flow = False + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -85,11 +92,13 @@ def test_call_list_import(self): input_path = "./test_files/test_inheritance/test_import.py" output_dir = "./output_dir" control_flow = False + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -100,11 +109,12 @@ def test_call_list_external_module(self): input_path = "./test_files/test_random.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data['body'] == dictionary['body']) @@ -116,11 +126,12 @@ def test_call_list_argument_call(self): input_path = "./test_files/test_dynamic/argument_call.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data['body'] == dictionary['body']) @@ -131,11 +142,12 @@ def test_call_list_dynamic_body(self): input_path = "./test_files/test_dynamic/test_dynamic.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -146,11 +158,12 @@ def test_call_list_dynamic_func(self): input_path = "./test_files/test_dynamic/test_dynamic_func.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -162,11 +175,12 @@ def test_call_list_dynamic_body_import(self): input_path = "./test_files/test_dynamic/test_dynamic_import.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -178,11 +192,12 @@ def test_call_list_dynamic_body_from_import(self): input_path = "./test_files/test_dynamic/test_dynamic_from_import.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -194,11 +209,12 @@ def test_call_list_dynamic_import_alias(self): input_path = "./test_files/test_dynamic/test_dynamic_import_alias.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -210,11 +226,12 @@ def test_call_list_dynamic_import_method(self): input_path = "./test_files/test_dynamic/test_dynamic_method.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -228,11 +245,12 @@ def test_call_list_dynamic_import_method_variable(self): input_path = "./test_files/test_dynamic/test_dynamic_method_variable.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) @@ -244,19 +262,21 @@ def test_call_list_dynamic_class_import(self): input_path = "./test_files/test_dynamic/test_dynamic_class_import.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) call_list_data = call_list_file(code_info) shutil.rmtree(output_dir) assert (call_list_data == dictionary) def test_service(self): - input_path = "./test_files/Chowlk" + input_path = "D:\\inspect4py-main\\test\\test_files\\Chowlk" output_dir = "./output_dir" - + data_flow = False + symbol_table = "" ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] requirements = False @@ -271,14 +291,14 @@ def test_service(self): metadata = False dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "service" def test_package(self): - input_path = "./test_files/somef" - output_dir = "./output_dir" + input_path = "D:\\inspect4py-main\\test\\test_files\\somef" + output_dir = "D:\\inspect4py-main\\test\\output_dir" ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] @@ -292,9 +312,11 @@ def test_package(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "package" @@ -315,9 +337,11 @@ def test_library(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "library" @@ -339,9 +363,11 @@ def test_multiple_mains(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) imports = dir_info['software_invocation'] shutil.rmtree(output_dir) for i in imports: @@ -353,7 +379,7 @@ def test_multiple_mains(self): def test_script(self): - input_path = "./test_files/BoostingMonocularDepth" + input_path = "D:\\inspect4py-main\\test\\test_files\\BoostingMonocularDepth" output_dir = "./output_dir" ignore_dir_pattern = [".", "__pycache__"] @@ -368,9 +394,11 @@ def test_script(self): license_detection = False readme = False metadata = False + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) current_type = dir_info['software_type'] shutil.rmtree(output_dir) assert current_type[0]["type"] == "script" @@ -402,8 +430,10 @@ def test_ast_function(self): abstract_syntax_tree = True source_code = False + data_flow = False + parser = [] cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) shutil.rmtree(output_dir) expected_ast = [ @@ -426,15 +456,45 @@ def test_ast_function(self): actual_ast = code_info.fileJson[0]["functions"]["foo"]["ast"] assert expected_ast == actual_ast + def test_data_flow(self): + input_path = "D:\\inspect4py-main\\test\\test_files\\test_data_flow.py" + output_dir = "./output_dir" + control_flow = False + abstract_syntax_tree = False + source_code = True + data_flow=True + LANGUAGE = Language("D:\\inspect4py-main\\my-languages.so", "python") + parser = Parser() + parser.set_language(LANGUAGE) + parser = [parser, DFG_python] + cf_dir, json_dir = create_output_dirs(output_dir, control_flow) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) + expected_dfg = [('a', 3, 'comesFrom', [], []), + ('b', 5, 'comesFrom', [], []), + ('x', 8, 'computedFrom', ['0'], [10]), + ('0', 10, 'comesFrom', [], []), + ('a', 12, 'comesFrom', ['a'], [3]), + ('b', 14, 'comesFrom', ['b'], [5]), + ('x', 16, 'computedFrom', ['a'], [18]), + ('a', 18, 'comesFrom', ['a'], [3]), + ('x', 21, 'computedFrom', ['b'], [23]), + ('b', 23, 'comesFrom', ['b'], [5]), + ('x', 25, 'comesFrom', ['x'], [16, 21])] + actual_dfg = code_info.fileJson[0]["functions"]["max"]["data_flow"] + assert actual_dfg == expected_dfg + + + def test_ast_method(self): input_path = "./test_files/test_basic/test_basic_method.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = True source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) shutil.rmtree(output_dir) expected_ast = [ @@ -467,11 +527,12 @@ def test_ast_body(self): input_path = "./test_files/test_basic/test_basic_body.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = True source_code = False cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) shutil.rmtree(output_dir) expected_ast = [ @@ -490,14 +551,15 @@ def test_ast_body(self): assert expected_ast == actual_ast def test_source_code_function(self): - input_path = "./test_files/test_basic/test_basic_function.py" + input_path = "D:\\inspect4py-main\\test\\test_files\\test_basic\\test_basic_function.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = True cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) shutil.rmtree(output_dir) expected_code = "def foo(arg1, arg2):\n print('Hello %s', arg1)\n return arg2" # Single double quote sensitive @@ -506,14 +568,15 @@ def test_source_code_function(self): def test_source_code_method(self): - input_path = "./test_files/test_basic/test_basic_method.py" + input_path = "D:\\inspect4py-main\\test\\test_files\\test_basic\\test_basic_method.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = True cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) shutil.rmtree(output_dir) expected_code = "def __init__(self, arg):\n self.arg = arg\n print('Hello %s' % self.arg)" @@ -521,14 +584,15 @@ def test_source_code_method(self): assert expected_code == actual_code def test_source_code_body(self): - input_path = "./test_files/test_basic/test_basic_body.py" + input_path = "D:\\inspect4py-main\\test\\test_files\\test_basic\\test_basic_body.py" output_dir = "./output_dir" control_flow = False - + data_flow = False + parser = [] abstract_syntax_tree = False source_code = True cf_dir, json_dir = create_output_dirs(output_dir, control_flow) - code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(input_path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) shutil.rmtree(output_dir) expected_code = ["print('Hello world')", "print(var)"] @@ -537,8 +601,8 @@ def test_source_code_body(self): def test_license_detection(self): - input_paths = ["./test_files/Chowlk", "./test_files/pylops", "./test_files/somef"] - output_dir = "./output_dir" + input_paths = ["D:\\inspect4py-main\\test\\test_files\Chowlk", "D:\\inspect4py-main\\test\\test_files\\pylops", "D:\\inspect4py-main\\test\\test_files\\somef"] + output_dir = "D:\\inspect4py-main\\test\\output_dir" fig = False ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] @@ -552,7 +616,8 @@ def test_license_detection(self): license_detection = True readme = False metadata = False - + data_flow = False + symbol_table = "" expected_liceses = ['Apache-2.0', 'LGPL-3.0', 'MIT'] first_rank_licenses = [] for input_path in input_paths: @@ -560,7 +625,7 @@ def test_license_detection(self): ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) first_rank_licenses.append(next(iter(dir_info["license"]["detected_type"][0]))) shutil.rmtree(output_dir) @@ -568,8 +633,8 @@ def test_license_detection(self): def test_license_text_extraction(self): license_text = "A random license." - input_path = "./test_files/test_license_extraction" - output_dir = "./output_dir" + input_path = "D:\\inspect4py-main\\test\\test_files\\test_license_extraction" + output_dir = "D:\\inspect4py-main\\test\\output_dir" fig = False ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] @@ -583,19 +648,20 @@ def test_license_text_extraction(self): license_detection = True readme = False metadata = False - + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table=symbol_table) assert dir_info["license"]["extracted_text"] == license_text def test_readme(self): - input_path = "./test_files/test_readme" - output_dir = "./output_dir" + input_path = "D:\\inspect4py-main\\test\\test_files\\test_readme" + output_dir = "D:\\inspect4py-main\\test\\output_dir" ignore_dir_pattern = [".", "__pycache__"] ignore_file_pattern = [".", "__pycache__"] @@ -609,59 +675,31 @@ def test_readme(self): license_detection = False readme = True metadata = False - + data_flow = False + symbol_table = "" dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata) + source_code, license_detection, readme, metadata, data_flow, symbol_table) expected_readme_files = { - f"{output_dir}/test_readme/README.md": "README.md in root dir\n", - f"{output_dir}/test_readme/subdir/README.txt": "README.txt in subdir\n", - f"{output_dir}/test_readme/subdir/subsubdir/README.rst": "README.rst in subsubdir\n" + f"{output_dir}\\test_readme\\README.md": "README.md in root dir\n", + f"{output_dir}\\test_readme\\subdir\\README.txt": "README.txt in subdir\n", + f"{output_dir}\\test_readme\\subdir\\subsubdir\\README.rst": "README.rst in subsubdir\n" } actual_readme_files = dir_info["readme_files"] assert expected_readme_files == actual_readme_files - #def test_metadata(self): - # """ - # Need to execute under test/test_files/: - # `git clone https://github.com/githubtraining/hellogitworld.git` - # to pass this test, as getting metadata requires the local repository - # to have a .git folder. - # """ - # input_path = "./test_files/hellogitworld" - # output_dir = "./output_dir" - # - # ignore_dir_pattern = [".", "__pycache__"] - # ignore_file_pattern = [".", "__pycache__"] - # requirements = False - # call_list = False - # control_flow = False - # directory_tree = False - # software_invocation = False - ## abstract_syntax_tree = False - # source_code = False - # license_detection = False - # readme = False - # metadata = True - # - # dir_info = invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - # call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - # source_code, license_detection, readme, metadata) - # try: - # response = requests.get("https://api.github.com/repos/githubtraining/hellogitworld") - # expected_metadata = response.json() - # except requests.RequestException as e: - # print(f"Error sending requests to Github API: {e}") - # raise e - # actual_metadata = dir_info["metadata"] - # assert expected_metadata == actual_metadata - - -def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, - call_list, control_flow, directory_tree, software_invocation, abstract_syntax_tree, - source_code, license_detection, readme, metadata): +def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pattern, requirements, call_list, + control_flow, directory_tree, software_invocation, abstract_syntax_tree, source_code, license_detection, readme, + metadata, data_flow, symbol_table): + if data_flow: + LANGUAGE = Language(symbol_table, "python") + parser = Parser() + parser.set_language(LANGUAGE) + parser = [parser, DFG_python] + else: + parser = [] dir_info = {} # retrieve readme text at the root level (if any) readme = "" @@ -684,7 +722,7 @@ def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pat relative_path = Path(subdir).relative_to(Path(input_path).parent) out_dir = str(Path(output_dir) / relative_path) cf_dir, json_dir = create_output_dirs(out_dir, control_flow) - code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code) + code_info = CodeInspection(path, cf_dir, json_dir, control_flow, abstract_syntax_tree, source_code, data_flow, parser) if out_dir not in dir_info: dir_info[out_dir] = [code_info.fileJson[0]] else: @@ -700,7 +738,7 @@ def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pat call_file_html = output_dir + "/call_graph.html" generate_output_html(pruned_call_list_data, call_file_html) call_json_file = output_dir + "/call_graph.json" - with open(call_json_file, 'w') as outfile: + with open(call_json_file, 'rb') as outfile: json.dump(pruned_call_list_data, outfile) # Note:1 for visualising the tree, nothing or 0 for not. if requirements: @@ -729,7 +767,7 @@ def invoke_inspector(input_path, output_dir, ignore_dir_pattern, ignore_file_pat dir_info["software_type"] = rank_software_invocation(soft_invocation_info_list) if license_detection: licenses_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), - "../inspect4py/licenses") + "..\\inspect4py\\licenses") license_text = extract_license(input_path) rank_list = detect_license(license_text, licenses_path) dir_info["license"] = {}