diff --git a/subcmds/sync.py b/subcmds/sync.py index 36ef16db..eac0556d 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -39,6 +39,10 @@ from project import R_HEADS from project import SyncBuffer from progress import Progress +class _FetchError(Exception): + """Internal error thrown in _FetchHelper() when we don't want stack trace.""" + pass + class Sync(Command, MirrorSafeCommand): jobs = 1 common = True @@ -135,20 +139,60 @@ later is required to fix a server side protocol bug. dest='repo_upgraded', action='store_true', help=SUPPRESS_HELP) - def _FetchHelper(self, opt, project, lock, fetched, pm, sem): - if not project.Sync_NetworkHalf(quiet=opt.quiet): - print >>sys.stderr, 'error: Cannot fetch %s' % project.name - if opt.force_broken: - print >>sys.stderr, 'warn: --force-broken, continuing to sync' - else: - sem.release() - sys.exit(1) + def _FetchHelper(self, opt, project, lock, fetched, pm, sem, err_event): + """Main function of the fetch threads when jobs are > 1. - lock.acquire() - fetched.add(project.gitdir) - pm.update() - lock.release() - sem.release() + Args: + opt: Program options returned from optparse. See _Options(). + project: Project object for the project to fetch. + lock: Lock for accessing objects that are shared amongst multiple + _FetchHelper() threads. + fetched: set object that we will add project.gitdir to when we're done + (with our lock held). + pm: Instance of a Project object. We will call pm.update() (with our + lock held). + sem: We'll release() this semaphore when we exit so that another thread + can be started up. + err_event: We'll set this event in the case of an error (after printing + out info about the error). + """ + # We'll set to true once we've locked the lock. + did_lock = False + + # Encapsulate everything in a try/except/finally so that: + # - We always set err_event in the case of an exception. + # - We always make sure we call sem.release(). + # - We always make sure we unlock the lock if we locked it. + try: + success = project.Sync_NetworkHalf(quiet=opt.quiet) + + # Lock around all the rest of the code, since printing, updating a set + # and Progress.update() are not thread safe. + lock.acquire() + did_lock = True + + if not success: + print >>sys.stderr, 'error: Cannot fetch %s' % project.name + if opt.force_broken: + print >>sys.stderr, 'warn: --force-broken, continuing to sync' + else: + raise _FetchError() + + fetched.add(project.gitdir) + pm.update() + except BaseException, e: + # Notify the _Fetch() function about all errors. + err_event.set() + + # If we got our own _FetchError, we don't want a stack trace. + # However, if we got something else (something in Sync_NetworkHalf?), + # we'd like one (so re-raise after we've set err_event). + if not isinstance(e, _FetchError): + raise + finally: + if did_lock: + lock.release() + sem.release() def _Fetch(self, projects, opt): fetched = set() @@ -169,7 +213,13 @@ later is required to fix a server side protocol bug. threads = set() lock = _threading.Lock() sem = _threading.Semaphore(self.jobs) + err_event = _threading.Event() for project in projects: + # Check for any errors before starting any new threads. + # ...we'll let existing threads finish, though. + if err_event.is_set(): + break + sem.acquire() t = _threading.Thread(target = self._FetchHelper, args = (opt, @@ -177,13 +227,19 @@ later is required to fix a server side protocol bug. lock, fetched, pm, - sem)) + sem, + err_event)) threads.add(t) t.start() for t in threads: t.join() + # If we saw an error, exit with code 1 so that other scripts can check. + if err_event.is_set(): + print >>sys.stderr, '\nerror: Exited sync due to fetch errors' + sys.exit(1) + pm.end() for project in projects: project.bare_git.gc('--auto')