Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 61 additions & 78 deletions tests/unittests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
import os
import platform
import re
import shutil
import stat
import tempfile
from collections import deque
from contextlib import nullcontext as does_not_raise
from pathlib import Path
Expand Down Expand Up @@ -1791,7 +1789,7 @@ def restorecon(self, path, recursive):
self.restored.append(path)


class TestGetCfgOptionListOrStr(helpers.TestCase):
class TestGetCfgOptionListOrStr:
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
config = {}
Expand Down Expand Up @@ -1823,15 +1821,10 @@ def test_value_is_none(self):
assert [] == result


class TestWriteFile(helpers.TestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)

def test_basic_usage(self):
class TestWriteFile:
def test_basic_usage(self, tmp_path):
"""Verify basic usage with default args."""
path = os.path.join(self.tmp, "NewFile.txt")
path = os.path.join(str(tmp_path), "NewFile.txt")
contents = "Hey there"

util.write_file(path, contents)
Expand All @@ -1844,9 +1837,9 @@ def test_basic_usage(self):
file_stat = os.stat(path)
assert 0o644 == stat.S_IMODE(file_stat.st_mode)

def test_dir_is_created_if_required(self):
def test_dir_is_created_if_required(self, tmp_path):
"""Verify that directories are created is required."""
dirname = os.path.join(self.tmp, "subdir")
dirname = os.path.join(str(tmp_path), "subdir")
path = os.path.join(dirname, "NewFile.txt")
contents = "Hey there"

Expand All @@ -1855,9 +1848,9 @@ def test_dir_is_created_if_required(self):
assert os.path.isdir(dirname)
assert os.path.isfile(path)

def test_dir_ownership(self):
def test_dir_ownership(self, tmp_path):
"""Verify that directories is created with appropriate ownership."""
dirname = os.path.join(self.tmp, "subdir", "subdir2")
dirname = os.path.join(str(tmp_path), "subdir", "subdir2")
path = os.path.join(dirname, "NewFile.txt")
contents = "Hey there"
user = "foo"
Expand All @@ -1869,14 +1862,14 @@ def test_dir_ownership(self):
util.write_file(path, contents, user=user, group=group)

calls = [
mock.call(os.path.join(self.tmp, "subdir"), user, group),
mock.call(os.path.join(str(tmp_path), "subdir"), user, group),
mock.call(Path(dirname), user, group),
]
mockobj.assert_has_calls(calls, any_order=False)

def test_dir_is_not_created_if_ensure_dir_false(self):
def test_dir_is_not_created_if_ensure_dir_false(self, tmp_path):
"""Verify directories are not created if ensure_dir_exists is False."""
dirname = os.path.join(self.tmp, "subdir")
dirname = os.path.join(str(tmp_path), "subdir")
path = os.path.join(dirname, "NewFile.txt")
contents = "Hey there"

Expand All @@ -1885,9 +1878,9 @@ def test_dir_is_not_created_if_ensure_dir_false(self):

assert not os.path.isdir(dirname)

def test_explicit_mode(self):
def test_explicit_mode(self, tmp_path):
"""Verify explicit file mode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")
path = os.path.join(str(tmp_path), "NewFile.txt")
contents = "Hey there"

util.write_file(path, contents, mode=0o666)
Expand All @@ -1897,10 +1890,10 @@ def test_explicit_mode(self):
file_stat = os.stat(path)
assert 0o666 == stat.S_IMODE(file_stat.st_mode)

def test_preserve_mode_no_existing(self):
def test_preserve_mode_no_existing(self, tmp_path):
"""Verify that file is created with mode 0o644 if preserve_mode
is true and there is no prior existing file."""
path = os.path.join(self.tmp, "NewFile.txt")
path = os.path.join(str(tmp_path), "NewFile.txt")
contents = "Hey there"

util.write_file(path, contents, preserve_mode=True)
Expand All @@ -1910,10 +1903,10 @@ def test_preserve_mode_no_existing(self):
file_stat = os.stat(path)
assert 0o644 == stat.S_IMODE(file_stat.st_mode)

def test_preserve_mode_with_existing(self):
def test_preserve_mode_with_existing(self, tmp_path):
"""Verify that file is created using mode of existing file
if preserve_mode is true."""
path = os.path.join(self.tmp, "NewFile.txt")
path = os.path.join(str(tmp_path), "NewFile.txt")
contents = "Hey there"

open(path, "w").close()
Expand All @@ -1926,9 +1919,9 @@ def test_preserve_mode_with_existing(self):
file_stat = os.stat(path)
assert 0o666 == stat.S_IMODE(file_stat.st_mode)

def test_custom_omode(self):
def test_custom_omode(self, tmp_path):
"""Verify custom omode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")
path = os.path.join(str(tmp_path), "NewFile.txt")
contents = "Hey there"

