2019-06-18 21:23:39 +00:00
|
|
|
# Copyright (C) 2019 The Android Open Source Project
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
2019-07-27 01:14:55 +00:00
|
|
|
"""Unittests for the project.py module."""
|
|
|
|
|
2019-09-11 22:43:17 +00:00
|
|
|
import contextlib
|
|
|
|
import os
|
|
|
|
import shutil
|
|
|
|
import subprocess
|
|
|
|
import tempfile
|
2019-06-18 21:23:39 +00:00
|
|
|
import unittest
|
|
|
|
|
2019-08-02 19:57:57 +00:00
|
|
|
import error
|
2020-12-01 14:58:53 +00:00
|
|
|
import git_command
|
2019-09-11 22:43:17 +00:00
|
|
|
import git_config
|
2020-02-20 03:36:26 +00:00
|
|
|
import platform_utils
|
2019-06-18 21:23:39 +00:00
|
|
|
import project
|
|
|
|
|
|
|
|
|
2019-09-11 22:43:17 +00:00
|
|
|
@contextlib.contextmanager
|
|
|
|
def TempGitTree():
|
|
|
|
"""Create a new empty git checkout for testing."""
|
|
|
|
# TODO(vapier): Convert this to tempfile.TemporaryDirectory once we drop
|
|
|
|
# Python 2 support entirely.
|
|
|
|
try:
|
|
|
|
tempdir = tempfile.mkdtemp(prefix='repo-tests')
|
2020-12-01 14:58:53 +00:00
|
|
|
|
|
|
|
# Tests need to assume, that main is default branch at init,
|
|
|
|
# which is not supported in config until 2.28.
|
|
|
|
cmd = ['git', 'init']
|
|
|
|
if git_command.git_require((2, 28, 0)):
|
|
|
|
cmd += ['--initial-branch=main']
|
|
|
|
else:
|
|
|
|
# Use template dir for init.
|
|
|
|
templatedir = tempfile.mkdtemp(prefix='.test-template')
|
|
|
|
with open(os.path.join(templatedir, 'HEAD'), 'w') as fp:
|
|
|
|
fp.write('ref: refs/heads/main\n')
|
2021-04-12 21:25:55 +00:00
|
|
|
cmd += ['--template', templatedir]
|
2020-12-01 14:58:53 +00:00
|
|
|
subprocess.check_call(cmd, cwd=tempdir)
|
2019-09-11 22:43:17 +00:00
|
|
|
yield tempdir
|
|
|
|
finally:
|
2020-02-20 03:36:26 +00:00
|
|
|
platform_utils.rmtree(tempdir)
|
2019-09-11 22:43:17 +00:00
|
|
|
|
|
|
|
|
|
|
|
class FakeProject(object):
|
|
|
|
"""A fake for Project for basic functionality."""
|
|
|
|
|
|
|
|
def __init__(self, worktree):
|
|
|
|
self.worktree = worktree
|
|
|
|
self.gitdir = os.path.join(worktree, '.git')
|
|
|
|
self.name = 'fakeproject'
|
|
|
|
self.work_git = project.Project._GitGetByExec(
|
|
|
|
self, bare=False, gitdir=self.gitdir)
|
|
|
|
self.bare_git = project.Project._GitGetByExec(
|
|
|
|
self, bare=True, gitdir=self.gitdir)
|
|
|
|
self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir)
|
|
|
|
|
|
|
|
|
|
|
|
class ReviewableBranchTests(unittest.TestCase):
|
|
|
|
"""Check ReviewableBranch behavior."""
|
|
|
|
|
|
|
|
def test_smoke(self):
|
|
|
|
"""A quick run through everything."""
|
|
|
|
with TempGitTree() as tempdir:
|
|
|
|
fakeproj = FakeProject(tempdir)
|
|
|
|
|
|
|
|
# Generate some commits.
|
|
|
|
with open(os.path.join(tempdir, 'readme'), 'w') as fp:
|
|
|
|
fp.write('txt')
|
|
|
|
fakeproj.work_git.add('readme')
|
|
|
|
fakeproj.work_git.commit('-mAdd file')
|
|
|
|
fakeproj.work_git.checkout('-b', 'work')
|
|
|
|
fakeproj.work_git.rm('-f', 'readme')
|
|
|
|
fakeproj.work_git.commit('-mDel file')
|
|
|
|
|
|
|
|
# Start off with the normal details.
|
|
|
|
rb = project.ReviewableBranch(
|
2020-11-17 03:56:35 +00:00
|
|
|
fakeproj, fakeproj.config.GetBranch('work'), 'main')
|
2019-09-11 22:43:17 +00:00
|
|
|
self.assertEqual('work', rb.name)
|
|
|
|
self.assertEqual(1, len(rb.commits))
|
|
|
|
self.assertIn('Del file', rb.commits[0])
|
|
|
|
d = rb.unabbrev_commits
|
|
|
|
self.assertEqual(1, len(d))
|
|
|
|
short, long = next(iter(d.items()))
|
|
|
|
self.assertTrue(long.startswith(short))
|
|
|
|
self.assertTrue(rb.base_exists)
|
|
|
|
# Hard to assert anything useful about this.
|
|
|
|
self.assertTrue(rb.date)
|
|
|
|
|
|
|
|
# Now delete the tracking branch!
|
2020-11-17 03:56:35 +00:00
|
|
|
fakeproj.work_git.branch('-D', 'main')
|
2019-09-11 22:43:17 +00:00
|
|
|
rb = project.ReviewableBranch(
|
2020-11-17 03:56:35 +00:00
|
|
|
fakeproj, fakeproj.config.GetBranch('work'), 'main')
|
2019-09-11 22:43:17 +00:00
|
|
|
self.assertEqual(0, len(rb.commits))
|
|
|
|
self.assertFalse(rb.base_exists)
|
|
|
|
# Hard to assert anything useful about this.
|
|
|
|
self.assertTrue(rb.date)
|
2019-08-02 19:57:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
class CopyLinkTestCase(unittest.TestCase):
|
|
|
|
"""TestCase for stub repo client checkouts.
|
|
|
|
|
|
|
|
It'll have a layout like:
|
|
|
|
tempdir/ # self.tempdir
|
|
|
|
checkout/ # self.topdir
|
|
|
|
git-project/ # self.worktree
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
tempdir: A dedicated temporary directory.
|
|
|
|
worktree: The top of the repo client checkout.
|
|
|
|
topdir: The top of a project checkout.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.tempdir = tempfile.mkdtemp(prefix='repo_tests')
|
|
|
|
self.topdir = os.path.join(self.tempdir, 'checkout')
|
|
|
|
self.worktree = os.path.join(self.topdir, 'git-project')
|
|
|
|
os.makedirs(self.topdir)
|
|
|
|
os.makedirs(self.worktree)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
shutil.rmtree(self.tempdir, ignore_errors=True)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def touch(path):
|
2020-02-12 02:36:14 +00:00
|
|
|
with open(path, 'w'):
|
2019-08-02 19:57:57 +00:00
|
|
|
pass
|
|
|
|
|
|
|
|
def assertExists(self, path, msg=None):
|
|
|
|
"""Make sure |path| exists."""
|
|
|
|
if os.path.exists(path):
|
|
|
|
return
|
|
|
|
|
|
|
|
if msg is None:
|
|
|
|
msg = ['path is missing: %s' % path]
|
|
|
|
while path != '/':
|
|
|
|
path = os.path.dirname(path)
|
|
|
|
if not path:
|
|
|
|
# If we're given something like "foo", abort once we get to "".
|
|
|
|
break
|
|
|
|
result = os.path.exists(path)
|
|
|
|
msg.append('\tos.path.exists(%s): %s' % (path, result))
|
|
|
|
if result:
|
|
|
|
msg.append('\tcontents: %r' % os.listdir(path))
|
|
|
|
break
|
|
|
|
msg = '\n'.join(msg)
|
|
|
|
|
|
|
|
raise self.failureException(msg)
|
|
|
|
|
|
|
|
|
|
|
|
class CopyFile(CopyLinkTestCase):
|
|
|
|
"""Check _CopyFile handling."""
|
|
|
|
|
|
|
|
def CopyFile(self, src, dest):
|
|
|
|
return project._CopyFile(self.worktree, src, self.topdir, dest)
|
|
|
|
|
|
|
|
def test_basic(self):
|
|
|
|
"""Basic test of copying a file from a project to the toplevel."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
|
|
|
cf = self.CopyFile('foo.txt', 'foo')
|
|
|
|
cf._Copy()
|
|
|
|
self.assertExists(os.path.join(self.topdir, 'foo'))
|
|
|
|
|
|
|
|
def test_src_subdir(self):
|
|
|
|
"""Copy a file from a subdir of a project."""
|
|
|
|
src = os.path.join(self.worktree, 'bar', 'foo.txt')
|
|
|
|
os.makedirs(os.path.dirname(src))
|
|
|
|
self.touch(src)
|
|
|
|
cf = self.CopyFile('bar/foo.txt', 'new.txt')
|
|
|
|
cf._Copy()
|
|
|
|
self.assertExists(os.path.join(self.topdir, 'new.txt'))
|
|
|
|
|
|
|
|
def test_dest_subdir(self):
|
|
|
|
"""Copy a file to a subdir of a checkout."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
|
|
|
cf = self.CopyFile('foo.txt', 'sub/dir/new.txt')
|
|
|
|
self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
|
|
|
|
cf._Copy()
|
|
|
|
self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'new.txt'))
|
|
|
|
|
|
|
|
def test_update(self):
|
|
|
|
"""Make sure changed files get copied again."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
dest = os.path.join(self.topdir, 'bar')
|
|
|
|
with open(src, 'w') as f:
|
|
|
|
f.write('1st')
|
|
|
|
cf = self.CopyFile('foo.txt', 'bar')
|
|
|
|
cf._Copy()
|
|
|
|
self.assertExists(dest)
|
|
|
|
with open(dest) as f:
|
|
|
|
self.assertEqual(f.read(), '1st')
|
|
|
|
|
|
|
|
with open(src, 'w') as f:
|
|
|
|
f.write('2nd!')
|
|
|
|
cf._Copy()
|
|
|
|
with open(dest) as f:
|
|
|
|
self.assertEqual(f.read(), '2nd!')
|
|
|
|
|
|
|
|
def test_src_block_symlink(self):
|
|
|
|
"""Do not allow reading from a symlinked path."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
sym = os.path.join(self.worktree, 'sym')
|
|
|
|
self.touch(src)
|
2020-02-20 03:36:26 +00:00
|
|
|
platform_utils.symlink('foo.txt', sym)
|
2019-08-02 19:57:57 +00:00
|
|
|
self.assertExists(sym)
|
|
|
|
cf = self.CopyFile('sym', 'foo')
|
|
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
|
|
|
|
def test_src_block_symlink_traversal(self):
|
|
|
|
"""Do not allow reading through a symlink dir."""
|
2020-02-20 03:36:26 +00:00
|
|
|
realfile = os.path.join(self.tempdir, 'file.txt')
|
|
|
|
self.touch(realfile)
|
|
|
|
src = os.path.join(self.worktree, 'bar', 'file.txt')
|
|
|
|
platform_utils.symlink(self.tempdir, os.path.join(self.worktree, 'bar'))
|
2019-08-02 19:57:57 +00:00
|
|
|
self.assertExists(src)
|
2020-02-20 03:36:26 +00:00
|
|
|
cf = self.CopyFile('bar/file.txt', 'foo')
|
2019-08-02 19:57:57 +00:00
|
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
|
2020-02-12 02:14:55 +00:00
|
|
|
def test_src_block_copy_from_dir(self):
|
2019-08-02 19:57:57 +00:00
|
|
|
"""Do not allow copying from a directory."""
|
|
|
|
src = os.path.join(self.worktree, 'dir')
|
|
|
|
os.makedirs(src)
|
|
|
|
cf = self.CopyFile('dir', 'foo')
|
|
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
|
|
|
|
def test_dest_block_symlink(self):
|
|
|
|
"""Do not allow writing to a symlink."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
2020-02-20 03:36:26 +00:00
|
|
|
platform_utils.symlink('dest', os.path.join(self.topdir, 'sym'))
|
2019-08-02 19:57:57 +00:00
|
|
|
cf = self.CopyFile('foo.txt', 'sym')
|
|
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
|
|
|
|
def test_dest_block_symlink_traversal(self):
|
|
|
|
"""Do not allow writing through a symlink dir."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
2020-02-20 03:36:26 +00:00
|
|
|
platform_utils.symlink(tempfile.gettempdir(),
|
|
|
|
os.path.join(self.topdir, 'sym'))
|
2019-08-02 19:57:57 +00:00
|
|
|
cf = self.CopyFile('foo.txt', 'sym/foo.txt')
|
|
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
|
2020-02-12 02:14:55 +00:00
|
|
|
def test_src_block_copy_to_dir(self):
|
2019-08-02 19:57:57 +00:00
|
|
|
"""Do not allow copying to a directory."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
|
|
|
os.makedirs(os.path.join(self.topdir, 'dir'))
|
|
|
|
cf = self.CopyFile('foo.txt', 'dir')
|
|
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
|
|
|
|
|
|
|
|
class LinkFile(CopyLinkTestCase):
|
|
|
|
"""Check _LinkFile handling."""
|
|
|
|
|
|
|
|
def LinkFile(self, src, dest):
|
|
|
|
return project._LinkFile(self.worktree, src, self.topdir, dest)
|
|
|
|
|
|
|
|
def test_basic(self):
|
|
|
|
"""Basic test of linking a file from a project into the toplevel."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
|
|
|
lf = self.LinkFile('foo.txt', 'foo')
|
|
|
|
lf._Link()
|
|
|
|
dest = os.path.join(self.topdir, 'foo')
|
|
|
|
self.assertExists(dest)
|
|
|
|
self.assertTrue(os.path.islink(dest))
|
2020-02-20 03:36:26 +00:00
|
|
|
self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest))
|
2019-08-02 19:57:57 +00:00
|
|
|
|
|
|
|
def test_src_subdir(self):
|
|
|
|
"""Link to a file in a subdir of a project."""
|
|
|
|
src = os.path.join(self.worktree, 'bar', 'foo.txt')
|
|
|
|
os.makedirs(os.path.dirname(src))
|
|
|
|
self.touch(src)
|
|
|
|
lf = self.LinkFile('bar/foo.txt', 'foo')
|
|
|
|
lf._Link()
|
|
|
|
self.assertExists(os.path.join(self.topdir, 'foo'))
|
|
|
|
|
2020-02-11 02:35:48 +00:00
|
|
|
def test_src_self(self):
|
|
|
|
"""Link to the project itself."""
|
|
|
|
dest = os.path.join(self.topdir, 'foo', 'bar')
|
|
|
|
lf = self.LinkFile('.', 'foo/bar')
|
|
|
|
lf._Link()
|
|
|
|
self.assertExists(dest)
|
2020-02-20 03:36:26 +00:00
|
|
|
self.assertEqual(os.path.join('..', 'git-project'), os.readlink(dest))
|
2020-02-11 02:35:48 +00:00
|
|
|
|
2019-08-02 19:57:57 +00:00
|
|
|
def test_dest_subdir(self):
|
|
|
|
"""Link a file to a subdir of a checkout."""
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
|
|
|
lf = self.LinkFile('foo.txt', 'sub/dir/foo/bar')
|
|
|
|
self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
|
|
|
|
lf._Link()
|
|
|
|
self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'foo', 'bar'))
|
|
|
|
|
2020-02-11 02:35:48 +00:00
|
|
|
def test_src_block_relative(self):
|
|
|
|
"""Do not allow relative symlinks."""
|
|
|
|
BAD_SOURCES = (
|
|
|
|
'./',
|
|
|
|
'..',
|
|
|
|
'../',
|
|
|
|
'foo/.',
|
|
|
|
'foo/./bar',
|
|
|
|
'foo/..',
|
|
|
|
'foo/../foo',
|
|
|
|
)
|
|
|
|
for src in BAD_SOURCES:
|
|
|
|
lf = self.LinkFile(src, 'foo')
|
|
|
|
self.assertRaises(error.ManifestInvalidPathError, lf._Link)
|
|
|
|
|
2019-08-02 19:57:57 +00:00
|
|
|
def test_update(self):
|
|
|
|
"""Make sure changed targets get updated."""
|
|
|
|
dest = os.path.join(self.topdir, 'sym')
|
|
|
|
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
|
|
self.touch(src)
|
|
|
|
lf = self.LinkFile('foo.txt', 'sym')
|
|
|
|
lf._Link()
|
2020-02-20 03:36:26 +00:00
|
|
|
self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest))
|
2019-08-02 19:57:57 +00:00
|
|
|
|
|
|
|
# Point the symlink somewhere else.
|
|
|
|
os.unlink(dest)
|
2020-02-20 03:36:26 +00:00
|
|
|
platform_utils.symlink(self.tempdir, dest)
|
2019-08-02 19:57:57 +00:00
|
|
|
lf._Link()
|
2020-02-20 03:36:26 +00:00
|
|
|
self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest))
|