diff --git a/subcmds/sync.py b/subcmds/sync.py index d1b631ae..0db96b54 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools import http.cookiejar as cookielib import io import json +import multiprocessing import netrc from optparse import SUPPRESS_HELP import os @@ -56,7 +58,7 @@ import git_superproject import gitc_utils from project import Project from project import RemoteSpec -from command import Command, MirrorSafeCommand +from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE from error import RepoChangedException, GitError, ManifestParseError import platform_utils from project import SyncBuffer @@ -71,10 +73,6 @@ class _FetchError(Exception): """Internal error thrown in _FetchHelper() when we don't want stack trace.""" -class _CheckoutError(Exception): - """Internal error thrown in _CheckoutOne() when we don't want stack trace.""" - - class Sync(Command, MirrorSafeCommand): jobs = 1 common = True @@ -457,149 +455,80 @@ later is required to fix a server side protocol bug. return fetched - def _CheckoutWorker(self, opt, sem, project, *args, **kwargs): - """Main function of the fetch threads. - - Delegates most of the work to _CheckoutOne. - - Args: - opt: Program options returned from optparse. See _Options(). - projects: Projects to fetch. - sem: We'll release() this semaphore when we exit so that another thread - can be started up. - *args, **kwargs: Remaining arguments to pass to _CheckoutOne. See the - _CheckoutOne docstring for details. - """ - try: - return self._CheckoutOne(opt, project, *args, **kwargs) - finally: - sem.release() - - def _CheckoutOne(self, opt, project, lock, pm, err_event, err_results): + def _CheckoutOne(self, opt, project): """Checkout work tree for one project Args: opt: Program options returned from optparse. See _Options(). project: Project object for the project to checkout. - lock: Lock for accessing objects that are shared amongst multiple - _CheckoutWorker() threads. - pm: Instance of a Project object. We will call pm.update() (with our - lock held). - err_event: We'll set this event in the case of an error (after printing - out info about the error). - err_results: A list of strings, paths to git repos where checkout - failed. Returns: Whether the fetch was successful. """ - # We'll set to true once we've locked the lock. - did_lock = False - - # Encapsulate everything in a try/except/finally so that: - # - We always set err_event in the case of an exception. - # - We always make sure we unlock the lock if we locked it. start = time.time() syncbuf = SyncBuffer(self.manifest.manifestProject.config, detach_head=opt.detach_head) success = False - with lock: - pm.start(project.name) try: - try: - project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) + project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) + success = syncbuf.Finish() + except Exception as e: + print('error: Cannot checkout %s: %s: %s' % + (project.name, type(e).__name__, str(e)), + file=sys.stderr) + raise - # Lock around all the rest of the code, since printing, updating a set - # and Progress.update() are not thread safe. - lock.acquire() - success = syncbuf.Finish() - did_lock = True + if not success: + print('error: Cannot checkout %s' % (project.name), file=sys.stderr) + finish = time.time() + return (success, project, start, finish) - if not success: - err_event.set() - print('error: Cannot checkout %s' % (project.name), - file=sys.stderr) - raise _CheckoutError() - except _CheckoutError: - pass - except Exception as e: - print('error: Cannot checkout %s: %s: %s' % - (project.name, type(e).__name__, str(e)), - file=sys.stderr) - err_event.set() - raise - finally: - if not did_lock: - lock.acquire() - if not success: - err_results.append(project.relpath) - pm.finish(project.name) - lock.release() - finish = time.time() - self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, - start, finish, success) - - return success - - def _Checkout(self, all_projects, opt, err_event, err_results): + def _Checkout(self, all_projects, opt, err_results): """Checkout projects listed in all_projects Args: all_projects: List of all projects that should be checked out. opt: Program options returned from optparse. See _Options(). - err_event: We'll set this event in the case of an error (after printing - out info about the error). - err_results: A list of strings, paths to git repos where checkout - failed. + err_results: A list of strings, paths to git repos where checkout failed. """ + ret = True - # Perform checkouts in multiple threads when we are using partial clone. - # Without partial clone, all needed git objects are already downloaded, - # in this situation it's better to use only one process because the checkout - # would be mostly disk I/O; with partial clone, the objects are only - # downloaded when demanded (at checkout time), which is similar to the - # Sync_NetworkHalf case and parallelism would be helpful. - if self.manifest.CloneFilter: - syncjobs = self.jobs - else: - syncjobs = 1 + # Only checkout projects with worktrees. + all_projects = [x for x in all_projects if x.worktree] - lock = _threading.Lock() pm = Progress('Checking out', len(all_projects)) - threads = set() - sem = _threading.Semaphore(syncjobs) + def _ProcessResults(results): + for (success, project, start, finish) in results: + self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, + start, finish, success) + # Check for any errors before running any more tasks. + # ...we'll let existing threads finish, though. + if not success: + err_results.append(project.relpath) + if opt.fail_fast: + return False + pm.update(msg=project.name) + return True - for project in all_projects: - # Check for any errors before running any more tasks. - # ...we'll let existing threads finish, though. - if err_event.is_set() and opt.fail_fast: - break - - sem.acquire() - if project.worktree: - kwargs = dict(opt=opt, - sem=sem, - project=project, - lock=lock, - pm=pm, - err_event=err_event, - err_results=err_results) - if syncjobs > 1: - t = _threading.Thread(target=self._CheckoutWorker, - kwargs=kwargs) - # Ensure that Ctrl-C will not freeze the repo process. - t.daemon = True - threads.add(t) - t.start() - else: - self._CheckoutWorker(**kwargs) - - for t in threads: - t.join() + # NB: Multiprocessing is heavy, so don't spin it up for one job. + if len(all_projects) == 1 or opt.jobs == 1: + if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects): + ret = False + else: + with multiprocessing.Pool(opt.jobs) as pool: + results = pool.imap_unordered( + functools.partial(self._CheckoutOne, opt), + all_projects, + chunksize=WORKER_BATCH_SIZE) + if not _ProcessResults(results): + ret = False + pool.close() pm.end() + return ret + def _GCProjects(self, projects, opt, err_event): gc_gitdirs = {} for project in projects: @@ -946,7 +875,6 @@ later is required to fix a server side protocol bug. err_network_sync = False err_update_projects = False - err_checkout = False self._fetch_times = _FetchTimes(self.manifest) if not opt.local_only: @@ -1011,10 +939,10 @@ later is required to fix a server side protocol bug. sys.exit(1) err_results = [] - self._Checkout(all_projects, opt, err_event, err_results) - if err_event.is_set(): - err_checkout = True - # NB: We don't exit here because this is the last step. + # NB: We don't exit here because this is the last step. + err_checkout = not self._Checkout(all_projects, opt, err_results) + if err_checkout: + err_event.set() # If there's a notice that's supposed to print at the end of the sync, print # it now...