From 4252064a073e6770bfcc2e34eb8f09bb213277df Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Sun, 21 Apr 2024 17:17:55 +0200 Subject: [PATCH 1/7] Include cgi.FieldStorage in paste.util --- docs/do-it-yourself-framework.txt | 3 +- paste/request.py | 37 +- paste/util/field_storage.py | 647 ++++++++++++++++++++++++++ paste/util/multidict.py | 6 +- tests/cgiapp_data/form.cgi | 92 ++-- tests/test_multidict.py | 6 +- tests/test_util/test_field_storage.py | 538 +++++++++++++++++++++ tests/test_wsgiwrappers.py | 6 +- 8 files changed, 1244 insertions(+), 91 deletions(-) create mode 100644 paste/util/field_storage.py create mode 100644 tests/test_util/test_field_storage.py diff --git a/docs/do-it-yourself-framework.txt b/docs/do-it-yourself-framework.txt index 23b85ae3..95b869b9 100644 --- a/docs/do-it-yourself-framework.txt +++ b/docs/do-it-yourself-framework.txt @@ -128,8 +128,7 @@ fields:: 'name="name">'] The ``parse_formvars`` function just takes the WSGI environment and -calls the `cgi `_ -module (the ``FieldStorage`` class) and turns that into a MultiDict. +turns that into a MultiDict which may contain ``FieldStorage`` instances. Now For a Framework =================== diff --git a/paste/request.py b/paste/request.py index bb0d9eec..612e648a 100644 --- a/paste/request.py +++ b/paste/request.py @@ -17,12 +17,12 @@ * resolve_relative_url(url, environ) """ -import cgi from collections.abc import MutableMapping as DictMixin from urllib import parse as urlparse from urllib.parse import quote, parse_qsl from http.cookies import SimpleCookie, CookieError +from paste.util.field_storage import FieldStorage from paste.util.multidict import MultiDict __all__ = ['get_cookies', 'get_cookie_dict', 'parse_querystring', @@ -151,16 +151,16 @@ def parse_formvars(environ, include_get_vars=True, encoding=None, errors=None): # fake_out_cgi requests formvars = MultiDict() ct = environ.get('CONTENT_TYPE', '').partition(';')[0].lower() - use_cgi = ct in ('', 'application/x-www-form-urlencoded', - 'multipart/form-data') - # FieldStorage assumes a default CONTENT_LENGTH of -1, but a - # default of 0 is better: + use_cgi = ct in ( + '', 'application/x-www-form-urlencoded', 'multipart/form-data') + # FieldStorage assumes a default CONTENT_LENGTH of -1, + # but a default of 0 is better: if not environ.get('CONTENT_LENGTH'): environ['CONTENT_LENGTH'] = '0' if use_cgi: - # Prevent FieldStorage from parsing QUERY_STRING during GET/HEAD - # requests - old_query_string = environ.get('QUERY_STRING','') + # Prevent FieldStorage from parsing QUERY_STRING + # during GET/HEAD # requests + old_query_string = environ.get('QUERY_STRING', '') environ['QUERY_STRING'] = '' inp = environ['wsgi.input'] kwparms = {} @@ -168,10 +168,9 @@ def parse_formvars(environ, include_get_vars=True, encoding=None, errors=None): kwparms['encoding'] = encoding if errors: kwparms['errors'] = errors - fs = cgi.FieldStorage(fp=inp, - environ=environ, - keep_blank_values=True, - **kwparms) + fs = FieldStorage( + fp=inp, environ=environ, keep_blank_values=True, + **kwparms) environ['QUERY_STRING'] = old_query_string if isinstance(fs.value, list): for name in fs.keys(): @@ -384,20 +383,6 @@ def keys(self): def __contains__(self, item): return self._trans_name(item) in self.environ -def _cgi_FieldStorage__repr__patch(self): - """ monkey patch for FieldStorage.__repr__ - - Unbelievely, the default __repr__ on FieldStorage reads - the entire file content instead of being sane about it. - This is a simple replacement that doesn't do that - """ - if self.file: - return "FieldStorage(%r, %r)" % ( - self.name, self.filename) - return "FieldStorage(%r, %r, %r)" % ( - self.name, self.filename, self.value) - -cgi.FieldStorage.__repr__ = _cgi_FieldStorage__repr__patch if __name__ == '__main__': import doctest diff --git a/paste/util/field_storage.py b/paste/util/field_storage.py new file mode 100644 index 00000000..13ef9d2f --- /dev/null +++ b/paste/util/field_storage.py @@ -0,0 +1,647 @@ +"""FieldStorage class from the deprecated cgi library.""" + +import locale +import os +import sys +import urllib.parse +import tempfile + +from collections.abc import Mapping +from email.message import Message +from email.parser import FeedParser +from io import StringIO, BytesIO, TextIOWrapper + +__all__ = ['FieldStorage', 'parse_header'] + +# Maximum input we will accept when REQUEST_METHOD is POST +# 0 ==> unlimited input +maxlen = 0 + + +def _parseparam(s): + while s[:1] == ';': + s = s[1:] + end = s.find(';') + while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2: + end = s.find(';', end + 1) + if end < 0: + end = len(s) + f = s[:end] + yield f.strip() + s = s[end:] + + +def parse_header(line): + """Parse a Content-type like header. + + Return the main content-type and a dictionary of options. + """ + parts = _parseparam(';' + line) + key = parts.__next__() + pdict = {} + for p in parts: + i = p.find('=') + if i >= 0: + name = p[:i].strip().lower() + value = p[i+1:].strip() + if len(value) >= 2 and value[0] == value[-1] == '"': + value = value[1:-1] + value = value.replace('\\\\', '\\').replace('\\"', '"') + pdict[name] = value + return key, pdict + + +def valid_boundary(s): + import re + if isinstance(s, bytes): + _vb_pattern = b'^[ -~]{0,200}[!-~]$' + else: + _vb_pattern = '^[ -~]{0,200}[!-~]$' + return re.match(_vb_pattern, s) + + +class MiniFieldStorage: + """Like FieldStorage, for use when no file uploads are possible.""" + + # Dummy attributes + filename = None + list = None + type = None + file = None + type_options = {} + disposition = None + disposition_options = {} + headers = {} + + def __init__(self, name, value): + """Constructor from field name and value.""" + self.name = name + self.value = value + # self.file = StringIO(value) + + def __repr__(self): + """Return printable representation.""" + return f'MiniFieldStorage({self.name!r}, {self.value!r})' + + +class FieldStorage: + """Store a sequence of fields, reading multipart/form-data. + + This class provides naming, typing, files stored on disk, and + more. At the top level, it is accessible like a dictionary, whose + keys are the field names. (Note: None can occur as a field name.) + The items are either a Python list (if there's multiple values) or + another FieldStorage or MiniFieldStorage object. If it's a single + object, it has the following attributes: + + name: the field name, if specified; otherwise None + + filename: the filename, if specified; otherwise None; this is the + client side filename, *not* the file name on which it is + stored (that's a temporary file you don't deal with) + + value: the value as a *string*; for file uploads, this + transparently reads the file every time you request the value + and returns *bytes* + + file: the file(-like) object from which you can read the data *as + bytes* ; None if the data is stored a simple string + + type: the content-type, or None if not specified + + type_options: dictionary of options specified on the content-type + line + + disposition: content-disposition, or None if not specified + + disposition_options: dictionary of corresponding options + + headers: a dictionary(-like) object (sometimes email.message.Message or a + subclass thereof) containing *all* headers + + The class is subclassable, mostly for the purpose of overriding + the make_file() method, which is called internally to come up with + a file open for reading and writing. This makes it possible to + override the default choice of storing all files in a temporary + directory and unlinking them as soon as they have been opened. + """ + def __init__(self, fp=None, headers=None, outerboundary=b'', + environ=os.environ, + keep_blank_values=False, strict_parsing=False, + limit=None, encoding='utf-8', errors='replace', + max_num_fields=None, separator='&'): + """Constructor. Read multipart/* until last part. + + Arguments, all optional: + + fp : file pointer; default: sys.stdin.buffer + (not used when the request method is GET) + Can be : + 1. a TextIOWrapper object + 2. an object whose read() and readline() methods return bytes + + headers : header dictionary-like object; default: + taken from environ as per CGI spec + + outerboundary : terminating multipart boundary + (for internal use only) + + environ : environment dictionary; default: os.environ + + keep_blank_values: flag indicating whether blank values in + percent-encoded forms should be treated as blank strings. + A true value indicates that blanks should be retained as + blank strings. The default false value indicates that + blank values are to be ignored and treated as if they were + not included. + + strict_parsing: flag indicating what to do with parsing errors. + If false (the default), errors are silently ignored. + If true, errors raise a ValueError exception. + + limit : used internally to read parts of multipart/form-data forms, + to exit from the reading loop when reached. It is the difference + between the form content-length and the number of bytes already + read + + encoding, errors : the encoding and error handler used to decode the + binary stream to strings. Must be the same as the charset defined + for the page sending the form (content-type : meta http-equiv or + header) + + max_num_fields: int. If set, then __init__ throws a ValueError + if there are more than n fields read by parse_qsl(). + """ + method = 'GET' + self.keep_blank_values = keep_blank_values + self.strict_parsing = strict_parsing + self.max_num_fields = max_num_fields + self.separator = separator + if 'REQUEST_METHOD' in environ: + method = environ['REQUEST_METHOD'].upper() + self.qs_on_post = None + if method == 'GET' or method == 'HEAD': + if 'QUERY_STRING' in environ: + qs = environ['QUERY_STRING'] + elif sys.argv[1:]: + qs = sys.argv[1] + else: + qs = '' + qs = qs.encode(locale.getpreferredencoding(), 'surrogateescape') + fp = BytesIO(qs) + if headers is None: + headers = {'content-type': + 'application/x-www-form-urlencoded'} + if headers is None: + headers = {} + if method == 'POST': + # Set default content-type for POST to what's traditional + headers['content-type'] = 'application/x-www-form-urlencoded' + if 'CONTENT_TYPE' in environ: + headers['content-type'] = environ['CONTENT_TYPE'] + if 'QUERY_STRING' in environ: + self.qs_on_post = environ['QUERY_STRING'] + if 'CONTENT_LENGTH' in environ: + headers['content-length'] = environ['CONTENT_LENGTH'] + else: + if not (isinstance(headers, (Mapping, Message))): + raise TypeError('headers must be mapping or an instance' + ' of email.message.Message') + self.headers = headers + if fp is None: + self.fp = sys.stdin.buffer + # self.fp.read() must return bytes + elif isinstance(fp, TextIOWrapper): + self.fp = fp.buffer + else: + if not (hasattr(fp, 'read') and hasattr(fp, 'readline')): + raise TypeError('fp must be file pointer') + self.fp = fp + + self.encoding = encoding + self.errors = errors + + if not isinstance(outerboundary, bytes): + raise TypeError('outerboundary must be bytes,' + f' not {type(outerboundary).__name__}') + self.outerboundary = outerboundary + + self.bytes_read = 0 + self.limit = limit + + # Process content-disposition header + cdisp, pdict = '', {} + if 'content-disposition' in self.headers: + cdisp, pdict = parse_header(self.headers['content-disposition']) + self.disposition = cdisp + self.disposition_options = pdict + self.name = None + if 'name' in pdict: + self.name = pdict['name'] + self.filename = None + if 'filename' in pdict: + self.filename = pdict['filename'] + self._binary_file = self.filename is not None + + # Process content-type header + # + # Honor any existing content-type header. But if there is no + # content-type header, use some sensible defaults. Assume + # outerboundary is '' at the outer level, but something non-false + # inside a multi-part. The default for an inner part is text/plain, + # but for an outer part it should be urlencoded. This should catch + # bogus clients which erroneously forget to include a content-type + # header. + # + # See below for what we do if there does exist a content-type header, + # but it happens to be something we don't understand. + if 'content-type' in self.headers: + ctype, pdict = parse_header(self.headers['content-type']) + elif self.outerboundary or method != 'POST': + ctype, pdict = 'text/plain', {} + else: + ctype, pdict = 'application/x-www-form-urlencoded', {} + self.type = ctype + self.type_options = pdict + if 'boundary' in pdict: + self.innerboundary = pdict['boundary'].encode(self.encoding, + self.errors) + else: + self.innerboundary = b'' + + clen = -1 + if 'content-length' in self.headers: + try: + clen = int(self.headers['content-length']) + except ValueError: + pass + if maxlen and clen > maxlen: + raise ValueError('Maximum content length exceeded') + self.length = clen + if self.limit is None and clen >= 0: + self.limit = clen + + self.list = self.file = None + self.done = 0 + if ctype == 'application/x-www-form-urlencoded': + self.read_urlencoded() + elif ctype[:10] == 'multipart/': + self.read_multi(environ, keep_blank_values, strict_parsing) + else: + self.read_single() + + def __del__(self): + try: + self.file.close() + except AttributeError: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.file.close() + + def __repr__(self): + """Return a printable representation.""" + value = '...' if self.file else self.value # avoid reading the file + return f'FieldStorage({self.name!r}, {self.filename!r}, {value!r})' + + def __iter__(self): + return iter(self.keys()) + + def __getattr__(self, name): + if name != 'value': + raise AttributeError(name) + if self.file: + self.file.seek(0) + value = self.file.read() + self.file.seek(0) + elif self.list is not None: + value = self.list + else: + value = None + return value + + def __getitem__(self, key): + """Dictionary style indexing.""" + if self.list is None: + raise TypeError('not indexable') + found = [] + for item in self.list: + if item.name == key: + found.append(item) + if not found: + raise KeyError(key) + if len(found) == 1: + return found[0] + else: + return found + + def getvalue(self, key, default=None): + """Dictionary style get() method, including 'value' lookup.""" + if key in self: + value = self[key] + if isinstance(value, list): + return [x.value for x in value] + else: + return value.value + else: + return default + + def getfirst(self, key, default=None): + """ Return the first value received.""" + if key in self: + value = self[key] + if isinstance(value, list): + return value[0].value + else: + return value.value + else: + return default + + def getlist(self, key): + """ Return list of received values.""" + if key in self: + value = self[key] + if isinstance(value, list): + return [x.value for x in value] + else: + return [value.value] + else: + return [] + + def keys(self): + """Dictionary style keys() method.""" + if self.list is None: + raise TypeError('not indexable') + return list(set(item.name for item in self.list)) + + def __contains__(self, key): + """Dictionary style __contains__ method.""" + if self.list is None: + raise TypeError('not indexable') + return any(item.name == key for item in self.list) + + def __len__(self): + """Dictionary style len(x) support.""" + return len(self.keys()) + + def __bool__(self): + if self.list is None: + raise TypeError('Cannot be converted to bool.') + return bool(self.list) + + def read_urlencoded(self): + """Internal: read data in query string format.""" + qs = self.fp.read(self.length) + if not isinstance(qs, bytes): + raise ValueError( + f'{self.fp} should return bytes, got {type(qs).__name__}') + qs = qs.decode(self.encoding, self.errors) + if self.qs_on_post: + qs += '&' + self.qs_on_post + query = urllib.parse.parse_qsl( + qs, self.keep_blank_values, self.strict_parsing, + encoding=self.encoding, errors=self.errors, + max_num_fields=self.max_num_fields, separator=self.separator) + self.list = [MiniFieldStorage(key, value) for key, value in query] + self.skip_lines() + + FieldStorageClass = None + + def read_multi(self, environ, keep_blank_values, strict_parsing): + """Internal: read a part that is itself multipart.""" + ib = self.innerboundary + if not valid_boundary(ib): + raise ValueError(f'Invalid boundary in multipart form: {ib!r}') + self.list = [] + if self.qs_on_post: + query = urllib.parse.parse_qsl( + self.qs_on_post, self.keep_blank_values, self.strict_parsing, + encoding=self.encoding, errors=self.errors, + max_num_fields=self.max_num_fields, separator=self.separator) + self.list.extend(MiniFieldStorage(key, value) for key, value in query) + + klass = self.FieldStorageClass or self.__class__ + first_line = self.fp.readline() # bytes + if not isinstance(first_line, bytes): + raise ValueError( + f'{self.fp} should return bytes,' + f' got {type(first_line).__name__}') + self.bytes_read += len(first_line) + + # Ensure that we consume the file until we've hit our inner boundary + while (first_line.strip() != (b'--' + self.innerboundary) and + first_line): + first_line = self.fp.readline() + self.bytes_read += len(first_line) + + # Propagate max_num_fields into the subclass appropriately + max_num_fields = self.max_num_fields + if max_num_fields is not None: + max_num_fields -= len(self.list) + + while True: + parser = FeedParser() + hdr_text = b'' + while True: + data = self.fp.readline() + hdr_text += data + if not data.strip(): + break + if not hdr_text: + break + # parser takes strings, not bytes + self.bytes_read += len(hdr_text) + parser.feed(hdr_text.decode(self.encoding, self.errors)) + headers = parser.close() + + # Some clients add Content-Length for part headers, ignore them + if 'content-length' in headers: + del headers['content-length'] + + limit = (None if self.limit is None + else self.limit - self.bytes_read) + part = klass(self.fp, headers, ib, environ, keep_blank_values, + strict_parsing, limit, + self.encoding, self.errors, max_num_fields, self.separator) + + if max_num_fields is not None: + max_num_fields -= 1 + if part.list: + max_num_fields -= len(part.list) + if max_num_fields < 0: + raise ValueError('Max number of fields exceeded') + + self.bytes_read += part.bytes_read + self.list.append(part) + if part.done or self.bytes_read >= self.length > 0: + break + self.skip_lines() + + def read_single(self): + """Internal: read an atomic part.""" + if self.length >= 0: + self.read_binary() + self.skip_lines() + else: + self.read_lines() + self.file.seek(0) + + bufsize = 8*1024 # I/O buffering size for copy to file + + def read_binary(self): + """Internal: read binary data.""" + self.file = self.make_file() + todo = self.length + if todo >= 0: + while todo > 0: + data = self.fp.read(min(todo, self.bufsize)) # bytes + if not isinstance(data, bytes): + raise ValueError( + f'{self.fp} should return bytes,' + f' got {type(data).__name__}') + self.bytes_read += len(data) + if not data: + self.done = -1 + break + self.file.write(data) + todo = todo - len(data) + + def read_lines(self): + """Internal: read lines until EOF or outerboundary.""" + if self._binary_file: + self.file = self.__file = BytesIO() # store data as bytes for files + else: + self.file = self.__file = StringIO() # as strings for other fields + if self.outerboundary: + self.read_lines_to_outerboundary() + else: + self.read_lines_to_eof() + + def __write(self, line): + """line is always bytes, not string""" + if self.__file is not None: + if self.__file.tell() + len(line) > 1000: + self.file = self.make_file() + data = self.__file.getvalue() + self.file.write(data) + self.__file = None + if self._binary_file: + # keep bytes + self.file.write(line) + else: + # decode to string + self.file.write(line.decode(self.encoding, self.errors)) + + def read_lines_to_eof(self): + """Internal: read lines until EOF.""" + while 1: + line = self.fp.readline(1 << 16) # bytes + self.bytes_read += len(line) + if not line: + self.done = -1 + break + self.__write(line) + + def read_lines_to_outerboundary(self): + """Internal: read lines until outerboundary. + Data is read as bytes: boundaries and line ends must be converted + to bytes for comparisons. + """ + next_boundary = b'--' + self.outerboundary + last_boundary = next_boundary + b'--' + delim = b'' + last_line_lfend = True + _read = 0 + while 1: + + if self.limit is not None and 0 <= self.limit <= _read: + break + line = self.fp.readline(1 << 16) # bytes + self.bytes_read += len(line) + _read += len(line) + if not line: + self.done = -1 + break + if delim == b'\r': + line = delim + line + delim = b'' + if line.startswith(b'--') and last_line_lfend: + strippedline = line.rstrip() + if strippedline == next_boundary: + break + if strippedline == last_boundary: + self.done = 1 + break + odelim = delim + if line.endswith(b'\r\n'): + delim = b'\r\n' + line = line[:-2] + last_line_lfend = True + elif line.endswith(b'\n'): + delim = b'\n' + line = line[:-1] + last_line_lfend = True + elif line.endswith(b'\r'): + # We may interrupt \r\n sequences if they span the 2**16 + # byte boundary + delim = b'\r' + line = line[:-1] + last_line_lfend = False + else: + delim = b'' + last_line_lfend = False + self.__write(odelim + line) + + def skip_lines(self): + """Internal: skip lines until outer boundary if defined.""" + if not self.outerboundary or self.done: + return + next_boundary = b'--' + self.outerboundary + last_boundary = next_boundary + b'--' + last_line_lfend = True + while True: + line = self.fp.readline(1 << 16) + self.bytes_read += len(line) + if not line: + self.done = -1 + break + if line.endswith(b'--') and last_line_lfend: + strippedline = line.strip() + if strippedline == next_boundary: + break + if strippedline == last_boundary: + self.done = 1 + break + last_line_lfend = line.endswith(b'\n') + + def make_file(self): + """Overridable: return a readable & writable file. + + The file will be used as follows: + - data is written to it + - seek(0) + - data is read from it + + The file is opened in binary mode for files, in text mode + for other fields + + This version opens a temporary file for reading and writing, + and immediately deletes (unlinks) it. The trick (on Unix!) is + that the file can still be used, but it can't be opened by + another process, and it will automatically be deleted when it + is closed or when the current process terminates. + + If you want a more permanent file, you derive a class which + overrides this method. If you want a visible temporary file + that is nevertheless automatically deleted when the script + terminates, try defining a __del__ method in a derived class + which unlinks the temporary files you have created. + """ + if self._binary_file: + return tempfile.TemporaryFile('wb+') + else: + return tempfile.TemporaryFile( + 'w+', encoding=self.encoding, newline='\n') diff --git a/paste/util/multidict.py b/paste/util/multidict.py index 2db51254..ce9d57ef 100644 --- a/paste/util/multidict.py +++ b/paste/util/multidict.py @@ -1,10 +1,12 @@ # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php -import cgi import copy import sys from collections.abc import MutableMapping as DictMixin +from .field_storage import FieldStorage + + class MultiDict(DictMixin): """ @@ -256,7 +258,7 @@ def _decode_value(self, value): ``FieldStorage`` objects are specially handled. """ - if isinstance(value, cgi.FieldStorage): + if isinstance(value, FieldStorage): # decode FieldStorage's field name and filename decode_name = self.decode_keys and isinstance(value.name, bytes) if decode_name: diff --git a/tests/cgiapp_data/form.cgi b/tests/cgiapp_data/form.cgi index 2e87f6c4..c224d361 100755 --- a/tests/cgiapp_data/form.cgi +++ b/tests/cgiapp_data/form.cgi @@ -1,74 +1,56 @@ #!/usr/bin/env python -import sys - -# Quiet warnings in this CGI so that it does not upset tests. -if not sys.warnoptions: - import warnings - warnings.simplefilter("ignore") - -# TODO: cgi is deprecated and will go away in Python 3.13. -import cgi +from paste.util.field_storage import FieldStorage print('Content-type: text/plain') print('') -if sys.version_info.major >= 3: - # Python 3: cgi.FieldStorage keeps some field names as unicode and some as - # the repr() of byte strings, duh. - class FieldStorage(cgi.FieldStorage): +class FormFieldStorage(FieldStorage): - def _key_candidates(self, key): - yield key + def _key_candidates(self, key): + yield key + try: + # assume bytes, coerce to str try: - # assume bytes, coerce to str - try: - yield key.decode(self.encoding) - except UnicodeDecodeError: - pass - except AttributeError: - # assume str, coerce to bytes - try: - yield key.encode(self.encoding) - except UnicodeEncodeError: - pass - - def __getitem__(self, key): - - superobj = super(FieldStorage, self) - - error = None - - for candidate in self._key_candidates(key): - if isinstance(candidate, bytes): - # ouch - candidate = repr(candidate) - try: - return superobj.__getitem__(candidate) - except KeyError as e: - if error is None: - error = e + yield key.decode(self.encoding) + except UnicodeDecodeError: + pass + except AttributeError: + # assume str, coerce to bytes + try: + yield key.encode(self.encoding) + except UnicodeEncodeError: + pass - # fall through, re-raise the first KeyError - raise error + def __getitem__(self, key): + error = None - def __contains__(self, key): - superobj = super(FieldStorage, self) + for candidate in self._key_candidates(key): + if isinstance(candidate, bytes): + # ouch + candidate = repr(candidate) + try: + return super().__getitem__(candidate) + except KeyError as e: + if error is None: + error = e - for candidate in self._key_candidates(key): - if superobj.__contains__(candidate): - return True - return False + # fall through, re-raise the first KeyError + raise error -else: # PY2 + def __contains__(self, key): + superobj = super(FieldStorage, self) - FieldStorage = cgi.FieldStorage + for candidate in self._key_candidates(key): + if superobj.__contains__(candidate): + return True + return False form = FieldStorage() -print('Filename: %s' % form['up'].filename) -print('Name: %s' % form['name'].value) -print('Content: %s' % form['up'].file.read()) +print('Filename:', form['up'].filename) +print('Name:', form['name'].value) +print('Content:', form['up'].file.read()) diff --git a/tests/test_multidict.py b/tests/test_multidict.py index ed6c920c..938aebf1 100644 --- a/tests/test_multidict.py +++ b/tests/test_multidict.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- # (c) 2007 Ian Bicking and Philip Jenvey; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php -import cgi import gc import io import pytest +from paste.util.field_storage import FieldStorage from paste.util.multidict import MultiDict, UnicodeMultiDict def test_dict(): @@ -146,13 +146,13 @@ def assert_unicode_item(obj): assert isinstance(item[0], tuple) assert isinstance(item[1], tuple) - fs = cgi.FieldStorage() + fs = FieldStorage() fs.name = 'thefile' fs.filename = 'hello.txt' fs.file = io.BytesIO(b'hello') d[k('f')] = fs ufs = d[k('f')] - assert isinstance(ufs, cgi.FieldStorage) + assert isinstance(ufs, FieldStorage) assert ufs.name == fs.name assert isinstance(ufs.name, str) assert ufs.filename == fs.filename diff --git a/tests/test_util/test_field_storage.py b/tests/test_util/test_field_storage.py new file mode 100644 index 00000000..b54b832d --- /dev/null +++ b/tests/test_util/test_field_storage.py @@ -0,0 +1,538 @@ +import sys +import tempfile +from collections import namedtuple +from io import BytesIO + +import pytest + +from paste.util import field_storage +from paste.util.field_storage import FieldStorage, parse_header + + +class HackedSysModule: + # The regression test will have real values in sys.argv, which + # will completely confuse the test of the field_storage module + argv = [] + stdin = sys.stdin + + +field_storage.sys = HackedSysModule() + + +parse_strict_test_cases = [ + ("", {}), + ("&", ValueError("bad query field: ''")), + ("&&", ValueError("bad query field: ''")), + # Should the next few really be valid? + ("=", {}), + ("=&=", {}), + # This rest seem to make sense + ("=a", {'': ['a']}), + ("&=a", ValueError("bad query field: ''")), + ("=a&", ValueError("bad query field: ''")), + ("=&a", ValueError("bad query field: 'a'")), + ("b=a", {'b': ['a']}), + ("b+=a", {'b ': ['a']}), + ("a=b=a", {'a': ['b=a']}), + ("a=+b=a", {'a': [' b=a']}), + ("&b=a", ValueError("bad query field: ''")), + ("b&=a", ValueError("bad query field: 'b'")), + ("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}), + ("a=a+b&a=b+a", {'a': ['a b', 'b a']}), + ("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), + ("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r" + "&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b" + "&expire=964546263&lobale=en-US&kid=130003.300038&ss=env", + {'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'], + 'cuyer': ['r'], + 'expire': ['964546263'], + 'kid': ['130003.300038'], + 'lobale': ['en-US'], + 'order_id': ['0bb2e248638833d48cb7fed300000f1b'], + 'ss': ['env'], + 'view': ['bustomer'], + }), + + ("group_id=5470&set=custom&_assigned_to=31392&_status=1" + "&_category=100&SUBMIT=Browse", + {'SUBMIT': ['Browse'], + '_assigned_to': ['31392'], + '_category': ['100'], + '_status': ['1'], + 'group_id': ['5470'], + 'set': ['custom'], + }) + ] + + +def gen_result(data, environ): + encoding = 'latin-1' + fake_stdin = BytesIO(data.encode(encoding)) + fake_stdin.seek(0) + form = FieldStorage(fp=fake_stdin, environ=environ, encoding=encoding) + return {k: form.getlist(k) if isinstance(v, list) else v.value + for k, v in dict(form).items()} + + +def test_fieldstorage_properties(): + fs = FieldStorage() + assert not fs + assert "FieldStorage" in repr(fs) + assert list(fs) == list(fs.keys()) + fs.list.append(namedtuple('MockFieldStorage', 'name')('fieldvalue')) + assert fs + + +def test_fieldstorage_invalid(): + with pytest.raises(TypeError): + FieldStorage("not-a-file-obj", environ={"REQUEST_METHOD": "PUT"}) + with pytest.raises(TypeError): + FieldStorage("foo", "bar") + fs = FieldStorage(headers={'content-type': 'text/plain'}) + with pytest.raises(TypeError): + bool(fs) + + +def test_strict(): + for orig, expect in parse_strict_test_cases: + env = {'QUERY_STRING': orig} + fs = FieldStorage(environ=env) + if isinstance(expect, dict): + # test dict interface + assert len(expect) == len(fs) + assert len(expect.keys()) == len(fs.keys()) + assert fs.getvalue("nonexistent field", "default") == "default" + # test individual fields + for key in expect.keys(): + expect_val = expect[key] + assert key in fs + if len(expect_val) > 1: + assert fs.getvalue(key) == expect_val + else: + assert fs.getvalue(key) == expect_val[0] + + +def test_separator(): + parse_semicolon = [ + ("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}), + ("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), + (";", ValueError("bad query field: ''")), + (";;", ValueError("bad query field: ''")), + ("=;a", ValueError("bad query field: 'a'")), + (";b=a", ValueError("bad query field: ''")), + ("b;=a", ValueError("bad query field: 'b'")), + ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}), + ("a=a+b;a=b+a", {'a': ['a b', 'b a']}), + ] + for orig, expect in parse_semicolon: + env = {'QUERY_STRING': orig} + fs = FieldStorage(separator=';', environ=env) + if isinstance(expect, dict): + for key in expect.keys(): + expect_val = expect[key] + assert key in fs + if len(expect_val) > 1: + assert fs.getvalue(key) == expect_val + else: + assert fs.getvalue(key) == expect_val[0] + + +def test_fieldstorage_readline(): + # FieldStorage uses readline, which has the capacity to read all + # contents of the input file into memory; we use readline's size argument + # to prevent that for files that do not contain any newlines in + # non-GET/HEAD requests + class TestReadlineFile: + def __init__(self, file): + self.file = file + self.numcalls = 0 + + def readline(self, size=None): + self.numcalls += 1 + if size: + return self.file.readline(size) + else: + return self.file.readline() + + def __getattr__(self, name): + file = self.__dict__['file'] + a = getattr(file, name) + if not isinstance(a, int): + setattr(self, name, a) + return a + + f = TestReadlineFile(tempfile.TemporaryFile("wb+")) + try: + f.write(b'x' * 256 * 1024) + f.seek(0) + env = {'REQUEST_METHOD': 'PUT'} + fs = FieldStorage(fp=f, environ=env) + try: + # if we're not chunking properly, readline is only called twice + # (by read_binary); if we are chunking properly, it will be called + # 5 times as long as the chunk size is 1 << 16. + assert f.numcalls >= 2 + finally: + fs.file.close() + finally: + f.close() + + +def test_fieldstorage_multipart(): + # Test basic FieldStorage multipart parsing + env = { + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY}', + 'CONTENT_LENGTH': '558'} + fp = BytesIO(POSTDATA.encode('latin-1')) + fs = FieldStorage(fp, environ=env, encoding="latin-1") + assert len(fs.list) == 4 + expect = [{'name': 'id', 'filename': None, 'value': '1234'}, + {'name': 'title', 'filename': None, 'value': ''}, + {'name': 'file', 'filename': 'test.txt', + 'value': b'Testing 123.\n'}, + {'name': 'submit', 'filename': None, 'value': ' Add '}] + for x in range(len(fs.list)): + for k, exp in expect[x].items(): + got = getattr(fs.list[x], k) + assert got == exp + + +def test_fieldstorage_multipart_leading_whitespace(): + env = { + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY}', + 'CONTENT_LENGTH': '560'} + # Add some leading whitespace to our post data that will cause the + # first line to not be the inner boundary. + fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1')) + fs = FieldStorage(fp, environ=env, encoding="latin-1") + assert len(fs.list) == 4 + expect = [{'name': 'id', 'filename': None, 'value': '1234'}, + {'name': 'title', 'filename': None, 'value': ''}, + {'name': 'file', 'filename': 'test.txt', + 'value': b'Testing 123.\n'}, + {'name': 'submit', 'filename': None, 'value': ' Add '}] + for x in range(len(fs.list)): + for k, exp in expect[x].items(): + got = getattr(fs.list[x], k) + assert got == exp + + +def test_fieldstorage_multipart_non_ascii(): + # Test basic FieldStorage multipart parsing + env = {'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY}', + 'CONTENT_LENGTH': '558'} + for encoding in ['iso-8859-1', 'utf-8']: + fp = BytesIO(POSTDATA_NON_ASCII.encode(encoding)) + fs = FieldStorage(fp, environ=env, encoding=encoding) + assert len(fs.list) == 1 + expect = [{'name': 'id', 'filename': None, 'value': '\xe7\xf1\x80'}] + for x in range(len(fs.list)): + for k, exp in expect[x].items(): + got = getattr(fs.list[x], k) + assert got == exp + + +def test_fieldstorage_multipart_maxline(): + # Issue #18167 + maxline = 1 << 16 - 1 + + def check(content): + data = """---123 +Content-Disposition: form-data; name="upload"; filename="fake.txt" +Content-Type: text/plain + +{} +---123-- +""".replace('\n', '\r\n').format(content) + environ = { + 'CONTENT_LENGTH': str(len(data)), + 'CONTENT_TYPE': 'multipart/form-data; boundary=-123', + 'REQUEST_METHOD': 'POST', + } + + assert gen_result(data, environ) == { + 'upload': content.encode('latin1')} + check('x' * maxline) + check('x' * maxline + '\r') + check('x' * maxline + '\r' + 'y' * maxline) + + +def test_fieldstorage_multipart_w3c(): + # Test basic FieldStorage multipart parsing (W3C sample) + env = { + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY_W3}', + 'CONTENT_LENGTH': str(len(POSTDATA_W3))} + fp = BytesIO(POSTDATA_W3.encode('latin-1')) + fs = FieldStorage(fp, environ=env, encoding="latin-1") + assert len(fs.list) == 2 + assert fs.list[0].name == 'submit-name' + assert fs.list[0].value == 'Larry' + assert fs.list[1].name == 'files' + files = fs.list[1].value + assert len(files) == 2 + expect = [{'name': None, 'filename': 'file1.txt', + 'value': b'... contents of file1.txt ...'}, + {'name': None, 'filename': 'file2.gif', + 'value': b'...contents of file2.gif...'}] + for x in range(len(files)): + for k, exp in expect[x].items(): + got = getattr(files[x], k) + assert got == exp + + +def test_fieldstorage_part_content_length(): + boundary = "JfISa01" + postdata = """--JfISa01 +Content-Disposition: form-data; name="submit-name" +Content-Length: 5 + +Larry +--JfISa01""" + env = { + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': f'multipart/form-data; boundary={boundary}', + 'CONTENT_LENGTH': str(len(postdata))} + fp = BytesIO(postdata.encode('latin-1')) + fs = FieldStorage(fp, environ=env, encoding="latin-1") + assert len(fs.list) == 1 + assert fs.list[0].name == 'submit-name' + assert fs.list[0].value == 'Larry' + + +def test_field_storage_multipart_no_content_length(): + fp = BytesIO(b"""--MyBoundary +Content-Disposition: form-data; name="my-arg"; filename="foo" + +Test + +--MyBoundary-- +""") + env = { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=MyBoundary", + "wsgi.input": fp, + } + fields = FieldStorage(fp, environ=env) + + assert len(fields["my-arg"].file.read()) == 5 + + +def test_fieldstorage_as_context_manager(): + fp = BytesIO(b'x' * 10) + env = {'REQUEST_METHOD': 'PUT'} + with FieldStorage(fp=fp, environ=env) as fs: + content = fs.file.read() + assert fs.file.closed is False + assert fs.file.closed is True + assert content == 'x' * 10 + with pytest.raises(ValueError, match='I/O operation on closed file'): + fs.file.read() + + +_qs_result = { + 'key1': 'value1', + 'key2': ['value2x', 'value2y'], + 'key3': 'value3', + 'key4': 'value4' +} + + +def test_qs_and_url_encode(): + data = "key2=value2x&key3=value3&key4=value4" + environ = { + 'CONTENT_LENGTH': str(len(data)), + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'QUERY_STRING': 'key1=value1&key2=value2y', + 'REQUEST_METHOD': 'POST', + } + assert gen_result(data, environ) == _qs_result + + +def test_max_num_fields(): + # For application/x-www-form-urlencoded + data = '&'.join(['a=a']*11) + environ = { + 'CONTENT_LENGTH': str(len(data)), + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'REQUEST_METHOD': 'POST', + } + + with pytest.raises(ValueError): + FieldStorage( + fp=BytesIO(data.encode()), + environ=environ, + max_num_fields=10, + ) + + # For multipart/form-data + data = """---123 +Content-Disposition: form-data; name="a" + +3 +---123 +Content-Type: application/x-www-form-urlencoded + +a=4 +---123 +Content-Type: application/x-www-form-urlencoded + +a=5 +---123-- +""" + environ = { + 'CONTENT_LENGTH': str(len(data)), + 'CONTENT_TYPE': 'multipart/form-data; boundary=-123', + 'QUERY_STRING': 'a=1&a=2', + 'REQUEST_METHOD': 'POST', + } + + # 2 GET entities + # 1 top level POST entities + # 1 entity within the second POST entity + # 1 entity within the third POST entity + with pytest.raises(ValueError): + FieldStorage( + fp=BytesIO(data.encode()), + environ=environ, + max_num_fields=4, + ) + FieldStorage( + fp=BytesIO(data.encode()), + environ=environ, + max_num_fields=5, + ) + + +def test_qs_and_form_data(): + data = """---123 +Content-Disposition: form-data; name="key2" + +value2y +---123 +Content-Disposition: form-data; name="key3" + +value3 +---123 +Content-Disposition: form-data; name="key4" + +value4 +---123-- +""" + environ = { + 'CONTENT_LENGTH': str(len(data)), + 'CONTENT_TYPE': 'multipart/form-data; boundary=-123', + 'QUERY_STRING': 'key1=value1&key2=value2x', + 'REQUEST_METHOD': 'POST', + } + assert gen_result(data, environ) == _qs_result + + +def test_qs_and_form_data_file(): + data = """---123 +Content-Disposition: form-data; name="key2" + +value2y +---123 +Content-Disposition: form-data; name="key3" + +value3 +---123 +Content-Disposition: form-data; name="key4" + +value4 +---123 +Content-Disposition: form-data; name="upload"; filename="fake.txt" +Content-Type: text/plain + +this is the content of the fake file + +---123-- +""" + environ = { + 'CONTENT_LENGTH': str(len(data)), + 'CONTENT_TYPE': 'multipart/form-data; boundary=-123', + 'QUERY_STRING': 'key1=value1&key2=value2x', + 'REQUEST_METHOD': 'POST', + } + result = {**_qs_result, + 'upload': b'this is the content of the fake file\n'} + assert gen_result(data, environ) == result + + +def test_parse_header(): + assert parse_header("text/plain") == ("text/plain", {}) + assert parse_header("text/vnd.just.made.this.up ; ") == ( + "text/vnd.just.made.this.up", {}) + assert parse_header("text/plain;charset=us-ascii") == ( + "text/plain", {"charset": "us-ascii"}) + assert parse_header('text/plain ; charset="us-ascii"') == ( + "text/plain", {"charset": "us-ascii"}) + assert parse_header('text/plain ; charset="us-ascii"; another=opt') == ( + "text/plain", {"charset": "us-ascii", "another": "opt"}) + assert parse_header('attachment; filename="silly.txt"') == ( + "attachment", {"filename": "silly.txt"}) + assert parse_header('attachment; filename="strange;name"') == ( + "attachment", {"filename": "strange;name"}) + assert parse_header('attachment; filename="strange;name";size=123;') == ( + "attachment", {"filename": "strange;name", "size": "123"}) + assert parse_header('form-data; name="files"; filename="fo\\"o;bar"') == ( + "form-data", {"name": "files", "filename": 'fo"o;bar'}) + + +BOUNDARY = "---------------------------721837373350705526688164684" +POSTDATA = """-----------------------------721837373350705526688164684 +Content-Disposition: form-data; name="id" + +1234 +-----------------------------721837373350705526688164684 +Content-Disposition: form-data; name="title" + + +-----------------------------721837373350705526688164684 +Content-Disposition: form-data; name="file"; filename="test.txt" +Content-Type: text/plain + +Testing 123. + +-----------------------------721837373350705526688164684 +Content-Disposition: form-data; name="submit" + + Add\x20 +-----------------------------721837373350705526688164684-- +""" + +POSTDATA_NON_ASCII = """-----------------------------721837373350705526688164684 +Content-Disposition: form-data; name="id" + +\xe7\xf1\x80 +-----------------------------721837373350705526688164684 +""" + +# http://www.w3.org/TR/html401/interact/forms.html#h-17.13.4 +BOUNDARY_W3 = "AaB03x" +POSTDATA_W3 = """--AaB03x +Content-Disposition: form-data; name="submit-name" + +Larry +--AaB03x +Content-Disposition: form-data; name="files" +Content-Type: multipart/mixed; boundary=BbC04y + +--BbC04y +Content-Disposition: file; filename="file1.txt" +Content-Type: text/plain + +... contents of file1.txt ... +--BbC04y +Content-Disposition: file; filename="file2.gif" +Content-Type: image/gif +Content-Transfer-Encoding: binary + +...contents of file2.gif... +--BbC04y-- +--AaB03x-- +""" diff --git a/tests/test_wsgiwrappers.py b/tests/test_wsgiwrappers.py index 1bdf1b0f..ba09cecd 100644 --- a/tests/test_wsgiwrappers.py +++ b/tests/test_wsgiwrappers.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- # (c) 2007 Philip Jenvey; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php -import cgi import io from paste.fixture import TestApp from paste.wsgiwrappers import WSGIRequest, WSGIResponse +from paste.util.field_storage import FieldStorage class AssertApp(object): def __init__(self, assertfunc): @@ -76,7 +76,7 @@ def handle_fileupload(environ, start_response): assert len(request.POST) == 1 assert isinstance(request.POST.keys()[0], str) fs = request.POST['thefile'] - assert isinstance(fs, cgi.FieldStorage) + assert isinstance(fs, FieldStorage) assert isinstance(fs.filename, str) assert fs.filename == '寿司.txt' assert fs.value == b'Sushi' @@ -85,7 +85,7 @@ def handle_fileupload(environ, start_response): assert len(request.POST) == 1 assert isinstance(request.POST.keys()[0], str) fs = request.POST['thefile'] - assert isinstance(fs, cgi.FieldStorage) + assert isinstance(fs, FieldStorage) assert isinstance(fs.filename, str) assert fs.filename == u'寿司.txt' assert fs.value == b'Sushi' From 0cdd8bf139c521dec30f4e58472db8f777a81d2b Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Sun, 21 Apr 2024 17:59:49 +0200 Subject: [PATCH 2/7] Fix issues with tests running under Linux --- .github/workflows/tests.yaml | 4 +- paste/__init__.py | 2 +- paste/wsgilib.py | 5 +- tests/cgiapp_data/form.cgi | 13 +++-- tests/test_cgiapp.py | 97 ++++++++++++++++++------------------ 5 files changed, 63 insertions(+), 58 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 9e533f6b..7e7f2f5c 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -23,8 +23,8 @@ jobs: env: pypy3 name: ${{ matrix.env }} on Python ${{ matrix.python }} steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v3 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python }} - run: pip install tox diff --git a/paste/__init__.py b/paste/__init__.py index 4e2d6388..ac06d663 100644 --- a/paste/__init__.py +++ b/paste/__init__.py @@ -3,7 +3,7 @@ try: import pkg_resources pkg_resources.declare_namespace(__name__) -except ImportError: +except (AttributeError, ImportError): # don't prevent use of paste if pkg_resources isn't installed from pkgutil import extend_path __path__ = extend_path(__path__, __name__) diff --git a/paste/wsgilib.py b/paste/wsgilib.py index a7623272..384f6f2d 100644 --- a/paste/wsgilib.py +++ b/paste/wsgilib.py @@ -378,8 +378,9 @@ def flush(self): def write(self, value): if not value: return - raise AssertionError( - "No errors should be written (got: %r)" % value) + print(value) # for debugging + # raise AssertionError( + # "No errors should be written (got: %r)" % value) def writelines(self, seq): raise AssertionError( diff --git a/tests/cgiapp_data/form.cgi b/tests/cgiapp_data/form.cgi index c224d361..e751aa04 100755 --- a/tests/cgiapp_data/form.cgi +++ b/tests/cgiapp_data/form.cgi @@ -1,10 +1,15 @@ #!/usr/bin/env python -from paste.util.field_storage import FieldStorage - print('Content-type: text/plain') print('') +import sys +from os.path import dirname + +base_dir = dirname(dirname(dirname(__file__))) +sys.path.insert(0, base_dir) + +from paste.util.field_storage import FieldStorage class FormFieldStorage(FieldStorage): @@ -41,10 +46,8 @@ class FormFieldStorage(FieldStorage): raise error def __contains__(self, key): - superobj = super(FieldStorage, self) - for candidate in self._key_candidates(key): - if superobj.__contains__(candidate): + if super().__contains__(candidate): return True return False diff --git a/tests/test_cgiapp.py b/tests/test_cgiapp.py index d173aa2c..4895256b 100644 --- a/tests/test_cgiapp.py +++ b/tests/test_cgiapp.py @@ -9,52 +9,53 @@ data_dir = os.path.join(os.path.dirname(__file__), 'cgiapp_data') # these CGI scripts can't work on Windows or Jython -if sys.platform != 'win32' and not sys.platform.startswith('java'): - - # Ensure the CGI scripts are called with the same python interpreter. Put a - # symlink to the interpreter executable into the path... - def setup_module(): - global oldpath, pyexelink - oldpath = os.environ.get('PATH', None) - os.environ['PATH'] = data_dir + os.path.pathsep + oldpath - pyexelink = os.path.join(data_dir, "python") - try: - os.unlink(pyexelink) - except OSError: - pass - os.symlink(sys.executable, pyexelink) - - # ... and clean up again. - def teardown_module(): - global oldpath, pyexelink +if sys.platform == 'win32' or sys.platform.startswith('java'): + sys.exit(0) + +# Ensure the CGI scripts are called with the same python interpreter. +# Put a symlink to the interpreter executable into the path... +def setup_module(): + global oldpath, pyexelink + oldpath = os.environ.get('PATH', None) + os.environ['PATH'] = data_dir + os.path.pathsep + oldpath + pyexelink = os.path.join(data_dir, "python") + try: os.unlink(pyexelink) - if oldpath is not None: - os.environ['PATH'] = oldpath - else: - del os.environ['PATH'] - - def test_ok(): - app = TestApp(CGIApplication({}, script='ok.cgi', path=[data_dir])) - res = app.get('') - assert res.header('content-type') == 'text/html; charset=UTF-8' - assert res.full_status == '200 Okay' - assert 'This is the body' in res - - def test_form(): - app = TestApp(CGIApplication({}, script='form.cgi', path=[data_dir])) - res = app.post('', params={'name': b'joe'}, - upload_files=[('up', 'file.txt', b'x'*10000)]) - assert 'file.txt' in res - assert 'joe' in res - assert 'x'*10000 in res - - def test_error(): - app = TestApp(CGIApplication({}, script='error.cgi', path=[data_dir])) - pytest.raises(CGIError, app.get, '', status=500) - - def test_stderr(): - app = TestApp(CGIApplication({}, script='stderr.cgi', path=[data_dir])) - res = app.get('', expect_errors=True) - assert res.status == 500 - assert 'error' in res - assert 'some data' in res.errors + except OSError: + pass + os.symlink(sys.executable, pyexelink) + +# ... and clean up again. +def teardown_module(): + global oldpath, pyexelink + os.unlink(pyexelink) + if oldpath is not None: + os.environ['PATH'] = oldpath + else: + del os.environ['PATH'] + +def test_ok(): + app = TestApp(CGIApplication({}, script='ok.cgi', path=[data_dir])) + res = app.get('') + assert res.header('content-type') == 'text/html; charset=UTF-8' + assert res.full_status == '200 Okay' + assert 'This is the body' in res + +def test_form(): + app = TestApp(CGIApplication({}, script='form.cgi', path=[data_dir])) + res = app.post('', params={'name': b'joe'}, + upload_files=[('up', 'file.txt', b'x'*10000)]) + assert 'file.txt' in res + assert 'joe' in res + assert 'x'*10000 in res + +def test_error(): + app = TestApp(CGIApplication({}, script='error.cgi', path=[data_dir])) + pytest.raises(CGIError, app.get, '', status=500) + +def test_stderr(): + app = TestApp(CGIApplication({}, script='stderr.cgi', path=[data_dir])) + res = app.get('', expect_errors=True) + assert res.status == 500 + assert 'error' in res + assert 'some data' in res.errors From 2d85b162dfad5d8a50330a6e556f9a7f0f02d3db Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Mon, 22 Apr 2024 00:02:18 +0200 Subject: [PATCH 3/7] Add PSF copyright notice --- paste/util/field_storage.py | 10 ++++++++++ tests/test_util/test_field_storage.py | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/paste/util/field_storage.py b/paste/util/field_storage.py index 13ef9d2f..a88c08d5 100644 --- a/paste/util/field_storage.py +++ b/paste/util/field_storage.py @@ -1,5 +1,15 @@ """FieldStorage class from the deprecated cgi library.""" +# Copyright © 2001-2023 Python Software Foundation; All Rights Reserved. + +# History of this code: +# Michael McLay started the original cgi module. Steve Majewski changed +# the interface to SvFormContentDict and FormContentDict. The multipart +# parsing was inspired by code submitted by Andreas Paepcke. Guido van +# Rossum rewrote, reformatted and documented the module and maintained it +# until it was deprecated. Christoph Zwerschke copied this slightly +# modified and stripped-down version of the code into paste. + import locale import os import sys diff --git a/tests/test_util/test_field_storage.py b/tests/test_util/test_field_storage.py index b54b832d..8d34e2c7 100644 --- a/tests/test_util/test_field_storage.py +++ b/tests/test_util/test_field_storage.py @@ -1,3 +1,7 @@ +# Slightly modified tests from the original cgi test module. + +# Copyright © 2001-2023 Python Software Foundation; All Rights Reserved. + import sys import tempfile from collections import namedtuple From f24a9d9335d38fccbd814abdb005ddb9e917ef23 Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Mon, 22 Apr 2024 00:24:18 +0200 Subject: [PATCH 4/7] Mention that this was taken from the standard lib --- paste/util/field_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paste/util/field_storage.py b/paste/util/field_storage.py index a88c08d5..e37052ee 100644 --- a/paste/util/field_storage.py +++ b/paste/util/field_storage.py @@ -1,4 +1,4 @@ -"""FieldStorage class from the deprecated cgi library.""" +"""FieldStorage class from the deprecated standard library cgi module.""" # Copyright © 2001-2023 Python Software Foundation; All Rights Reserved. From f5d32d6a9470a3a1d68353daf299ff86d5bdd7fc Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Thu, 25 Apr 2024 17:16:02 +0200 Subject: [PATCH 5/7] Remove debug code --- paste/wsgilib.py | 5 ++--- tests/test_exceptions/test_reporter.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/paste/wsgilib.py b/paste/wsgilib.py index 384f6f2d..a7623272 100644 --- a/paste/wsgilib.py +++ b/paste/wsgilib.py @@ -378,9 +378,8 @@ def flush(self): def write(self, value): if not value: return - print(value) # for debugging - # raise AssertionError( - # "No errors should be written (got: %r)" % value) + raise AssertionError( + "No errors should be written (got: %r)" % value) def writelines(self, seq): raise AssertionError( diff --git a/tests/test_exceptions/test_reporter.py b/tests/test_exceptions/test_reporter.py index 5a06589b..92711dd4 100644 --- a/tests/test_exceptions/test_reporter.py +++ b/tests/test_exceptions/test_reporter.py @@ -30,7 +30,7 @@ def test_logger(): assert 0 rep.report(exc_data) content = open(fn).read() - assert len(content.splitlines()) == 4, len(content.splitlines()) + assert len(content.splitlines()) == 4 assert 'ValueError' in content assert 'int' in content assert 'test_reporter.py' in content From 60740b316f256d8786f36dcd621e51af3a14135d Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Thu, 25 Apr 2024 17:16:26 +0200 Subject: [PATCH 6/7] Correctly skip tests on Windows --- tests/test_cgiapp.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_cgiapp.py b/tests/test_cgiapp.py index 4895256b..c6aa9219 100644 --- a/tests/test_cgiapp.py +++ b/tests/test_cgiapp.py @@ -8,9 +8,10 @@ data_dir = os.path.join(os.path.dirname(__file__), 'cgiapp_data') -# these CGI scripts can't work on Windows or Jython -if sys.platform == 'win32' or sys.platform.startswith('java'): - sys.exit(0) +pytestmark = pytest.mark.skipif( + sys.platform == 'win32' or sys.platform.startswith('java'), + reason="CGI scripts can't work on Windows or Jython") + # Ensure the CGI scripts are called with the same python interpreter. # Put a symlink to the interpreter executable into the path... From 12fc30ad45c83b43533f3cbdf0e1c1d9a4c5e684 Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Thu, 25 Apr 2024 17:20:55 +0200 Subject: [PATCH 7/7] Align with master branch --- paste/__init__.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/paste/__init__.py b/paste/__init__.py index ac06d663..ec8b0654 100644 --- a/paste/__init__.py +++ b/paste/__init__.py @@ -1,8 +1,13 @@ # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php + +import warnings + try: - import pkg_resources - pkg_resources.declare_namespace(__name__) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=DeprecationWarning) + import pkg_resources + pkg_resources.declare_namespace(__name__) except (AttributeError, ImportError): # don't prevent use of paste if pkg_resources isn't installed from pkgutil import extend_path