abandon/start: add --jobs support

Use multiprocessing to run in parallel.  When operating on multiple
projects, this can greatly speed things up.  Across 1000 repos, it
goes from ~30sec to ~3sec with the default -j8.

Change-Id: I0dc62d704c022dd02cac0bd67fe79224f4e34095
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297484
Tested-by: Mike Frysinger <vapier@google.com>
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
This commit is contained in:
Mike Frysinger 2021-02-18 23:37:33 -05:00
parent 8d2a6df1fd
commit 8dbc07aced
2 changed files with 83 additions and 38 deletions

View File

@ -13,9 +13,12 @@
# limitations under the License. # limitations under the License.
from collections import defaultdict from collections import defaultdict
import functools
import itertools
import multiprocessing
import sys import sys
from command import Command from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE
from git_command import git from git_command import git
from progress import Progress from progress import Progress
@ -31,8 +34,10 @@ deleting it (and all its history) from your local repository.
It is equivalent to "git branch -D <branchname>". It is equivalent to "git branch -D <branchname>".
""" """
PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
def _Options(self, p): def _Options(self, p):
super()._Options(p)
p.add_option('-q', '--quiet', p.add_option('-q', '--quiet',
action='store_true', default=False, action='store_true', default=False,
help='be quiet') help='be quiet')
@ -51,35 +56,49 @@ It is equivalent to "git branch -D <branchname>".
else: else:
args.insert(0, "'All local branches'") args.insert(0, "'All local branches'")
def _ExecuteOne(self, opt, nb, project):
"""Abandon one project."""
if opt.all:
branches = project.GetBranches()
else:
branches = [nb]
ret = {}
for name in branches:
status = project.AbandonBranch(name)
if status is not None:
ret[name] = status
return (ret, project)
def Execute(self, opt, args): def Execute(self, opt, args):
nb = args[0] nb = args[0]
err = defaultdict(list) err = defaultdict(list)
success = defaultdict(list) success = defaultdict(list)
all_projects = self.GetProjects(args[1:]) all_projects = self.GetProjects(args[1:])
pm = Progress('Abandon %s' % nb, len(all_projects)) def _ProcessResults(states):
for project in all_projects: for (results, project) in states:
pm.update() for branch, status in results.items():
if opt.all:
branches = list(project.GetBranches().keys())
else:
branches = [nb]
for name in branches:
status = project.AbandonBranch(name)
if status is not None:
if status: if status:
success[name].append(project) success[branch].append(project)
else: else:
err[name].append(project) err[branch].append(project)
pm.update()
pm = Progress('Abandon %s' % nb, len(all_projects))
# NB: Multiprocessing is heavy, so don't spin it up for one job.
if len(all_projects) == 1 or opt.jobs == 1:
_ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects)
else:
with multiprocessing.Pool(opt.jobs) as pool:
states = pool.imap_unordered(
functools.partial(self._ExecuteOne, opt, nb), all_projects,
chunksize=WORKER_BATCH_SIZE)
_ProcessResults(states)
pm.end() pm.end()
width = 25 width = max(itertools.chain(
for name in branches: [25], (len(x) for x in itertools.chain(success, err))))
if width < len(name):
width = len(name)
if err: if err:
for br in err.keys(): for br in err.keys():
err_msg = "error: cannot abandon %s" % br err_msg = "error: cannot abandon %s" % br

View File

@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import functools
import multiprocessing
import os import os
import sys import sys
from command import Command from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE
from git_config import IsImmutable from git_config import IsImmutable
from git_command import git from git_command import git
import gitc_utils import gitc_utils
@ -33,8 +35,10 @@ class Start(Command):
'%prog' begins a new branch of development, starting from the '%prog' begins a new branch of development, starting from the
revision specified in the manifest. revision specified in the manifest.
""" """
PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
def _Options(self, p): def _Options(self, p):
super()._Options(p)
p.add_option('--all', p.add_option('--all',
dest='all', action='store_true', dest='all', action='store_true',
help='begin branch in all projects') help='begin branch in all projects')
@ -51,6 +55,26 @@ revision specified in the manifest.
if not git.check_ref_format('heads/%s' % nb): if not git.check_ref_format('heads/%s' % nb):
self.OptionParser.error("'%s' is not a valid name" % nb) self.OptionParser.error("'%s' is not a valid name" % nb)
def _ExecuteOne(self, opt, nb, project):
"""Start one project."""
# If the current revision is immutable, such as a SHA1, a tag or
# a change, then we can't push back to it. Substitute with
# dest_branch, if defined; or with manifest default revision instead.
branch_merge = ''
if IsImmutable(project.revisionExpr):
if project.dest_branch:
branch_merge = project.dest_branch
else:
branch_merge = self.manifest.default.revisionExpr
try:
ret = project.StartBranch(
nb, branch_merge=branch_merge, revision=opt.revision)
except Exception as e:
print('error: unable to checkout %s: %s' % (project.name, e), file=sys.stderr)
ret = False
return (ret, project)
def Execute(self, opt, args): def Execute(self, opt, args):
nb = args[0] nb = args[0]
err = [] err = []
@ -82,11 +106,8 @@ revision specified in the manifest.
if not os.path.exists(os.getcwd()): if not os.path.exists(os.getcwd()):
os.chdir(self.manifest.topdir) os.chdir(self.manifest.topdir)
pm = Progress('Starting %s' % nb, len(all_projects)) pm = Progress('Syncing %s' % nb, len(all_projects))
for project in all_projects: for project in all_projects:
pm.update()
if self.gitc_manifest:
gitc_project = self.gitc_manifest.paths[project.relpath] gitc_project = self.gitc_manifest.paths[project.relpath]
# Sync projects that have not been opened. # Sync projects that have not been opened.
if not gitc_project.already_synced: if not gitc_project.already_synced:
@ -99,20 +120,25 @@ revision specified in the manifest.
sync_buf = SyncBuffer(self.manifest.manifestProject.config) sync_buf = SyncBuffer(self.manifest.manifestProject.config)
project.Sync_LocalHalf(sync_buf) project.Sync_LocalHalf(sync_buf)
project.revisionId = gitc_project.old_revision project.revisionId = gitc_project.old_revision
pm.update()
pm.end()
# If the current revision is immutable, such as a SHA1, a tag or def _ProcessResults(results):
# a change, then we can't push back to it. Substitute with for (result, project) in results:
# dest_branch, if defined; or with manifest default revision instead. if not result:
branch_merge = '' err.append(project)
if IsImmutable(project.revisionExpr): pm.update()
if project.dest_branch:
branch_merge = project.dest_branch
else:
branch_merge = self.manifest.default.revisionExpr
if not project.StartBranch( pm = Progress('Starting %s' % nb, len(all_projects))
nb, branch_merge=branch_merge, revision=opt.revision): # NB: Multiprocessing is heavy, so don't spin it up for one job.
err.append(project) if len(all_projects) == 1 or opt.jobs == 1:
_ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects)
else:
with multiprocessing.Pool(opt.jobs) as pool:
results = pool.imap_unordered(
functools.partial(self._ExecuteOne, opt, nb), all_projects,
chunksize=WORKER_BATCH_SIZE)
_ProcessResults(results)
pm.end() pm.end()
if err: if err: