-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathdockermon.py
More file actions
executable file
·151 lines (120 loc) · 4.19 KB
/
dockermon.py
File metadata and controls
executable file
·151 lines (120 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python
"""docker monitor using docker /events HTTP streaming API"""
from contextlib import closing
from functools import partial
from socket import socket, AF_UNIX
from subprocess import Popen, PIPE
from sys import stdout, version_info
import json
import shlex
if version_info[:2] < (3, 0):
from httplib import OK as HTTP_OK
from urlparse import urlparse
else:
from http.client import OK as HTTP_OK
from urllib.parse import urlparse
__version__ = '0.2.2'
bufsize = 1024
default_sock_url = 'ipc:///var/run/docker.sock'
class DockermonError(Exception):
pass
def read_http_header(sock):
"""Read HTTP header from socket, return header and rest of data."""
buf = []
hdr_end = '\r\n\r\n'
while True:
buf.append(sock.recv(bufsize).decode('utf-8'))
data = ''.join(buf)
i = data.find(hdr_end)
if i == -1:
continue
return data[:i], data[i + len(hdr_end):]
def header_status(header):
"""Parse HTTP status line, return status (int) and reason."""
status_line = header[:header.find('\r')]
# 'HTTP/1.1 200 OK' -> (200, 'OK')
fields = status_line.split(None, 2)
return int(fields[1]), fields[2]
def connect(url):
"""Connect to UNIX or TCP socket.
url can be either tcp://<host>:port or ipc://<path>
"""
url = urlparse(url)
if url.scheme == 'tcp':
sock = socket()
netloc = tuple(url.netloc.rsplit(':', 1))
hostname = socket.gethostname()
elif url.scheme == 'ipc':
sock = socket(AF_UNIX)
netloc = url.path
hostname = 'localhost'
else:
raise ValueError('unknown socket type: %s' % url.scheme)
sock.connect(netloc)
return sock, hostname
def watch(callback, url=default_sock_url):
"""Watch docker events. Will call callback with each new event (dict).
url can be either tcp://<host>:port or ipc://<path>
"""
sock, hostname = connect(url)
request = 'GET /events HTTP/1.1\nHost: %s\n\n' % hostname
request = request.encode('utf-8')
with closing(sock):
sock.sendall(request)
header, payload = read_http_header(sock)
status, reason = header_status(header)
if status != HTTP_OK:
raise DockermonError('bad HTTP status: %s %s' % (status, reason))
# Messages are \r\n<size in hex><JSON payload>\r\n
buf = [payload]
while True:
chunk = sock.recv(bufsize)
if not chunk:
raise EOFError('socket closed')
buf.append(chunk.decode('utf-8'))
data = ''.join(buf)
i = data.find('\r\n')
if i == -1:
continue
size = int(data[:i], 16)
start = i + 2 # Skip initial \r\n
if len(data) < start + size + 2:
continue
payload = data[start:start+size]
callback(json.loads(payload))
buf = [data[start+size+2:]] # Skip \r\n suffix
def print_callback(msg):
"""Print callback, prints message to stdout as JSON in one line."""
json.dump(msg, stdout)
stdout.write('\n')
stdout.flush()
def prog_callback(prog, msg):
"""Program callback, calls prog with message in stdin"""
pipe = Popen(prog, stdin=PIPE)
data = json.dumps(msg)
pipe.stdin.write(data.encode('utf-8'))
pipe.stdin.close()
if __name__ == '__main__':
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument('--prog', default=None,
help='program to call (e.g. "jq --unbuffered .")')
parser.add_argument(
'--socket-url', default=default_sock_url,
help='socket url (ipc:///path/to/sock or tcp:///host:port)')
parser.add_argument(
'--version', help='print version and exit',
action='store_true', default=False)
args = parser.parse_args()
if args.version:
print('dockermon %s' % __version__)
raise SystemExit
if args.prog:
prog = shlex.split(args.prog)
callback = partial(prog_callback, prog)
else:
callback = print_callback
try:
watch(callback, args.socket_url)
except (KeyboardInterrupt, EOFError):
pass