diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 9e533f6..7e7f2f5 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -23,8 +23,8 @@ jobs:
env: pypy3
name: ${{ matrix.env }} on Python ${{ matrix.python }}
steps:
- - uses: actions/checkout@v3
- - uses: actions/setup-python@v3
+ - uses: actions/checkout@v4
+ - uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}
- run: pip install tox
diff --git a/docs/do-it-yourself-framework.txt b/docs/do-it-yourself-framework.txt
index 23b85ae..95b869b 100644
--- a/docs/do-it-yourself-framework.txt
+++ b/docs/do-it-yourself-framework.txt
@@ -128,8 +128,7 @@ fields::
'name="name">']
The ``parse_formvars`` function just takes the WSGI environment and
-calls the `cgi `_
-module (the ``FieldStorage`` class) and turns that into a MultiDict.
+turns that into a MultiDict which may contain ``FieldStorage`` instances.
Now For a Framework
===================
diff --git a/paste/__init__.py b/paste/__init__.py
index 4e2d638..ec8b065 100644
--- a/paste/__init__.py
+++ b/paste/__init__.py
@@ -1,9 +1,14 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
+
+import warnings
+
try:
- import pkg_resources
- pkg_resources.declare_namespace(__name__)
-except ImportError:
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=DeprecationWarning)
+ import pkg_resources
+ pkg_resources.declare_namespace(__name__)
+except (AttributeError, ImportError):
# don't prevent use of paste if pkg_resources isn't installed
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
diff --git a/paste/request.py b/paste/request.py
index bb0d9ee..612e648 100644
--- a/paste/request.py
+++ b/paste/request.py
@@ -17,12 +17,12 @@
* resolve_relative_url(url, environ)
"""
-import cgi
from collections.abc import MutableMapping as DictMixin
from urllib import parse as urlparse
from urllib.parse import quote, parse_qsl
from http.cookies import SimpleCookie, CookieError
+from paste.util.field_storage import FieldStorage
from paste.util.multidict import MultiDict
__all__ = ['get_cookies', 'get_cookie_dict', 'parse_querystring',
@@ -151,16 +151,16 @@ def parse_formvars(environ, include_get_vars=True, encoding=None, errors=None):
# fake_out_cgi requests
formvars = MultiDict()
ct = environ.get('CONTENT_TYPE', '').partition(';')[0].lower()
- use_cgi = ct in ('', 'application/x-www-form-urlencoded',
- 'multipart/form-data')
- # FieldStorage assumes a default CONTENT_LENGTH of -1, but a
- # default of 0 is better:
+ use_cgi = ct in (
+ '', 'application/x-www-form-urlencoded', 'multipart/form-data')
+ # FieldStorage assumes a default CONTENT_LENGTH of -1,
+ # but a default of 0 is better:
if not environ.get('CONTENT_LENGTH'):
environ['CONTENT_LENGTH'] = '0'
if use_cgi:
- # Prevent FieldStorage from parsing QUERY_STRING during GET/HEAD
- # requests
- old_query_string = environ.get('QUERY_STRING','')
+ # Prevent FieldStorage from parsing QUERY_STRING
+ # during GET/HEAD # requests
+ old_query_string = environ.get('QUERY_STRING', '')
environ['QUERY_STRING'] = ''
inp = environ['wsgi.input']
kwparms = {}
@@ -168,10 +168,9 @@ def parse_formvars(environ, include_get_vars=True, encoding=None, errors=None):
kwparms['encoding'] = encoding
if errors:
kwparms['errors'] = errors
- fs = cgi.FieldStorage(fp=inp,
- environ=environ,
- keep_blank_values=True,
- **kwparms)
+ fs = FieldStorage(
+ fp=inp, environ=environ, keep_blank_values=True,
+ **kwparms)
environ['QUERY_STRING'] = old_query_string
if isinstance(fs.value, list):
for name in fs.keys():
@@ -384,20 +383,6 @@ def keys(self):
def __contains__(self, item):
return self._trans_name(item) in self.environ
-def _cgi_FieldStorage__repr__patch(self):
- """ monkey patch for FieldStorage.__repr__
-
- Unbelievely, the default __repr__ on FieldStorage reads
- the entire file content instead of being sane about it.
- This is a simple replacement that doesn't do that
- """
- if self.file:
- return "FieldStorage(%r, %r)" % (
- self.name, self.filename)
- return "FieldStorage(%r, %r, %r)" % (
- self.name, self.filename, self.value)
-
-cgi.FieldStorage.__repr__ = _cgi_FieldStorage__repr__patch
if __name__ == '__main__':
import doctest
diff --git a/paste/util/field_storage.py b/paste/util/field_storage.py
new file mode 100644
index 0000000..e37052e
--- /dev/null
+++ b/paste/util/field_storage.py
@@ -0,0 +1,657 @@
+"""FieldStorage class from the deprecated standard library cgi module."""
+
+# Copyright © 2001-2023 Python Software Foundation; All Rights Reserved.
+
+# History of this code:
+# Michael McLay started the original cgi module. Steve Majewski changed
+# the interface to SvFormContentDict and FormContentDict. The multipart
+# parsing was inspired by code submitted by Andreas Paepcke. Guido van
+# Rossum rewrote, reformatted and documented the module and maintained it
+# until it was deprecated. Christoph Zwerschke copied this slightly
+# modified and stripped-down version of the code into paste.
+
+import locale
+import os
+import sys
+import urllib.parse
+import tempfile
+
+from collections.abc import Mapping
+from email.message import Message
+from email.parser import FeedParser
+from io import StringIO, BytesIO, TextIOWrapper
+
+__all__ = ['FieldStorage', 'parse_header']
+
+# Maximum input we will accept when REQUEST_METHOD is POST
+# 0 ==> unlimited input
+maxlen = 0
+
+
+def _parseparam(s):
+ while s[:1] == ';':
+ s = s[1:]
+ end = s.find(';')
+ while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
+ end = s.find(';', end + 1)
+ if end < 0:
+ end = len(s)
+ f = s[:end]
+ yield f.strip()
+ s = s[end:]
+
+
+def parse_header(line):
+ """Parse a Content-type like header.
+
+ Return the main content-type and a dictionary of options.
+ """
+ parts = _parseparam(';' + line)
+ key = parts.__next__()
+ pdict = {}
+ for p in parts:
+ i = p.find('=')
+ if i >= 0:
+ name = p[:i].strip().lower()
+ value = p[i+1:].strip()
+ if len(value) >= 2 and value[0] == value[-1] == '"':
+ value = value[1:-1]
+ value = value.replace('\\\\', '\\').replace('\\"', '"')
+ pdict[name] = value
+ return key, pdict
+
+
+def valid_boundary(s):
+ import re
+ if isinstance(s, bytes):
+ _vb_pattern = b'^[ -~]{0,200}[!-~]$'
+ else:
+ _vb_pattern = '^[ -~]{0,200}[!-~]$'
+ return re.match(_vb_pattern, s)
+
+
+class MiniFieldStorage:
+ """Like FieldStorage, for use when no file uploads are possible."""
+
+ # Dummy attributes
+ filename = None
+ list = None
+ type = None
+ file = None
+ type_options = {}
+ disposition = None
+ disposition_options = {}
+ headers = {}
+
+ def __init__(self, name, value):
+ """Constructor from field name and value."""
+ self.name = name
+ self.value = value
+ # self.file = StringIO(value)
+
+ def __repr__(self):
+ """Return printable representation."""
+ return f'MiniFieldStorage({self.name!r}, {self.value!r})'
+
+
+class FieldStorage:
+ """Store a sequence of fields, reading multipart/form-data.
+
+ This class provides naming, typing, files stored on disk, and
+ more. At the top level, it is accessible like a dictionary, whose
+ keys are the field names. (Note: None can occur as a field name.)
+ The items are either a Python list (if there's multiple values) or
+ another FieldStorage or MiniFieldStorage object. If it's a single
+ object, it has the following attributes:
+
+ name: the field name, if specified; otherwise None
+
+ filename: the filename, if specified; otherwise None; this is the
+ client side filename, *not* the file name on which it is
+ stored (that's a temporary file you don't deal with)
+
+ value: the value as a *string*; for file uploads, this
+ transparently reads the file every time you request the value
+ and returns *bytes*
+
+ file: the file(-like) object from which you can read the data *as
+ bytes* ; None if the data is stored a simple string
+
+ type: the content-type, or None if not specified
+
+ type_options: dictionary of options specified on the content-type
+ line
+
+ disposition: content-disposition, or None if not specified
+
+ disposition_options: dictionary of corresponding options
+
+ headers: a dictionary(-like) object (sometimes email.message.Message or a
+ subclass thereof) containing *all* headers
+
+ The class is subclassable, mostly for the purpose of overriding
+ the make_file() method, which is called internally to come up with
+ a file open for reading and writing. This makes it possible to
+ override the default choice of storing all files in a temporary
+ directory and unlinking them as soon as they have been opened.
+ """
+ def __init__(self, fp=None, headers=None, outerboundary=b'',
+ environ=os.environ,
+ keep_blank_values=False, strict_parsing=False,
+ limit=None, encoding='utf-8', errors='replace',
+ max_num_fields=None, separator='&'):
+ """Constructor. Read multipart/* until last part.
+
+ Arguments, all optional:
+
+ fp : file pointer; default: sys.stdin.buffer
+ (not used when the request method is GET)
+ Can be :
+ 1. a TextIOWrapper object
+ 2. an object whose read() and readline() methods return bytes
+
+ headers : header dictionary-like object; default:
+ taken from environ as per CGI spec
+
+ outerboundary : terminating multipart boundary
+ (for internal use only)
+
+ environ : environment dictionary; default: os.environ
+
+ keep_blank_values: flag indicating whether blank values in
+ percent-encoded forms should be treated as blank strings.
+ A true value indicates that blanks should be retained as
+ blank strings. The default false value indicates that
+ blank values are to be ignored and treated as if they were
+ not included.
+
+ strict_parsing: flag indicating what to do with parsing errors.
+ If false (the default), errors are silently ignored.
+ If true, errors raise a ValueError exception.
+
+ limit : used internally to read parts of multipart/form-data forms,
+ to exit from the reading loop when reached. It is the difference
+ between the form content-length and the number of bytes already
+ read
+
+ encoding, errors : the encoding and error handler used to decode the
+ binary stream to strings. Must be the same as the charset defined
+ for the page sending the form (content-type : meta http-equiv or
+ header)
+
+ max_num_fields: int. If set, then __init__ throws a ValueError
+ if there are more than n fields read by parse_qsl().
+ """
+ method = 'GET'
+ self.keep_blank_values = keep_blank_values
+ self.strict_parsing = strict_parsing
+ self.max_num_fields = max_num_fields
+ self.separator = separator
+ if 'REQUEST_METHOD' in environ:
+ method = environ['REQUEST_METHOD'].upper()
+ self.qs_on_post = None
+ if method == 'GET' or method == 'HEAD':
+ if 'QUERY_STRING' in environ:
+ qs = environ['QUERY_STRING']
+ elif sys.argv[1:]:
+ qs = sys.argv[1]
+ else:
+ qs = ''
+ qs = qs.encode(locale.getpreferredencoding(), 'surrogateescape')
+ fp = BytesIO(qs)
+ if headers is None:
+ headers = {'content-type':
+ 'application/x-www-form-urlencoded'}
+ if headers is None:
+ headers = {}
+ if method == 'POST':
+ # Set default content-type for POST to what's traditional
+ headers['content-type'] = 'application/x-www-form-urlencoded'
+ if 'CONTENT_TYPE' in environ:
+ headers['content-type'] = environ['CONTENT_TYPE']
+ if 'QUERY_STRING' in environ:
+ self.qs_on_post = environ['QUERY_STRING']
+ if 'CONTENT_LENGTH' in environ:
+ headers['content-length'] = environ['CONTENT_LENGTH']
+ else:
+ if not (isinstance(headers, (Mapping, Message))):
+ raise TypeError('headers must be mapping or an instance'
+ ' of email.message.Message')
+ self.headers = headers
+ if fp is None:
+ self.fp = sys.stdin.buffer
+ # self.fp.read() must return bytes
+ elif isinstance(fp, TextIOWrapper):
+ self.fp = fp.buffer
+ else:
+ if not (hasattr(fp, 'read') and hasattr(fp, 'readline')):
+ raise TypeError('fp must be file pointer')
+ self.fp = fp
+
+ self.encoding = encoding
+ self.errors = errors
+
+ if not isinstance(outerboundary, bytes):
+ raise TypeError('outerboundary must be bytes,'
+ f' not {type(outerboundary).__name__}')
+ self.outerboundary = outerboundary
+
+ self.bytes_read = 0
+ self.limit = limit
+
+ # Process content-disposition header
+ cdisp, pdict = '', {}
+ if 'content-disposition' in self.headers:
+ cdisp, pdict = parse_header(self.headers['content-disposition'])
+ self.disposition = cdisp
+ self.disposition_options = pdict
+ self.name = None
+ if 'name' in pdict:
+ self.name = pdict['name']
+ self.filename = None
+ if 'filename' in pdict:
+ self.filename = pdict['filename']
+ self._binary_file = self.filename is not None
+
+ # Process content-type header
+ #
+ # Honor any existing content-type header. But if there is no
+ # content-type header, use some sensible defaults. Assume
+ # outerboundary is '' at the outer level, but something non-false
+ # inside a multi-part. The default for an inner part is text/plain,
+ # but for an outer part it should be urlencoded. This should catch
+ # bogus clients which erroneously forget to include a content-type
+ # header.
+ #
+ # See below for what we do if there does exist a content-type header,
+ # but it happens to be something we don't understand.
+ if 'content-type' in self.headers:
+ ctype, pdict = parse_header(self.headers['content-type'])
+ elif self.outerboundary or method != 'POST':
+ ctype, pdict = 'text/plain', {}
+ else:
+ ctype, pdict = 'application/x-www-form-urlencoded', {}
+ self.type = ctype
+ self.type_options = pdict
+ if 'boundary' in pdict:
+ self.innerboundary = pdict['boundary'].encode(self.encoding,
+ self.errors)
+ else:
+ self.innerboundary = b''
+
+ clen = -1
+ if 'content-length' in self.headers:
+ try:
+ clen = int(self.headers['content-length'])
+ except ValueError:
+ pass
+ if maxlen and clen > maxlen:
+ raise ValueError('Maximum content length exceeded')
+ self.length = clen
+ if self.limit is None and clen >= 0:
+ self.limit = clen
+
+ self.list = self.file = None
+ self.done = 0
+ if ctype == 'application/x-www-form-urlencoded':
+ self.read_urlencoded()
+ elif ctype[:10] == 'multipart/':
+ self.read_multi(environ, keep_blank_values, strict_parsing)
+ else:
+ self.read_single()
+
+ def __del__(self):
+ try:
+ self.file.close()
+ except AttributeError:
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.file.close()
+
+ def __repr__(self):
+ """Return a printable representation."""
+ value = '...' if self.file else self.value # avoid reading the file
+ return f'FieldStorage({self.name!r}, {self.filename!r}, {value!r})'
+
+ def __iter__(self):
+ return iter(self.keys())
+
+ def __getattr__(self, name):
+ if name != 'value':
+ raise AttributeError(name)
+ if self.file:
+ self.file.seek(0)
+ value = self.file.read()
+ self.file.seek(0)
+ elif self.list is not None:
+ value = self.list
+ else:
+ value = None
+ return value
+
+ def __getitem__(self, key):
+ """Dictionary style indexing."""
+ if self.list is None:
+ raise TypeError('not indexable')
+ found = []
+ for item in self.list:
+ if item.name == key:
+ found.append(item)
+ if not found:
+ raise KeyError(key)
+ if len(found) == 1:
+ return found[0]
+ else:
+ return found
+
+ def getvalue(self, key, default=None):
+ """Dictionary style get() method, including 'value' lookup."""
+ if key in self:
+ value = self[key]
+ if isinstance(value, list):
+ return [x.value for x in value]
+ else:
+ return value.value
+ else:
+ return default
+
+ def getfirst(self, key, default=None):
+ """ Return the first value received."""
+ if key in self:
+ value = self[key]
+ if isinstance(value, list):
+ return value[0].value
+ else:
+ return value.value
+ else:
+ return default
+
+ def getlist(self, key):
+ """ Return list of received values."""
+ if key in self:
+ value = self[key]
+ if isinstance(value, list):
+ return [x.value for x in value]
+ else:
+ return [value.value]
+ else:
+ return []
+
+ def keys(self):
+ """Dictionary style keys() method."""
+ if self.list is None:
+ raise TypeError('not indexable')
+ return list(set(item.name for item in self.list))
+
+ def __contains__(self, key):
+ """Dictionary style __contains__ method."""
+ if self.list is None:
+ raise TypeError('not indexable')
+ return any(item.name == key for item in self.list)
+
+ def __len__(self):
+ """Dictionary style len(x) support."""
+ return len(self.keys())
+
+ def __bool__(self):
+ if self.list is None:
+ raise TypeError('Cannot be converted to bool.')
+ return bool(self.list)
+
+ def read_urlencoded(self):
+ """Internal: read data in query string format."""
+ qs = self.fp.read(self.length)
+ if not isinstance(qs, bytes):
+ raise ValueError(
+ f'{self.fp} should return bytes, got {type(qs).__name__}')
+ qs = qs.decode(self.encoding, self.errors)
+ if self.qs_on_post:
+ qs += '&' + self.qs_on_post
+ query = urllib.parse.parse_qsl(
+ qs, self.keep_blank_values, self.strict_parsing,
+ encoding=self.encoding, errors=self.errors,
+ max_num_fields=self.max_num_fields, separator=self.separator)
+ self.list = [MiniFieldStorage(key, value) for key, value in query]
+ self.skip_lines()
+
+ FieldStorageClass = None
+
+ def read_multi(self, environ, keep_blank_values, strict_parsing):
+ """Internal: read a part that is itself multipart."""
+ ib = self.innerboundary
+ if not valid_boundary(ib):
+ raise ValueError(f'Invalid boundary in multipart form: {ib!r}')
+ self.list = []
+ if self.qs_on_post:
+ query = urllib.parse.parse_qsl(
+ self.qs_on_post, self.keep_blank_values, self.strict_parsing,
+ encoding=self.encoding, errors=self.errors,
+ max_num_fields=self.max_num_fields, separator=self.separator)
+ self.list.extend(MiniFieldStorage(key, value) for key, value in query)
+
+ klass = self.FieldStorageClass or self.__class__
+ first_line = self.fp.readline() # bytes
+ if not isinstance(first_line, bytes):
+ raise ValueError(
+ f'{self.fp} should return bytes,'
+ f' got {type(first_line).__name__}')
+ self.bytes_read += len(first_line)
+
+ # Ensure that we consume the file until we've hit our inner boundary
+ while (first_line.strip() != (b'--' + self.innerboundary) and
+ first_line):
+ first_line = self.fp.readline()
+ self.bytes_read += len(first_line)
+
+ # Propagate max_num_fields into the subclass appropriately
+ max_num_fields = self.max_num_fields
+ if max_num_fields is not None:
+ max_num_fields -= len(self.list)
+
+ while True:
+ parser = FeedParser()
+ hdr_text = b''
+ while True:
+ data = self.fp.readline()
+ hdr_text += data
+ if not data.strip():
+ break
+ if not hdr_text:
+ break
+ # parser takes strings, not bytes
+ self.bytes_read += len(hdr_text)
+ parser.feed(hdr_text.decode(self.encoding, self.errors))
+ headers = parser.close()
+
+ # Some clients add Content-Length for part headers, ignore them
+ if 'content-length' in headers:
+ del headers['content-length']
+
+ limit = (None if self.limit is None
+ else self.limit - self.bytes_read)
+ part = klass(self.fp, headers, ib, environ, keep_blank_values,
+ strict_parsing, limit,
+ self.encoding, self.errors, max_num_fields, self.separator)
+
+ if max_num_fields is not None:
+ max_num_fields -= 1
+ if part.list:
+ max_num_fields -= len(part.list)
+ if max_num_fields < 0:
+ raise ValueError('Max number of fields exceeded')
+
+ self.bytes_read += part.bytes_read
+ self.list.append(part)
+ if part.done or self.bytes_read >= self.length > 0:
+ break
+ self.skip_lines()
+
+ def read_single(self):
+ """Internal: read an atomic part."""
+ if self.length >= 0:
+ self.read_binary()
+ self.skip_lines()
+ else:
+ self.read_lines()
+ self.file.seek(0)
+
+ bufsize = 8*1024 # I/O buffering size for copy to file
+
+ def read_binary(self):
+ """Internal: read binary data."""
+ self.file = self.make_file()
+ todo = self.length
+ if todo >= 0:
+ while todo > 0:
+ data = self.fp.read(min(todo, self.bufsize)) # bytes
+ if not isinstance(data, bytes):
+ raise ValueError(
+ f'{self.fp} should return bytes,'
+ f' got {type(data).__name__}')
+ self.bytes_read += len(data)
+ if not data:
+ self.done = -1
+ break
+ self.file.write(data)
+ todo = todo - len(data)
+
+ def read_lines(self):
+ """Internal: read lines until EOF or outerboundary."""
+ if self._binary_file:
+ self.file = self.__file = BytesIO() # store data as bytes for files
+ else:
+ self.file = self.__file = StringIO() # as strings for other fields
+ if self.outerboundary:
+ self.read_lines_to_outerboundary()
+ else:
+ self.read_lines_to_eof()
+
+ def __write(self, line):
+ """line is always bytes, not string"""
+ if self.__file is not None:
+ if self.__file.tell() + len(line) > 1000:
+ self.file = self.make_file()
+ data = self.__file.getvalue()
+ self.file.write(data)
+ self.__file = None
+ if self._binary_file:
+ # keep bytes
+ self.file.write(line)
+ else:
+ # decode to string
+ self.file.write(line.decode(self.encoding, self.errors))
+
+ def read_lines_to_eof(self):
+ """Internal: read lines until EOF."""
+ while 1:
+ line = self.fp.readline(1 << 16) # bytes
+ self.bytes_read += len(line)
+ if not line:
+ self.done = -1
+ break
+ self.__write(line)
+
+ def read_lines_to_outerboundary(self):
+ """Internal: read lines until outerboundary.
+ Data is read as bytes: boundaries and line ends must be converted
+ to bytes for comparisons.
+ """
+ next_boundary = b'--' + self.outerboundary
+ last_boundary = next_boundary + b'--'
+ delim = b''
+ last_line_lfend = True
+ _read = 0
+ while 1:
+
+ if self.limit is not None and 0 <= self.limit <= _read:
+ break
+ line = self.fp.readline(1 << 16) # bytes
+ self.bytes_read += len(line)
+ _read += len(line)
+ if not line:
+ self.done = -1
+ break
+ if delim == b'\r':
+ line = delim + line
+ delim = b''
+ if line.startswith(b'--') and last_line_lfend:
+ strippedline = line.rstrip()
+ if strippedline == next_boundary:
+ break
+ if strippedline == last_boundary:
+ self.done = 1
+ break
+ odelim = delim
+ if line.endswith(b'\r\n'):
+ delim = b'\r\n'
+ line = line[:-2]
+ last_line_lfend = True
+ elif line.endswith(b'\n'):
+ delim = b'\n'
+ line = line[:-1]
+ last_line_lfend = True
+ elif line.endswith(b'\r'):
+ # We may interrupt \r\n sequences if they span the 2**16
+ # byte boundary
+ delim = b'\r'
+ line = line[:-1]
+ last_line_lfend = False
+ else:
+ delim = b''
+ last_line_lfend = False
+ self.__write(odelim + line)
+
+ def skip_lines(self):
+ """Internal: skip lines until outer boundary if defined."""
+ if not self.outerboundary or self.done:
+ return
+ next_boundary = b'--' + self.outerboundary
+ last_boundary = next_boundary + b'--'
+ last_line_lfend = True
+ while True:
+ line = self.fp.readline(1 << 16)
+ self.bytes_read += len(line)
+ if not line:
+ self.done = -1
+ break
+ if line.endswith(b'--') and last_line_lfend:
+ strippedline = line.strip()
+ if strippedline == next_boundary:
+ break
+ if strippedline == last_boundary:
+ self.done = 1
+ break
+ last_line_lfend = line.endswith(b'\n')
+
+ def make_file(self):
+ """Overridable: return a readable & writable file.
+
+ The file will be used as follows:
+ - data is written to it
+ - seek(0)
+ - data is read from it
+
+ The file is opened in binary mode for files, in text mode
+ for other fields
+
+ This version opens a temporary file for reading and writing,
+ and immediately deletes (unlinks) it. The trick (on Unix!) is
+ that the file can still be used, but it can't be opened by
+ another process, and it will automatically be deleted when it
+ is closed or when the current process terminates.
+
+ If you want a more permanent file, you derive a class which
+ overrides this method. If you want a visible temporary file
+ that is nevertheless automatically deleted when the script
+ terminates, try defining a __del__ method in a derived class
+ which unlinks the temporary files you have created.
+ """
+ if self._binary_file:
+ return tempfile.TemporaryFile('wb+')
+ else:
+ return tempfile.TemporaryFile(
+ 'w+', encoding=self.encoding, newline='\n')
diff --git a/paste/util/multidict.py b/paste/util/multidict.py
index 2db5125..ce9d57e 100644
--- a/paste/util/multidict.py
+++ b/paste/util/multidict.py
@@ -1,10 +1,12 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
-import cgi
import copy
import sys
from collections.abc import MutableMapping as DictMixin
+from .field_storage import FieldStorage
+
+
class MultiDict(DictMixin):
"""
@@ -256,7 +258,7 @@ def _decode_value(self, value):
``FieldStorage`` objects are specially handled.
"""
- if isinstance(value, cgi.FieldStorage):
+ if isinstance(value, FieldStorage):
# decode FieldStorage's field name and filename
decode_name = self.decode_keys and isinstance(value.name, bytes)
if decode_name:
diff --git a/tests/cgiapp_data/form.cgi b/tests/cgiapp_data/form.cgi
index 2e87f6c..e751aa0 100755
--- a/tests/cgiapp_data/form.cgi
+++ b/tests/cgiapp_data/form.cgi
@@ -1,74 +1,59 @@
#!/usr/bin/env python
-import sys
-
-# Quiet warnings in this CGI so that it does not upset tests.
-if not sys.warnoptions:
- import warnings
- warnings.simplefilter("ignore")
-
-# TODO: cgi is deprecated and will go away in Python 3.13.
-import cgi
-
print('Content-type: text/plain')
print('')
-if sys.version_info.major >= 3:
- # Python 3: cgi.FieldStorage keeps some field names as unicode and some as
- # the repr() of byte strings, duh.
-
- class FieldStorage(cgi.FieldStorage):
-
- def _key_candidates(self, key):
- yield key
-
- try:
- # assume bytes, coerce to str
- try:
- yield key.decode(self.encoding)
- except UnicodeDecodeError:
- pass
- except AttributeError:
- # assume str, coerce to bytes
- try:
- yield key.encode(self.encoding)
- except UnicodeEncodeError:
- pass
+import sys
+from os.path import dirname
- def __getitem__(self, key):
+base_dir = dirname(dirname(dirname(__file__)))
+sys.path.insert(0, base_dir)
- superobj = super(FieldStorage, self)
+from paste.util.field_storage import FieldStorage
- error = None
+class FormFieldStorage(FieldStorage):
- for candidate in self._key_candidates(key):
- if isinstance(candidate, bytes):
- # ouch
- candidate = repr(candidate)
- try:
- return superobj.__getitem__(candidate)
- except KeyError as e:
- if error is None:
- error = e
+ def _key_candidates(self, key):
+ yield key
- # fall through, re-raise the first KeyError
- raise error
+ try:
+ # assume bytes, coerce to str
+ try:
+ yield key.decode(self.encoding)
+ except UnicodeDecodeError:
+ pass
+ except AttributeError:
+ # assume str, coerce to bytes
+ try:
+ yield key.encode(self.encoding)
+ except UnicodeEncodeError:
+ pass
- def __contains__(self, key):
- superobj = super(FieldStorage, self)
+ def __getitem__(self, key):
+ error = None
- for candidate in self._key_candidates(key):
- if superobj.__contains__(candidate):
- return True
- return False
+ for candidate in self._key_candidates(key):
+ if isinstance(candidate, bytes):
+ # ouch
+ candidate = repr(candidate)
+ try:
+ return super().__getitem__(candidate)
+ except KeyError as e:
+ if error is None:
+ error = e
-else: # PY2
+ # fall through, re-raise the first KeyError
+ raise error
- FieldStorage = cgi.FieldStorage
+ def __contains__(self, key):
+ for candidate in self._key_candidates(key):
+ if super().__contains__(candidate):
+ return True
+ return False
form = FieldStorage()
-print('Filename: %s' % form['up'].filename)
-print('Name: %s' % form['name'].value)
-print('Content: %s' % form['up'].file.read())
+print('Filename:', form['up'].filename)
+print('Name:', form['name'].value)
+print('Content:', form['up'].file.read())
diff --git a/tests/test_cgiapp.py b/tests/test_cgiapp.py
index d173aa2..c6aa921 100644
--- a/tests/test_cgiapp.py
+++ b/tests/test_cgiapp.py
@@ -8,53 +8,55 @@
data_dir = os.path.join(os.path.dirname(__file__), 'cgiapp_data')
-# these CGI scripts can't work on Windows or Jython
-if sys.platform != 'win32' and not sys.platform.startswith('java'):
-
- # Ensure the CGI scripts are called with the same python interpreter. Put a
- # symlink to the interpreter executable into the path...
- def setup_module():
- global oldpath, pyexelink
- oldpath = os.environ.get('PATH', None)
- os.environ['PATH'] = data_dir + os.path.pathsep + oldpath
- pyexelink = os.path.join(data_dir, "python")
- try:
- os.unlink(pyexelink)
- except OSError:
- pass
- os.symlink(sys.executable, pyexelink)
-
- # ... and clean up again.
- def teardown_module():
- global oldpath, pyexelink
+pytestmark = pytest.mark.skipif(
+ sys.platform == 'win32' or sys.platform.startswith('java'),
+ reason="CGI scripts can't work on Windows or Jython")
+
+
+# Ensure the CGI scripts are called with the same python interpreter.
+# Put a symlink to the interpreter executable into the path...
+def setup_module():
+ global oldpath, pyexelink
+ oldpath = os.environ.get('PATH', None)
+ os.environ['PATH'] = data_dir + os.path.pathsep + oldpath
+ pyexelink = os.path.join(data_dir, "python")
+ try:
os.unlink(pyexelink)
- if oldpath is not None:
- os.environ['PATH'] = oldpath
- else:
- del os.environ['PATH']
-
- def test_ok():
- app = TestApp(CGIApplication({}, script='ok.cgi', path=[data_dir]))
- res = app.get('')
- assert res.header('content-type') == 'text/html; charset=UTF-8'
- assert res.full_status == '200 Okay'
- assert 'This is the body' in res
-
- def test_form():
- app = TestApp(CGIApplication({}, script='form.cgi', path=[data_dir]))
- res = app.post('', params={'name': b'joe'},
- upload_files=[('up', 'file.txt', b'x'*10000)])
- assert 'file.txt' in res
- assert 'joe' in res
- assert 'x'*10000 in res
-
- def test_error():
- app = TestApp(CGIApplication({}, script='error.cgi', path=[data_dir]))
- pytest.raises(CGIError, app.get, '', status=500)
-
- def test_stderr():
- app = TestApp(CGIApplication({}, script='stderr.cgi', path=[data_dir]))
- res = app.get('', expect_errors=True)
- assert res.status == 500
- assert 'error' in res
- assert 'some data' in res.errors
+ except OSError:
+ pass
+ os.symlink(sys.executable, pyexelink)
+
+# ... and clean up again.
+def teardown_module():
+ global oldpath, pyexelink
+ os.unlink(pyexelink)
+ if oldpath is not None:
+ os.environ['PATH'] = oldpath
+ else:
+ del os.environ['PATH']
+
+def test_ok():
+ app = TestApp(CGIApplication({}, script='ok.cgi', path=[data_dir]))
+ res = app.get('')
+ assert res.header('content-type') == 'text/html; charset=UTF-8'
+ assert res.full_status == '200 Okay'
+ assert 'This is the body' in res
+
+def test_form():
+ app = TestApp(CGIApplication({}, script='form.cgi', path=[data_dir]))
+ res = app.post('', params={'name': b'joe'},
+ upload_files=[('up', 'file.txt', b'x'*10000)])
+ assert 'file.txt' in res
+ assert 'joe' in res
+ assert 'x'*10000 in res
+
+def test_error():
+ app = TestApp(CGIApplication({}, script='error.cgi', path=[data_dir]))
+ pytest.raises(CGIError, app.get, '', status=500)
+
+def test_stderr():
+ app = TestApp(CGIApplication({}, script='stderr.cgi', path=[data_dir]))
+ res = app.get('', expect_errors=True)
+ assert res.status == 500
+ assert 'error' in res
+ assert 'some data' in res.errors
diff --git a/tests/test_exceptions/test_reporter.py b/tests/test_exceptions/test_reporter.py
index 5a06589..92711dd 100644
--- a/tests/test_exceptions/test_reporter.py
+++ b/tests/test_exceptions/test_reporter.py
@@ -30,7 +30,7 @@ def test_logger():
assert 0
rep.report(exc_data)
content = open(fn).read()
- assert len(content.splitlines()) == 4, len(content.splitlines())
+ assert len(content.splitlines()) == 4
assert 'ValueError' in content
assert 'int' in content
assert 'test_reporter.py' in content
diff --git a/tests/test_multidict.py b/tests/test_multidict.py
index ed6c920..938aebf 100644
--- a/tests/test_multidict.py
+++ b/tests/test_multidict.py
@@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
# (c) 2007 Ian Bicking and Philip Jenvey; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
-import cgi
import gc
import io
import pytest
+from paste.util.field_storage import FieldStorage
from paste.util.multidict import MultiDict, UnicodeMultiDict
def test_dict():
@@ -146,13 +146,13 @@ def assert_unicode_item(obj):
assert isinstance(item[0], tuple)
assert isinstance(item[1], tuple)
- fs = cgi.FieldStorage()
+ fs = FieldStorage()
fs.name = 'thefile'
fs.filename = 'hello.txt'
fs.file = io.BytesIO(b'hello')
d[k('f')] = fs
ufs = d[k('f')]
- assert isinstance(ufs, cgi.FieldStorage)
+ assert isinstance(ufs, FieldStorage)
assert ufs.name == fs.name
assert isinstance(ufs.name, str)
assert ufs.filename == fs.filename
diff --git a/tests/test_util/test_field_storage.py b/tests/test_util/test_field_storage.py
new file mode 100644
index 0000000..8d34e2c
--- /dev/null
+++ b/tests/test_util/test_field_storage.py
@@ -0,0 +1,542 @@
+# Slightly modified tests from the original cgi test module.
+
+# Copyright © 2001-2023 Python Software Foundation; All Rights Reserved.
+
+import sys
+import tempfile
+from collections import namedtuple
+from io import BytesIO
+
+import pytest
+
+from paste.util import field_storage
+from paste.util.field_storage import FieldStorage, parse_header
+
+
+class HackedSysModule:
+ # The regression test will have real values in sys.argv, which
+ # will completely confuse the test of the field_storage module
+ argv = []
+ stdin = sys.stdin
+
+
+field_storage.sys = HackedSysModule()
+
+
+parse_strict_test_cases = [
+ ("", {}),
+ ("&", ValueError("bad query field: ''")),
+ ("&&", ValueError("bad query field: ''")),
+ # Should the next few really be valid?
+ ("=", {}),
+ ("=&=", {}),
+ # This rest seem to make sense
+ ("=a", {'': ['a']}),
+ ("&=a", ValueError("bad query field: ''")),
+ ("=a&", ValueError("bad query field: ''")),
+ ("=&a", ValueError("bad query field: 'a'")),
+ ("b=a", {'b': ['a']}),
+ ("b+=a", {'b ': ['a']}),
+ ("a=b=a", {'a': ['b=a']}),
+ ("a=+b=a", {'a': [' b=a']}),
+ ("&b=a", ValueError("bad query field: ''")),
+ ("b&=a", ValueError("bad query field: 'b'")),
+ ("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}),
+ ("a=a+b&a=b+a", {'a': ['a b', 'b a']}),
+ ("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
+ ("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r"
+ "&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b"
+ "&expire=964546263&lobale=en-US&kid=130003.300038&ss=env",
+ {'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'],
+ 'cuyer': ['r'],
+ 'expire': ['964546263'],
+ 'kid': ['130003.300038'],
+ 'lobale': ['en-US'],
+ 'order_id': ['0bb2e248638833d48cb7fed300000f1b'],
+ 'ss': ['env'],
+ 'view': ['bustomer'],
+ }),
+
+ ("group_id=5470&set=custom&_assigned_to=31392&_status=1"
+ "&_category=100&SUBMIT=Browse",
+ {'SUBMIT': ['Browse'],
+ '_assigned_to': ['31392'],
+ '_category': ['100'],
+ '_status': ['1'],
+ 'group_id': ['5470'],
+ 'set': ['custom'],
+ })
+ ]
+
+
+def gen_result(data, environ):
+ encoding = 'latin-1'
+ fake_stdin = BytesIO(data.encode(encoding))
+ fake_stdin.seek(0)
+ form = FieldStorage(fp=fake_stdin, environ=environ, encoding=encoding)
+ return {k: form.getlist(k) if isinstance(v, list) else v.value
+ for k, v in dict(form).items()}
+
+
+def test_fieldstorage_properties():
+ fs = FieldStorage()
+ assert not fs
+ assert "FieldStorage" in repr(fs)
+ assert list(fs) == list(fs.keys())
+ fs.list.append(namedtuple('MockFieldStorage', 'name')('fieldvalue'))
+ assert fs
+
+
+def test_fieldstorage_invalid():
+ with pytest.raises(TypeError):
+ FieldStorage("not-a-file-obj", environ={"REQUEST_METHOD": "PUT"})
+ with pytest.raises(TypeError):
+ FieldStorage("foo", "bar")
+ fs = FieldStorage(headers={'content-type': 'text/plain'})
+ with pytest.raises(TypeError):
+ bool(fs)
+
+
+def test_strict():
+ for orig, expect in parse_strict_test_cases:
+ env = {'QUERY_STRING': orig}
+ fs = FieldStorage(environ=env)
+ if isinstance(expect, dict):
+ # test dict interface
+ assert len(expect) == len(fs)
+ assert len(expect.keys()) == len(fs.keys())
+ assert fs.getvalue("nonexistent field", "default") == "default"
+ # test individual fields
+ for key in expect.keys():
+ expect_val = expect[key]
+ assert key in fs
+ if len(expect_val) > 1:
+ assert fs.getvalue(key) == expect_val
+ else:
+ assert fs.getvalue(key) == expect_val[0]
+
+
+def test_separator():
+ parse_semicolon = [
+ ("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}),
+ ("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
+ (";", ValueError("bad query field: ''")),
+ (";;", ValueError("bad query field: ''")),
+ ("=;a", ValueError("bad query field: 'a'")),
+ (";b=a", ValueError("bad query field: ''")),
+ ("b;=a", ValueError("bad query field: 'b'")),
+ ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
+ ("a=a+b;a=b+a", {'a': ['a b', 'b a']}),
+ ]
+ for orig, expect in parse_semicolon:
+ env = {'QUERY_STRING': orig}
+ fs = FieldStorage(separator=';', environ=env)
+ if isinstance(expect, dict):
+ for key in expect.keys():
+ expect_val = expect[key]
+ assert key in fs
+ if len(expect_val) > 1:
+ assert fs.getvalue(key) == expect_val
+ else:
+ assert fs.getvalue(key) == expect_val[0]
+
+
+def test_fieldstorage_readline():
+ # FieldStorage uses readline, which has the capacity to read all
+ # contents of the input file into memory; we use readline's size argument
+ # to prevent that for files that do not contain any newlines in
+ # non-GET/HEAD requests
+ class TestReadlineFile:
+ def __init__(self, file):
+ self.file = file
+ self.numcalls = 0
+
+ def readline(self, size=None):
+ self.numcalls += 1
+ if size:
+ return self.file.readline(size)
+ else:
+ return self.file.readline()
+
+ def __getattr__(self, name):
+ file = self.__dict__['file']
+ a = getattr(file, name)
+ if not isinstance(a, int):
+ setattr(self, name, a)
+ return a
+
+ f = TestReadlineFile(tempfile.TemporaryFile("wb+"))
+ try:
+ f.write(b'x' * 256 * 1024)
+ f.seek(0)
+ env = {'REQUEST_METHOD': 'PUT'}
+ fs = FieldStorage(fp=f, environ=env)
+ try:
+ # if we're not chunking properly, readline is only called twice
+ # (by read_binary); if we are chunking properly, it will be called
+ # 5 times as long as the chunk size is 1 << 16.
+ assert f.numcalls >= 2
+ finally:
+ fs.file.close()
+ finally:
+ f.close()
+
+
+def test_fieldstorage_multipart():
+ # Test basic FieldStorage multipart parsing
+ env = {
+ 'REQUEST_METHOD': 'POST',
+ 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY}',
+ 'CONTENT_LENGTH': '558'}
+ fp = BytesIO(POSTDATA.encode('latin-1'))
+ fs = FieldStorage(fp, environ=env, encoding="latin-1")
+ assert len(fs.list) == 4
+ expect = [{'name': 'id', 'filename': None, 'value': '1234'},
+ {'name': 'title', 'filename': None, 'value': ''},
+ {'name': 'file', 'filename': 'test.txt',
+ 'value': b'Testing 123.\n'},
+ {'name': 'submit', 'filename': None, 'value': ' Add '}]
+ for x in range(len(fs.list)):
+ for k, exp in expect[x].items():
+ got = getattr(fs.list[x], k)
+ assert got == exp
+
+
+def test_fieldstorage_multipart_leading_whitespace():
+ env = {
+ 'REQUEST_METHOD': 'POST',
+ 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY}',
+ 'CONTENT_LENGTH': '560'}
+ # Add some leading whitespace to our post data that will cause the
+ # first line to not be the inner boundary.
+ fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1'))
+ fs = FieldStorage(fp, environ=env, encoding="latin-1")
+ assert len(fs.list) == 4
+ expect = [{'name': 'id', 'filename': None, 'value': '1234'},
+ {'name': 'title', 'filename': None, 'value': ''},
+ {'name': 'file', 'filename': 'test.txt',
+ 'value': b'Testing 123.\n'},
+ {'name': 'submit', 'filename': None, 'value': ' Add '}]
+ for x in range(len(fs.list)):
+ for k, exp in expect[x].items():
+ got = getattr(fs.list[x], k)
+ assert got == exp
+
+
+def test_fieldstorage_multipart_non_ascii():
+ # Test basic FieldStorage multipart parsing
+ env = {'REQUEST_METHOD': 'POST',
+ 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY}',
+ 'CONTENT_LENGTH': '558'}
+ for encoding in ['iso-8859-1', 'utf-8']:
+ fp = BytesIO(POSTDATA_NON_ASCII.encode(encoding))
+ fs = FieldStorage(fp, environ=env, encoding=encoding)
+ assert len(fs.list) == 1
+ expect = [{'name': 'id', 'filename': None, 'value': '\xe7\xf1\x80'}]
+ for x in range(len(fs.list)):
+ for k, exp in expect[x].items():
+ got = getattr(fs.list[x], k)
+ assert got == exp
+
+
+def test_fieldstorage_multipart_maxline():
+ # Issue #18167
+ maxline = 1 << 16 - 1
+
+ def check(content):
+ data = """---123
+Content-Disposition: form-data; name="upload"; filename="fake.txt"
+Content-Type: text/plain
+
+{}
+---123--
+""".replace('\n', '\r\n').format(content)
+ environ = {
+ 'CONTENT_LENGTH': str(len(data)),
+ 'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
+ 'REQUEST_METHOD': 'POST',
+ }
+
+ assert gen_result(data, environ) == {
+ 'upload': content.encode('latin1')}
+ check('x' * maxline)
+ check('x' * maxline + '\r')
+ check('x' * maxline + '\r' + 'y' * maxline)
+
+
+def test_fieldstorage_multipart_w3c():
+ # Test basic FieldStorage multipart parsing (W3C sample)
+ env = {
+ 'REQUEST_METHOD': 'POST',
+ 'CONTENT_TYPE': f'multipart/form-data; boundary={BOUNDARY_W3}',
+ 'CONTENT_LENGTH': str(len(POSTDATA_W3))}
+ fp = BytesIO(POSTDATA_W3.encode('latin-1'))
+ fs = FieldStorage(fp, environ=env, encoding="latin-1")
+ assert len(fs.list) == 2
+ assert fs.list[0].name == 'submit-name'
+ assert fs.list[0].value == 'Larry'
+ assert fs.list[1].name == 'files'
+ files = fs.list[1].value
+ assert len(files) == 2
+ expect = [{'name': None, 'filename': 'file1.txt',
+ 'value': b'... contents of file1.txt ...'},
+ {'name': None, 'filename': 'file2.gif',
+ 'value': b'...contents of file2.gif...'}]
+ for x in range(len(files)):
+ for k, exp in expect[x].items():
+ got = getattr(files[x], k)
+ assert got == exp
+
+
+def test_fieldstorage_part_content_length():
+ boundary = "JfISa01"
+ postdata = """--JfISa01
+Content-Disposition: form-data; name="submit-name"
+Content-Length: 5
+
+Larry
+--JfISa01"""
+ env = {
+ 'REQUEST_METHOD': 'POST',
+ 'CONTENT_TYPE': f'multipart/form-data; boundary={boundary}',
+ 'CONTENT_LENGTH': str(len(postdata))}
+ fp = BytesIO(postdata.encode('latin-1'))
+ fs = FieldStorage(fp, environ=env, encoding="latin-1")
+ assert len(fs.list) == 1
+ assert fs.list[0].name == 'submit-name'
+ assert fs.list[0].value == 'Larry'
+
+
+def test_field_storage_multipart_no_content_length():
+ fp = BytesIO(b"""--MyBoundary
+Content-Disposition: form-data; name="my-arg"; filename="foo"
+
+Test
+
+--MyBoundary--
+""")
+ env = {
+ "REQUEST_METHOD": "POST",
+ "CONTENT_TYPE": "multipart/form-data; boundary=MyBoundary",
+ "wsgi.input": fp,
+ }
+ fields = FieldStorage(fp, environ=env)
+
+ assert len(fields["my-arg"].file.read()) == 5
+
+
+def test_fieldstorage_as_context_manager():
+ fp = BytesIO(b'x' * 10)
+ env = {'REQUEST_METHOD': 'PUT'}
+ with FieldStorage(fp=fp, environ=env) as fs:
+ content = fs.file.read()
+ assert fs.file.closed is False
+ assert fs.file.closed is True
+ assert content == 'x' * 10
+ with pytest.raises(ValueError, match='I/O operation on closed file'):
+ fs.file.read()
+
+
+_qs_result = {
+ 'key1': 'value1',
+ 'key2': ['value2x', 'value2y'],
+ 'key3': 'value3',
+ 'key4': 'value4'
+}
+
+
+def test_qs_and_url_encode():
+ data = "key2=value2x&key3=value3&key4=value4"
+ environ = {
+ 'CONTENT_LENGTH': str(len(data)),
+ 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
+ 'QUERY_STRING': 'key1=value1&key2=value2y',
+ 'REQUEST_METHOD': 'POST',
+ }
+ assert gen_result(data, environ) == _qs_result
+
+
+def test_max_num_fields():
+ # For application/x-www-form-urlencoded
+ data = '&'.join(['a=a']*11)
+ environ = {
+ 'CONTENT_LENGTH': str(len(data)),
+ 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
+ 'REQUEST_METHOD': 'POST',
+ }
+
+ with pytest.raises(ValueError):
+ FieldStorage(
+ fp=BytesIO(data.encode()),
+ environ=environ,
+ max_num_fields=10,
+ )
+
+ # For multipart/form-data
+ data = """---123
+Content-Disposition: form-data; name="a"
+
+3
+---123
+Content-Type: application/x-www-form-urlencoded
+
+a=4
+---123
+Content-Type: application/x-www-form-urlencoded
+
+a=5
+---123--
+"""
+ environ = {
+ 'CONTENT_LENGTH': str(len(data)),
+ 'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
+ 'QUERY_STRING': 'a=1&a=2',
+ 'REQUEST_METHOD': 'POST',
+ }
+
+ # 2 GET entities
+ # 1 top level POST entities
+ # 1 entity within the second POST entity
+ # 1 entity within the third POST entity
+ with pytest.raises(ValueError):
+ FieldStorage(
+ fp=BytesIO(data.encode()),
+ environ=environ,
+ max_num_fields=4,
+ )
+ FieldStorage(
+ fp=BytesIO(data.encode()),
+ environ=environ,
+ max_num_fields=5,
+ )
+
+
+def test_qs_and_form_data():
+ data = """---123
+Content-Disposition: form-data; name="key2"
+
+value2y
+---123
+Content-Disposition: form-data; name="key3"
+
+value3
+---123
+Content-Disposition: form-data; name="key4"
+
+value4
+---123--
+"""
+ environ = {
+ 'CONTENT_LENGTH': str(len(data)),
+ 'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
+ 'QUERY_STRING': 'key1=value1&key2=value2x',
+ 'REQUEST_METHOD': 'POST',
+ }
+ assert gen_result(data, environ) == _qs_result
+
+
+def test_qs_and_form_data_file():
+ data = """---123
+Content-Disposition: form-data; name="key2"
+
+value2y
+---123
+Content-Disposition: form-data; name="key3"
+
+value3
+---123
+Content-Disposition: form-data; name="key4"
+
+value4
+---123
+Content-Disposition: form-data; name="upload"; filename="fake.txt"
+Content-Type: text/plain
+
+this is the content of the fake file
+
+---123--
+"""
+ environ = {
+ 'CONTENT_LENGTH': str(len(data)),
+ 'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
+ 'QUERY_STRING': 'key1=value1&key2=value2x',
+ 'REQUEST_METHOD': 'POST',
+ }
+ result = {**_qs_result,
+ 'upload': b'this is the content of the fake file\n'}
+ assert gen_result(data, environ) == result
+
+
+def test_parse_header():
+ assert parse_header("text/plain") == ("text/plain", {})
+ assert parse_header("text/vnd.just.made.this.up ; ") == (
+ "text/vnd.just.made.this.up", {})
+ assert parse_header("text/plain;charset=us-ascii") == (
+ "text/plain", {"charset": "us-ascii"})
+ assert parse_header('text/plain ; charset="us-ascii"') == (
+ "text/plain", {"charset": "us-ascii"})
+ assert parse_header('text/plain ; charset="us-ascii"; another=opt') == (
+ "text/plain", {"charset": "us-ascii", "another": "opt"})
+ assert parse_header('attachment; filename="silly.txt"') == (
+ "attachment", {"filename": "silly.txt"})
+ assert parse_header('attachment; filename="strange;name"') == (
+ "attachment", {"filename": "strange;name"})
+ assert parse_header('attachment; filename="strange;name";size=123;') == (
+ "attachment", {"filename": "strange;name", "size": "123"})
+ assert parse_header('form-data; name="files"; filename="fo\\"o;bar"') == (
+ "form-data", {"name": "files", "filename": 'fo"o;bar'})
+
+
+BOUNDARY = "---------------------------721837373350705526688164684"
+POSTDATA = """-----------------------------721837373350705526688164684
+Content-Disposition: form-data; name="id"
+
+1234
+-----------------------------721837373350705526688164684
+Content-Disposition: form-data; name="title"
+
+
+-----------------------------721837373350705526688164684
+Content-Disposition: form-data; name="file"; filename="test.txt"
+Content-Type: text/plain
+
+Testing 123.
+
+-----------------------------721837373350705526688164684
+Content-Disposition: form-data; name="submit"
+
+ Add\x20
+-----------------------------721837373350705526688164684--
+"""
+
+POSTDATA_NON_ASCII = """-----------------------------721837373350705526688164684
+Content-Disposition: form-data; name="id"
+
+\xe7\xf1\x80
+-----------------------------721837373350705526688164684
+"""
+
+# http://www.w3.org/TR/html401/interact/forms.html#h-17.13.4
+BOUNDARY_W3 = "AaB03x"
+POSTDATA_W3 = """--AaB03x
+Content-Disposition: form-data; name="submit-name"
+
+Larry
+--AaB03x
+Content-Disposition: form-data; name="files"
+Content-Type: multipart/mixed; boundary=BbC04y
+
+--BbC04y
+Content-Disposition: file; filename="file1.txt"
+Content-Type: text/plain
+
+... contents of file1.txt ...
+--BbC04y
+Content-Disposition: file; filename="file2.gif"
+Content-Type: image/gif
+Content-Transfer-Encoding: binary
+
+...contents of file2.gif...
+--BbC04y--
+--AaB03x--
+"""
diff --git a/tests/test_wsgiwrappers.py b/tests/test_wsgiwrappers.py
index 1bdf1b0..ba09cec 100644
--- a/tests/test_wsgiwrappers.py
+++ b/tests/test_wsgiwrappers.py
@@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
# (c) 2007 Philip Jenvey; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
-import cgi
import io
from paste.fixture import TestApp
from paste.wsgiwrappers import WSGIRequest, WSGIResponse
+from paste.util.field_storage import FieldStorage
class AssertApp(object):
def __init__(self, assertfunc):
@@ -76,7 +76,7 @@ def handle_fileupload(environ, start_response):
assert len(request.POST) == 1
assert isinstance(request.POST.keys()[0], str)
fs = request.POST['thefile']
- assert isinstance(fs, cgi.FieldStorage)
+ assert isinstance(fs, FieldStorage)
assert isinstance(fs.filename, str)
assert fs.filename == '寿司.txt'
assert fs.value == b'Sushi'
@@ -85,7 +85,7 @@ def handle_fileupload(environ, start_response):
assert len(request.POST) == 1
assert isinstance(request.POST.keys()[0], str)
fs = request.POST['thefile']
- assert isinstance(fs, cgi.FieldStorage)
+ assert isinstance(fs, FieldStorage)
assert isinstance(fs.filename, str)
assert fs.filename == u'寿司.txt'
assert fs.value == b'Sushi'