# Copyright (C) 2019 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Unittests for the project.py module.""" import contextlib import os from pathlib import Path import subprocess import tempfile import unittest import error import manifest_xml import git_command import git_config import platform_utils import project @contextlib.contextmanager def TempGitTree(): """Create a new empty git checkout for testing.""" with tempfile.TemporaryDirectory(prefix="repo-tests") as tempdir: # Tests need to assume, that main is default branch at init, # which is not supported in config until 2.28. cmd = ["git", "init"] if git_command.git_require((2, 28, 0)): cmd += ["--initial-branch=main"] else: # Use template dir for init. templatedir = tempfile.mkdtemp(prefix=".test-template") with open(os.path.join(templatedir, "HEAD"), "w") as fp: fp.write("ref: refs/heads/main\n") cmd += ["--template", templatedir] subprocess.check_call(cmd, cwd=tempdir) yield tempdir class FakeProject(object): """A fake for Project for basic functionality.""" def __init__(self, worktree): self.worktree = worktree self.gitdir = os.path.join(worktree, ".git") self.name = "fakeproject" self.work_git = project.Project._GitGetByExec( self, bare=False, gitdir=self.gitdir ) self.bare_git = project.Project._GitGetByExec( self, bare=True, gitdir=self.gitdir ) self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir) class ReviewableBranchTests(unittest.TestCase): """Check ReviewableBranch behavior.""" def test_smoke(self): """A quick run through everything.""" with TempGitTree() as tempdir: fakeproj = FakeProject(tempdir) # Generate some commits. with open(os.path.join(tempdir, "readme"), "w") as fp: fp.write("txt") fakeproj.work_git.add("readme") fakeproj.work_git.commit("-mAdd file") fakeproj.work_git.checkout("-b", "work") fakeproj.work_git.rm("-f", "readme") fakeproj.work_git.commit("-mDel file") # Start off with the normal details. rb = project.ReviewableBranch( fakeproj, fakeproj.config.GetBranch("work"), "main" ) self.assertEqual("work", rb.name) self.assertEqual(1, len(rb.commits)) self.assertIn("Del file", rb.commits[0]) d = rb.unabbrev_commits self.assertEqual(1, len(d)) short, long = next(iter(d.items())) self.assertTrue(long.startswith(short)) self.assertTrue(rb.base_exists) # Hard to assert anything useful about this. self.assertTrue(rb.date) # Now delete the tracking branch! fakeproj.work_git.branch("-D", "main") rb = project.ReviewableBranch( fakeproj, fakeproj.config.GetBranch("work"), "main" ) self.assertEqual(0, len(rb.commits)) self.assertFalse(rb.base_exists) # Hard to assert anything useful about this. self.assertTrue(rb.date) class CopyLinkTestCase(unittest.TestCase): """TestCase for stub repo client checkouts. It'll have a layout like this: tempdir/ # self.tempdir checkout/ # self.topdir git-project/ # self.worktree Attributes: tempdir: A dedicated temporary directory. worktree: The top of the repo client checkout. topdir: The top of a project checkout. """ def setUp(self): self.tempdirobj = tempfile.TemporaryDirectory(prefix="repo_tests") self.tempdir = self.tempdirobj.name self.topdir = os.path.join(self.tempdir, "checkout") self.worktree = os.path.join(self.topdir, "git-project") os.makedirs(self.topdir) os.makedirs(self.worktree) def tearDown(self): self.tempdirobj.cleanup() @staticmethod def touch(path): with open(path, "w"): pass def assertExists(self, path, msg=None): """Make sure |path| exists.""" if os.path.exists(path): return if msg is None: msg = ["path is missing: %s" % path] while path != "/": path = os.path.dirname(path) if not path: # If we're given something like "foo", abort once we get to # "". break result = os.path.exists(path) msg.append("\tos.path.exists(%s): %s" % (path, result)) if result: msg.append("\tcontents: %r" % os.listdir(path)) break msg = "\n".join(msg) raise self.failureException(msg) class CopyFile(CopyLinkTestCase): """Check _CopyFile handling.""" def CopyFile(self, src, dest): return project._CopyFile(self.worktree, src, self.topdir, dest) def test_basic(self): """Basic test of copying a file from a project to the toplevel.""" src = os.path.join(self.worktree, "foo.txt") self.touch(src) cf = self.CopyFile("foo.txt", "foo") cf._Copy() self.assertExists(os.path.join(self.topdir, "foo")) def test_src_subdir(self): """Copy a file from a subdir of a project.""" src = os.path.join(self.worktree, "bar", "foo.txt") os.makedirs(os.path.dirname(src)) self.touch(src) cf = self.CopyFile("bar/foo.txt", "new.txt") cf._Copy() self.assertExists(os.path.join(self.topdir, "new.txt")) def test_dest_subdir(self): """Copy a file to a subdir of a checkout.""" src = os.path.join(self.worktree, "foo.txt") self.touch(src) cf = self.CopyFile("foo.txt", "sub/dir/new.txt") self.assertFalse(os.path.exists(os.path.join(self.topdir, "sub"))) cf._Copy() self.assertExists(os.path.join(self.topdir, "sub", "dir", "new.txt")) def test_update(self): """Make sure changed files get copied again.""" src = os.path.join(self.worktree, "foo.txt") dest = os.path.join(self.topdir, "bar") with open(src, "w") as f: f.write("1st") cf = self.CopyFile("foo.txt", "bar") cf._Copy() self.assertExists(dest) with open(dest) as f: self.assertEqual(f.read(), "1st") with open(src, "w") as f: f.write("2nd!") cf._Copy() with open(dest) as f: self.assertEqual(f.read(), "2nd!") def test_src_block_symlink(self): """Do not allow reading from a symlinked path.""" src = os.path.join(self.worktree, "foo.txt") sym = os.path.join(self.worktree, "sym") self.touch(src) platform_utils.symlink("foo.txt", sym) self.assertExists(sym) cf = self.CopyFile("sym", "foo") self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_src_block_symlink_traversal(self): """Do not allow reading through a symlink dir.""" realfile = os.path.join(self.tempdir, "file.txt") self.touch(realfile) src = os.path.join(self.worktree, "bar", "file.txt") platform_utils.symlink(self.tempdir, os.path.join(self.worktree, "bar")) self.assertExists(src) cf = self.CopyFile("bar/file.txt", "foo") self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_src_block_copy_from_dir(self): """Do not allow copying from a directory.""" src = os.path.join(self.worktree, "dir") os.makedirs(src) cf = self.CopyFile("dir", "foo") self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_dest_block_symlink(self): """Do not allow writing to a symlink.""" src = os.path.join(self.worktree, "foo.txt") self.touch(src) platform_utils.symlink("dest", os.path.join(self.topdir, "sym")) cf = self.CopyFile("foo.txt", "sym") self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_dest_block_symlink_traversal(self): """Do not allow writing through a symlink dir.""" src = os.path.join(self.worktree, "foo.txt") self.touch(src) platform_utils.symlink( tempfile.gettempdir(), os.path.join(self.topdir, "sym") ) cf = self.CopyFile("foo.txt", "sym/foo.txt") self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_src_block_copy_to_dir(self): """Do not allow copying to a directory.""" src = os.path.join(self.worktree, "foo.txt") self.touch(src) os.makedirs(os.path.join(self.topdir, "dir")) cf = self.CopyFile("foo.txt", "dir") self.assertRaises(error.ManifestInvalidPathError, cf._Copy) class LinkFile(CopyLinkTestCase): """Check _LinkFile handling.""" def LinkFile(self, src, dest): return project._LinkFile(self.worktree, src, self.topdir, dest) def test_basic(self): """Basic test of linking a file from a project into the toplevel.""" src = os.path.join(self.worktree, "foo.txt") self.touch(src) lf = self.LinkFile("foo.txt", "foo") lf._Link() dest = os.path.join(self.topdir, "foo") self.assertExists(dest) self.assertTrue(os.path.islink(dest)) self.assertEqual( os.path.join("git-project", "foo.txt"), os.readlink(dest) ) def test_src_subdir(self): """Link to a file in a subdir of a project.""" src = os.path.join(self.worktree, "bar", "foo.txt") os.makedirs(os.path.dirname(src)) self.touch(src) lf = self.LinkFile("bar/foo.txt", "foo") lf._Link() self.assertExists(os.path.join(self.topdir, "foo")) def test_src_self(self): """Link to the project itself.""" dest = os.path.join(self.topdir, "foo", "bar") lf = self.LinkFile(".", "foo/bar") lf._Link() self.assertExists(dest) self.assertEqual(os.path.join("..", "git-project"), os.readlink(dest)) def test_dest_subdir(self): """Link a file to a subdir of a checkout.""" src = os.path.join(self.worktree, "foo.txt") self.touch(src) lf = self.LinkFile("foo.txt", "sub/dir/foo/bar") self.assertFalse(os.path.exists(os.path.join(self.topdir, "sub"))) lf._Link() self.assertExists(os.path.join(self.topdir, "sub", "dir", "foo", "bar")) def test_src_block_relative(self): """Do not allow relative symlinks.""" BAD_SOURCES = ( "./", "..", "../", "foo/.", "foo/./bar", "foo/..", "foo/../foo", ) for src in BAD_SOURCES: lf = self.LinkFile(src, "foo") self.assertRaises(error.ManifestInvalidPathError, lf._Link) def test_update(self): """Make sure changed targets get updated.""" dest = os.path.join(self.topdir, "sym") src = os.path.join(self.worktree, "foo.txt") self.touch(src) lf = self.LinkFile("foo.txt", "sym") lf._Link() self.assertEqual( os.path.join("git-project", "foo.txt"), os.readlink(dest) ) # Point the symlink somewhere else. os.unlink(dest) platform_utils.symlink(self.tempdir, dest) lf._Link() self.assertEqual( os.path.join("git-project", "foo.txt"), os.readlink(dest) ) class MigrateWorkTreeTests(unittest.TestCase): """Check _MigrateOldWorkTreeGitDir handling.""" _SYMLINKS = { "config", "description", "hooks", "info", "logs", "objects", "packed-refs", "refs", "rr-cache", "shallow", "svn", } _FILES = { "COMMIT_EDITMSG", "FETCH_HEAD", "HEAD", "index", "ORIG_HEAD", "unknown-file-should-be-migrated", } _CLEAN_FILES = { "a-vim-temp-file~", "#an-emacs-temp-file#", } @classmethod @contextlib.contextmanager def _simple_layout(cls): """Create a simple repo client checkout to test against.""" with tempfile.TemporaryDirectory() as tempdir: tempdir = Path(tempdir) gitdir = tempdir / ".repo/projects/src/test.git" gitdir.mkdir(parents=True) cmd = ["git", "init", "--bare", str(gitdir)] subprocess.check_call(cmd) dotgit = tempdir / "src/test/.git" dotgit.mkdir(parents=True) for name in cls._SYMLINKS: (dotgit / name).symlink_to( f"../../../.repo/projects/src/test.git/{name}" ) for name in cls._FILES | cls._CLEAN_FILES: (dotgit / name).write_text(name) yield tempdir def test_standard(self): """Migrate a standard checkout that we expect.""" with self._simple_layout() as tempdir: dotgit = tempdir / "src/test/.git" project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) # Make sure the dir was transformed into a symlink. self.assertTrue(dotgit.is_symlink()) self.assertEqual( os.readlink(dotgit), os.path.normpath("../../.repo/projects/src/test.git"), ) # Make sure files were moved over. gitdir = tempdir / ".repo/projects/src/test.git" for name in self._FILES: self.assertEqual(name, (gitdir / name).read_text()) # Make sure files were removed. for name in self._CLEAN_FILES: self.assertFalse((gitdir / name).exists()) def test_unknown(self): """A checkout with unknown files should abort.""" with self._simple_layout() as tempdir: dotgit = tempdir / "src/test/.git" (tempdir / ".repo/projects/src/test.git/random-file").write_text( "one" ) (dotgit / "random-file").write_text("two") with self.assertRaises(error.GitError): project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) # Make sure no content was actually changed. self.assertTrue(dotgit.is_dir()) for name in self._FILES: self.assertTrue((dotgit / name).is_file()) for name in self._CLEAN_FILES: self.assertTrue((dotgit / name).is_file()) for name in self._SYMLINKS: self.assertTrue((dotgit / name).is_symlink()) class ManifestPropertiesFetchedCorrectly(unittest.TestCase): """Ensure properties are fetched properly.""" def setUpManifest(self, tempdir): repodir = os.path.join(tempdir, ".repo") manifest_dir = os.path.join(repodir, "manifests") manifest_file = os.path.join(repodir, manifest_xml.MANIFEST_FILE_NAME) os.mkdir(repodir) os.mkdir(manifest_dir) manifest = manifest_xml.XmlManifest(repodir, manifest_file) return project.ManifestProject( manifest, "test/manifest", os.path.join(tempdir, ".git"), tempdir ) def test_manifest_config_properties(self): """Test we are fetching the manifest config properties correctly.""" with TempGitTree() as tempdir: fakeproj = self.setUpManifest(tempdir) # Set property using the expected Set method, then ensure # the porperty functions are using the correct Get methods. fakeproj.config.SetString( "manifest.standalone", "https://chicken/manifest.git" ) self.assertEqual( fakeproj.standalone_manifest_url, "https://chicken/manifest.git" ) fakeproj.config.SetString( "manifest.groups", "test-group, admin-group" ) self.assertEqual( fakeproj.manifest_groups, "test-group, admin-group" ) fakeproj.config.SetString("repo.reference", "mirror/ref") self.assertEqual(fakeproj.reference, "mirror/ref") fakeproj.config.SetBoolean("repo.dissociate", False) self.assertFalse(fakeproj.dissociate) fakeproj.config.SetBoolean("repo.archive", False) self.assertFalse(fakeproj.archive) fakeproj.config.SetBoolean("repo.mirror", False) self.assertFalse(fakeproj.mirror) fakeproj.config.SetBoolean("repo.worktree", False) self.assertFalse(fakeproj.use_worktree) fakeproj.config.SetBoolean("repo.clonebundle", False) self.assertFalse(fakeproj.clone_bundle) fakeproj.config.SetBoolean("repo.submodules", False) self.assertFalse(fakeproj.submodules) fakeproj.config.SetBoolean("repo.git-lfs", False) self.assertFalse(fakeproj.git_lfs) fakeproj.config.SetBoolean("repo.superproject", False) self.assertFalse(fakeproj.use_superproject) fakeproj.config.SetBoolean("repo.partialclone", False) self.assertFalse(fakeproj.partial_clone) fakeproj.config.SetString("repo.depth", "48") self.assertEqual(fakeproj.depth, "48") fakeproj.config.SetString("repo.clonefilter", "blob:limit=10M") self.assertEqual(fakeproj.clone_filter, "blob:limit=10M") fakeproj.config.SetString( "repo.partialcloneexclude", "third_party/big_repo" ) self.assertEqual( fakeproj.partial_clone_exclude, "third_party/big_repo" ) fakeproj.config.SetString("manifest.platform", "auto") self.assertEqual(fakeproj.manifest_platform, "auto")