sync: Show elapsed time for the longest syncing project

"Last synced: X" is printed only after a project finishes syncing.
Replace that with a message that shows the longest actively syncing
project.

Bug: https://crbug.com/gerrit/11293
Change-Id: I84c7873539d84999772cd554f426b44921521e85
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/372674
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Joanna Wang <jojwang@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
This commit is contained in:
Gavin Mak 2023-05-04 04:48:43 +00:00 committed by LUCI
parent 131fc96381
commit 551285fa35
5 changed files with 70 additions and 26 deletions

View File

@ -82,10 +82,10 @@ class Progress(object):
title, title,
total=0, total=0,
units="", units="",
print_newline=False,
delay=True, delay=True,
quiet=False, quiet=False,
show_elapsed=False, show_elapsed=False,
elide=False,
): ):
self._title = title self._title = title
self._total = total self._total = total
@ -93,7 +93,7 @@ class Progress(object):
self._start = time.time() self._start = time.time()
self._show = not delay self._show = not delay
self._units = units self._units = units
self._print_newline = print_newline self._elide = elide
# Only show the active jobs section if we run more than one in parallel. # Only show the active jobs section if we run more than one in parallel.
self._show_jobs = False self._show_jobs = False
self._active = 0 self._active = 0
@ -118,10 +118,18 @@ class Progress(object):
def _update_loop(self): def _update_loop(self):
while True: while True:
if self._update_event.is_set(): self.update(inc=0)
if self._update_event.wait(timeout=1):
return return
self.update(inc=0, msg=self._last_msg)
time.sleep(1) def _write(self, s):
s = "\r" + s
if self._elide:
col = os.get_terminal_size().columns
if len(s) > col:
s = s[: col - 1] + ".."
sys.stderr.write(s)
sys.stderr.flush()
def start(self, name): def start(self, name):
self._active += 1 self._active += 1
@ -133,8 +141,16 @@ class Progress(object):
self.update(msg="finished " + name) self.update(msg="finished " + name)
self._active -= 1 self._active -= 1
def update(self, inc=1, msg=""): def update(self, inc=1, msg=None):
"""Updates the progress indicator.
Args:
inc: The number of items completed.
msg: The message to display. If None, use the last message.
"""
self._done += inc self._done += inc
if msg is None:
msg = self._last_msg
self._last_msg = msg self._last_msg = msg
if _NOT_TTY or IsTraceToStderr(): if _NOT_TTY or IsTraceToStderr():
@ -148,10 +164,9 @@ class Progress(object):
return return
if self._total <= 0: if self._total <= 0:
sys.stderr.write( self._write(
"\r%s: %d,%s" % (self._title, self._done, CSI_ERASE_LINE_AFTER) "%s: %d,%s" % (self._title, self._done, CSI_ERASE_LINE_AFTER)
) )
sys.stderr.flush()
else: else:
p = (100 * self._done) / self._total p = (100 * self._done) / self._total
if self._show_jobs: if self._show_jobs:
@ -165,8 +180,8 @@ class Progress(object):
elapsed = f" {elapsed_str(elapsed_sec)} |" elapsed = f" {elapsed_str(elapsed_sec)} |"
else: else:
elapsed = "" elapsed = ""
sys.stderr.write( self._write(
"\r%s: %2d%% %s(%d%s/%d%s)%s %s%s%s" "%s: %2d%% %s(%d%s/%d%s)%s %s%s"
% ( % (
self._title, self._title,
p, p,
@ -178,10 +193,8 @@ class Progress(object):
elapsed, elapsed,
msg, msg,
CSI_ERASE_LINE_AFTER, CSI_ERASE_LINE_AFTER,
"\n" if self._print_newline else "",
) )
) )
sys.stderr.flush()
def end(self): def end(self):
self._update_event.set() self._update_event.set()
@ -190,15 +203,14 @@ class Progress(object):
duration = duration_str(time.time() - self._start) duration = duration_str(time.time() - self._start)
if self._total <= 0: if self._total <= 0:
sys.stderr.write( self._write(
"\r%s: %d, done in %s%s\n" "%s: %d, done in %s%s\n"
% (self._title, self._done, duration, CSI_ERASE_LINE_AFTER) % (self._title, self._done, duration, CSI_ERASE_LINE_AFTER)
) )
sys.stderr.flush()
else: else:
p = (100 * self._done) / self._total p = (100 * self._done) / self._total
sys.stderr.write( self._write(
"\r%s: %3d%% (%d%s/%d%s), done in %s%s\n" "%s: %3d%% (%d%s/%d%s), done in %s%s\n"
% ( % (
self._title, self._title,
p, p,
@ -210,4 +222,3 @@ class Progress(object):
CSI_ERASE_LINE_AFTER, CSI_ERASE_LINE_AFTER,
) )
) )
sys.stderr.flush()

View File

