diff --git a/command.py b/command.py index 2a2ce138..22115ac2 100644 --- a/command.py +++ b/command.py @@ -268,8 +268,10 @@ class Command: cls._parallel_context = None @classmethod - def _SetParallelContext(cls, context): + def _InitParallelWorker(cls, context, initializer): cls._parallel_context = context + if initializer: + initializer() @classmethod def ExecuteInParallel( @@ -281,6 +283,7 @@ class Command: output=None, ordered=False, chunksize=WORKER_BATCH_SIZE, + initializer=None, ): """Helper for managing parallel execution boiler plate. @@ -307,6 +310,7 @@ class Command: ordered: Whether the jobs should be processed in order. chunksize: The number of jobs processed in batch by parallel workers. + initializer: Worker initializer. Returns: The |callback| function's results are returned. @@ -318,8 +322,8 @@ class Command: else: with multiprocessing.Pool( jobs, - initializer=cls._SetParallelContext, - initargs=(cls._parallel_context,), + initializer=cls._InitParallelWorker, + initargs=(cls._parallel_context, initializer), ) as pool: submit = pool.imap if ordered else pool.imap_unordered return callback( diff --git a/subcmds/abandon.py b/subcmds/abandon.py index e280d69e..3208be6b 100644 --- a/subcmds/abandon.py +++ b/subcmds/abandon.py @@ -70,8 +70,10 @@ It is equivalent to "git branch -D ". else: args.insert(0, "'All local branches'") - def _ExecuteOne(self, all_branches, nb, project): + @classmethod + def _ExecuteOne(cls, all_branches, nb, project_idx): """Abandon one project.""" + project = cls.get_parallel_context()["projects"][project_idx] if all_branches: branches = project.GetBranches() else: @@ -89,7 +91,7 @@ It is equivalent to "git branch -D ". if status is not None: ret[name] = status - return (ret, project, errors) + return (ret, project_idx, errors) def Execute(self, opt, args): nb = args[0].split() @@ -102,7 +104,8 @@ It is equivalent to "git branch -D ". _RelPath = lambda p: p.RelPath(local=opt.this_manifest_only) def _ProcessResults(_pool, pm, states): - for results, project, errors in states: + for results, project_idx, errors in states: + project = all_projects[project_idx] for branch, status in results.items(): if status: success[branch].append(project) @@ -111,15 +114,18 @@ It is equivalent to "git branch -D ". aggregate_errors.extend(errors) pm.update(msg="") - self.ExecuteInParallel( - opt.jobs, - functools.partial(self._ExecuteOne, opt.all, nb), - all_projects, - callback=_ProcessResults, - output=Progress( - f"Abandon {nb}", len(all_projects), quiet=opt.quiet - ), - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = all_projects + self.ExecuteInParallel( + opt.jobs, + functools.partial(self._ExecuteOne, opt.all, nb), + range(len(all_projects)), + callback=_ProcessResults, + output=Progress( + f"Abandon {nb}", len(all_projects), quiet=opt.quiet + ), + chunksize=1, + ) width = max( itertools.chain( diff --git a/subcmds/branches.py b/subcmds/branches.py index 59b5cb28..08c6389c 100644 --- a/subcmds/branches.py +++ b/subcmds/branches.py @@ -98,6 +98,22 @@ is shown, then the branch appears in all projects. """ PARALLEL_JOBS = DEFAULT_LOCAL_JOBS + @classmethod + def _ExpandProjectToBranches(cls, project_idx): + """Expands a project into a list of branch names & associated info. + + Args: + project_idx: project.Project index + + Returns: + List[Tuple[str, git_config.Branch, int]] + """ + branches = [] + project = cls.get_parallel_context()["projects"][project_idx] + for name, b in project.GetBranches().items(): + branches.append((name, b, project_idx)) + return branches + def Execute(self, opt, args): projects = self.GetProjects( args, all_manifests=not opt.this_manifest_only @@ -107,17 +123,20 @@ is shown, then the branch appears in all projects. project_cnt = len(projects) def _ProcessResults(_pool, _output, results): - for name, b in itertools.chain.from_iterable(results): + for name, b, project_idx in itertools.chain.from_iterable(results): + b.project = projects[project_idx] if name not in all_branches: all_branches[name] = BranchInfo(name) all_branches[name].add(b) - self.ExecuteInParallel( - opt.jobs, - expand_project_to_branches, - projects, - callback=_ProcessResults, - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = projects + self.ExecuteInParallel( + opt.jobs, + self._ExpandProjectToBranches, + range(len(projects)), + callback=_ProcessResults, + ) names = sorted(all_branches) @@ -191,19 +210,3 @@ is shown, then the branch appears in all projects. else: out.write(" in all projects") out.nl() - - -def expand_project_to_branches(project): - """Expands a project into a list of branch names & associated information. - - Args: - project: project.Project - - Returns: - List[Tuple[str, git_config.Branch]] - """ - branches = [] - for name, b in project.GetBranches().items(): - b.project = project - branches.append((name, b)) - return branches diff --git a/subcmds/checkout.py b/subcmds/checkout.py index 379bfa18..859ddf6c 100644 --- a/subcmds/checkout.py +++ b/subcmds/checkout.py @@ -20,7 +20,6 @@ from command import DEFAULT_LOCAL_JOBS from error import GitError from error import RepoExitError from progress import Progress -from project import Project from repo_logging import RepoLogger @@ -30,7 +29,7 @@ logger = RepoLogger(__file__) class CheckoutBranchResult(NamedTuple): # Whether the Project is on the branch (i.e. branch exists and no errors) result: bool - project: Project + project_idx: int error: Exception @@ -62,15 +61,17 @@ The command is equivalent to: if not args: self.Usage() - def _ExecuteOne(self, nb, project): + @classmethod + def _ExecuteOne(cls, nb, project_idx): """Checkout one project.""" error = None result = None + project = cls.get_parallel_context()["projects"][project_idx] try: result = project.CheckoutBranch(nb) except GitError as e: error = e - return CheckoutBranchResult(result, project, error) + return CheckoutBranchResult(result, project_idx, error) def Execute(self, opt, args): nb = args[0] @@ -83,22 +84,25 @@ The command is equivalent to: def _ProcessResults(_pool, pm, results): for result in results: + project = all_projects[result.project_idx] if result.error is not None: err.append(result.error) - err_projects.append(result.project) + err_projects.append(project) elif result.result: - success.append(result.project) + success.append(project) pm.update(msg="") - self.ExecuteInParallel( - opt.jobs, - functools.partial(self._ExecuteOne, nb), - all_projects, - callback=_ProcessResults, - output=Progress( - f"Checkout {nb}", len(all_projects), quiet=opt.quiet - ), - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = all_projects + self.ExecuteInParallel( + opt.jobs, + functools.partial(self._ExecuteOne, nb), + range(len(all_projects)), + callback=_ProcessResults, + output=Progress( + f"Checkout {nb}", len(all_projects), quiet=opt.quiet + ), + ) if err_projects: for p in err_projects: diff --git a/subcmds/diff.py b/subcmds/diff.py index d9d72b40..7bb0cbbd 100644 --- a/subcmds/diff.py +++ b/subcmds/diff.py @@ -40,7 +40,8 @@ to the Unix 'patch' command. help="paths are relative to the repository root", ) - def _ExecuteOne(self, absolute, local, project): + @classmethod + def _ExecuteOne(cls, absolute, local, project_idx): """Obtains the diff for a specific project. Args: @@ -48,12 +49,13 @@ to the Unix 'patch' command. local: a boolean, if True, the path is relative to the local (sub)manifest. If false, the path is relative to the outermost manifest. - project: Project to get status of. + project_idx: Project index to get status of. Returns: The status of the project. """ buf = io.StringIO() + project = cls.get_parallel_context()["projects"][project_idx] ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local) return (ret, buf.getvalue()) @@ -71,12 +73,15 @@ to the Unix 'patch' command. ret = 1 return ret - return self.ExecuteInParallel( - opt.jobs, - functools.partial( - self._ExecuteOne, opt.absolute, opt.this_manifest_only - ), - all_projects, - callback=_ProcessResults, - ordered=True, - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = all_projects + return self.ExecuteInParallel( + opt.jobs, + functools.partial( + self._ExecuteOne, opt.absolute, opt.this_manifest_only + ), + range(len(all_projects)), + callback=_ProcessResults, + ordered=True, + chunksize=1, + ) diff --git a/subcmds/forall.py b/subcmds/forall.py index 287f2e04..e5fc9e80 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py @@ -15,7 +15,6 @@ import errno import functools import io -import multiprocessing import os import re import signal @@ -26,7 +25,6 @@ from color import Coloring from command import Command from command import DEFAULT_LOCAL_JOBS from command import MirrorSafeCommand -from command import WORKER_BATCH_SIZE from error import ManifestInvalidRevisionError from repo_logging import RepoLogger @@ -241,7 +239,6 @@ without iterating through the remaining projects. cmd.insert(cmd.index(cn) + 1, "--color") mirror = self.manifest.IsMirror - rc = 0 smart_sync_manifest_name = "smart_sync_override.xml" smart_sync_manifest_path = os.path.join( @@ -264,32 +261,41 @@ without iterating through the remaining projects. os.environ["REPO_COUNT"] = str(len(projects)) + def _ProcessResults(_pool, _output, results): + rc = 0 + first = True + for r, output in results: + if output: + if first: + first = False + elif opt.project_header: + print() + # To simplify the DoWorkWrapper, take care of automatic + # newlines. + end = "\n" + if output[-1] == "\n": + end = "" + print(output, end=end) + rc = rc or r + if r != 0 and opt.abort_on_errors: + raise Exception("Aborting due to previous error") + return rc + try: config = self.manifest.manifestProject.config - with multiprocessing.Pool(opt.jobs, InitWorker) as pool: - results_it = pool.imap( + with self.ParallelContext(): + self.get_parallel_context()["projects"] = projects + rc = self.ExecuteInParallel( + opt.jobs, functools.partial( - DoWorkWrapper, mirror, opt, cmd, shell, config + self.DoWorkWrapper, mirror, opt, cmd, shell, config ), - enumerate(projects), - chunksize=WORKER_BATCH_SIZE, + range(len(projects)), + callback=_ProcessResults, + ordered=True, + initializer=self.InitWorker, + chunksize=1, ) - first = True - for r, output in results_it: - if output: - if first: - first = False - elif opt.project_header: - print() - # To simplify the DoWorkWrapper, take care of automatic - # newlines. - end = "\n" - if output[-1] == "\n": - end = "" - print(output, end=end) - rc = rc or r - if r != 0 and opt.abort_on_errors: - raise Exception("Aborting due to previous error") except (KeyboardInterrupt, WorkerKeyboardInterrupt): # Catch KeyboardInterrupt raised inside and outside of workers rc = rc or errno.EINTR @@ -304,31 +310,31 @@ without iterating through the remaining projects. if rc != 0: sys.exit(rc) + @classmethod + def InitWorker(cls): + signal.signal(signal.SIGINT, signal.SIG_IGN) + + @classmethod + def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx): + """A wrapper around the DoWork() method. + + Catch the KeyboardInterrupt exceptions here and re-raise them as a + different, ``Exception``-based exception to stop it flooding the console + with stacktraces and making the parent hang indefinitely. + + """ + project = cls.get_parallel_context()["projects"][project_idx] + try: + return DoWork(project, mirror, opt, cmd, shell, project_idx, config) + except KeyboardInterrupt: + print("%s: Worker interrupted" % project.name) + raise WorkerKeyboardInterrupt() + class WorkerKeyboardInterrupt(Exception): """Keyboard interrupt exception for worker processes.""" -def InitWorker(): - signal.signal(signal.SIGINT, signal.SIG_IGN) - - -def DoWorkWrapper(mirror, opt, cmd, shell, config, args): - """A wrapper around the DoWork() method. - - Catch the KeyboardInterrupt exceptions here and re-raise them as a - different, ``Exception``-based exception to stop it flooding the console - with stacktraces and making the parent hang indefinitely. - - """ - cnt, project = args - try: - return DoWork(project, mirror, opt, cmd, shell, cnt, config) - except KeyboardInterrupt: - print("%s: Worker interrupted" % project.name) - raise WorkerKeyboardInterrupt() - - def DoWork(project, mirror, opt, cmd, shell, cnt, config): env = os.environ.copy() diff --git a/subcmds/grep.py b/subcmds/grep.py index b677b6bd..918651d9 100644 --- a/subcmds/grep.py +++ b/subcmds/grep.py @@ -23,7 +23,6 @@ from error import GitError from error import InvalidArgumentsError from error import SilentRepoExitError from git_command import GitCommand -from project import Project from repo_logging import RepoLogger @@ -40,7 +39,7 @@ class GrepColoring(Coloring): class ExecuteOneResult(NamedTuple): """Result from an execute instance.""" - project: Project + project_idx: int rc: int stdout: str stderr: str @@ -262,8 +261,10 @@ contain a line that matches both expressions: help="Show only file names not containing matching lines", ) - def _ExecuteOne(self, cmd_argv, project): + @classmethod + def _ExecuteOne(cls, cmd_argv, project_idx): """Process one project.""" + project = cls.get_parallel_context()["projects"][project_idx] try: p = GitCommand( project, @@ -274,7 +275,7 @@ contain a line that matches both expressions: verify_command=True, ) except GitError as e: - return ExecuteOneResult(project, -1, None, str(e), e) + return ExecuteOneResult(project_idx, -1, None, str(e), e) try: error = None @@ -282,10 +283,12 @@ contain a line that matches both expressions: except GitError as e: rc = 1 error = e - return ExecuteOneResult(project, rc, p.stdout, p.stderr, error) + return ExecuteOneResult(project_idx, rc, p.stdout, p.stderr, error) @staticmethod - def _ProcessResults(full_name, have_rev, opt, _pool, out, results): + def _ProcessResults( + full_name, have_rev, opt, projects, _pool, out, results + ): git_failed = False bad_rev = False have_match = False @@ -293,9 +296,10 @@ contain a line that matches both expressions: errors = [] for result in results: + project = projects[result.project_idx] if result.rc < 0: git_failed = True - out.project("--- project %s ---" % _RelPath(result.project)) + out.project("--- project %s ---" % _RelPath(project)) out.nl() out.fail("%s", result.stderr) out.nl() @@ -311,9 +315,7 @@ contain a line that matches both expressions: ): bad_rev = True else: - out.project( - "--- project %s ---" % _RelPath(result.project) - ) + out.project("--- project %s ---" % _RelPath(project)) out.nl() out.fail("%s", result.stderr.strip()) out.nl() @@ -331,13 +333,13 @@ contain a line that matches both expressions: rev, line = line.split(":", 1) out.write("%s", rev) out.write(":") - out.project(_RelPath(result.project)) + out.project(_RelPath(project)) out.write("/") out.write("%s", line) out.nl() elif full_name: for line in r: - out.project(_RelPath(result.project)) + out.project(_RelPath(project)) out.write("/") out.write("%s", line) out.nl() @@ -381,16 +383,19 @@ contain a line that matches both expressions: cmd_argv.extend(opt.revision) cmd_argv.append("--") - git_failed, bad_rev, have_match, errors = self.ExecuteInParallel( - opt.jobs, - functools.partial(self._ExecuteOne, cmd_argv), - projects, - callback=functools.partial( - self._ProcessResults, full_name, have_rev, opt - ), - output=out, - ordered=True, - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = projects + git_failed, bad_rev, have_match, errors = self.ExecuteInParallel( + opt.jobs, + functools.partial(self._ExecuteOne, cmd_argv), + range(len(projects)), + callback=functools.partial( + self._ProcessResults, full_name, have_rev, opt, projects + ), + output=out, + ordered=True, + chunksize=1, + ) if git_failed: raise GrepCommandError( diff --git a/subcmds/prune.py b/subcmds/prune.py index f99082a4..18bfc680 100644 --- a/subcmds/prune.py +++ b/subcmds/prune.py @@ -27,8 +27,10 @@ class Prune(PagedCommand): """ PARALLEL_JOBS = DEFAULT_LOCAL_JOBS - def _ExecuteOne(self, project): + @classmethod + def _ExecuteOne(cls, project_idx): """Process one project.""" + project = cls.get_parallel_context()["projects"][project_idx] return project.PruneHeads() def Execute(self, opt, args): @@ -41,13 +43,15 @@ class Prune(PagedCommand): def _ProcessResults(_pool, _output, results): return list(itertools.chain.from_iterable(results)) - all_branches = self.ExecuteInParallel( - opt.jobs, - self._ExecuteOne, - projects, - callback=_ProcessResults, - ordered=True, - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = projects + all_branches = self.ExecuteInParallel( + opt.jobs, + self._ExecuteOne, + range(len(projects)), + callback=_ProcessResults, + ordered=True, + ) if not all_branches: return diff --git a/subcmds/start.py b/subcmds/start.py index 56008f42..6dca7e4e 100644 --- a/subcmds/start.py +++ b/subcmds/start.py @@ -21,7 +21,6 @@ from error import RepoExitError from git_command import git from git_config import IsImmutable from progress import Progress -from project import Project from repo_logging import RepoLogger @@ -29,7 +28,7 @@ logger = RepoLogger(__file__) class ExecuteOneResult(NamedTuple): - project: Project + project_idx: int error: Exception @@ -80,18 +79,20 @@ revision specified in the manifest. if not git.check_ref_format("heads/%s" % nb): self.OptionParser.error("'%s' is not a valid name" % nb) - def _ExecuteOne(self, revision, nb, project): + @classmethod + def _ExecuteOne(cls, revision, nb, default_revisionExpr, project_idx): """Start one project.""" # If the current revision is immutable, such as a SHA1, a tag or # a change, then we can't push back to it. Substitute with # dest_branch, if defined; or with manifest default revision instead. branch_merge = "" error = None + project = cls.get_parallel_context()["projects"][project_idx] if IsImmutable(project.revisionExpr): if project.dest_branch: branch_merge = project.dest_branch else: - branch_merge = self.manifest.default.revisionExpr + branch_merge = default_revisionExpr try: project.StartBranch( @@ -100,7 +101,7 @@ revision specified in the manifest. except Exception as e: logger.error("error: unable to checkout %s: %s", project.name, e) error = e - return ExecuteOneResult(project, error) + return ExecuteOneResult(project_idx, error) def Execute(self, opt, args): nb = args[0] @@ -120,19 +121,28 @@ revision specified in the manifest. def _ProcessResults(_pool, pm, results): for result in results: if result.error: - err_projects.append(result.project) + project = all_projects[result.project_idx] + err_projects.append(project) err.append(result.error) pm.update(msg="") - self.ExecuteInParallel( - opt.jobs, - functools.partial(self._ExecuteOne, opt.revision, nb), - all_projects, - callback=_ProcessResults, - output=Progress( - f"Starting {nb}", len(all_projects), quiet=opt.quiet - ), - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = all_projects + self.ExecuteInParallel( + opt.jobs, + functools.partial( + self._ExecuteOne, + opt.revision, + nb, + self.manifest.default.revisionExpr, + ), + range(len(all_projects)), + callback=_ProcessResults, + output=Progress( + f"Starting {nb}", len(all_projects), quiet=opt.quiet + ), + chunksize=1, + ) if err_projects: for p in err_projects: diff --git a/subcmds/status.py b/subcmds/status.py index dac61ab6..cda73627 100644 --- a/subcmds/status.py +++ b/subcmds/status.py @@ -88,7 +88,8 @@ the following meanings: "projects", ) - def _StatusHelper(self, quiet, local, project): + @classmethod + def _StatusHelper(cls, quiet, local, project_idx): """Obtains the status for a specific project. Obtains the status for a project, redirecting the output to @@ -99,12 +100,13 @@ the following meanings: local: a boolean, if True, the path is relative to the local (sub)manifest. If false, the path is relative to the outermost manifest. - project: Project to get status of. + project_idx: Project index to get status of. Returns: The status of the project. """ buf = io.StringIO() + project = cls.get_parallel_context()["projects"][project_idx] ret = project.PrintWorkTreeStatus( quiet=quiet, output_redir=buf, local=local ) @@ -143,15 +145,18 @@ the following meanings: ret += 1 return ret - counter = self.ExecuteInParallel( - opt.jobs, - functools.partial( - self._StatusHelper, opt.quiet, opt.this_manifest_only - ), - all_projects, - callback=_ProcessResults, - ordered=True, - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = all_projects + counter = self.ExecuteInParallel( + opt.jobs, + functools.partial( + self._StatusHelper, opt.quiet, opt.this_manifest_only + ), + range(len(all_projects)), + callback=_ProcessResults, + ordered=True, + chunksize=1, + ) if not opt.quiet and len(all_projects) == counter: print("nothing to commit (working directory clean)") diff --git a/subcmds/upload.py b/subcmds/upload.py index 8039a1cd..6344ee31 100644 --- a/subcmds/upload.py +++ b/subcmds/upload.py @@ -713,16 +713,17 @@ Gerrit Code Review: https://www.gerritcodereview.com/ merge_branch = p.stdout.strip() return merge_branch - @staticmethod - def _GatherOne(opt, project): + @classmethod + def _GatherOne(cls, opt, project_idx): """Figure out the upload status for |project|.""" + project = cls.get_parallel_context()["projects"][project_idx] if opt.current_branch: cbr = project.CurrentBranch up_branch = project.GetUploadableBranch(cbr) avail = [up_branch] if up_branch else None else: avail = project.GetUploadableBranches(opt.branch) - return (project, avail) + return (project_idx, avail) def Execute(self, opt, args): projects = self.GetProjects( @@ -732,8 +733,9 @@ Gerrit Code Review: https://www.gerritcodereview.com/ def _ProcessResults(_pool, _out, results): pending = [] for result in results: - project, avail = result + project_idx, avail = result if avail is None: + project = projects[project_idx] logger.error( 'repo: error: %s: Unable to upload branch "%s". ' "You might be able to fix the branch by running:\n" @@ -746,12 +748,14 @@ Gerrit Code Review: https://www.gerritcodereview.com/ pending.append(result) return pending - pending = self.ExecuteInParallel( - opt.jobs, - functools.partial(self._GatherOne, opt), - projects, - callback=_ProcessResults, - ) + with self.ParallelContext(): + self.get_parallel_context()["projects"] = projects + pending = self.ExecuteInParallel( + opt.jobs, + functools.partial(self._GatherOne, opt), + range(len(projects)), + callback=_ProcessResults, + ) if not pending: if opt.branch is None: