Source code

Revision control

Copy as Markdown

Other Tools

Test Info:

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import stat
import unittest
import mozunit
import six
import mozpack.path as mozpath
from mozpack.copier import FileCopier, FileRegistry, FileRegistrySubtree, Jarrer
from mozpack.errors import ErrorMessage
from mozpack.files import ExistingFile, GeneratedFile
from mozpack.mozjar import JarReader
from mozpack.test.test_files import MatchTestTemplate, MockDest, TestWithTmpDir
class BaseTestFileRegistry(MatchTestTemplate):
def add(self, path):
self.registry.add(path, GeneratedFile(path))
def do_check(self, pattern, result):
self.checked = True
if result:
self.assertTrue(self.registry.contains(pattern))
else:
self.assertFalse(self.registry.contains(pattern))
self.assertEqual(self.registry.match(pattern), result)
def do_test_file_registry(self, registry):
self.registry = registry
self.registry.add("foo", GeneratedFile(b"foo"))
bar = GeneratedFile(b"bar")
self.registry.add("bar", bar)
self.assertEqual(self.registry.paths(), ["foo", "bar"])
self.assertEqual(self.registry["bar"], bar)
self.assertRaises(
ErrorMessage, self.registry.add, "foo", GeneratedFile(b"foo2")
)
self.assertRaises(ErrorMessage, self.registry.remove, "qux")
self.assertRaises(
ErrorMessage, self.registry.add, "foo/bar", GeneratedFile(b"foobar")
)
self.assertRaises(
ErrorMessage, self.registry.add, "foo/bar/baz", GeneratedFile(b"foobar")
)
self.assertEqual(self.registry.paths(), ["foo", "bar"])
self.registry.remove("foo")
self.assertEqual(self.registry.paths(), ["bar"])
self.registry.remove("bar")
self.assertEqual(self.registry.paths(), [])
self.prepare_match_test()
self.do_match_test()
self.assertTrue(self.checked)
self.assertEqual(
self.registry.paths(),
[
"bar",
"foo/bar",
"foo/baz",
"foo/qux/1",
"foo/qux/bar",
"foo/qux/2/test",
"foo/qux/2/test2",
],
)
self.registry.remove("foo/qux")
self.assertEqual(self.registry.paths(), ["bar", "foo/bar", "foo/baz"])
self.registry.add("foo/qux", GeneratedFile(b"fooqux"))
self.assertEqual(
self.registry.paths(), ["bar", "foo/bar", "foo/baz", "foo/qux"]
)
self.registry.remove("foo/b*")
self.assertEqual(self.registry.paths(), ["bar", "foo/qux"])
self.assertEqual([f for f, c in self.registry], ["bar", "foo/qux"])
self.assertEqual(len(self.registry), 2)
self.add("foo/.foo")
self.assertTrue(self.registry.contains("foo/.foo"))
def do_test_registry_paths(self, registry):
self.registry = registry
# Can't add a file if it requires a directory in place of a
# file we also require.
self.registry.add("foo", GeneratedFile(b"foo"))
self.assertRaises(
ErrorMessage, self.registry.add, "foo/bar", GeneratedFile(b"foobar")
)
# Can't add a file if we already have a directory there.
self.registry.add("bar/baz", GeneratedFile(b"barbaz"))
self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar"))
# Bump the count of things that require bar/ to 2.
self.registry.add("bar/zot", GeneratedFile(b"barzot"))
self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar"))
# Drop the count of things that require bar/ to 1.
self.registry.remove("bar/baz")
self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar"))
# Drop the count of things that require bar/ to 0.
self.registry.remove("bar/zot")
self.registry.add("bar/zot", GeneratedFile(b"barzot"))
class TestFileRegistry(BaseTestFileRegistry, unittest.TestCase):
def test_partial_paths(self):
cases = {
"foo/bar/baz/zot": ["foo/bar/baz", "foo/bar", "foo"],
"foo/bar": ["foo"],
"bar": [],
}
reg = FileRegistry()
for path, parts in six.iteritems(cases):
self.assertEqual(reg._partial_paths(path), parts)
def test_file_registry(self):
self.do_test_file_registry(FileRegistry())
def test_registry_paths(self):
self.do_test_registry_paths(FileRegistry())
def test_required_directories(self):
self.registry = FileRegistry()
self.registry.add("foo", GeneratedFile(b"foo"))
self.assertEqual(self.registry.required_directories(), set())
self.registry.add("bar/baz", GeneratedFile(b"barbaz"))
self.assertEqual(self.registry.required_directories(), {"bar"})
self.registry.add("bar/zot", GeneratedFile(b"barzot"))
self.assertEqual(self.registry.required_directories(), {"bar"})
self.registry.add("bar/zap/zot", GeneratedFile(b"barzapzot"))
self.assertEqual(self.registry.required_directories(), {"bar", "bar/zap"})
self.registry.remove("bar/zap/zot")
self.assertEqual(self.registry.required_directories(), {"bar"})
self.registry.remove("bar/baz")
self.assertEqual(self.registry.required_directories(), {"bar"})
self.registry.remove("bar/zot")
self.assertEqual(self.registry.required_directories(), set())
self.registry.add("x/y/z", GeneratedFile(b"xyz"))
self.assertEqual(self.registry.required_directories(), {"x", "x/y"})
class TestFileRegistrySubtree(BaseTestFileRegistry, unittest.TestCase):
def test_file_registry_subtree_base(self):
registry = FileRegistry()
self.assertEqual(registry, FileRegistrySubtree("", registry))
self.assertNotEqual(registry, FileRegistrySubtree("base", registry))
def create_registry(self):
registry = FileRegistry()
registry.add("foo/bar", GeneratedFile(b"foo/bar"))
registry.add("baz/qux", GeneratedFile(b"baz/qux"))
return FileRegistrySubtree("base/root", registry)
def test_file_registry_subtree(self):
self.do_test_file_registry(self.create_registry())
def test_registry_paths_subtree(self):
FileRegistry()
self.do_test_registry_paths(self.create_registry())
class TestFileCopier(TestWithTmpDir):
def all_dirs(self, base):
all_dirs = set()
for root, dirs, files in os.walk(base):
if not dirs:
all_dirs.add(mozpath.relpath(root, base))
return all_dirs
def all_files(self, base):
all_files = set()
for root, dirs, files in os.walk(base):
for f in files:
all_files.add(mozpath.join(mozpath.relpath(root, base), f))
return all_files
def test_file_copier(self):
copier = FileCopier()
copier.add("foo/bar", GeneratedFile(b"foobar"))
copier.add("foo/qux", GeneratedFile(b"fooqux"))
copier.add("foo/deep/nested/directory/file", GeneratedFile(b"fooz"))
copier.add("bar", GeneratedFile(b"bar"))
copier.add("qux/foo", GeneratedFile(b"quxfoo"))
copier.add("qux/bar", GeneratedFile(b""))
result = copier.copy(self.tmpdir)
self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
self.assertEqual(
self.all_dirs(self.tmpdir), set(["foo/deep/nested/directory", "qux"])
)
self.assertEqual(
result.updated_files,
set(self.tmppath(p) for p in self.all_files(self.tmpdir)),
)
self.assertEqual(result.existing_files, set())
self.assertEqual(result.removed_files, set())
self.assertEqual(result.removed_directories, set())
copier.remove("foo")
copier.add("test", GeneratedFile(b"test"))
result = copier.copy(self.tmpdir)
self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
self.assertEqual(self.all_dirs(self.tmpdir), set(["qux"]))
self.assertEqual(
result.removed_files,
set(
self.tmppath(p)
for p in ("foo/bar", "foo/qux", "foo/deep/nested/directory/file")
),
)
def test_symlink_directory_replaced(self):
"""Directory symlinks in destination are replaced if they need to be
real directories."""
if not self.symlink_supported:
return
dest = self.tmppath("dest")
copier = FileCopier()
copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz"))
os.makedirs(self.tmppath("dest/foo"))
dummy = self.tmppath("dummy")
os.mkdir(dummy)
link = self.tmppath("dest/foo/bar")
os.symlink(dummy, link)
result = copier.copy(dest)
st = os.lstat(link)
self.assertFalse(stat.S_ISLNK(st.st_mode))
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertEqual(self.all_files(dest), set(copier.paths()))
self.assertEqual(result.removed_directories, set())
self.assertEqual(len(result.updated_files), 1)
def test_remove_unaccounted_directory_symlinks(self):
"""Directory symlinks in destination that are not in the way are
deleted according to remove_unaccounted and
remove_all_directory_symlinks.
"""
if not self.symlink_supported:
return
dest = self.tmppath("dest")
copier = FileCopier()
copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz"))
os.makedirs(self.tmppath("dest/foo"))
dummy = self.tmppath("dummy")
os.mkdir(dummy)
os.mkdir(self.tmppath("dest/zot"))
link = self.tmppath("dest/zot/zap")
os.symlink(dummy, link)
# If not remove_unaccounted but remove_empty_directories, then
# the symlinked directory remains (as does its containing
# directory).
result = copier.copy(
dest,
remove_unaccounted=False,
remove_empty_directories=True,
remove_all_directory_symlinks=False,
)
st = os.lstat(link)
self.assertTrue(stat.S_ISLNK(st.st_mode))
self.assertFalse(stat.S_ISDIR(st.st_mode))
self.assertEqual(self.all_files(dest), set(copier.paths()))
self.assertEqual(self.all_dirs(dest), set(["foo/bar"]))
self.assertEqual(result.removed_directories, set())
self.assertEqual(len(result.updated_files), 1)
# If remove_unaccounted but not remove_empty_directories, then
# only the symlinked directory is removed.
result = copier.copy(
dest,
remove_unaccounted=True,
remove_empty_directories=False,
remove_all_directory_symlinks=False,
)
st = os.lstat(self.tmppath("dest/zot"))
self.assertFalse(stat.S_ISLNK(st.st_mode))
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertEqual(result.removed_files, set([link]))
self.assertEqual(result.removed_directories, set())
self.assertEqual(self.all_files(dest), set(copier.paths()))
self.assertEqual(self.all_dirs(dest), set(["foo/bar", "zot"]))
# If remove_unaccounted and remove_empty_directories, then
# both the symlink and its containing directory are removed.
link = self.tmppath("dest/zot/zap")
os.symlink(dummy, link)
result = copier.copy(
dest,
remove_unaccounted=True,
remove_empty_directories=True,
remove_all_directory_symlinks=False,
)
self.assertEqual(result.removed_files, set([link]))
self.assertEqual(result.removed_directories, set([self.tmppath("dest/zot")]))
self.assertEqual(self.all_files(dest), set(copier.paths()))
self.assertEqual(self.all_dirs(dest), set(["foo/bar"]))
def test_permissions(self):
"""Ensure files without write permission can be deleted."""
with open(self.tmppath("dummy"), "a"):
pass
p = self.tmppath("no_perms")
with open(p, "a"):
pass
# Make file and directory unwritable. Reminder: making a directory
# unwritable prevents modifications (including deletes) from the list
# of files in that directory.
os.chmod(p, 0o400)
os.chmod(self.tmpdir, 0o400)
copier = FileCopier()
copier.add("dummy", GeneratedFile(b"content"))
result = copier.copy(self.tmpdir)
self.assertEqual(result.removed_files_count, 1)
self.assertFalse(os.path.exists(p))
def test_no_remove(self):
copier = FileCopier()
copier.add("foo", GeneratedFile(b"foo"))
with open(self.tmppath("bar"), "a"):
pass
os.mkdir(self.tmppath("emptydir"))
d = self.tmppath("populateddir")
os.mkdir(d)
with open(self.tmppath("populateddir/foo"), "a"):
pass
result = copier.copy(self.tmpdir, remove_unaccounted=False)
self.assertEqual(
self.all_files(self.tmpdir), set(["foo", "bar", "populateddir/foo"])
)
self.assertEqual(self.all_dirs(self.tmpdir), set(["populateddir"]))
self.assertEqual(result.removed_files, set())
self.assertEqual(result.removed_directories, set([self.tmppath("emptydir")]))
def test_no_remove_empty_directories(self):
copier = FileCopier()
copier.add("foo", GeneratedFile(b"foo"))
with open(self.tmppath("bar"), "a"):
pass
os.mkdir(self.tmppath("emptydir"))
d = self.tmppath("populateddir")
os.mkdir(d)
with open(self.tmppath("populateddir/foo"), "a"):
pass
result = copier.copy(
self.tmpdir, remove_unaccounted=False, remove_empty_directories=False
)
self.assertEqual(
self.all_files(self.tmpdir), set(["foo", "bar", "populateddir/foo"])
)
self.assertEqual(self.all_dirs(self.tmpdir), set(["emptydir", "populateddir"]))
self.assertEqual(result.removed_files, set())
self.assertEqual(result.removed_directories, set())
def test_optional_exists_creates_unneeded_directory(self):
"""Demonstrate that a directory not strictly required, but specified
as the path to an optional file, will be unnecessarily created.
This behaviour is wrong; fixing it is tracked by Bug 972432;
and this test exists to guard against unexpected changes in
behaviour.
"""
dest = self.tmppath("dest")
copier = FileCopier()
copier.add("foo/bar", ExistingFile(required=False))
result = copier.copy(dest)
st = os.lstat(self.tmppath("dest/foo"))
self.assertFalse(stat.S_ISLNK(st.st_mode))
self.assertTrue(stat.S_ISDIR(st.st_mode))
# What's worse, we have no record that dest was created.
self.assertEqual(len(result.updated_files), 0)
# But we do have an erroneous record of an optional file
# existing when it does not.
self.assertIn(self.tmppath("dest/foo/bar"), result.existing_files)
def test_remove_unaccounted_file_registry(self):
"""Test FileCopier.copy(remove_unaccounted=FileRegistry())"""
dest = self.tmppath("dest")
copier = FileCopier()
copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz"))
copier.add("foo/bar/qux", GeneratedFile(b"foobarqux"))
copier.add("foo/hoge/fuga", GeneratedFile(b"foohogefuga"))
copier.add("foo/toto/tata", GeneratedFile(b"footototata"))
os.makedirs(os.path.join(dest, "bar"))
with open(os.path.join(dest, "bar", "bar"), "w") as fh:
fh.write("barbar")
os.makedirs(os.path.join(dest, "foo", "toto"))
with open(os.path.join(dest, "foo", "toto", "toto"), "w") as fh:
fh.write("foototototo")
result = copier.copy(dest, remove_unaccounted=False)
self.assertEqual(
self.all_files(dest), set(copier.paths()) | {"foo/toto/toto", "bar/bar"}
)
self.assertEqual(
self.all_dirs(dest), {"foo/bar", "foo/hoge", "foo/toto", "bar"}
)
copier2 = FileCopier()
copier2.add("foo/hoge/fuga", GeneratedFile(b"foohogefuga"))
# We expect only files copied from the first copier to be removed,
# not the extra file that was there beforehand.
result = copier2.copy(dest, remove_unaccounted=copier)
self.assertEqual(
self.all_files(dest), set(copier2.paths()) | {"foo/toto/toto", "bar/bar"}
)
self.assertEqual(self.all_dirs(dest), {"foo/hoge", "foo/toto", "bar"})
self.assertEqual(result.updated_files, {self.tmppath("dest/foo/hoge/fuga")})
self.assertEqual(result.existing_files, set())
self.assertEqual(
result.removed_files,
{
self.tmppath(p)
for p in ("dest/foo/bar/baz", "dest/foo/bar/qux", "dest/foo/toto/tata")
},
)
self.assertEqual(result.removed_directories, {self.tmppath("dest/foo/bar")})
class TestJarrer(unittest.TestCase):
def check_jar(self, dest, copier):
jar = JarReader(fileobj=dest)
self.assertEqual([f.filename for f in jar], copier.paths())
for f in jar:
self.assertEqual(f.uncompressed_data.read(), copier[f.filename].content)
def test_jarrer(self):
copier = Jarrer()
copier.add("foo/bar", GeneratedFile(b"foobar"))
copier.add("foo/qux", GeneratedFile(b"fooqux"))
copier.add("foo/deep/nested/directory/file", GeneratedFile(b"fooz"))
copier.add("bar", GeneratedFile(b"bar"))
copier.add("qux/foo", GeneratedFile(b"quxfoo"))
copier.add("qux/bar", GeneratedFile(b""))
dest = MockDest()
copier.copy(dest)
self.check_jar(dest, copier)
copier.remove("foo")
copier.add("test", GeneratedFile(b"test"))
copier.copy(dest)
self.check_jar(dest, copier)
copier.remove("test")
copier.add("test", GeneratedFile(b"replaced-content"))
copier.copy(dest)
self.check_jar(dest, copier)
copier.copy(dest)
self.check_jar(dest, copier)
preloaded = ["qux/bar", "bar"]
copier.preload(preloaded)
copier.copy(dest)
dest.seek(0)
jar = JarReader(fileobj=dest)
self.assertEqual(
[f.filename for f in jar],
preloaded + [p for p in copier.paths() if p not in preloaded],
)
self.assertEqual(jar.last_preloaded, preloaded[-1])
def test_jarrer_compress(self):
copier = Jarrer()
copier.add("foo/bar", GeneratedFile(b"ffffff"))
copier.add("foo/qux", GeneratedFile(b"ffffff"), compress=False)
dest = MockDest()
copier.copy(dest)
self.check_jar(dest, copier)
dest.seek(0)
jar = JarReader(fileobj=dest)
self.assertTrue(jar["foo/bar"].compressed)
self.assertFalse(jar["foo/qux"].compressed)
if __name__ == "__main__":
mozunit.main()