# Create file first with basic content
Expand All @@ -1942,9 +1935,9 @@ def test_custom_omode(self):
create_contents = f.read()
assert "LINE1\nHey there" == create_contents

def test_restorecon_if_possible_is_called(self):
def test_restorecon_if_possible_is_called(self, tmp_path):
"""Make sure the selinux guard is called correctly."""
my_file = os.path.join(self.tmp, "my_file")
my_file = os.path.join(str(tmp_path), "my_file")
with open(my_file, "w") as fp:
fp.write("My Content")

Expand All @@ -1962,70 +1955,65 @@ def test_restorecon_if_possible_is_called(self):
mockobj.assert_called_once_with("selinux")


class TestDeleteDirContents(helpers.TestCase):
def setUp(self):
super(TestDeleteDirContents, self).setUp()
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)

class TestDeleteDirContents:
def assertDirEmpty(self, dirname):
assert [] == os.listdir(dirname)

def test_does_not_delete_dir(self):
def test_does_not_delete_dir(self, tmp_path):
"""Ensure directory itself is not deleted."""
util.delete_dir_contents(self.tmp)
util.delete_dir_contents(str(tmp_path))

assert os.path.isdir(self.tmp)
self.assertDirEmpty(self.tmp)
assert os.path.isdir(str(tmp_path))
self.assertDirEmpty(str(tmp_path))

def test_deletes_files(self):
def test_deletes_files(self, tmp_path):
"""Single file should be deleted."""
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
with open(os.path.join(str(tmp_path), "new_file.txt"), "wb") as f:
f.write(b"DELETE ME")

util.delete_dir_contents(self.tmp)
util.delete_dir_contents(str(tmp_path))

self.assertDirEmpty(self.tmp)
self.assertDirEmpty(str(tmp_path))

def test_deletes_empty_dirs(self):
def test_deletes_empty_dirs(self, tmp_path):
"""Empty directories should be deleted."""
os.mkdir(os.path.join(self.tmp, "new_dir"))
os.mkdir(os.path.join(str(tmp_path), "new_dir"))

util.delete_dir_contents(self.tmp)
util.delete_dir_contents(str(tmp_path))

self.assertDirEmpty(self.tmp)
self.assertDirEmpty(str(tmp_path))

def test_deletes_nested_dirs(self):
def test_deletes_nested_dirs(self, tmp_path):
"""Nested directories should be deleted."""
os.mkdir(os.path.join(self.tmp, "new_dir"))
os.mkdir(os.path.join(self.tmp, "new_dir", "new_subdir"))
os.mkdir(os.path.join(str(tmp_path), "new_dir"))
os.mkdir(os.path.join(str(tmp_path), "new_dir", "new_subdir"))

util.delete_dir_contents(self.tmp)
util.delete_dir_contents(str(tmp_path))

self.assertDirEmpty(self.tmp)
self.assertDirEmpty(str(tmp_path))

def test_deletes_non_empty_dirs(self):
def test_deletes_non_empty_dirs(self, tmp_path):
"""Non-empty directories should be deleted."""
os.mkdir(os.path.join(self.tmp, "new_dir"))
f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
os.mkdir(os.path.join(str(tmp_path), "new_dir"))
f_name = os.path.join(str(tmp_path), "new_dir", "new_file.txt")
with open(f_name, "wb") as f:
f.write(b"DELETE ME")

util.delete_dir_contents(self.tmp)
util.delete_dir_contents(str(tmp_path))

self.assertDirEmpty(self.tmp)
self.assertDirEmpty(str(tmp_path))

