From 1d8d622812243e686556187ba67f8fb5320a679e Mon Sep 17 00:00:00 2001 From: Shreenidhi Shedi Date: Sun, 20 Feb 2022 11:21:28 +0530 Subject: [PATCH] Check for existing symlink while force creating symlink If a dead symlink by the same name is present, os.path.exists returns false. Signed-off-by: Shreenidhi Shedi Signed-off-by: Shreenidhi Shedi --- cloudinit/util.py | 2 +- tests/unittests/test_util.py | 46 ++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/cloudinit/util.py b/cloudinit/util.py index 569fc215836..1fadd196291 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -1886,7 +1886,7 @@ def is_link(path): def sym_link(source, link, force=False): LOG.debug("Creating symbolic link from %r => %r", link, source) - if force and os.path.exists(link): + if force and os.path.lexists(link): del_file(link) os.symlink(source, link) diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 3765511bec7..3f3079b04e7 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -361,6 +361,52 @@ def test_mount_is_ro(self, m_mount_info): self.assertEqual(is_rw, False) +class TestSymlink(CiTestCase): + def test_sym_link_simple(self): + tmpd = self.tmp_dir() + link = self.tmp_path("link", tmpd) + target = self.tmp_path("target", tmpd) + util.write_file(target, "hello") + + util.sym_link(target, link) + self.assertTrue(os.path.exists(link)) + self.assertTrue(os.path.islink(link)) + + def test_sym_link_source_exists(self): + tmpd = self.tmp_dir() + link = self.tmp_path("link", tmpd) + target = self.tmp_path("target", tmpd) + util.write_file(target, "hello") + + util.sym_link(target, link) + self.assertTrue(os.path.exists(link)) + + util.sym_link(target, link, force=True) + self.assertTrue(os.path.exists(link)) + + def test_sym_link_dangling_link(self): + tmpd = self.tmp_dir() + link = self.tmp_path("link", tmpd) + target = self.tmp_path("target", tmpd) + + util.sym_link(target, link) + self.assertTrue(os.path.islink(link)) + self.assertFalse(os.path.exists(link)) + + util.sym_link(target, link, force=True) + self.assertTrue(os.path.islink(link)) + self.assertFalse(os.path.exists(link)) + + def test_sym_link_create_dangling(self): + tmpd = self.tmp_dir() + link = self.tmp_path("link", tmpd) + target = self.tmp_path("target", tmpd) + + util.sym_link(target, link) + self.assertTrue(os.path.islink(link)) + self.assertFalse(os.path.exists(link)) + + class TestUptime(CiTestCase): @mock.patch("cloudinit.util.boottime") @mock.patch("cloudinit.util.os.path.exists")