Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/utils/ports/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .ports import (
split_port,
build_port_bindings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think to_port_range and add_port_mapping should be imported here - they're only used within ports.py, and I can't see them being useful outside it

) # flake8: noqa
84 changes: 84 additions & 0 deletions docker/utils/ports/ports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@


def add_port_mapping(port_bindings, internal_port, external):
if internal_port in port_bindings:
port_bindings[internal_port].append(external)
else:
port_bindings[internal_port] = [external]


def add_port(port_bindings, internal_port_range, external_range):
if external_range is None:
for internal_port in internal_port_range:
add_port_mapping(port_bindings, internal_port, None)
else:
ports = zip(internal_port_range, external_range)
for internal_port, external_port in ports:
add_port_mapping(port_bindings, internal_port, external_port)


def build_port_bindings(ports):
port_bindings = {}
for port in ports:
internal_port_range, external_range = split_port(port)
add_port(port_bindings, internal_port_range, external_range)
return port_bindings


def to_port_range(port):
if not port:
return None

protocol = ""
if "/" in port:
parts = port.split("/")
if len(parts) != 2:
raise ValueError('Invalid port "%s", should be '
'[[remote_ip:]remote_port[-remote_port]:]'
'port[/protocol]' % port)
port, protocol = parts
protocol = "/" + protocol

parts = str(port).split('-')

if len(parts) == 1:
return ["%s%s" % (port, protocol)]

if len(parts) == 2:
full_port_range = range(int(parts[0]), int(parts[1]) + 1)
return ["%s%s" % (p, protocol) for p in full_port_range]

raise ValueError('Invalid port range "%s", should be '
'port or startport-endport' % port)


def split_port(port):
parts = str(port).split(':')
if not 1 <= len(parts) <= 3:
raise ValueError('Invalid port "%s", should be '
'[[remote_ip:]remote_port:]port[/protocol]' % port)

if len(parts) == 1:
internal_port, = parts
return to_port_range(internal_port), None
if len(parts) == 2:
external_port, internal_port = parts

internal_range = to_port_range(internal_port)
external_range = to_port_range(external_port)
if len(internal_range) != len(external_range):
raise ValueError('Port ranges don\'t match in length')

return internal_range, external_range

external_ip, external_port, internal_port = parts
internal_range = to_port_range(internal_port)
external_range = to_port_range(external_port)
if not external_range:
external_range = [None] * len(internal_range)

if len(internal_range) != len(external_range):
raise ValueError('Port ranges don\'t match in length')

return internal_range, [(external_ip, ex_port or None)
for ex_port in external_range]
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
name="docker-py",
version=version,
description="Python client for Docker.",
packages=['docker', 'docker.auth', 'docker.unixconn', 'docker.utils',
packages=['docker', 'docker.auth', 'docker.unixconn', 'docker.utils', 'docker.utils.ports',
'docker.ssladapter'],
install_requires=requirements,
tests_require=test_requirements,
Expand Down
97 changes: 97 additions & 0 deletions tests/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
parse_repository_tag, parse_host, convert_filters, kwargs_from_env,
create_host_config
)
from docker.utils.ports import build_port_bindings, split_port
from docker.auth import resolve_authconfig


Expand Down Expand Up @@ -165,6 +166,102 @@ def test_resolve_authconfig(self):
resolve_authconfig(auth_config, 'does.not.exist') is None
)

def test_split_port_with_host_ip(self):
internal_port, external_port = split_port("127.0.0.1:1000:2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, [("127.0.0.1", "1000")])

def test_split_port_with_protocol(self):
internal_port, external_port = split_port("127.0.0.1:1000:2000/udp")
self.assertEqual(internal_port, ["2000/udp"])
self.assertEqual(external_port, [("127.0.0.1", "1000")])

def test_split_port_with_host_ip_no_port(self):
internal_port, external_port = split_port("127.0.0.1::2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, [("127.0.0.1", None)])

def test_split_port_range_with_host_ip_no_port(self):
internal_port, external_port = split_port("127.0.0.1::2000-2001")
self.assertEqual(internal_port, ["2000", "2001"])
self.assertEqual(external_port,
[("127.0.0.1", None), ("127.0.0.1", None)])

def test_split_port_with_host_port(self):
internal_port, external_port = split_port("1000:2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, ["1000"])

def test_split_port_range_with_host_port(self):
internal_port, external_port = split_port("1000-1001:2000-2001")
self.assertEqual(internal_port, ["2000", "2001"])
self.assertEqual(external_port, ["1000", "1001"])

def test_split_port_no_host_port(self):
internal_port, external_port = split_port("2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, None)

def test_split_port_range_no_host_port(self):
internal_port, external_port = split_port("2000-2001")
self.assertEqual(internal_port, ["2000", "2001"])
self.assertEqual(external_port, None)

def test_split_port_range_with_protocol(self):
internal_port, external_port = split_port(
"127.0.0.1:1000-1001:2000-2001/udp")
self.assertEqual(internal_port, ["2000/udp", "2001/udp"])
self.assertEqual(external_port,
[("127.0.0.1", "1000"), ("127.0.0.1", "1001")])

def test_split_port_invalid(self):
self.assertRaises(ValueError,
lambda: split_port("0.0.0.0:1000:2000:tcp"))

def test_non_matching_length_port_ranges(self):
self.assertRaises(
ValueError,
lambda: split_port("0.0.0.0:1000-1010:2000-2002/tcp")
)

def test_port_and_range_invalid(self):
self.assertRaises(ValueError,
lambda: split_port("0.0.0.0:1000:2000-2002/tcp"))

def test_build_port_bindings_with_one_port(self):
port_bindings = build_port_bindings(["127.0.0.1:1000:1000"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])

def test_build_port_bindings_with_matching_internal_ports(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000:1000", "127.0.0.1:2000:1000"])
self.assertEqual(port_bindings["1000"],
[("127.0.0.1", "1000"), ("127.0.0.1", "2000")])

def test_build_port_bindings_with_nonmatching_internal_ports(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])
self.assertEqual(port_bindings["2000"], [("127.0.0.1", "2000")])

def test_build_port_bindings_with_port_range(self):
port_bindings = build_port_bindings(["127.0.0.1:1000-1001:1000-1001"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])
self.assertEqual(port_bindings["1001"], [("127.0.0.1", "1001")])

def test_build_port_bindings_with_matching_internal_port_ranges(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000-1001:1000-1001", "127.0.0.1:2000-2001:1000-1001"])
self.assertEqual(port_bindings["1000"],
[("127.0.0.1", "1000"), ("127.0.0.1", "2000")])
self.assertEqual(port_bindings["1001"],
[("127.0.0.1", "1001"), ("127.0.0.1", "2001")])

def test_build_port_bindings_with_nonmatching_internal_port_ranges(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])
self.assertEqual(port_bindings["2000"], [("127.0.0.1", "2000")])

if __name__ == '__main__':
unittest.main()