def test_deletes_symlinks(self):
def test_deletes_symlinks(self, tmp_path):
"""Symlinks should be deleted."""
file_name = os.path.join(self.tmp, "new_file.txt")
link_name = os.path.join(self.tmp, "new_file_link.txt")
file_name = os.path.join(str(tmp_path), "new_file.txt")
link_name = os.path.join(str(tmp_path), "new_file_link.txt")
with open(file_name, "wb") as f:
f.write(b"DELETE ME")
os.symlink(file_name, link_name)

util.delete_dir_contents(self.tmp)
util.delete_dir_contents(str(tmp_path))

self.assertDirEmpty(self.tmp)
self.assertDirEmpty(str(tmp_path))


class TestDelDir:
Expand Down Expand Up @@ -2067,14 +2055,14 @@ def test_del_dir_generic_errors(self, mocker):
assert mock_rmtree.call_count == 1


class TestKeyValStrings(helpers.TestCase):
class TestKeyValStrings:
def test_keyval_str_to_dict(self):
expected = {"1": "one", "2": "one+one", "ro": True}
cmdline = "1=one ro 2=one+one"
assert expected == util.keyval_str_to_dict(cmdline)


class TestGetCmdline(helpers.TestCase):
class TestGetCmdline:
def test_cmdline_reads_debug_env(self):
with mock.patch.dict(
"os.environ", values={"DEBUG_PROC_CMDLINE": "abcd 123"}
Expand Down Expand Up @@ -2454,7 +2442,7 @@ def test_given_log_level_used(self):
assert log_level, mock.ANY == logger.log.call_args[0]


class TestMessageFromString(helpers.TestCase):
class TestMessageFromString:
def test_unicode_not_messed_up(self):
roundtripped = util.message_from_string("\n").as_string()
assert "\x00" not in roundtripped
Expand Down Expand Up @@ -2638,25 +2626,20 @@ def fake_response(url, timeout, retries):
] == m_read.call_args_list


class TestReadSeededWithoutVendorData(helpers.TestCase):
def setUp(self):
super(TestReadSeededWithoutVendorData, self).setUp()
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)

def test_unicode_not_messed_up(self):
class TestReadSeededWithoutVendorData:
def test_unicode_not_messed_up(self, tmp_path):
ud = b"userdatablob"
vd = None
network = b"test: 'true'"
helpers.populate_dir(
self.tmp,
str(tmp_path),
{
"meta-data": "key1: val1",
"user-data": ud,
"network-config": network,
},
)
sdir = self.tmp + os.path.sep
sdir = str(tmp_path) + os.path.sep
found_md, found_ud, found_vd, found_network = util.read_seeded(sdir)

assert found_md == {"key1": "val1"}
Expand All @@ -2665,7 +2648,7 @@ def test_unicode_not_messed_up(self):
assert found_network == {"test": "true"}


class TestEncode(helpers.TestCase):
class TestEncode:
"""Test the encoding functions"""

def test_decode_binary_plain_text_with_hex(self):
Expand All @@ -2674,7 +2657,7 @@ def test_decode_binary_plain_text_with_hex(self):
assert text == blob


class TestProcessExecutionError(helpers.TestCase):
class TestProcessExecutionError:
template = (
"{description}\n"
"Command: {cmd}\n"
Expand Down Expand Up @@ -2815,7 +2798,7 @@ def test_system_image_config_dir_is_snappy(self, mocker):
assert util.system_is_snappy() is True


class TestLoadShellContent(helpers.TestCase):
class TestLoadShellContent:
def test_comments_handled_correctly(self):
"""Shell comments should be allowed in the content."""
assert {
Expand All @@ -2839,7 +2822,7 @@ def test_comments_handled_correctly(self):
@skipIf(
not util.is_Linux(), "These tests don't make sense on non-Linux systems."
)
class TestGetProcEnv(helpers.TestCase):
class TestGetProcEnv:
"""test get_proc_env."""

null = b"\x00"
Expand Down Expand Up @@ -2885,7 +2868,7 @@ def test_non_existing_file_returns_empty_dict(self, m_load_file):
assert 1 == m_load_file.call_count


class TestGetProcPpid(helpers.TestCase):
class TestGetProcPpid:
"""test get_proc_ppid"""

@skipIf(not util.is_Linux(), "/proc/$pid/stat is not useful on not-Linux")
Expand Down