-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathvalidation.py
More file actions
388 lines (319 loc) · 15.2 KB
/
validation.py
File metadata and controls
388 lines (319 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
"""
Simple file validation for various file formats.
Can be used before grading for checking if a file is valid.
May also be useful as a trivial grader to give a point or points for submitting correct filetypes.
Enable by using the 'validation' key in the test_config.yaml.
Detailed examples available in the readme.
"""
import ast
import collections
import contextlib
import html5lib
import importlib
import io
import re
import sys
import traceback
from graderutils import GraderUtilsError
from graderutils import remote
from graderutils.graderunittest import result_or_timeout
from graderutils.graderunittest import TEST_MODULE_STDERR_MAX_SIZE
from graderutils.graderunittest import testmethod_timeout
from graderutils.remote import GraderConnClosedError
from graderutils.tracebackformat import strip_irrelevant_traceback_lines
class ValidationError(GraderUtilsError): pass
RestrictedSyntaxMatch = collections.namedtuple("RestrictedSyntaxMatch", ["filename", "linenumber", "line_content", "message"])
def syntax_matches_to_message(matches):
msg = '\n\n'.join('\n'.join(field + ": " + repr(getattr(match, field)) for field in match._fields) for match in matches)
return "Restricted syntax found:\n\n" + msg
def _check_python_restricted_syntax(config, blacklist=True):
"""
Read config["file"] and search for restricted syntax.
If blacklist is True, return RestrictedSyntaxMatch objects for every match.
Else return RestrictedSyntaxMatch for every miss.
If linenumbers are not valid for some node (e.g. function arguments node), -1 is used as the linenumber.
See the test_config.yaml for examples and format.
"""
if "node_names" in config:
restricted_names = config["node_names"].keys()
else:
restricted_names = set()
if "node_dumps" in config:
restricted_dumps = config["node_dumps"].keys()
else:
restricted_dumps = set()
if "node_dump_regexp" in config:
restricted_regexp = [(re.compile(expr), message) for expr, message in config["node_dump_regexp"].items()]
else:
restricted_regexp = []
filename = config["file"]
with open(filename, encoding="utf-8") as submitted_file:
source = submitted_file.read() # Note: may raise OSError
submitted_ast = ast.parse(source) # Note: may raise SyntaxError
submitted_lines = source.splitlines()
matches = []
# Walk once through the ast of the source of the submitted file, searching for black/whitelisted stuff.
for node in ast.walk(submitted_ast):
node_name = node.__class__.__name__
node_dump = ast.dump(node)
linenumber = getattr(node, "lineno", -1)
line_content = submitted_lines[linenumber-1] if linenumber > 0 else ""
if blacklist:
if node_dump in restricted_dumps:
# This node has a dump representation that is not allowed.
message = config["node_dumps"][node_dump]
matches.append(RestrictedSyntaxMatch(
filename, linenumber,
line_content, message))
if node_name in restricted_names:
# This node has a name that is not allowed.
message = config["node_names"][node_name]
matches.append(RestrictedSyntaxMatch(
filename, linenumber,
line_content, message))
for pattern, message in restricted_regexp:
if re.search(pattern, node_dump):
# This node has a dump representation that matches a given node dump regular expression.
matches.append(RestrictedSyntaxMatch(
filename, linenumber,
line_content, message))
else:
if (node_name not in restricted_names and
node_dump not in restricted_dumps and
not any(re.search(pat, node_dump) for pat, _ in restricted_regexp)):
# This node has a name or dump representation that is not allowed.
message = node_name
matches.append(RestrictedSyntaxMatch(
filename, linenumber,
line_content, message))
return matches
def _check_plain_text_restricted_syntax(config, blacklist=True):
"""
As in _check_python_restricted_syntax but for plain text strings.
No sophisticated tokenization is done for the source text and it is checked by simple regular expressions.
If blacklisting, return every line which contains a word which is in config["strings"]
If whitelisting, return every line which contains a word which is not in config["strings"].
"""
def re_split_no_keep(pattern, string):
"""Return an iterator over `string` which yields substrings that do not match `pattern`."""
for word in re.split(pattern, string):
word = word.strip()
if word and not re.match(pattern, word):
yield word
matches = []
config_strings = config["strings"].keys()
ignorecase = config.get("ignorecase", False)
filename = config["file"]
with open(filename, encoding="utf-8") as submitted_file:
source = submitted_file.readlines()
pattern_string = "(" + "|".join(config_strings) + ")"
pattern = re.compile(pattern_string, re.IGNORECASE if ignorecase else 0)
for line_number, line in enumerate(source, start=1):
if blacklist:
for line_match in re.findall(pattern, line):
key = line_match if not ignorecase else line_match.lower()
message = config["strings"][key]
matches.append(RestrictedSyntaxMatch(
filename, line_number,
line, message))
else:
# Split at matches and do not keep split strings
for line_miss in re_split_no_keep(pattern, line):
matches.append(RestrictedSyntaxMatch(
filename, line_number,
line, line_miss))
return matches
def _get_python_blacklist_matches(blacklist):
return _check_python_restricted_syntax(blacklist, blacklist=True)
def _get_python_whitelist_misses(whitelist):
return _check_python_restricted_syntax(whitelist, blacklist=False)
def _get_plain_text_blacklist_matches(blacklist):
return _check_plain_text_restricted_syntax(blacklist, blacklist=True)
def _get_plain_text_whitelist_misses(whitelist):
return _check_plain_text_restricted_syntax(whitelist, blacklist=False)
def get_restricted_syntax_matches(config, get_matches):
matches = get_matches(config)
if matches:
return {"message": syntax_matches_to_message(matches)}
return None
def ast_dump(source):
"""
Returns all AST nodes of source, each dumped on its own line.
You can use this to experiment what AST node names you want to add to the blacklisted nodes.
Or install more sophisticated utilities from https://greentreesnakes.readthedocs.io/en/latest/.
"""
return '\n'.join(map(ast.dump, ast.walk(ast.parse(source))))
def _import_module_from_python_file(filename):
err = io.StringIO()
module = None
try:
with contextlib.redirect_stderr(err):
# Module output must be suppressed during import, since grading json is printed to stdout as well
with contextlib.redirect_stdout(None):
if remote.conn:
# Update rpyc timeout so that it doesn't timeout before result_or_timeout
remote.conn._config.update({"sync_request_timeout": testmethod_timeout})
# Check if remote connection was closed earlier
conn_closed_earlier = remote.conn.closed
args = [filename.split(".py")[0]]
# SystemExit and KeyboardInterrupt kill grader if not caught
try:
running_time, module = result_or_timeout(importlib.import_module, args, timeout=testmethod_timeout)
if running_time == testmethod_timeout and module is None:
if remote.conn and not remote.conn.closed:
# Close remote connection, student process is stuck in an infinite loop or it runs too slowly.
# Rest of the python_import validation tasks run after this will fail with GraderConnClosedError.
remote.conn.close()
raise TimeoutError(
"Validation task timed out after {} seconds. Your code may be "
"stuck in an infinite loop or it runs very slowly.".format(testmethod_timeout)
)
except EOFError as e: # Rpyc raises an EOFError when connection to the remote server does not work
if remote.conn and conn_closed_earlier:
raise GraderConnClosedError(
"Grader cannot complete this validation task because connection to the child process was "
"closed earlier. Your code may have got stuck in an infinite loop, it runs very slowly "
"or KeyboardInterrupt was raised."
) from None
elif remote.conn and str(e) in ["[Errno 32] Broken pipe", "stream has been closed"]:
# Student code most likely raised KeyboardInterrupt.
# str(e) is "[Errno 32] Broken pipe" if it was raised inside a function.
# str(e) is "stream has been closed" if it was raised on module level.
# Close remote connection if it is still open (in case of broken pipe).
# Rest of the tests run after this will fail with GraderConnClosedError.
remote.conn.close()
raise GraderUtilsError("Grader does not support raising KeyboardInterrupt.") from None
# Raise the EOFError if it was not caused by remote connection being closed
raise
except SystemExit as e:
raise GraderUtilsError("Grader does not support the usage of sys.exit(), exit() or quit().") from e
except KeyboardInterrupt as e: # Non-rpyc KeyboardInterrupt
raise GraderUtilsError("Grader does not support raising KeyboardInterrupt.") from e
finally:
# Limit maximum size of the stderr output
sys.stderr.write(err.getvalue()[:TEST_MODULE_STDERR_MAX_SIZE])
return module
def get_python_import_errors(filename):
errors = {}
try:
_import_module_from_python_file(filename)
except Exception:
errors["message"] = strip_irrelevant_traceback_lines(traceback.format_exc(), strip_exercise_tb=True)
return errors
def _hasattr_path(obj, attr_path):
"""
Return True if obj has some attr path.
>>> # object().__class__
>>> _hasattr_path(object(), "__class__")
True
>>> # object().__class__.__class__
>>> _hasattr_path(object(), "__class__.__class__")
True
>>> # object().x.y
>>> _hasattr_path(object(), "x.y")
False
"""
for attr in attr_path.split("."):
obj = getattr(obj, attr, None)
if obj is None:
return False
return True
def get_python_missing_attr_errors(filename, expected_attributes):
errors = {}
module = _import_module_from_python_file(filename)
missing_attrs = [(path, msg) for path, msg in expected_attributes.items()
if not _hasattr_path(module, path)]
if missing_attrs:
errors["missing_attrs"] = missing_attrs
return errors
def get_python_syntax_errors(filename):
errors = {}
try:
with open(filename, encoding="utf-8") as submitted_file:
source = submitted_file.read()
ast.parse(source)
except SyntaxError as syntax_error:
errors["message"] = "Syntax error in {!r} at line {}:\n{}".format(
filename,
syntax_error.lineno,
syntax_error.text
)
return errors
def get_labview_errors(filename):
errors = {}
with open(filename, "rb") as f:
header = f.read(16)
if header != b'RSRC\r\n\x00\x03LVINLBVW':
errors["message"] = "The file wasn't a proper labVIEW-file"
return errors
def get_xlsm_errors(filename):
errors = {}
with open(filename, "rb") as f:
header = f.read(14)
if header != b'PK\x03\x04\x14\x00\x06\x00\x08\x00\x00\x00!\x00':
errors["message"] = "The file wasn't a proper Excel-file with macros!"
return errors
def get_html_errors(filename):
errors = {}
with open(filename, "r") as f:
parser = html5lib.HTMLParser(tree=html5lib.getTreeBuilder("dom"), strict=True)
err = ""
try:
document = parser.parse(f)
except:
for e in parser.errors:
err += "Line {0}: {1}: {2} \n".format(e[0][0], e[1], e[2])
if err:
errors["message"] = err
return errors
def _get_validation_error(validation, filename, config):
error = None
if validation == "python_import":
# import matplotlib
# matplotlib.use(MATPLOTLIB_RENDERER_BACKEND)
error = get_python_import_errors(filename)
if not error and "attrs" in config:
# Import succeeded, now check that module has all required attributes.
error = get_python_missing_attr_errors(filename, config["attrs"])
elif validation == "python_syntax":
error = get_python_syntax_errors(filename)
elif validation == "python_blacklist":
get_matches = _get_python_blacklist_matches
error = get_restricted_syntax_matches(config, get_matches)
elif validation == "python_whitelist":
get_matches = _get_python_whitelist_misses
error = get_restricted_syntax_matches(config, get_matches)
elif validation == "plain_text_blacklist":
get_matches = _get_plain_text_blacklist_matches
error = get_restricted_syntax_matches(config, get_matches)
elif validation == "plain_text_whitelist":
get_matches = _get_plain_text_whitelist_misses
error = get_restricted_syntax_matches(config, get_matches)
elif validation == "labview":
error = get_labview_errors(filename)
elif validation == "xlsm":
error = get_xlsm_errors(filename)
elif validation == "html":
error = get_html_errors(filename)
return error
def run_validation_tasks(tasks):
"""
Generator that runs all validation tasks specified by a list of task configs and yields error dicts for failed tasks.
"""
for task in tasks:
validation_type, filename = task["type"], task["file"]
error = None
try:
error = _get_validation_error(validation_type, filename, task)
except Exception as e:
error = {"message": strip_irrelevant_traceback_lines(traceback.format_exc(), strip_exercise_tb=True)}
if error:
error["type"] = validation_type
error["file"] = filename
if "description" in task:
error["description"] = task["description"]
if "display_name" in task:
error["display_name"] = task["display_name"]
yield error
if task.get("break_on_fail", True):
break