sync: Add orchestration logic for --interleaved

Introduce the parallel orchestration framework for `repo sync
--interleaved`.

The new logic respects project dependencies by processing them in
hierarchical levels. Projects sharing a git object directory are grouped
and processed serially. Also reuse the familiar fetch progress bar UX.

Bug: 421935613
Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281
Commit-Queue: Gavin Mak <gavinmak@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
This commit is contained in:
Gavin Mak
2025-06-17 10:54:41 -07:00
committed by LUCI
parent f91f4462e6
commit b4b323a8bd
2 changed files with 398 additions and 21 deletions

View File

@ -25,7 +25,7 @@ from pathlib import Path
import sys import sys
import tempfile import tempfile
import time import time
from typing import List, NamedTuple, Set, Union from typing import List, NamedTuple, Optional, Set, Union
import urllib.error import urllib.error
import urllib.parse import urllib.parse
import urllib.request import urllib.request
@ -194,6 +194,49 @@ class _CheckoutOneResult(NamedTuple):
finish: float finish: float
class _SyncResult(NamedTuple):
"""Individual project sync result for interleaved mode.
Attributes:
relpath (str): The project's relative path from the repo client top.
fetch_success (bool): True if the fetch operation was successful.
checkout_success (bool): True if the checkout operation was
successful.
fetch_error (Optional[Exception]): The Exception from a failed fetch,
or None.
checkout_error (Optional[Exception]): The Exception from a failed
checkout, or None.
fetch_start (Optional[float]): The time.time() when fetch started.
fetch_finish (Optional[float]): The time.time() when fetch finished.
checkout_start (Optional[float]): The time.time() when checkout
started.
checkout_finish (Optional[float]): The time.time() when checkout
finished.
"""
relpath: str
fetch_success: bool
checkout_success: bool
fetch_error: Optional[Exception]
checkout_error: Optional[Exception]
fetch_start: Optional[float]
fetch_finish: Optional[float]
checkout_start: Optional[float]
checkout_finish: Optional[float]
class _InterleavedSyncResult(NamedTuple):
"""Result of an interleaved sync.
Attributes:
results (List[_SyncResult]): A list of results, one for each project
processed. Empty if the worker failed before creating results.
"""
results: List[_SyncResult]
class SuperprojectError(SyncError): class SuperprojectError(SyncError):
"""Superproject sync repo.""" """Superproject sync repo."""
@ -837,15 +880,7 @@ later is required to fix a server side protocol bug.
) )
sync_event = _threading.Event() sync_event = _threading.Event()
sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
def _MonitorSyncLoop():
while True:
pm.update(inc=0, msg=self._GetSyncProgressMessage())
if sync_event.wait(timeout=1):
return
sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
sync_progress_thread.daemon = True
def _ProcessResults(pool, pm, results_sets): def _ProcessResults(pool, pm, results_sets):
ret = True ret = True
@ -1828,6 +1863,16 @@ later is required to fix a server side protocol bug.
all_manifests=not opt.this_manifest_only, all_manifests=not opt.this_manifest_only,
) )
# Log the repo projects by existing and new.
existing = [x for x in all_projects if x.Exists]
mp.config.SetString("repo.existingprojectcount", str(len(existing)))
mp.config.SetString(
"repo.newprojectcount", str(len(all_projects) - len(existing))
)
self._fetch_times = _FetchTimes(manifest)
self._local_sync_state = LocalSyncState(manifest)
if opt.interleaved: if opt.interleaved:
sync_method = self._SyncInterleaved sync_method = self._SyncInterleaved
else: else:
@ -1864,6 +1909,34 @@ later is required to fix a server side protocol bug.
if not opt.quiet: if not opt.quiet:
print("repo sync has finished successfully.") print("repo sync has finished successfully.")
def _CreateSyncProgressThread(
self, pm: Progress, stop_event: _threading.Event
) -> _threading.Thread:
"""Creates and returns a daemon thread to update a Progress object.
The returned thread is not yet started. The thread will periodically
update the progress bar with information from _GetSyncProgressMessage
until the stop_event is set.
Args:
pm: The Progress object to update.
stop_event: The threading.Event to signal the monitor to stop.
Returns:
The configured _threading.Thread object.
"""
def _monitor_loop():
"""The target function for the monitor thread."""
while True:
# Update the progress bar with the current status message.
pm.update(inc=0, msg=self._GetSyncProgressMessage())
# Wait for 1 second or until the stop_event is set.
if stop_event.wait(timeout=1):
return
return _threading.Thread(target=_monitor_loop, daemon=True)
def _SyncPhased( def _SyncPhased(
self, self,
opt, opt,
@ -1890,15 +1963,6 @@ later is required to fix a server side protocol bug.
err_update_projects = False err_update_projects = False
err_update_linkfiles = False err_update_linkfiles = False
# Log the repo projects by existing and new.
existing = [x for x in all_projects if x.Exists]
mp.config.SetString("repo.existingprojectcount", str(len(existing)))
mp.config.SetString(
"repo.newprojectcount", str(len(all_projects) - len(existing))
)
self._fetch_times = _FetchTimes(manifest)
self._local_sync_state = LocalSyncState(manifest)
if not opt.local_only: if not opt.local_only:
with multiprocessing.Manager() as manager: with multiprocessing.Manager() as manager:
with ssh.ProxyManager(manager) as ssh_proxy: with ssh.ProxyManager(manager) as ssh_proxy:
@ -2003,6 +2067,88 @@ later is required to fix a server side protocol bug.
) )
raise SyncError(aggregate_errors=errors) raise SyncError(aggregate_errors=errors)
@classmethod
def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult:
"""Worker for interleaved sync.
This function is responsible for syncing a group of projects that share
a git object directory.
Args:
opt: Program options returned from optparse. See _Options().
project_indices: A list of indices into the projects list stored in
the parallel context.
Returns:
An `_InterleavedSyncResult` containing the results for each project.
"""
results = []
context = cls.get_parallel_context()
projects = context["projects"]
sync_dict = context["sync_dict"]
assert project_indices, "_SyncProjectList called with no indices."
# Use the first project as the representative for the progress bar.
first_project = projects[project_indices[0]]
key = f"{first_project.name} @ {first_project.relpath}"
start_time = time.time()
sync_dict[key] = start_time
try:
for idx in project_indices:
project = projects[idx]
# For now, simulate a successful sync.
# TODO(b/421935613): Perform the actual git fetch and checkout.
results.append(
_SyncResult(
relpath=project.relpath,
fetch_success=True,
checkout_success=True,
fetch_error=None,
checkout_error=None,
fetch_start=None,
fetch_finish=None,
checkout_start=None,
checkout_finish=None,
)
)
finally:
del sync_dict[key]
return _InterleavedSyncResult(results=results)
def _ProcessSyncInterleavedResults(
self,
synced_relpaths: Set[str],
err_event: _threading.Event,
errors: List[Exception],
opt: optparse.Values,
pool: Optional[multiprocessing.Pool],
pm: Progress,
results_sets: List[_InterleavedSyncResult],
):
"""Callback to process results from interleaved sync workers."""
ret = True
for result_group in results_sets:
for result in result_group.results:
pm.update()
if result.fetch_success and result.checkout_success:
synced_relpaths.add(result.relpath)
else:
ret = False
err_event.set()
if result.fetch_error:
errors.append(result.fetch_error)
if result.checkout_error:
errors.append(result.checkout_error)
if not ret and opt.fail_fast:
if pool:
pool.close()
break
return ret
def _SyncInterleaved( def _SyncInterleaved(
self, self,
opt, opt,
@ -2026,7 +2172,116 @@ later is required to fix a server side protocol bug.
2. Projects that share git objects are processed serially to prevent 2. Projects that share git objects are processed serially to prevent
race conditions. race conditions.
""" """
raise NotImplementedError("Interleaved sync is not implemented yet.") err_event = multiprocessing.Event()
synced_relpaths = set()
project_list = list(all_projects)
pm = Progress(
"Syncing",
len(project_list),
delay=False,
quiet=opt.quiet,
show_elapsed=True,
elide=True,
)
previously_pending_relpaths = set()
sync_event = _threading.Event()
sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
with self.ParallelContext():
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
sync_progress_thread.start()
try:
# Outer loop for dynamic project discovery (e.g., submodules).
# It continues until no unsynced projects remain.
while True:
projects_to_sync = [
p
for p in project_list
if p.relpath not in synced_relpaths
]
if not projects_to_sync:
break
pending_relpaths = {p.relpath for p in projects_to_sync}
if previously_pending_relpaths == pending_relpaths:
logger.error(
"Stall detected in interleaved sync, not all "
"projects could be synced."
)
err_event.set()
break
previously_pending_relpaths = pending_relpaths
# Update the projects list for workers in the current pass.
self.get_parallel_context()["projects"] = projects_to_sync
project_index_map = {
p: i for i, p in enumerate(projects_to_sync)
}
# Inner loop to process projects in a hierarchical order.
# This iterates through levels of project dependencies (e.g.
# 'foo' then 'foo/bar'). All projects in one level can be
# processed in parallel, but we must wait for a level to
# complete before starting the next.
for level_projects in _SafeCheckoutOrder(projects_to_sync):
if not level_projects:
continue
objdir_project_map = collections.defaultdict(list)
for p in level_projects:
objdir_project_map[p.objdir].append(
project_index_map[p]
)
work_items = list(objdir_project_map.values())
if not work_items:
continue
jobs = max(1, min(opt.jobs, len(work_items)))
callback = functools.partial(
self._ProcessSyncInterleavedResults,
synced_relpaths,
err_event,
errors,
opt,
)
if not self.ExecuteInParallel(
jobs,
functools.partial(self._SyncProjectList, opt),
work_items,
callback=callback,
output=pm,
chunksize=1,
):
err_event.set()
if err_event.is_set() and opt.fail_fast:
raise SyncFailFastError(aggregate_errors=errors)
self._ReloadManifest(None, manifest)
project_list = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
finally:
sync_event.set()
sync_progress_thread.join()
pm.end()
if err_event.is_set():
logger.error(
"error: Unable to fully sync the tree in interleaved mode."
)
raise SyncError(aggregate_errors=errors)
def _PostRepoUpgrade(manifest, quiet=False): def _PostRepoUpgrade(manifest, quiet=False):

View File

@ -305,8 +305,10 @@ class LocalSyncState(unittest.TestCase):
class FakeProject: class FakeProject:
def __init__(self, relpath): def __init__(self, relpath, name=None, objdir=None):
self.relpath = relpath self.relpath = relpath
self.name = name or relpath
self.objdir = objdir or relpath
def __str__(self): def __str__(self):
return f"project: {self.relpath}" return f"project: {self.relpath}"
@ -513,3 +515,123 @@ class SyncCommand(unittest.TestCase):
self.cmd.Execute(self.opt, []) self.cmd.Execute(self.opt, [])
self.assertIn(self.sync_local_half_error, e.aggregate_errors) self.assertIn(self.sync_local_half_error, e.aggregate_errors)
self.assertIn(self.sync_network_half_error, e.aggregate_errors) self.assertIn(self.sync_network_half_error, e.aggregate_errors)
class InterleavedSyncTest(unittest.TestCase):
"""Tests for interleaved sync."""
def setUp(self):
"""Set up a sync command with mocks."""
self.repodir = tempfile.mkdtemp(".repo")
self.manifest = mock.MagicMock(repodir=self.repodir)
self.manifest.repoProject.LastFetch = time.time()
self.manifest.repoProject.worktree = self.repodir
self.manifest.manifestProject.worktree = self.repodir
self.manifest.IsArchive = False
self.manifest.CloneBundle = False
self.manifest.default.sync_j = 1
self.cmd = sync.Sync(manifest=self.manifest)
self.cmd.outer_manifest = self.manifest
# Mock projects.
self.projA = FakeProject("projA", objdir="objA")
self.projB = FakeProject("projB", objdir="objB")
self.projA_sub = FakeProject(
"projA/sub", name="projA_sub", objdir="objA_sub"
)
self.projC = FakeProject("projC", objdir="objC")
# Mock methods that are not part of the core interleaved sync logic.
mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start()
mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start()
mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start()
mock.patch.object(sync, "_PostRepoUpgrade").start()
mock.patch.object(sync, "_PostRepoFetch").start()
def tearDown(self):
"""Clean up resources."""
shutil.rmtree(self.repodir)
mock.patch.stopall()
def test_interleaved_fail_fast(self):
"""Test that --fail-fast is respected in interleaved mode."""
opt, args = self.cmd.OptionParser.parse_args(
["--interleaved", "--fail-fast", "-j2"]
)
opt.quiet = True
# With projA/sub, _SafeCheckoutOrder creates two batches:
# 1. [projA, projB]
# 2. [projA/sub]
# We want to fail on the first batch and ensure the second isn't run.
all_projects = [self.projA, self.projB, self.projA_sub]
mock.patch.object(
self.cmd, "GetProjects", return_value=all_projects
).start()
# Mock ExecuteInParallel to simulate a failed run on the first batch of
# projects.
execute_mock = mock.patch.object(
self.cmd, "ExecuteInParallel", return_value=False
).start()
with self.assertRaises(sync.SyncFailFastError):
self.cmd._SyncInterleaved(
opt,
args,
[],
self.manifest,
self.manifest.manifestProject,
all_projects,
{},
)
execute_mock.assert_called_once()
def test_interleaved_shared_objdir_serial(self):
"""Test that projects with shared objdir are processed serially."""
opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"])
opt.quiet = True
# Setup projects with a shared objdir.
self.projA.objdir = "common_objdir"
self.projC.objdir = "common_objdir"
all_projects = [self.projA, self.projB, self.projC]
mock.patch.object(
self.cmd, "GetProjects", return_value=all_projects
).start()
def execute_side_effect(jobs, target, work_items, **kwargs):
# The callback is a partial object. The first arg is the set we
# need to update to avoid the stall detection.
synced_relpaths_set = kwargs["callback"].args[0]
projects_in_pass = self.cmd.get_parallel_context()["projects"]
for item in work_items:
for project_idx in item:
synced_relpaths_set.add(
projects_in_pass[project_idx].relpath
)
return True
execute_mock = mock.patch.object(
self.cmd, "ExecuteInParallel", side_effect=execute_side_effect
).start()
self.cmd._SyncInterleaved(
opt,
args,
[],
self.manifest,
self.manifest.manifestProject,
all_projects,
{},
)
execute_mock.assert_called_once()
jobs_arg, _, work_items = execute_mock.call_args.args
self.assertEqual(jobs_arg, 2)
work_items_sets = {frozenset(item) for item in work_items}
expected_sets = {frozenset([0, 2]), frozenset([1])}
self.assertEqual(work_items_sets, expected_sets)