@ -90,7 +90,7 @@ It is equivalent to "git branch -D <branchname>".
success[branch].append(project) success[branch].append(project)
else: else:
err[branch].append(project) err[branch].append(project)
pm.update() pm.update(msg="")
self.ExecuteInParallel( self.ExecuteInParallel(
opt.jobs, opt.jobs,

View File

@ -58,7 +58,7 @@ The command is equivalent to:
success.append(project) success.append(project)
else: else:
err.append(project) err.append(project)
pm.update() pm.update(msg="")
self.ExecuteInParallel( self.ExecuteInParallel(
opt.jobs, opt.jobs,

View File

@ -142,14 +142,14 @@ revision specified in the manifest.
sync_buf = SyncBuffer(self.manifest.manifestProject.config) sync_buf = SyncBuffer(self.manifest.manifestProject.config)
project.Sync_LocalHalf(sync_buf) project.Sync_LocalHalf(sync_buf)
project.revisionId = gitc_project.old_revision project.revisionId = gitc_project.old_revision
pm.update() pm.update(msg="")
pm.end() pm.end()
def _ProcessResults(_pool, pm, results): def _ProcessResults(_pool, pm, results):
for result, project in results: for result, project in results:
if not result: if not result:
err.append(project) err.append(project)
pm.update() pm.update(msg="")
self.ExecuteInParallel( self.ExecuteInParallel(
opt.jobs, opt.jobs,

View File

@ -66,7 +66,7 @@ from command import (
from error import RepoChangedException, GitError from error import RepoChangedException, GitError
import platform_utils import platform_utils
from project import SyncBuffer from project import SyncBuffer
from progress import Progress from progress import Progress, elapsed_str
from repo_trace import Trace from repo_trace import Trace
import ssh import ssh
from wrapper import Wrapper from wrapper import Wrapper
@ -596,7 +596,7 @@ later is required to fix a server side protocol bug.
The projects we're given share the same underlying git object store, so The projects we're given share the same underlying git object store, so
we have to fetch them in serial. we have to fetch them in serial.
Delegates most of the work to _FetchHelper. Delegates most of the work to _FetchOne.
Args: Args:
opt: Program options returned from optparse. See _Options(). opt: Program options returned from optparse. See _Options().
@ -615,6 +615,8 @@ later is required to fix a server side protocol bug.
Whether the fetch was successful. Whether the fetch was successful.
""" """
start = time.time() start = time.time()
k = f"{project.name} @ {project.relpath}"
self._sync_dict[k] = start
success = False success = False
remote_fetched = False remote_fetched = False
buf = io.StringIO() buf = io.StringIO()
@ -660,15 +662,31 @@ later is required to fix a server side protocol bug.
% (project.name, type(e).__name__, str(e)), % (project.name, type(e).__name__, str(e)),
file=sys.stderr, file=sys.stderr,
) )
del self._sync_dict[k]
raise raise
finish = time.time() finish = time.time()
del self._sync_dict[k]
return _FetchOneResult(success, project, start, finish, remote_fetched) return _FetchOneResult(success, project, start, finish, remote_fetched)
@classmethod @classmethod
def _FetchInitChild(cls, ssh_proxy): def _FetchInitChild(cls, ssh_proxy):
cls.ssh_proxy = ssh_proxy cls.ssh_proxy = ssh_proxy
def _GetLongestSyncMessage(self):
if len(self._sync_dict) == 0:
return None
earliest_time = float("inf")
earliest_proj = None
for project, t in self._sync_dict.items():
if t < earliest_time:
earliest_time = t
earliest_proj = project
elapsed = time.time() - earliest_time
return f"{elapsed_str(elapsed)} {earliest_proj}"
def _Fetch(self, projects, opt, err_event, ssh_proxy): def _Fetch(self, projects, opt, err_event, ssh_proxy):
ret = True ret = True
@ -681,8 +699,22 @@ later is required to fix a server side protocol bug.
delay=False, delay=False,
quiet=opt.quiet, quiet=opt.quiet,
show_elapsed=True, show_elapsed=True,
elide=True,
) )
self._sync_dict = multiprocessing.Manager().dict()
sync_event = _threading.Event()
def _MonitorSyncLoop():
while True:
pm.update(inc=0, msg=self._GetLongestSyncMessage())
if sync_event.wait(timeout=1):
return
sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
sync_progress_thread.daemon = True
sync_progress_thread.start()
objdir_project_map = dict() objdir_project_map = dict()
for project in projects: for project in projects:
objdir_project_map.setdefault(project.objdir, []).append(project) objdir_project_map.setdefault(project.objdir, []).append(project)
@ -712,7 +744,7 @@ later is required to fix a server side protocol bug.
ret = False ret = False
else: else:
fetched.add(project.gitdir) fetched.add(project.gitdir)
pm.update(msg=f"Last synced: {project.name}") pm.update()
if not ret and opt.fail_fast: if not ret and opt.fail_fast:
break break
return ret return ret
@ -764,6 +796,7 @@ later is required to fix a server side protocol bug.
# crash. # crash.
del Sync.ssh_proxy del Sync.ssh_proxy
sync_event.set()
pm.end() pm.end()
self._fetch_times.Save() self._fetch_times.Save()