# Copyright (C) 2019 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Unittests for the project.py module.""" import contextlib import os from pathlib import Path import subprocess import tempfile import unittest import error import manifest_xml import git_command import git_config import platform_utils import project @contextlib.contextmanager def TempGitTree(): """Create a new empty git checkout for testing.""" with tempfile.TemporaryDirectory(prefix='repo-tests') as tempdir: # Tests need to assume, that main is default branch at init, # which is not supported in config until 2.28. cmd = ['git', 'init'] if git_command.git_require((2, 28, 0)): cmd += ['--initial-branch=main'] else: # Use template dir for init. templatedir = tempfile.mkdtemp(prefix='.test-template') with open(os.path.join(templatedir, 'HEAD'), 'w') as fp: fp.write('ref: refs/heads/main\n') cmd += ['--template', templatedir] subprocess.check_call(cmd, cwd=tempdir) yield tempdir class FakeProject(object): """A fake for Project for basic functionality.""" def __init__(self, worktree): self.worktree = worktree self.gitdir = os.path.join(worktree, '.git') self.name = 'fakeproject' self.work_git = project.Project._GitGetByExec( self, bare=False, gitdir=self.gitdir) self.bare_git = project.Project._GitGetByExec( self, bare=True, gitdir=self.gitdir) self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir) class ReviewableBranchTests(unittest.TestCase): """Check ReviewableBranch behavior.""" def test_smoke(self): """A quick run through everything.""" with TempGitTree() as tempdir: fakeproj = FakeProject(tempdir) # Generate some commits. with open(os.path.join(tempdir, 'readme'), 'w') as fp: fp.write('txt') fakeproj.work_git.add('readme') fakeproj.work_git.commit('-mAdd file') fakeproj.work_git.checkout('-b', 'work') fakeproj.work_git.rm('-f', 'readme') fakeproj.work_git.commit('-mDel file') # Start off with the normal details. rb = project.ReviewableBranch( fakeproj, fakeproj.config.GetBranch('work'), 'main') self.assertEqual('work', rb.name) self.assertEqual(1, len(rb.commits)) self.assertIn('Del file', rb.commits[0]) d = rb.unabbrev_commits self.assertEqual(1, len(d)) short, long = next(iter(d.items())) self.assertTrue(long.startswith(short)) self.assertTrue(rb.base_exists) # Hard to assert anything useful about this. self.assertTrue(rb.date) # Now delete the tracking branch! fakeproj.work_git.branch('-D', 'main') rb = project.ReviewableBranch( fakeproj, fakeproj.config.GetBranch('work'), 'main') self.assertEqual(0, len(rb.commits)) self.assertFalse(rb.base_exists) # Hard to assert anything useful about this. self.assertTrue(rb.date) class CopyLinkTestCase(unittest.TestCase): """TestCase for stub repo client checkouts. It'll have a layout like: tempdir/ # self.tempdir checkout/ # self.topdir git-project/ # self.worktree Attributes: tempdir: A dedicated temporary directory. worktree: The top of the repo client checkout. topdir: The top of a project checkout. """ def setUp(self): self.tempdirobj = tempfile.TemporaryDirectory(prefix='repo_tests') self.tempdir = self.tempdirobj.name self.topdir = os.path.join(self.tempdir, 'checkout') self.worktree = os.path.join(self.topdir, 'git-project') os.makedirs(self.topdir) os.makedirs(self.worktree) def tearDown(self): self.tempdirobj.cleanup() @staticmethod def touch(path): with open(path, 'w'): pass def assertExists(self, path, msg=None): """Make sure |path| exists.""" if os.path.exists(path): return if msg is None: msg = ['path is missing: %s' % path] while path != '/': path = os.path.dirname(path) if not path: # If we're given something like "foo", abort once we get to "". break result = os.path.exists(path) msg.append('\tos.path.exists(%s): %s' % (path, result)) if result: msg.append('\tcontents: %r' % os.listdir(path)) break msg = '\n'.join(msg) raise self.failureException(msg) class CopyFile(CopyLinkTestCase): """Check _CopyFile handling.""" def CopyFile(self, src, dest): return project._CopyFile(self.worktree, src, self.topdir, dest) def test_basic(self): """Basic test of copying a file from a project to the toplevel.""" src = os.path.join(self.worktree, 'foo.txt') self.touch(src) cf = self.CopyFile('foo.txt', 'foo') cf._Copy() self.assertExists(os.path.join(self.topdir, 'foo')) def test_src_subdir(self): """Copy a file from a subdir of a project.""" src = os.path.join(self.worktree, 'bar', 'foo.txt') os.makedirs(os.path.dirname(src)) self.touch(src) cf = self.CopyFile('bar/foo.txt', 'new.txt') cf._Copy() self.assertExists(os.path.join(self.topdir, 'new.txt')) def test_dest_subdir(self): """Copy a file to a subdir of a checkout.""" src = os.path.join(self.worktree, 'foo.txt') self.touch(src) cf = self.CopyFile('foo.txt', 'sub/dir/new.txt') self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub'))) cf._Copy() self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'new.txt')) def test_update(self): """Make sure changed files get copied again.""" src = os.path.join(self.worktree, 'foo.txt') dest = os.path.join(self.topdir, 'bar') with open(src, 'w') as f: f.write('1st') cf = self.CopyFile('foo.txt', 'bar') cf._Copy() self.assertExists(dest) with open(dest) as f: self.assertEqual(f.read(), '1st') with open(src, 'w') as f: f.write('2nd!') cf._Copy() with open(dest) as f: self.assertEqual(f.read(), '2nd!') def test_src_block_symlink(self): """Do not allow reading from a symlinked path.""" src = os.path.join(self.worktree, 'foo.txt') sym = os.path.join(self.worktree, 'sym') self.touch(src) platform_utils.symlink('foo.txt', sym) self.assertExists(sym) cf = self.CopyFile('sym', 'foo') self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_src_block_symlink_traversal(self): """Do not allow reading through a symlink dir.""" realfile = os.path.join(self.tempdir, 'file.txt') self.touch(realfile) src = os.path.join(self.worktree, 'bar', 'file.txt') platform_utils.symlink(self.tempdir, os.path.join(self.worktree, 'bar')) self.assertExists(src) cf = self.CopyFile('bar/file.txt', 'foo') self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_src_block_copy_from_dir(self): """Do not allow copying from a directory.""" src = os.path.join(self.worktree, 'dir') os.makedirs(src) cf = self.CopyFile('dir', 'foo') self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_dest_block_symlink(self): """Do not allow writing to a symlink.""" src = os.path.join(self.worktree, 'foo.txt') self.touch(src) platform_utils.symlink('dest', os.path.join(self.topdir, 'sym')) cf = self.CopyFile('foo.txt', 'sym') self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_dest_block_symlink_traversal(self): """Do not allow writing through a symlink dir.""" src = os.path.join(self.worktree, 'foo.txt') self.touch(src) platform_utils.symlink(tempfile.gettempdir(), os.path.join(self.topdir, 'sym')) cf = self.CopyFile('foo.txt', 'sym/foo.txt') self.assertRaises(error.ManifestInvalidPathError, cf._Copy) def test_src_block_copy_to_dir(self): """Do not allow copying to a directory.""" src = os.path.join(self.worktree, 'foo.txt') self.touch(src) os.makedirs(os.path.join(self.topdir, 'dir')) cf = self.CopyFile('foo.txt', 'dir') self.assertRaises(error.ManifestInvalidPathError, cf._Copy) class LinkFile(CopyLinkTestCase): """Check _LinkFile handling.""" def LinkFile(self, src, dest): return project._LinkFile(self.worktree, src, self.topdir, dest) def test_basic(self): """Basic test of linking a file from a project into the toplevel.""" src = os.path.join(self.worktree, 'foo.txt') self.touch(src) lf = self.LinkFile('foo.txt', 'foo') lf._Link() dest = os.path.join(self.topdir, 'foo') self.assertExists(dest) self.assertTrue(os.path.islink(dest)) self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) def test_src_subdir(self): """Link to a file in a subdir of a project.""" src = os.path.join(self.worktree, 'bar', 'foo.txt') os.makedirs(os.path.dirname(src)) self.touch(src) lf = self.LinkFile('bar/foo.txt', 'foo') lf._Link() self.assertExists(os.path.join(self.topdir, 'foo')) def test_src_self(self): """Link to the project itself.""" dest = os.path.join(self.topdir, 'foo', 'bar') lf = self.LinkFile('.', 'foo/bar') lf._Link() self.assertExists(dest) self.assertEqual(os.path.join('..', 'git-project'), os.readlink(dest)) def test_dest_subdir(self): """Link a file to a subdir of a checkout.""" src = os.path.join(self.worktree, 'foo.txt') self.touch(src) lf = self.LinkFile('foo.txt', 'sub/dir/foo/bar') self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub'))) lf._Link() self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'foo', 'bar')) def test_src_block_relative(self): """Do not allow relative symlinks.""" BAD_SOURCES = ( './', '..', '../', 'foo/.', 'foo/./bar', 'foo/..', 'foo/../foo', ) for src in BAD_SOURCES: lf = self.LinkFile(src, 'foo') self.assertRaises(error.ManifestInvalidPathError, lf._Link) def test_update(self): """Make sure changed targets get updated.""" dest = os.path.join(self.topdir, 'sym') src = os.path.join(self.worktree, 'foo.txt') self.touch(src) lf = self.LinkFile('foo.txt', 'sym') lf._Link() self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) # Point the symlink somewhere else. os.unlink(dest) platform_utils.symlink(self.tempdir, dest) lf._Link() self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) class MigrateWorkTreeTests(unittest.TestCase): """Check _MigrateOldWorkTreeGitDir handling.""" _SYMLINKS = { 'config', 'description', 'hooks', 'info', 'logs', 'objects', 'packed-refs', 'refs', 'rr-cache', 'shallow', 'svn', } _FILES = { 'COMMIT_EDITMSG', 'FETCH_HEAD', 'HEAD', 'index', 'ORIG_HEAD', 'unknown-file-should-be-migrated', } _CLEAN_FILES = { 'a-vim-temp-file~', '#an-emacs-temp-file#', } @classmethod @contextlib.contextmanager def _simple_layout(cls): """Create a simple repo client checkout to test against.""" with tempfile.TemporaryDirectory() as tempdir: tempdir = Path(tempdir) gitdir = tempdir / '.repo/projects/src/test.git' gitdir.mkdir(parents=True) cmd = ['git', 'init', '--bare', str(gitdir)] subprocess.check_call(cmd) dotgit = tempdir / 'src/test/.git' dotgit.mkdir(parents=True) for name in cls._SYMLINKS: (dotgit / name).symlink_to(f'../../../.repo/projects/src/test.git/{name}') for name in cls._FILES | cls._CLEAN_FILES: (dotgit / name).write_text(name) yield tempdir def test_standard(self): """Migrate a standard checkout that we expect.""" with self._simple_layout() as tempdir: dotgit = tempdir / 'src/test/.git' project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) # Make sure the dir was transformed into a symlink. self.assertTrue(dotgit.is_symlink()) self.assertEqual(os.readlink(dotgit), os.path.normpath('../../.repo/projects/src/test.git')) # Make sure files were moved over. gitdir = tempdir / '.repo/projects/src/test.git' for name in self._FILES: self.assertEqual(name, (gitdir / name).read_text()) # Make sure files were removed. for name in self._CLEAN_FILES: self.assertFalse((gitdir / name).exists()) def test_unknown(self): """A checkout with unknown files should abort.""" with self._simple_layout() as tempdir: dotgit = tempdir / 'src/test/.git' (tempdir / '.repo/projects/src/test.git/random-file').write_text('one') (dotgit / 'random-file').write_text('two') with self.assertRaises(error.GitError): project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) # Make sure no content was actually changed. self.assertTrue(dotgit.is_dir()) for name in self._FILES: self.assertTrue((dotgit / name).is_file()) for name in self._CLEAN_FILES: self.assertTrue((dotgit / name).is_file()) for name in self._SYMLINKS: self.assertTrue((dotgit / name).is_symlink()) class ManifestPropertiesFetchedCorrectly(unittest.TestCase): """Ensure properties are fetched properly.""" def setUpManifest(self, tempdir): repodir = os.path.join(tempdir, '.repo') manifest_dir = os.path.join(repodir, 'manifests') manifest_file = os.path.join( repodir, manifest_xml.MANIFEST_FILE_NAME) local_manifest_dir = os.path.join( repodir, manifest_xml.LOCAL_MANIFESTS_DIR_NAME) os.mkdir(repodir) os.mkdir(manifest_dir) manifest = manifest_xml.XmlManifest(repodir, manifest_file) return project.ManifestProject( manifest, 'test/manifest', os.path.join(tempdir, '.git'), tempdir) def test_manifest_config_properties(self): """Test we are fetching the manifest config properties correctly.""" with TempGitTree() as tempdir: fakeproj = self.setUpManifest(tempdir) # Set property using the expected Set method, then ensure # the porperty functions are using the correct Get methods. fakeproj.config.SetString( 'manifest.standalone', 'https://chicken/manifest.git') self.assertEqual( fakeproj.standalone_manifest_url, 'https://chicken/manifest.git') fakeproj.config.SetString('manifest.groups', 'test-group, admin-group') self.assertEqual(fakeproj.manifest_groups, 'test-group, admin-group') fakeproj.config.SetString('repo.reference', 'mirror/ref') self.assertEqual(fakeproj.reference, 'mirror/ref') fakeproj.config.SetBoolean('repo.dissociate', False) self.assertFalse(fakeproj.dissociate) fakeproj.config.SetBoolean('repo.archive', False) self.assertFalse(fakeproj.archive) fakeproj.config.SetBoolean('repo.mirror', False) self.assertFalse(fakeproj.mirror) fakeproj.config.SetBoolean('repo.worktree', False) self.assertFalse(fakeproj.use_worktree) fakeproj.config.SetBoolean('repo.clonebundle', False) self.assertFalse(fakeproj.clone_bundle) fakeproj.config.SetBoolean('repo.submodules', False) self.assertFalse(fakeproj.submodules) fakeproj.config.SetBoolean('repo.git-lfs', False) self.assertFalse(fakeproj.git_lfs) fakeproj.config.SetBoolean('repo.superproject', False) self.assertFalse(fakeproj.use_superproject) fakeproj.config.SetBoolean('repo.partialclone', False) self.assertFalse(fakeproj.partial_clone) fakeproj.config.SetString('repo.depth', '48') self.assertEqual(fakeproj.depth, '48') fakeproj.config.SetString('repo.clonefilter', 'blob:limit=10M') self.assertEqual(fakeproj.clone_filter, 'blob:limit=10M') fakeproj.config.SetString('repo.partialcloneexclude', 'third_party/big_repo') self.assertEqual(fakeproj.partial_clone_exclude, 'third_party/big_repo') fakeproj.config.SetString('manifest.platform', 'auto') self.assertEqual(fakeproj.manifest_platform, 'auto')