sync: switch local checkout to multiprocessing

This avoids GIL limitations with using threads for parallel processing.
In a CrOS checkout with ~1000 repos, the nop case goes from ~6 sec down
to ~4 sec with -j8.  Not a big deal, but shows that this actually works
to speed things up unlike the threading model.

This reworks the checkout logic to return results for processing in the
main thread instead of leaving every thread to do its own processing.

Bug: https://crbug.com/gerrit/12389
Change-Id: I143e5e3f7158e83ea67e2d14e5552153a874248a
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/298063
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
This commit is contained in:
Mike Frysinger 2021-02-23 20:48:04 -05:00
parent 8dbc07aced
commit ebf04a4404

View File

@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import http.cookiejar as cookielib
import io
import json
import multiprocessing
import netrc
from optparse import SUPPRESS_HELP
import os
@ -56,7 +58,7 @@ import git_superproject
import gitc_utils
from project import Project
from project import RemoteSpec
from command import Command, MirrorSafeCommand
from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE
from error import RepoChangedException, GitError, ManifestParseError
import platform_utils
from project import SyncBuffer
@ -71,10 +73,6 @@ class _FetchError(Exception):
"""Internal error thrown in _FetchHelper() when we don't want stack trace."""
class _CheckoutError(Exception):
"""Internal error thrown in _CheckoutOne() when we don't want stack trace."""
class Sync(Command, MirrorSafeCommand):
jobs = 1
common = True
@ -457,149 +455,80 @@ later is required to fix a server side protocol bug.
return fetched
def _CheckoutWorker(self, opt, sem, project, *args, **kwargs):
"""Main function of the fetch threads.
Delegates most of the work to _CheckoutOne.
Args:
opt: Program options returned from optparse. See _Options().
projects: Projects to fetch.
sem: We'll release() this semaphore when we exit so that another thread
can be started up.
*args, **kwargs: Remaining arguments to pass to _CheckoutOne. See the
_CheckoutOne docstring for details.
"""
try:
return self._CheckoutOne(opt, project, *args, **kwargs)
finally:
sem.release()
def _CheckoutOne(self, opt, project, lock, pm, err_event, err_results):
def _CheckoutOne(self, opt, project):
"""Checkout work tree for one project
Args:
opt: Program options returned from optparse. See _Options().
project: Project object for the project to checkout.
lock: Lock for accessing objects that are shared amongst multiple
_CheckoutWorker() threads.
pm: Instance of a Project object. We will call pm.update() (with our
lock held).
err_event: We'll set this event in the case of an error (after printing
out info about the error).
err_results: A list of strings, paths to git repos where checkout
failed.
Returns:
Whether the fetch was successful.
"""
# We'll set to true once we've locked the lock.
did_lock = False
# Encapsulate everything in a try/except/finally so that:
# - We always set err_event in the case of an exception.
# - We always make sure we unlock the lock if we locked it.
start = time.time()
syncbuf = SyncBuffer(self.manifest.manifestProject.config,
detach_head=opt.detach_head)
success = False
with lock:
pm.start(project.name)
try:
try:
project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync)
project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync)
success = syncbuf.Finish()
except Exception as e:
print('error: Cannot checkout %s: %s: %s' %
(project.name, type(e).__name__, str(e)),
file=sys.stderr)
raise
# Lock around all the rest of the code, since printing, updating a set
# and Progress.update() are not thread safe.
lock.acquire()
success = syncbuf.Finish()
did_lock = True
if not success:
print('error: Cannot checkout %s' % (project.name), file=sys.stderr)
finish = time.time()
return (success, project, start, finish)
if not success:
err_event.set()
print('error: Cannot checkout %s' % (project.name),
file=sys.stderr)
raise _CheckoutError()
except _CheckoutError:
pass
except Exception as e:
print('error: Cannot checkout %s: %s: %s' %
(project.name, type(e).__name__, str(e)),
file=sys.stderr)
err_event.set()
raise
finally:
if not did_lock:
lock.acquire()
if not success:
err_results.append(project.relpath)
pm.finish(project.name)
lock.release()
finish = time.time()
self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
start, finish, success)
return success
def _Checkout(self, all_projects, opt, err_event, err_results):
def _Checkout(self, all_projects, opt, err_results):
"""Checkout projects listed in all_projects
Args:
all_projects: List of all projects that should be checked out.
opt: Program options returned from optparse. See _Options().
err_event: We'll set this event in the case of an error (after printing
out info about the error).
err_results: A list of strings, paths to git repos where checkout
failed.
err_results: A list of strings, paths to git repos where checkout failed.
"""
ret = True
# Perform checkouts in multiple threads when we are using partial clone.
# Without partial clone, all needed git objects are already downloaded,
# in this situation it's better to use only one process because the checkout
# would be mostly disk I/O; with partial clone, the objects are only
# downloaded when demanded (at checkout time), which is similar to the
# Sync_NetworkHalf case and parallelism would be helpful.
if self.manifest.CloneFilter:
syncjobs = self.jobs
else:
syncjobs = 1
# Only checkout projects with worktrees.
all_projects = [x for x in all_projects if x.worktree]
lock = _threading.Lock()
pm = Progress('Checking out', len(all_projects))
threads = set()
sem = _threading.Semaphore(syncjobs)
def _ProcessResults(results):
for (success, project, start, finish) in results:
self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
start, finish, success)
# Check for any errors before running any more tasks.
# ...we'll let existing threads finish, though.
if not success:
err_results.append(project.relpath)
if opt.fail_fast:
return False
pm.update(msg=project.name)
return True
for project in all_projects:
# Check for any errors before running any more tasks.
# ...we'll let existing threads finish, though.
if err_event.is_set() and opt.fail_fast:
break
sem.acquire()
if project.worktree:
kwargs = dict(opt=opt,
sem=sem,
project=project,
lock=lock,
pm=pm,
err_event=err_event,
err_results=err_results)
if syncjobs > 1:
t = _threading.Thread(target=self._CheckoutWorker,
kwargs=kwargs)
# Ensure that Ctrl-C will not freeze the repo process.
t.daemon = True
threads.add(t)
t.start()
else:
self._CheckoutWorker(**kwargs)
for t in threads:
t.join()
# NB: Multiprocessing is heavy, so don't spin it up for one job.
if len(all_projects) == 1 or opt.jobs == 1:
if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects):
ret = False
else:
with multiprocessing.Pool(opt.jobs) as pool:
results = pool.imap_unordered(
functools.partial(self._CheckoutOne, opt),
all_projects,
chunksize=WORKER_BATCH_SIZE)
if not _ProcessResults(results):
ret = False
pool.close()
pm.end()
return ret
def _GCProjects(self, projects, opt, err_event):
gc_gitdirs = {}
for project in projects:
@ -946,7 +875,6 @@ later is required to fix a server side protocol bug.
err_network_sync = False
err_update_projects = False
err_checkout = False
self._fetch_times = _FetchTimes(self.manifest)
if not opt.local_only:
@ -1011,10 +939,10 @@ later is required to fix a server side protocol bug.
sys.exit(1)
err_results = []
self._Checkout(all_projects, opt, err_event, err_results)
if err_event.is_set():
err_checkout = True
# NB: We don't exit here because this is the last step.
# NB: We don't exit here because this is the last step.
err_checkout = not self._Checkout(all_projects, opt, err_results)
if err_checkout:
err_event.set()
# If there's a notice that's supposed to print at the end of the sync, print
# it now...