sync: Implement --interleaved sync worker

For each assigned project, the worker sequentially calls
Sync_NetworkHalf and Sync_LocalHalf, respecting --local-only and
--network-only flags. To prevent scrambled progress bars, all stderr
output from the checkout phase is captured (shown with --verbose).
Result objects now carry status and timing information from the worker
for state updates.

Bug: 421935613
Change-Id: I398602e08a375e974a8914e5fa48ffae673dda9b
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483301
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
This commit is contained in:
Gavin Mak
2025-06-13 17:53:38 -07:00
committed by LUCI
parent b4b323a8bd
commit 7b6ffed4ae
3 changed files with 483 additions and 114 deletions

View File

@ -13,6 +13,7 @@
# limitations under the License.
import collections
import contextlib
import functools
import http.cookiejar as cookielib
import io
@ -198,33 +199,41 @@ class _SyncResult(NamedTuple):
"""Individual project sync result for interleaved mode.
Attributes:
project_index (int): The index of the project in the shared list.
relpath (str): The project's relative path from the repo client top.
remote_fetched (bool): True if the remote was actually queried.
fetch_success (bool): True if the fetch operation was successful.
checkout_success (bool): True if the checkout operation was
successful.
fetch_error (Optional[Exception]): The Exception from a failed fetch,
or None.
checkout_error (Optional[Exception]): The Exception from a failed
checkout, or None.
fetch_start (Optional[float]): The time.time() when fetch started.
fetch_finish (Optional[float]): The time.time() when fetch finished.
checkout_success (bool): True if the checkout operation was
successful.
checkout_error (Optional[Exception]): The Exception from a failed
checkout, or None.
checkout_start (Optional[float]): The time.time() when checkout
started.
checkout_finish (Optional[float]): The time.time() when checkout
finished.
stderr_text (str): The combined output from both fetch and checkout.
"""
project_index: int
relpath: str
fetch_success: bool
checkout_success: bool
fetch_error: Optional[Exception]
checkout_error: Optional[Exception]
remote_fetched: bool
fetch_success: bool
fetch_error: Optional[Exception]
fetch_start: Optional[float]
fetch_finish: Optional[float]
checkout_success: bool
checkout_error: Optional[Exception]
checkout_start: Optional[float]
checkout_finish: Optional[float]
stderr_text: str
class _InterleavedSyncResult(NamedTuple):
"""Result of an interleaved sync.
@ -996,6 +1005,7 @@ later is required to fix a server side protocol bug.
err_event.set()
# Call self update, unless requested not to
# TODO(b/42193561): Extract repo update logic to ExecuteHelper.
if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0":
_PostRepoFetch(rp, opt.repo_verify)
if opt.network_only:
@ -1176,6 +1186,16 @@ later is required to fix a server side protocol bug.
self._local_sync_state.Save()
return proc_res and not err_results
def _PrintManifestNotices(self, opt):
"""Print all manifest notices, but only once."""
printed_notices = set()
# Print all manifest notices, but only once.
# Sort by path_prefix to ensure consistent ordering.
for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix):
if m.notice and m.notice not in printed_notices:
print(m.notice)
printed_notices.add(m.notice)
@staticmethod
def _GetPreciousObjectsState(project: Project, opt):
"""Get the preciousObjects state for the project.
@ -2032,14 +2052,7 @@ later is required to fix a server side protocol bug.
if err_checkout:
err_event.set()
printed_notices = set()
# If there's a notice that's supposed to print at the end of the sync,
# print it now... But avoid printing duplicate messages, and preserve
# order.
for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix):
if m.notice and m.notice not in printed_notices:
print(m.notice)
printed_notices.add(m.notice)
self._PrintManifestNotices(opt)
# If we saw an error, exit with code 1 so that other scripts can check.
if err_event.is_set():
@ -2067,6 +2080,139 @@ later is required to fix a server side protocol bug.
)
raise SyncError(aggregate_errors=errors)
@classmethod
def _SyncOneProject(cls, opt, project_index, project) -> _SyncResult:
"""Syncs a single project for interleaved sync."""
fetch_success = False
remote_fetched = False
fetch_error = None
fetch_start = None
fetch_finish = None
network_output = ""
if opt.local_only:
fetch_success = True
else:
fetch_start = time.time()
network_output_capture = io.StringIO()
try:
ssh_proxy = cls.get_parallel_context().get("ssh_proxy")
sync_result = project.Sync_NetworkHalf(
quiet=opt.quiet,
verbose=opt.verbose,
output_redir=network_output_capture,
current_branch_only=cls._GetCurrentBranchOnly(
opt, project.manifest
),
force_sync=opt.force_sync,
clone_bundle=opt.clone_bundle,
tags=opt.tags,
archive=project.manifest.IsArchive,
optimized_fetch=opt.optimized_fetch,
retry_fetches=opt.retry_fetches,
prune=opt.prune,
ssh_proxy=ssh_proxy,
clone_filter=project.manifest.CloneFilter,
partial_clone_exclude=project.manifest.PartialCloneExclude,
clone_filter_for_depth=project.manifest.CloneFilterForDepth,
)
fetch_success = sync_result.success
remote_fetched = sync_result.remote_fetched
fetch_error = sync_result.error
except KeyboardInterrupt:
logger.error(
"Keyboard interrupt while processing %s", project.name
)
except GitError as e:
fetch_error = e
logger.error("error.GitError: Cannot fetch %s", e)
except Exception as e:
fetch_error = e
logger.error(
"error: Cannot fetch %s (%s: %s)",
project.name,
type(e).__name__,
e,
)
finally:
fetch_finish = time.time()
network_output = network_output_capture.getvalue()
checkout_success = False
checkout_error = None
checkout_start = None
checkout_finish = None
checkout_stderr = ""
if fetch_success and not opt.network_only:
checkout_start = time.time()
stderr_capture = io.StringIO()
try:
with contextlib.redirect_stderr(stderr_capture):
syncbuf = SyncBuffer(
project.manifest.manifestProject.config,
detach_head=opt.detach_head,
)
local_half_errors = []
project.Sync_LocalHalf(
syncbuf,
force_sync=opt.force_sync,
force_checkout=opt.force_checkout,
force_rebase=opt.rebase,
errors=local_half_errors,
verbose=opt.verbose,
)
checkout_success = syncbuf.Finish()
if local_half_errors:
checkout_error = SyncError(
aggregate_errors=local_half_errors
)
except KeyboardInterrupt:
logger.error(
"Keyboard interrupt while processing %s", project.name
)
except GitError as e:
checkout_error = e
logger.error(
"error.GitError: Cannot checkout %s: %s", project.name, e
)
except Exception as e:
checkout_error = e
logger.error(
"error: Cannot checkout %s: %s: %s",
project.name,
type(e).__name__,
e,
)
finally:
checkout_finish = time.time()
checkout_stderr = stderr_capture.getvalue()
elif fetch_success:
checkout_success = True
# Consolidate all captured output.
captured_parts = []
if network_output:
captured_parts.append(network_output)
if checkout_stderr:
captured_parts.append(checkout_stderr)
stderr_text = "\n".join(captured_parts)
return _SyncResult(
project_index=project_index,
relpath=project.relpath,
fetch_success=fetch_success,
remote_fetched=remote_fetched,
checkout_success=checkout_success,
fetch_error=fetch_error,
checkout_error=checkout_error,
stderr_text=stderr_text.strip(),
fetch_start=fetch_start,
fetch_finish=fetch_finish,
checkout_start=checkout_start,
checkout_finish=checkout_finish,
)
@classmethod
def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult:
"""Worker for interleaved sync.
@ -2092,27 +2238,12 @@ later is required to fix a server side protocol bug.
# Use the first project as the representative for the progress bar.
first_project = projects[project_indices[0]]
key = f"{first_project.name} @ {first_project.relpath}"
start_time = time.time()
sync_dict[key] = start_time
sync_dict[key] = time.time()
try:
for idx in project_indices:
project = projects[idx]
# For now, simulate a successful sync.
# TODO(b/421935613): Perform the actual git fetch and checkout.
results.append(
_SyncResult(
relpath=project.relpath,
fetch_success=True,
checkout_success=True,
fetch_error=None,
checkout_error=None,
fetch_start=None,
fetch_finish=None,
checkout_start=None,
checkout_finish=None,
)
)
results.append(cls._SyncOneProject(opt, idx, project))
finally:
del sync_dict[key]
@ -2130,9 +2261,39 @@ later is required to fix a server side protocol bug.
):
"""Callback to process results from interleaved sync workers."""
ret = True
projects = self.get_parallel_context()["projects"]
for result_group in results_sets:
for result in result_group.results:
pm.update()
project = projects[result.project_index]
if opt.verbose and result.stderr_text:
pm.display_message(result.stderr_text)
if result.fetch_start:
self._fetch_times.Set(
project,
result.fetch_finish - result.fetch_start,
)
self._local_sync_state.SetFetchTime(project)
self.event_log.AddSync(
project,
event_log.TASK_SYNC_NETWORK,
result.fetch_start,
result.fetch_finish,
result.fetch_success,
)
if result.checkout_start:
if result.checkout_success:
self._local_sync_state.SetCheckoutTime(project)
self.event_log.AddSync(
project,
event_log.TASK_SYNC_LOCAL,
result.checkout_start,
result.checkout_finish,
result.checkout_success,
)
if result.fetch_success and result.checkout_success:
synced_relpaths.add(result.relpath)
else:
@ -2188,96 +2349,110 @@ later is required to fix a server side protocol bug.
sync_event = _threading.Event()
sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
with self.ParallelContext():
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
sync_progress_thread.start()
with multiprocessing.Manager() as manager, ssh.ProxyManager(
manager
) as ssh_proxy:
ssh_proxy.sock()
with self.ParallelContext():
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
sync_progress_thread.start()
try:
# Outer loop for dynamic project discovery (e.g., submodules).
# It continues until no unsynced projects remain.
while True:
projects_to_sync = [
p
for p in project_list
if p.relpath not in synced_relpaths
]
if not projects_to_sync:
break
try:
# Outer loop for dynamic project discovery. This continues
# until no unsynced projects remain.
while True:
projects_to_sync = [
p
for p in project_list
if p.relpath not in synced_relpaths
]
if not projects_to_sync:
break
pending_relpaths = {p.relpath for p in projects_to_sync}
if previously_pending_relpaths == pending_relpaths:
logger.error(
"Stall detected in interleaved sync, not all "
"projects could be synced."
)
err_event.set()
break
previously_pending_relpaths = pending_relpaths
# Update the projects list for workers in the current pass.
self.get_parallel_context()["projects"] = projects_to_sync
project_index_map = {
p: i for i, p in enumerate(projects_to_sync)
}
# Inner loop to process projects in a hierarchical order.
# This iterates through levels of project dependencies (e.g.
# 'foo' then 'foo/bar'). All projects in one level can be
# processed in parallel, but we must wait for a level to
# complete before starting the next.
for level_projects in _SafeCheckoutOrder(projects_to_sync):
if not level_projects:
continue
objdir_project_map = collections.defaultdict(list)
for p in level_projects:
objdir_project_map[p.objdir].append(
project_index_map[p]
pending_relpaths = {p.relpath for p in projects_to_sync}
if previously_pending_relpaths == pending_relpaths:
logger.error(
"Stall detected in interleaved sync, not all "
"projects could be synced."
)
work_items = list(objdir_project_map.values())
if not work_items:
continue
jobs = max(1, min(opt.jobs, len(work_items)))
callback = functools.partial(
self._ProcessSyncInterleavedResults,
synced_relpaths,
err_event,
errors,
opt,
)
if not self.ExecuteInParallel(
jobs,
functools.partial(self._SyncProjectList, opt),
work_items,
callback=callback,
output=pm,
chunksize=1,
):
err_event.set()
break
previously_pending_relpaths = pending_relpaths
if err_event.is_set() and opt.fail_fast:
raise SyncFailFastError(aggregate_errors=errors)
self.get_parallel_context()[
"projects"
] = projects_to_sync
project_index_map = {
p: i for i, p in enumerate(projects_to_sync)
}
self._ReloadManifest(None, manifest)
project_list = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
finally:
sync_event.set()
sync_progress_thread.join()
# Inner loop to process projects in a hierarchical
# order. This iterates through levels of project
# dependencies (e.g. 'foo' then 'foo/bar'). All projects
# in one level can be processed in parallel, but we must
# wait for a level to complete before starting the next.
for level_projects in _SafeCheckoutOrder(
projects_to_sync
):
if not level_projects:
continue
objdir_project_map = collections.defaultdict(list)
for p in level_projects:
objdir_project_map[p.objdir].append(
project_index_map[p]
)
work_items = list(objdir_project_map.values())
if not work_items:
continue
jobs = max(1, min(opt.jobs, len(work_items)))
callback = functools.partial(
self._ProcessSyncInterleavedResults,
synced_relpaths,
err_event,
errors,
opt,
)
if not self.ExecuteInParallel(
jobs,
functools.partial(self._SyncProjectList, opt),
work_items,
callback=callback,
output=pm,
chunksize=1,
):
err_event.set()
if err_event.is_set() and opt.fail_fast:
raise SyncFailFastError(aggregate_errors=errors)
self._ReloadManifest(None, manifest)
project_list = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
finally:
sync_event.set()
sync_progress_thread.join()
pm.end()
# TODO(b/421935613): Add the manifest loop block from PhasedSync.
if not self.outer_client.manifest.IsArchive:
self._GCProjects(project_list, opt, err_event)
self._PrintManifestNotices(opt)
if err_event.is_set():
# TODO(b/421935613): Log errors better like SyncPhased.
logger.error(
"error: Unable to fully sync the tree in interleaved mode."
)