-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest.py
More file actions
87 lines (78 loc) · 2.61 KB
/
request.py
File metadata and controls
87 lines (78 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import urllib.parse
from utils import log
import json
class Request:
def __init__(self):
self.remote_addr = None
self.method = 'GET'
self.path = '/'
self.query = None
self.protocol = 'HTTP/1.1'
self.headers = None
self.cookies = None
self.body = None
@staticmethod
def from_http_and_address(http_text, address):
req = http_text.split('\r\n\r\n', 1)
log('req', req)
head = req[0]
body = req[1] if len(req) == 2 else ''
request_line, *request_headers = head.split('\r\n')
request_line_parts = request_line.split(' ', 2)
# in case of '/'
path_with_query = request_line_parts[1].rstrip('/') or '/'
path, query = Request.parse_path(path_with_query)
# compose new request object
request = Request()
request.remote_addr = address
request.method = request_line_parts[0]
request.path = path
request.query = query
request.protocol = request_line_parts[-1]
request.set_headers(request_headers)
request.body = body
return request
def parse_cookies(self):
self.cookies = {}
c = self.headers.get('Cookie', '')
for item in c.split('; '):
if '=' in item:
k, v = item.split('=')
self.cookies[k] = v
def set_headers(self, fields):
self.headers = {}
for field in fields:
k, v = field.split(': ', 1)
self.headers[k] = v
self.parse_cookies()
@property
def form(self):
"""
Parse the body of the request and returns a form
:return: a dict parsed from the body of the request
"""
if not hasattr(self, '_request_form'):
form = {}
if self.body:
kvs = self.body.split('&')
for kv in kvs:
k, v = kv.split('=')
k, v = urllib.parse.unquote_plus(k), urllib.parse.unquote_plus(v)
form[k] = v
setattr(self, '_request_form', form)
return getattr(self, '_request_form')
@property
def json(self):
return json.loads(self.body)
@staticmethod
def parse_path(path: str):
query = {}
if '?' in path:
path, query_string = path.split('?', 1)
for item in query_string.split('&'):
k, v = item.split('=')
query[k] = v
log('path and query is', path, query)
return path, query
def __repr__(self):
return self.__class__.__name__ + repr(self.__dict__)