forall: greatly speed up processing overhead

With the recent commit 0501b29e7a
("status: Use multiprocessing for `repo status -j<num>` instead of
threading"), the limitation with project serialization no longer
applies.  It turns out that ad-hoc logic is expensive.  In the CrOS
checkout (~1000 projects w/8 jobs by default), it adds about ~7sec
overhead to all invocations.  With a fast nop run:
	time repo forall -j8 -c true
This goes from ~11sec to ~4sec -- more than 50% speedup.

Change-Id: Ie6bcccd21eef20440692751b7ebd36c890d5bbcc
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/298724
Reviewed-by: Michael Mortensen <mmortensen@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
This commit is contained in:
Mike Frysinger 2021-03-01 02:06:10 -05:00
parent 819c73954f
commit 13cb7f799d

View File

@ -158,31 +158,6 @@ without iterating through the remaining projects.
def WantPager(self, opt): def WantPager(self, opt):
return opt.project_header and opt.jobs == 1 return opt.project_header and opt.jobs == 1
def _SerializeProject(self, project):
""" Serialize a project._GitGetByExec instance.
project._GitGetByExec is not pickle-able. Instead of trying to pass it
around between processes, make a dict ourselves containing only the
attributes that we need.
"""
if not self.manifest.IsMirror:
lrev = project.GetRevisionId()
else:
lrev = None
return {
'name': project.name,
'relpath': project.relpath,
'remote_name': project.remote.name,
'lrev': lrev,
'rrev': project.revisionExpr,
'annotations': dict((a.name, a.value) for a in project.annotations),
'gitdir': project.gitdir,
'worktree': project.worktree,
'upstream': project.upstream,
'dest_branch': project.dest_branch,
}
def ValidateOptions(self, opt, args): def ValidateOptions(self, opt, args):
if not opt.command: if not opt.command:
self.Usage() self.Usage()
@ -242,7 +217,7 @@ without iterating through the remaining projects.
with multiprocessing.Pool(opt.jobs, InitWorker) as pool: with multiprocessing.Pool(opt.jobs, InitWorker) as pool:
results_it = pool.imap( results_it = pool.imap(
functools.partial(DoWorkWrapper, mirror, opt, cmd, shell, config), functools.partial(DoWorkWrapper, mirror, opt, cmd, shell, config),
enumerate(self._SerializeProject(x) for x in projects), enumerate(projects),
chunksize=WORKER_BATCH_SIZE) chunksize=WORKER_BATCH_SIZE)
first = True first = True
for (r, output) in results_it: for (r, output) in results_it:
@ -292,7 +267,7 @@ def DoWorkWrapper(mirror, opt, cmd, shell, config, args):
try: try:
return DoWork(project, mirror, opt, cmd, shell, cnt, config) return DoWork(project, mirror, opt, cmd, shell, cnt, config)
except KeyboardInterrupt: except KeyboardInterrupt:
print('%s: Worker interrupted' % project['name']) print('%s: Worker interrupted' % project.name)
raise WorkerKeyboardInterrupt() raise WorkerKeyboardInterrupt()
@ -304,22 +279,22 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config):
val = '' val = ''
env[name] = val env[name] = val
setenv('REPO_PROJECT', project['name']) setenv('REPO_PROJECT', project.name)
setenv('REPO_PATH', project['relpath']) setenv('REPO_PATH', project.relpath)
setenv('REPO_REMOTE', project['remote_name']) setenv('REPO_REMOTE', project.remote.name)
setenv('REPO_LREV', project['lrev']) setenv('REPO_LREV', '' if mirror else project.GetRevisionId())
setenv('REPO_RREV', project['rrev']) setenv('REPO_RREV', project.revisionExpr)
setenv('REPO_UPSTREAM', project['upstream']) setenv('REPO_UPSTREAM', project.upstream)
setenv('REPO_DEST_BRANCH', project['dest_branch']) setenv('REPO_DEST_BRANCH', project.dest_branch)
setenv('REPO_I', str(cnt + 1)) setenv('REPO_I', str(cnt + 1))
for name in project['annotations']: for annotation in project.annotations:
setenv("REPO__%s" % (name), project['annotations'][name]) setenv("REPO__%s" % (annotation.name), annotation.value)
if mirror: if mirror:
setenv('GIT_DIR', project['gitdir']) setenv('GIT_DIR', project.gitdir)
cwd = project['gitdir'] cwd = project.gitdir
else: else:
cwd = project['worktree'] cwd = project.worktree
if not os.path.exists(cwd): if not os.path.exists(cwd):
# Allow the user to silently ignore missing checkouts so they can run on # Allow the user to silently ignore missing checkouts so they can run on
@ -330,7 +305,7 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config):
output = '' output = ''
if ((opt.project_header and opt.verbose) if ((opt.project_header and opt.verbose)
or not opt.project_header): or not opt.project_header):
output = 'skipping %s/' % project['relpath'] output = 'skipping %s/' % project.relpath
return (1, output) return (1, output)
if opt.verbose: if opt.verbose:
@ -350,9 +325,9 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config):
out = ForallColoring(config) out = ForallColoring(config)
out.redirect(buf) out.redirect(buf)
if mirror: if mirror:
project_header_path = project['name'] project_header_path = project.name
else: else:
project_header_path = project['relpath'] project_header_path = project.relpath
out.project('project %s/' % project_header_path) out.project('project %s/' % project_header_path)
out.nl() out.nl()
buf.write(output) buf.write(output)