# Copyright (C) 2019 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unittests for the project.py module."""

import contextlib
import os
from pathlib import Path
import shutil
import subprocess
import tempfile
import unittest

import error
import git_command
import git_config
import platform_utils
import project


@contextlib.contextmanager
def TempGitTree():
  """Create a new empty git checkout for testing."""
  # TODO(vapier): Convert this to tempfile.TemporaryDirectory once we drop
  # Python 2 support entirely.
  try:
    tempdir = tempfile.mkdtemp(prefix='repo-tests')

    # Tests need to assume, that main is default branch at init,
    # which is not supported in config until 2.28.
    cmd = ['git', 'init']
    if git_command.git_require((2, 28, 0)):
      cmd += ['--initial-branch=main']
    else:
      # Use template dir for init.
      templatedir = tempfile.mkdtemp(prefix='.test-template')
      with open(os.path.join(templatedir, 'HEAD'), 'w') as fp:
        fp.write('ref: refs/heads/main\n')
      cmd += ['--template', templatedir]
    subprocess.check_call(cmd, cwd=tempdir)
    yield tempdir
  finally:
    platform_utils.rmtree(tempdir)


class FakeProject(object):
  """A fake for Project for basic functionality."""

  def __init__(self, worktree):
    self.worktree = worktree
    self.gitdir = os.path.join(worktree, '.git')
    self.name = 'fakeproject'
    self.work_git = project.Project._GitGetByExec(
        self, bare=False, gitdir=self.gitdir)
    self.bare_git = project.Project._GitGetByExec(
        self, bare=True, gitdir=self.gitdir)
    self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir)


class ReviewableBranchTests(unittest.TestCase):
  """Check ReviewableBranch behavior."""

  def test_smoke(self):
    """A quick run through everything."""
    with TempGitTree() as tempdir:
      fakeproj = FakeProject(tempdir)

      # Generate some commits.
      with open(os.path.join(tempdir, 'readme'), 'w') as fp:
        fp.write('txt')
      fakeproj.work_git.add('readme')
      fakeproj.work_git.commit('-mAdd file')
      fakeproj.work_git.checkout('-b', 'work')
      fakeproj.work_git.rm('-f', 'readme')
      fakeproj.work_git.commit('-mDel file')

      # Start off with the normal details.
      rb = project.ReviewableBranch(
          fakeproj, fakeproj.config.GetBranch('work'), 'main')
      self.assertEqual('work', rb.name)
      self.assertEqual(1, len(rb.commits))
      self.assertIn('Del file', rb.commits[0])
      d = rb.unabbrev_commits
      self.assertEqual(1, len(d))
      short, long = next(iter(d.items()))
      self.assertTrue(long.startswith(short))
      self.assertTrue(rb.base_exists)
      # Hard to assert anything useful about this.
      self.assertTrue(rb.date)

      # Now delete the tracking branch!
      fakeproj.work_git.branch('-D', 'main')
      rb = project.ReviewableBranch(
          fakeproj, fakeproj.config.GetBranch('work'), 'main')
      self.assertEqual(0, len(rb.commits))
      self.assertFalse(rb.base_exists)
      # Hard to assert anything useful about this.
      self.assertTrue(rb.date)


class CopyLinkTestCase(unittest.TestCase):
  """TestCase for stub repo client checkouts.

  It'll have a layout like:
    tempdir/          # self.tempdir
      checkout/       # self.topdir
        git-project/  # self.worktree

  Attributes:
    tempdir: A dedicated temporary directory.
    worktree: The top of the repo client checkout.
    topdir: The top of a project checkout.
  """

  def setUp(self):
    self.tempdir = tempfile.mkdtemp(prefix='repo_tests')
    self.topdir = os.path.join(self.tempdir, 'checkout')
    self.worktree = os.path.join(self.topdir, 'git-project')
    os.makedirs(self.topdir)
    os.makedirs(self.worktree)

  def tearDown(self):
    shutil.rmtree(self.tempdir, ignore_errors=True)

  @staticmethod
  def touch(path):
    with open(path, 'w'):
      pass

  def assertExists(self, path, msg=None):
    """Make sure |path| exists."""
    if os.path.exists(path):
      return

    if msg is None:
      msg = ['path is missing: %s' % path]
      while path != '/':
        path = os.path.dirname(path)
        if not path:
          # If we're given something like "foo", abort once we get to "".
          break
        result = os.path.exists(path)
        msg.append('\tos.path.exists(%s): %s' % (path, result))
        if result:
          msg.append('\tcontents: %r' % os.listdir(path))
          break
      msg = '\n'.join(msg)

    raise self.failureException(msg)


class CopyFile(CopyLinkTestCase):
  """Check _CopyFile handling."""

  def CopyFile(self, src, dest):
    return project._CopyFile(self.worktree, src, self.topdir, dest)

  def test_basic(self):
    """Basic test of copying a file from a project to the toplevel."""
    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    cf = self.CopyFile('foo.txt', 'foo')
    cf._Copy()
    self.assertExists(os.path.join(self.topdir, 'foo'))

  def test_src_subdir(self):
    """Copy a file from a subdir of a project."""
    src = os.path.join(self.worktree, 'bar', 'foo.txt')
    os.makedirs(os.path.dirname(src))
    self.touch(src)
    cf = self.CopyFile('bar/foo.txt', 'new.txt')
    cf._Copy()
    self.assertExists(os.path.join(self.topdir, 'new.txt'))

  def test_dest_subdir(self):
    """Copy a file to a subdir of a checkout."""
    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    cf = self.CopyFile('foo.txt', 'sub/dir/new.txt')
    self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
    cf._Copy()
    self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'new.txt'))

  def test_update(self):
    """Make sure changed files get copied again."""
    src = os.path.join(self.worktree, 'foo.txt')
    dest = os.path.join(self.topdir, 'bar')
    with open(src, 'w') as f:
      f.write('1st')
    cf = self.CopyFile('foo.txt', 'bar')
    cf._Copy()
    self.assertExists(dest)
    with open(dest) as f:
      self.assertEqual(f.read(), '1st')

    with open(src, 'w') as f:
      f.write('2nd!')
    cf._Copy()
    with open(dest) as f:
      self.assertEqual(f.read(), '2nd!')

  def test_src_block_symlink(self):
    """Do not allow reading from a symlinked path."""
    src = os.path.join(self.worktree, 'foo.txt')
    sym = os.path.join(self.worktree, 'sym')
    self.touch(src)
    platform_utils.symlink('foo.txt', sym)
    self.assertExists(sym)
    cf = self.CopyFile('sym', 'foo')
    self.assertRaises(error.ManifestInvalidPathError, cf._Copy)

  def test_src_block_symlink_traversal(self):
    """Do not allow reading through a symlink dir."""
    realfile = os.path.join(self.tempdir, 'file.txt')
    self.touch(realfile)
    src = os.path.join(self.worktree, 'bar', 'file.txt')
    platform_utils.symlink(self.tempdir, os.path.join(self.worktree, 'bar'))
    self.assertExists(src)
    cf = self.CopyFile('bar/file.txt', 'foo')
    self.assertRaises(error.ManifestInvalidPathError, cf._Copy)

  def test_src_block_copy_from_dir(self):
    """Do not allow copying from a directory."""
    src = os.path.join(self.worktree, 'dir')
    os.makedirs(src)
    cf = self.CopyFile('dir', 'foo')
    self.assertRaises(error.ManifestInvalidPathError, cf._Copy)

  def test_dest_block_symlink(self):
    """Do not allow writing to a symlink."""
    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    platform_utils.symlink('dest', os.path.join(self.topdir, 'sym'))
    cf = self.CopyFile('foo.txt', 'sym')
    self.assertRaises(error.ManifestInvalidPathError, cf._Copy)

  def test_dest_block_symlink_traversal(self):
    """Do not allow writing through a symlink dir."""
    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    platform_utils.symlink(tempfile.gettempdir(),
                           os.path.join(self.topdir, 'sym'))
    cf = self.CopyFile('foo.txt', 'sym/foo.txt')
    self.assertRaises(error.ManifestInvalidPathError, cf._Copy)

  def test_src_block_copy_to_dir(self):
    """Do not allow copying to a directory."""
    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    os.makedirs(os.path.join(self.topdir, 'dir'))
    cf = self.CopyFile('foo.txt', 'dir')
    self.assertRaises(error.ManifestInvalidPathError, cf._Copy)


class LinkFile(CopyLinkTestCase):
  """Check _LinkFile handling."""

  def LinkFile(self, src, dest):
    return project._LinkFile(self.worktree, src, self.topdir, dest)

  def test_basic(self):
    """Basic test of linking a file from a project into the toplevel."""
    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    lf = self.LinkFile('foo.txt', 'foo')
    lf._Link()
    dest = os.path.join(self.topdir, 'foo')
    self.assertExists(dest)
    self.assertTrue(os.path.islink(dest))
    self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest))

  def test_src_subdir(self):
    """Link to a file in a subdir of a project."""
    src = os.path.join(self.worktree, 'bar', 'foo.txt')
    os.makedirs(os.path.dirname(src))
    self.touch(src)
    lf = self.LinkFile('bar/foo.txt', 'foo')
    lf._Link()
    self.assertExists(os.path.join(self.topdir, 'foo'))

  def test_src_self(self):
    """Link to the project itself."""
    dest = os.path.join(self.topdir, 'foo', 'bar')
    lf = self.LinkFile('.', 'foo/bar')
    lf._Link()
    self.assertExists(dest)
    self.assertEqual(os.path.join('..', 'git-project'), os.readlink(dest))

  def test_dest_subdir(self):
    """Link a file to a subdir of a checkout."""
    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    lf = self.LinkFile('foo.txt', 'sub/dir/foo/bar')
    self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
    lf._Link()
    self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'foo', 'bar'))

  def test_src_block_relative(self):
    """Do not allow relative symlinks."""
    BAD_SOURCES = (
        './',
        '..',
        '../',
        'foo/.',
        'foo/./bar',
        'foo/..',
        'foo/../foo',
    )
    for src in BAD_SOURCES:
      lf = self.LinkFile(src, 'foo')
      self.assertRaises(error.ManifestInvalidPathError, lf._Link)

  def test_update(self):
    """Make sure changed targets get updated."""
    dest = os.path.join(self.topdir, 'sym')

    src = os.path.join(self.worktree, 'foo.txt')
    self.touch(src)
    lf = self.LinkFile('foo.txt', 'sym')
    lf._Link()
    self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest))

    # Point the symlink somewhere else.
    os.unlink(dest)
    platform_utils.symlink(self.tempdir, dest)
    lf._Link()
    self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest))


class MigrateWorkTreeTests(unittest.TestCase):
  """Check _MigrateOldWorkTreeGitDir handling."""

  _SYMLINKS = {
      'config', 'description', 'hooks', 'info', 'logs', 'objects',
      'packed-refs', 'refs', 'rr-cache', 'shallow', 'svn',
  }
  _FILES = {
      'COMMIT_EDITMSG', 'FETCH_HEAD', 'HEAD', 'index', 'ORIG_HEAD',
      'unknown-file-should-be-migrated',
  }
  _CLEAN_FILES = {
      'a-vim-temp-file~', '#an-emacs-temp-file#',
  }

  @classmethod
  @contextlib.contextmanager
  def _simple_layout(cls):
    """Create a simple repo client checkout to test against."""
    with tempfile.TemporaryDirectory() as tempdir:
      tempdir = Path(tempdir)

      gitdir = tempdir / '.repo/projects/src/test.git'
      gitdir.mkdir(parents=True)
      cmd = ['git', 'init', '--bare', str(gitdir)]
      subprocess.check_call(cmd)

      dotgit = tempdir / 'src/test/.git'
      dotgit.mkdir(parents=True)
      for name in cls._SYMLINKS:
        (dotgit / name).symlink_to(f'../../../.repo/projects/src/test.git/{name}')
      for name in cls._FILES | cls._CLEAN_FILES:
        (dotgit / name).write_text(name)

      yield tempdir

  def test_standard(self):
    """Migrate a standard checkout that we expect."""
    with self._simple_layout() as tempdir:
      dotgit = tempdir / 'src/test/.git'
      project.Project._MigrateOldWorkTreeGitDir(str(dotgit))

      # Make sure the dir was transformed into a symlink.
      self.assertTrue(dotgit.is_symlink())
      self.assertEqual(os.readlink(dotgit), '../../.repo/projects/src/test.git')

      # Make sure files were moved over.
      gitdir = tempdir / '.repo/projects/src/test.git'
      for name in self._FILES:
        self.assertEqual(name, (gitdir / name).read_text())
      # Make sure files were removed.
      for name in self._CLEAN_FILES:
        self.assertFalse((gitdir / name).exists())

  def test_unknown(self):
    """A checkout with unknown files should abort."""
    with self._simple_layout() as tempdir:
      dotgit = tempdir / 'src/test/.git'
      (tempdir / '.repo/projects/src/test.git/random-file').write_text('one')
      (dotgit / 'random-file').write_text('two')
      with self.assertRaises(error.GitError):
        project.Project._MigrateOldWorkTreeGitDir(str(dotgit))

      # Make sure no content was actually changed.
      self.assertTrue(dotgit.is_dir())
      for name in self._FILES:
        self.assertTrue((dotgit / name).is_file())
      for name in self._CLEAN_FILES:
        self.assertTrue((dotgit / name).is_file())
      for name in self._SYMLINKS:
        self.assertTrue((dotgit / name).is_symlink())