mirror of
https://gerrit.googlesource.com/git-repo
synced 2025-01-22 16:14:28 +00:00
e6a202f790
Reject paths in <copyfile> & <linkfile> that try to use symlinks or non-file or non-dirs. We don't fully validate <linkfile> when src is a glob as it's a bit complicated -- any component in the src could be the glob. We make sure the destination is a directory, and that any paths in that dir are created as symlinks. So while this can be used to read any path, it can't be abused to write to any paths. Bug: https://crbug.com/gerrit/11218 Change-Id: I68b6d789b5ca4e43f569e75e8b293b3e13d3224b Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/233074 Tested-by: Mike Frysinger <vapier@google.com> Reviewed-by: Mike Frysinger <vapier@google.com> Reviewed-by: Michael Mortensen <mmortensen@google.com>
341 lines
11 KiB
Python
341 lines
11 KiB
Python
# -*- coding:utf-8 -*-
|
|
#
|
|
# Copyright (C) 2019 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Unittests for the project.py module."""
|
|
|
|
from __future__ import print_function
|
|
|
|
import contextlib
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
|
|
import error
|
|
import git_config
|
|
import project
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def TempGitTree():
|
|
"""Create a new empty git checkout for testing."""
|
|
# TODO(vapier): Convert this to tempfile.TemporaryDirectory once we drop
|
|
# Python 2 support entirely.
|
|
try:
|
|
tempdir = tempfile.mkdtemp(prefix='repo-tests')
|
|
subprocess.check_call(['git', 'init'], cwd=tempdir)
|
|
yield tempdir
|
|
finally:
|
|
shutil.rmtree(tempdir)
|
|
|
|
|
|
class RepoHookShebang(unittest.TestCase):
|
|
"""Check shebang parsing in RepoHook."""
|
|
|
|
def test_no_shebang(self):
|
|
"""Lines w/out shebangs should be rejected."""
|
|
DATA = (
|
|
'',
|
|
'# -*- coding:utf-8 -*-\n',
|
|
'#\n# foo\n',
|
|
'# Bad shebang in script\n#!/foo\n'
|
|
)
|
|
for data in DATA:
|
|
self.assertIsNone(project.RepoHook._ExtractInterpFromShebang(data))
|
|
|
|
def test_direct_interp(self):
|
|
"""Lines whose shebang points directly to the interpreter."""
|
|
DATA = (
|
|
('#!/foo', '/foo'),
|
|
('#! /foo', '/foo'),
|
|
('#!/bin/foo ', '/bin/foo'),
|
|
('#! /usr/foo ', '/usr/foo'),
|
|
('#! /usr/foo -args', '/usr/foo'),
|
|
)
|
|
for shebang, interp in DATA:
|
|
self.assertEqual(project.RepoHook._ExtractInterpFromShebang(shebang),
|
|
interp)
|
|
|
|
def test_env_interp(self):
|
|
"""Lines whose shebang launches through `env`."""
|
|
DATA = (
|
|
('#!/usr/bin/env foo', 'foo'),
|
|
('#!/bin/env foo', 'foo'),
|
|
('#! /bin/env /bin/foo ', '/bin/foo'),
|
|
)
|
|
for shebang, interp in DATA:
|
|
self.assertEqual(project.RepoHook._ExtractInterpFromShebang(shebang),
|
|
interp)
|
|
|
|
|
|
class FakeProject(object):
|
|
"""A fake for Project for basic functionality."""
|
|
|
|
def __init__(self, worktree):
|
|
self.worktree = worktree
|
|
self.gitdir = os.path.join(worktree, '.git')
|
|
self.name = 'fakeproject'
|
|
self.work_git = project.Project._GitGetByExec(
|
|
self, bare=False, gitdir=self.gitdir)
|
|
self.bare_git = project.Project._GitGetByExec(
|
|
self, bare=True, gitdir=self.gitdir)
|
|
self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir)
|
|
|
|
|
|
class ReviewableBranchTests(unittest.TestCase):
|
|
"""Check ReviewableBranch behavior."""
|
|
|
|
def test_smoke(self):
|
|
"""A quick run through everything."""
|
|
with TempGitTree() as tempdir:
|
|
fakeproj = FakeProject(tempdir)
|
|
|
|
# Generate some commits.
|
|
with open(os.path.join(tempdir, 'readme'), 'w') as fp:
|
|
fp.write('txt')
|
|
fakeproj.work_git.add('readme')
|
|
fakeproj.work_git.commit('-mAdd file')
|
|
fakeproj.work_git.checkout('-b', 'work')
|
|
fakeproj.work_git.rm('-f', 'readme')
|
|
fakeproj.work_git.commit('-mDel file')
|
|
|
|
# Start off with the normal details.
|
|
rb = project.ReviewableBranch(
|
|
fakeproj, fakeproj.config.GetBranch('work'), 'master')
|
|
self.assertEqual('work', rb.name)
|
|
self.assertEqual(1, len(rb.commits))
|
|
self.assertIn('Del file', rb.commits[0])
|
|
d = rb.unabbrev_commits
|
|
self.assertEqual(1, len(d))
|
|
short, long = next(iter(d.items()))
|
|
self.assertTrue(long.startswith(short))
|
|
self.assertTrue(rb.base_exists)
|
|
# Hard to assert anything useful about this.
|
|
self.assertTrue(rb.date)
|
|
|
|
# Now delete the tracking branch!
|
|
fakeproj.work_git.branch('-D', 'master')
|
|
rb = project.ReviewableBranch(
|
|
fakeproj, fakeproj.config.GetBranch('work'), 'master')
|
|
self.assertEqual(0, len(rb.commits))
|
|
self.assertFalse(rb.base_exists)
|
|
# Hard to assert anything useful about this.
|
|
self.assertTrue(rb.date)
|
|
|
|
|
|
class CopyLinkTestCase(unittest.TestCase):
|
|
"""TestCase for stub repo client checkouts.
|
|
|
|
It'll have a layout like:
|
|
tempdir/ # self.tempdir
|
|
checkout/ # self.topdir
|
|
git-project/ # self.worktree
|
|
|
|
Attributes:
|
|
tempdir: A dedicated temporary directory.
|
|
worktree: The top of the repo client checkout.
|
|
topdir: The top of a project checkout.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.tempdir = tempfile.mkdtemp(prefix='repo_tests')
|
|
self.topdir = os.path.join(self.tempdir, 'checkout')
|
|
self.worktree = os.path.join(self.topdir, 'git-project')
|
|
os.makedirs(self.topdir)
|
|
os.makedirs(self.worktree)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tempdir, ignore_errors=True)
|
|
|
|
@staticmethod
|
|
def touch(path):
|
|
with open(path, 'w') as f:
|
|
pass
|
|
|
|
def assertExists(self, path, msg=None):
|
|
"""Make sure |path| exists."""
|
|
if os.path.exists(path):
|
|
return
|
|
|
|
if msg is None:
|
|
msg = ['path is missing: %s' % path]
|
|
while path != '/':
|
|
path = os.path.dirname(path)
|
|
if not path:
|
|
# If we're given something like "foo", abort once we get to "".
|
|
break
|
|
result = os.path.exists(path)
|
|
msg.append('\tos.path.exists(%s): %s' % (path, result))
|
|
if result:
|
|
msg.append('\tcontents: %r' % os.listdir(path))
|
|
break
|
|
msg = '\n'.join(msg)
|
|
|
|
raise self.failureException(msg)
|
|
|
|
|
|
class CopyFile(CopyLinkTestCase):
|
|
"""Check _CopyFile handling."""
|
|
|
|
def CopyFile(self, src, dest):
|
|
return project._CopyFile(self.worktree, src, self.topdir, dest)
|
|
|
|
def test_basic(self):
|
|
"""Basic test of copying a file from a project to the toplevel."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
cf = self.CopyFile('foo.txt', 'foo')
|
|
cf._Copy()
|
|
self.assertExists(os.path.join(self.topdir, 'foo'))
|
|
|
|
def test_src_subdir(self):
|
|
"""Copy a file from a subdir of a project."""
|
|
src = os.path.join(self.worktree, 'bar', 'foo.txt')
|
|
os.makedirs(os.path.dirname(src))
|
|
self.touch(src)
|
|
cf = self.CopyFile('bar/foo.txt', 'new.txt')
|
|
cf._Copy()
|
|
self.assertExists(os.path.join(self.topdir, 'new.txt'))
|
|
|
|
def test_dest_subdir(self):
|
|
"""Copy a file to a subdir of a checkout."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
cf = self.CopyFile('foo.txt', 'sub/dir/new.txt')
|
|
self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
|
|
cf._Copy()
|
|
self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'new.txt'))
|
|
|
|
def test_update(self):
|
|
"""Make sure changed files get copied again."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
dest = os.path.join(self.topdir, 'bar')
|
|
with open(src, 'w') as f:
|
|
f.write('1st')
|
|
cf = self.CopyFile('foo.txt', 'bar')
|
|
cf._Copy()
|
|
self.assertExists(dest)
|
|
with open(dest) as f:
|
|
self.assertEqual(f.read(), '1st')
|
|
|
|
with open(src, 'w') as f:
|
|
f.write('2nd!')
|
|
cf._Copy()
|
|
with open(dest) as f:
|
|
self.assertEqual(f.read(), '2nd!')
|
|
|
|
def test_src_block_symlink(self):
|
|
"""Do not allow reading from a symlinked path."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
sym = os.path.join(self.worktree, 'sym')
|
|
self.touch(src)
|
|
os.symlink('foo.txt', sym)
|
|
self.assertExists(sym)
|
|
cf = self.CopyFile('sym', 'foo')
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
def test_src_block_symlink_traversal(self):
|
|
"""Do not allow reading through a symlink dir."""
|
|
src = os.path.join(self.worktree, 'bar', 'passwd')
|
|
os.symlink('/etc', os.path.join(self.worktree, 'bar'))
|
|
self.assertExists(src)
|
|
cf = self.CopyFile('bar/foo.txt', 'foo')
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
def test_src_block_dir(self):
|
|
"""Do not allow copying from a directory."""
|
|
src = os.path.join(self.worktree, 'dir')
|
|
os.makedirs(src)
|
|
cf = self.CopyFile('dir', 'foo')
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
def test_dest_block_symlink(self):
|
|
"""Do not allow writing to a symlink."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
os.symlink('dest', os.path.join(self.topdir, 'sym'))
|
|
cf = self.CopyFile('foo.txt', 'sym')
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
def test_dest_block_symlink_traversal(self):
|
|
"""Do not allow writing through a symlink dir."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
os.symlink('/tmp', os.path.join(self.topdir, 'sym'))
|
|
cf = self.CopyFile('foo.txt', 'sym/foo.txt')
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
def test_src_block_dir(self):
|
|
"""Do not allow copying to a directory."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
os.makedirs(os.path.join(self.topdir, 'dir'))
|
|
cf = self.CopyFile('foo.txt', 'dir')
|
|
self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
|
|
|
|
|
|
class LinkFile(CopyLinkTestCase):
|
|
"""Check _LinkFile handling."""
|
|
|
|
def LinkFile(self, src, dest):
|
|
return project._LinkFile(self.worktree, src, self.topdir, dest)
|
|
|
|
def test_basic(self):
|
|
"""Basic test of linking a file from a project into the toplevel."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
lf = self.LinkFile('foo.txt', 'foo')
|
|
lf._Link()
|
|
dest = os.path.join(self.topdir, 'foo')
|
|
self.assertExists(dest)
|
|
self.assertTrue(os.path.islink(dest))
|
|
self.assertEqual('git-project/foo.txt', os.readlink(dest))
|
|
|
|
def test_src_subdir(self):
|
|
"""Link to a file in a subdir of a project."""
|
|
src = os.path.join(self.worktree, 'bar', 'foo.txt')
|
|
os.makedirs(os.path.dirname(src))
|
|
self.touch(src)
|
|
lf = self.LinkFile('bar/foo.txt', 'foo')
|
|
lf._Link()
|
|
self.assertExists(os.path.join(self.topdir, 'foo'))
|
|
|
|
def test_dest_subdir(self):
|
|
"""Link a file to a subdir of a checkout."""
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
lf = self.LinkFile('foo.txt', 'sub/dir/foo/bar')
|
|
self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
|
|
lf._Link()
|
|
self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'foo', 'bar'))
|
|
|
|
def test_update(self):
|
|
"""Make sure changed targets get updated."""
|
|
dest = os.path.join(self.topdir, 'sym')
|
|
|
|
src = os.path.join(self.worktree, 'foo.txt')
|
|
self.touch(src)
|
|
lf = self.LinkFile('foo.txt', 'sym')
|
|
lf._Link()
|
|
self.assertEqual('git-project/foo.txt', os.readlink(dest))
|
|
|
|
# Point the symlink somewhere else.
|
|
os.unlink(dest)
|
|
os.symlink('/', dest)
|
|
lf._Link()
|
|
self.assertEqual('git-project/foo.txt', os.readlink(dest))
|