status: improve parallel execution stability

The status command runs a bunch of jobs in parallel, and each one
is responsible for writing to stdout directly.  When running many
noisy jobs in parallel, output can get intermingled.  Pass down a
StringIO buffer for writing to so we can return the entire output
as a string so the main job can handle displaying it.  This fixes
interleaved output as well as making the output stable: we always
display results in the same project order now.  By switching from
map to imap, this ends up not really adding any overhead.

Bug: https://crbug.com/gerrit/12231
Change-Id: Ic18b07c8074c046ff36e306eb8d392fb34fb6eca
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297242
Tested-by: Mike Frysinger <vapier@google.com>
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
This commit is contained in:
Mike Frysinger 2021-02-16 01:45:39 -05:00
parent 6a2400a4d0
commit 7c871163c8
3 changed files with 22 additions and 13 deletions

View File

@ -23,6 +23,15 @@ from error import NoSuchProjectError
from error import InvalidProjectGroupsError from error import InvalidProjectGroupsError
# Number of projects to submit to a single worker process at a time.
# This number represents a tradeoff between the overhead of IPC and finer
# grained opportunity for parallelism. This particular value was chosen by
# iterating through powers of two until the overall performance no longer
# improved. The performance of this batch size is not a function of the
# number of cores on the system.
WORKER_BATCH_SIZE = 32
# How many jobs to run in parallel by default? This assumes the jobs are # How many jobs to run in parallel by default? This assumes the jobs are
# largely I/O bound and do not hit the network. # largely I/O bound and do not hit the network.
DEFAULT_LOCAL_JOBS = min(os.cpu_count(), 8) DEFAULT_LOCAL_JOBS = min(os.cpu_count(), 8)

View File

@ -16,15 +16,7 @@ import itertools
import multiprocessing import multiprocessing
import sys import sys
from color import Coloring from color import Coloring
from command import Command, DEFAULT_LOCAL_JOBS from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE
# Number of projects to submit to a single worker process at a time.
# This number represents a tradeoff between the overhead of IPC and finer
# grained opportunity for parallelism. This particular value was chosen by
# iterating through powers of two until the overall performance no longer
# improved. The performance of this batch size is not a function of the
# number of cores on the system.
WORKER_BATCH_SIZE = 32
class BranchColoring(Coloring): class BranchColoring(Coloring):

View File

@ -14,10 +14,11 @@
import functools import functools
import glob import glob
import io
import multiprocessing import multiprocessing
import os import os
from command import DEFAULT_LOCAL_JOBS, PagedCommand from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE
from color import Coloring from color import Coloring
import platform_utils import platform_utils
@ -99,7 +100,9 @@ the following meanings:
Returns: Returns:
The status of the project. The status of the project.
""" """
return project.PrintWorkTreeStatus(quiet=quiet) buf = io.StringIO()
ret = project.PrintWorkTreeStatus(quiet=quiet, output_redir=buf)
return (ret, buf.getvalue())
def _FindOrphans(self, dirs, proj_dirs, proj_dirs_parents, outstring): def _FindOrphans(self, dirs, proj_dirs, proj_dirs_parents, outstring):
"""find 'dirs' that are present in 'proj_dirs_parents' but not in 'proj_dirs'""" """find 'dirs' that are present in 'proj_dirs_parents' but not in 'proj_dirs'"""
@ -128,8 +131,13 @@ the following meanings:
counter += 1 counter += 1
else: else:
with multiprocessing.Pool(opt.jobs) as pool: with multiprocessing.Pool(opt.jobs) as pool:
states = pool.map(functools.partial(self._StatusHelper, opt.quiet), all_projects) states = pool.imap(functools.partial(self._StatusHelper, opt.quiet),
counter += states.count('CLEAN') all_projects, chunksize=WORKER_BATCH_SIZE)
for (state, output) in states:
if output:
print(output, end='')
if state == 'CLEAN':
counter += 1
if not opt.quiet and len(all_projects) == counter: if not opt.quiet and len(all_projects) == counter:
print('nothing to commit (working directory clean)') print('nothing to commit (working directory clean)')