mirror of
https://gerrit.googlesource.com/git-repo
synced 2025-01-08 16:14:26 +00:00
sync: switch network fetch to multiprocessing
This avoids GIL limitations with using threads for parallel processing. This reworks the fetch logic to return results for processing in the main thread instead of leaving every thread to do its own processing. We have to tweak the chunking logic a little here because multiprocessing favors batching over returning immediate results when using a larger value for chunksize. When a single job can be quite slow, this tradeoff is not good UX. Bug: https://crbug.com/gerrit/12389 Change-Id: I0f0512d15ad7332d1eb28aff52c29d378acc9e1d Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/298642 Reviewed-by: Chris Mcdonald <cjmcdonald@google.com> Tested-by: Mike Frysinger <vapier@google.com>
This commit is contained in:
parent
d246d1fee7
commit
b2fa30a2b8
@ -42,12 +42,12 @@ def duration_str(total):
|
|||||||
|
|
||||||
|
|
||||||
class Progress(object):
|
class Progress(object):
|
||||||
def __init__(self, title, total=0, units='', print_newline=False):
|
def __init__(self, title, total=0, units='', print_newline=False, delay=True):
|
||||||
self._title = title
|
self._title = title
|
||||||
self._total = total
|
self._total = total
|
||||||
self._done = 0
|
self._done = 0
|
||||||
self._start = time()
|
self._start = time()
|
||||||
self._show = False
|
self._show = not delay
|
||||||
self._units = units
|
self._units = units
|
||||||
self._print_newline = print_newline
|
self._print_newline = print_newline
|
||||||
# Only show the active jobs section if we run more than one in parallel.
|
# Only show the active jobs section if we run more than one in parallel.
|
||||||
|
165
subcmds/sync.py
165
subcmds/sync.py
@ -45,11 +45,6 @@ except ImportError:
|
|||||||
def _rlimit_nofile():
|
def _rlimit_nofile():
|
||||||
return (256, 256)
|
return (256, 256)
|
||||||
|
|
||||||
try:
|
|
||||||
import multiprocessing
|
|
||||||
except ImportError:
|
|
||||||
multiprocessing = None
|
|
||||||
|
|
||||||
import event_log
|
import event_log
|
||||||
from git_command import GIT, git_require
|
from git_command import GIT, git_require
|
||||||
from git_config import GetUrlCookieFile
|
from git_config import GetUrlCookieFile
|
||||||
@ -69,10 +64,6 @@ from manifest_xml import GitcManifest
|
|||||||
_ONE_DAY_S = 24 * 60 * 60
|
_ONE_DAY_S = 24 * 60 * 60
|
||||||
|
|
||||||
|
|
||||||
class _FetchError(Exception):
|
|
||||||
"""Internal error thrown in _FetchHelper() when we don't want stack trace."""
|
|
||||||
|
|
||||||
|
|
||||||
class Sync(Command, MirrorSafeCommand):
|
class Sync(Command, MirrorSafeCommand):
|
||||||
jobs = 1
|
jobs = 1
|
||||||
common = True
|
common = True
|
||||||
@ -315,59 +306,33 @@ later is required to fix a server side protocol bug.
|
|||||||
self._ReloadManifest(manifest_path)
|
self._ReloadManifest(manifest_path)
|
||||||
return manifest_path
|
return manifest_path
|
||||||
|
|
||||||
def _FetchProjectList(self, opt, projects, sem, *args, **kwargs):
|
def _FetchProjectList(self, opt, projects):
|
||||||
"""Main function of the fetch threads.
|
"""Main function of the fetch worker.
|
||||||
|
|
||||||
|
The projects we're given share the same underlying git object store, so we
|
||||||
|
have to fetch them in serial.
|
||||||
|
|
||||||
Delegates most of the work to _FetchHelper.
|
Delegates most of the work to _FetchHelper.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
opt: Program options returned from optparse. See _Options().
|
opt: Program options returned from optparse. See _Options().
|
||||||
projects: Projects to fetch.
|
projects: Projects to fetch.
|
||||||
sem: We'll release() this semaphore when we exit so that another thread
|
|
||||||
can be started up.
|
|
||||||
*args, **kwargs: Remaining arguments to pass to _FetchHelper. See the
|
|
||||||
_FetchHelper docstring for details.
|
|
||||||
"""
|
"""
|
||||||
try:
|
return [self._FetchOne(opt, x) for x in projects]
|
||||||
for project in projects:
|
|
||||||
success = self._FetchHelper(opt, project, *args, **kwargs)
|
|
||||||
if not success and opt.fail_fast:
|
|
||||||
break
|
|
||||||
finally:
|
|
||||||
sem.release()
|
|
||||||
|
|
||||||
def _FetchHelper(self, opt, project, lock, fetched, pm, err_event,
|
def _FetchOne(self, opt, project):
|
||||||
clone_filter):
|
|
||||||
"""Fetch git objects for a single project.
|
"""Fetch git objects for a single project.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
opt: Program options returned from optparse. See _Options().
|
opt: Program options returned from optparse. See _Options().
|
||||||
project: Project object for the project to fetch.
|
project: Project object for the project to fetch.
|
||||||
lock: Lock for accessing objects that are shared amongst multiple
|
|
||||||
_FetchHelper() threads.
|
|
||||||
fetched: set object that we will add project.gitdir to when we're done
|
|
||||||
(with our lock held).
|
|
||||||
pm: Instance of a Project object. We will call pm.update() (with our
|
|
||||||
lock held).
|
|
||||||
err_event: We'll set this event in the case of an error (after printing
|
|
||||||
out info about the error).
|
|
||||||
clone_filter: Filter for use in a partial clone.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Whether the fetch was successful.
|
Whether the fetch was successful.
|
||||||
"""
|
"""
|
||||||
# We'll set to true once we've locked the lock.
|
|
||||||
did_lock = False
|
|
||||||
|
|
||||||
# Encapsulate everything in a try/except/finally so that:
|
|
||||||
# - We always set err_event in the case of an exception.
|
|
||||||
# - We always make sure we unlock the lock if we locked it.
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
success = False
|
success = False
|
||||||
buf = io.StringIO()
|
buf = io.StringIO()
|
||||||
with lock:
|
|
||||||
pm.start(project.name)
|
|
||||||
try:
|
|
||||||
try:
|
try:
|
||||||
success = project.Sync_NetworkHalf(
|
success = project.Sync_NetworkHalf(
|
||||||
quiet=opt.quiet,
|
quiet=opt.quiet,
|
||||||
@ -380,83 +345,80 @@ later is required to fix a server side protocol bug.
|
|||||||
optimized_fetch=opt.optimized_fetch,
|
optimized_fetch=opt.optimized_fetch,
|
||||||
retry_fetches=opt.retry_fetches,
|
retry_fetches=opt.retry_fetches,
|
||||||
prune=opt.prune,
|
prune=opt.prune,
|
||||||
clone_filter=clone_filter)
|
clone_filter=self.manifest.CloneFilter)
|
||||||
self._fetch_times.Set(project, time.time() - start)
|
|
||||||
|
|
||||||
# Lock around all the rest of the code, since printing, updating a set
|
|
||||||
# and Progress.update() are not thread safe.
|
|
||||||
lock.acquire()
|
|
||||||
did_lock = True
|
|
||||||
|
|
||||||
output = buf.getvalue()
|
output = buf.getvalue()
|
||||||
if opt.verbose and output:
|
if opt.verbose and output:
|
||||||
pm.update(inc=0, msg=output.rstrip())
|
print('\n' + output.rstrip())
|
||||||
|
|
||||||
if not success:
|
if not success:
|
||||||
err_event.set()
|
|
||||||
print('error: Cannot fetch %s from %s'
|
print('error: Cannot fetch %s from %s'
|
||||||
% (project.name, project.remote.url),
|
% (project.name, project.remote.url),
|
||||||
file=sys.stderr)
|
file=sys.stderr)
|
||||||
if opt.fail_fast:
|
|
||||||
raise _FetchError()
|
|
||||||
|
|
||||||
fetched.add(project.gitdir)
|
|
||||||
except _FetchError:
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print('error: Cannot fetch %s (%s: %s)'
|
print('error: Cannot fetch %s (%s: %s)'
|
||||||
% (project.name, type(e).__name__, str(e)), file=sys.stderr)
|
% (project.name, type(e).__name__, str(e)), file=sys.stderr)
|
||||||
err_event.set()
|
|
||||||
raise
|
raise
|
||||||
finally:
|
|
||||||
if not did_lock:
|
|
||||||
lock.acquire()
|
|
||||||
pm.finish(project.name)
|
|
||||||
lock.release()
|
|
||||||
finish = time.time()
|
|
||||||
self.event_log.AddSync(project, event_log.TASK_SYNC_NETWORK,
|
|
||||||
start, finish, success)
|
|
||||||
|
|
||||||
return success
|
finish = time.time()
|
||||||
|
return (success, project, start, finish)
|
||||||
|
|
||||||
def _Fetch(self, projects, opt, err_event):
|
def _Fetch(self, projects, opt, err_event):
|
||||||
|
ret = True
|
||||||
|
|
||||||
fetched = set()
|
fetched = set()
|
||||||
lock = _threading.Lock()
|
pm = Progress('Fetching', len(projects), delay=False)
|
||||||
pm = Progress('Fetching', len(projects))
|
|
||||||
|
|
||||||
objdir_project_map = dict()
|
objdir_project_map = dict()
|
||||||
for project in projects:
|
for project in projects:
|
||||||
objdir_project_map.setdefault(project.objdir, []).append(project)
|
objdir_project_map.setdefault(project.objdir, []).append(project)
|
||||||
|
projects_list = list(objdir_project_map.values())
|
||||||
|
|
||||||
threads = set()
|
def _ProcessResults(results_sets):
|
||||||
sem = _threading.Semaphore(self.jobs)
|
ret = True
|
||||||
for project_list in objdir_project_map.values():
|
for results in results_sets:
|
||||||
|
for (success, project, start, finish) in results:
|
||||||
|
self._fetch_times.Set(project, finish - start)
|
||||||
|
self.event_log.AddSync(project, event_log.TASK_SYNC_NETWORK,
|
||||||
|
start, finish, success)
|
||||||
# Check for any errors before running any more tasks.
|
# Check for any errors before running any more tasks.
|
||||||
# ...we'll let existing threads finish, though.
|
# ...we'll let existing jobs finish, though.
|
||||||
if err_event.is_set() and opt.fail_fast:
|
if not success:
|
||||||
break
|
ret = False
|
||||||
|
|
||||||
sem.acquire()
|
|
||||||
kwargs = dict(opt=opt,
|
|
||||||
projects=project_list,
|
|
||||||
sem=sem,
|
|
||||||
lock=lock,
|
|
||||||
fetched=fetched,
|
|
||||||
pm=pm,
|
|
||||||
err_event=err_event,
|
|
||||||
clone_filter=self.manifest.CloneFilter)
|
|
||||||
if self.jobs > 1:
|
|
||||||
t = _threading.Thread(target=self._FetchProjectList,
|
|
||||||
kwargs=kwargs)
|
|
||||||
# Ensure that Ctrl-C will not freeze the repo process.
|
|
||||||
t.daemon = True
|
|
||||||
threads.add(t)
|
|
||||||
t.start()
|
|
||||||
else:
|
else:
|
||||||
self._FetchProjectList(**kwargs)
|
fetched.add(project.gitdir)
|
||||||
|
pm.update(msg=project.name)
|
||||||
|
if not ret and opt.fail_fast:
|
||||||
|
break
|
||||||
|
return ret
|
||||||
|
|
||||||
for t in threads:
|
# NB: Multiprocessing is heavy, so don't spin it up for one job.
|
||||||
t.join()
|
if len(projects_list) == 1 or opt.jobs == 1:
|
||||||
|
if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list):
|
||||||
|
ret = False
|
||||||
|
else:
|
||||||
|
# Favor throughput over responsiveness when quiet. It seems that imap()
|
||||||
|
# will yield results in batches relative to chunksize, so even as the
|
||||||
|
# children finish a sync, we won't see the result until one child finishes
|
||||||
|
# ~chunksize jobs. When using a large --jobs with large chunksize, this
|
||||||
|
# can be jarring as there will be a large initial delay where repo looks
|
||||||
|
# like it isn't doing anything and sits at 0%, but then suddenly completes
|
||||||
|
# a lot of jobs all at once. Since this code is more network bound, we
|
||||||
|
# can accept a bit more CPU overhead with a smaller chunksize so that the
|
||||||
|
# user sees more immediate & continuous feedback.
|
||||||
|
if opt.quiet:
|
||||||
|
chunksize = WORKER_BATCH_SIZE
|
||||||
|
else:
|
||||||
|
pm.update(inc=0, msg='warming up')
|
||||||
|
chunksize = 4
|
||||||
|
with multiprocessing.Pool(opt.jobs) as pool:
|
||||||
|
results = pool.imap_unordered(
|
||||||
|
functools.partial(self._FetchProjectList, opt),
|
||||||
|
projects_list,
|
||||||
|
chunksize=chunksize)
|
||||||
|
if not _ProcessResults(results):
|
||||||
|
ret = False
|
||||||
|
pool.close()
|
||||||
|
|
||||||
pm.end()
|
pm.end()
|
||||||
self._fetch_times.Save()
|
self._fetch_times.Save()
|
||||||
@ -464,7 +426,7 @@ later is required to fix a server side protocol bug.
|
|||||||
if not self.manifest.IsArchive:
|
if not self.manifest.IsArchive:
|
||||||
self._GCProjects(projects, opt, err_event)
|
self._GCProjects(projects, opt, err_event)
|
||||||
|
|
||||||
return fetched
|
return (ret, fetched)
|
||||||
|
|
||||||
def _CheckoutOne(self, opt, project):
|
def _CheckoutOne(self, opt, project):
|
||||||
"""Checkout work tree for one project
|
"""Checkout work tree for one project
|
||||||
@ -514,7 +476,7 @@ later is required to fix a server side protocol bug.
|
|||||||
self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
|
self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
|
||||||
start, finish, success)
|
start, finish, success)
|
||||||
# Check for any errors before running any more tasks.
|
# Check for any errors before running any more tasks.
|
||||||
# ...we'll let existing threads finish, though.
|
# ...we'll let existing jobs finish, though.
|
||||||
if not success:
|
if not success:
|
||||||
err_results.append(project.relpath)
|
err_results.append(project.relpath)
|
||||||
if opt.fail_fast:
|
if opt.fail_fast:
|
||||||
@ -894,7 +856,9 @@ later is required to fix a server side protocol bug.
|
|||||||
to_fetch.extend(all_projects)
|
to_fetch.extend(all_projects)
|
||||||
to_fetch.sort(key=self._fetch_times.Get, reverse=True)
|
to_fetch.sort(key=self._fetch_times.Get, reverse=True)
|
||||||
|
|
||||||
fetched = self._Fetch(to_fetch, opt, err_event)
|
success, fetched = self._Fetch(to_fetch, opt, err_event)
|
||||||
|
if not success:
|
||||||
|
err_event.set()
|
||||||
|
|
||||||
_PostRepoFetch(rp, opt.repo_verify)
|
_PostRepoFetch(rp, opt.repo_verify)
|
||||||
if opt.network_only:
|
if opt.network_only:
|
||||||
@ -923,7 +887,10 @@ later is required to fix a server side protocol bug.
|
|||||||
if previously_missing_set == missing_set:
|
if previously_missing_set == missing_set:
|
||||||
break
|
break
|
||||||
previously_missing_set = missing_set
|
previously_missing_set = missing_set
|
||||||
fetched.update(self._Fetch(missing, opt, err_event))
|
success, new_fetched = self._Fetch(to_fetch, opt, err_event)
|
||||||
|
if not success:
|
||||||
|
err_event.set()
|
||||||
|
fetched.update(new_fetched)
|
||||||
|
|
||||||
# If we saw an error, exit with code 1 so that other scripts can check.
|
# If we saw an error, exit with code 1 so that other scripts can check.
|
||||||
if err_event.is_set():
|
if err_event.is_set():
|
||||||
|
Loading…
Reference in New Issue
Block a user