Compare commits

...

7 Commits

Author SHA1 Message Date
6b8e9fc8db sync: clarify job flags when using interleaved
--jobs-network and --jobs-checkout are ignored with --interleaved.

Bug: 421935613
Change-Id: Ib69413993c4f970b385bd09318972716e5ac3324
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/485021
Tested-by: Gavin Mak <gavinmak@google.com>
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
2025-06-18 15:23:59 -07:00
7b6ffed4ae sync: Implement --interleaved sync worker
For each assigned project, the worker sequentially calls
Sync_NetworkHalf and Sync_LocalHalf, respecting --local-only and
--network-only flags. To prevent scrambled progress bars, all stderr
output from the checkout phase is captured (shown with --verbose).
Result objects now carry status and timing information from the worker
for state updates.

Bug: 421935613
Change-Id: I398602e08a375e974a8914e5fa48ffae673dda9b
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483301
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
2025-06-18 10:26:27 -07:00
b4b323a8bd sync: Add orchestration logic for --interleaved
Introduce the parallel orchestration framework for `repo sync
--interleaved`.

The new logic respects project dependencies by processing them in
hierarchical levels. Projects sharing a git object directory are grouped
and processed serially. Also reuse the familiar fetch progress bar UX.

Bug: 421935613
Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281
Commit-Queue: Gavin Mak <gavinmak@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
2025-06-17 16:13:36 -07:00
f91f4462e6 upload: fix FileNotFoundError when no superproject
Upload gets a FileNotFoundError if not using superproject because it
tries to access the superproject's repo_id before checking if
superproject was actually enabled.

Reorder the logic to check use_superproject first.

Change-Id: I65cd2adab481e799dd7bb75e1a83553ad6e34d8d
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/484401
Tested-by: Gavin Mak <gavinmak@google.com>
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Mike Frysinger <vapier@google.com>
2025-06-17 13:31:02 -07:00
85352825ff sync: Add scaffolding for interleaved sync
Prepare for an interleaved fetch and checkout mode for `repo sync`. The
goal of the new mode is to significantly speed up syncs by running fetch
and checkout operations in parallel for different projects, rather than
waiting for all fetches to complete before starting any checkouts.

Bug: 421935613
Change-Id: I8c66d1e790c7bba6280e409b95238c5e4e61a9c8
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/482821
Reviewed-by: Scott Lee <ddoman@google.com>
Commit-Queue: Gavin Mak <gavinmak@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
2025-06-11 16:31:35 -07:00
b262d0e461 info: fix mismatched format args and wrong symbol name
Bug: 416589884
Change-Id: Icbaade585932f0cbb51367e07925ef606f089697
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/482762
Commit-Queue: Scott Lee <ddoman@google.com>
Reviewed-by: Mike Frysinger <vapier@google.com>
Lint: Scott Lee <ddoman@google.com>
Tested-by: Scott Lee <ddoman@google.com>
2025-06-10 12:38:23 -07:00
044e52e236 hooks: add internal check for external hook API
Add an internal check to make sure we always follow the API we've
documented for external authors.  Since the internal call is a bit
ad-hoc, it can be easy to miss a call site.

Change-Id: Ie8cd298d1fc34f10f3c5eb353512a3e881f42252
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/481721
Reviewed-by: Nasser Grainawi <nasser.grainawi@oss.qualcomm.com>
Reviewed-by: Gavin Mak <gavinmak@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
Commit-Queue: Mike Frysinger <vapier@google.com>
2025-06-06 11:12:13 -07:00
8 changed files with 901 additions and 53 deletions

View File

@ -147,7 +147,8 @@ class Superproject:
"git rev-parse call failed, command: git {}, "
"return code: {}, stderr: {}",
cmd,
p.stdwerr,
retval,
p.stderr,
)
return None
return p.stdout

View File

@ -22,6 +22,12 @@ from error import HookError
from git_refs import HEAD
# The API we've documented to hook authors. Keep in sync with repo-hooks.md.
_API_ARGS = {
"pre-upload": {"project_list", "worktree_list"},
}
class RepoHook:
"""A RepoHook contains information about a script to run as a hook.
@ -56,6 +62,7 @@ class RepoHook:
hooks_project,
repo_topdir,
manifest_url,
bug_url=None,
bypass_hooks=False,
allow_all_hooks=False,
ignore_hooks=False,
@ -75,6 +82,7 @@ class RepoHook:
run with CWD as this directory.
If you have a manifest, this is manifest.topdir.
manifest_url: The URL to the manifest git repo.
bug_url: The URL to report issues.
bypass_hooks: If True, then 'Do not run the hook'.
allow_all_hooks: If True, then 'Run the hook without prompting'.
ignore_hooks: If True, then 'Do not abort action if hooks fail'.
@ -85,6 +93,7 @@ class RepoHook:
self._hooks_project = hooks_project
self._repo_topdir = repo_topdir
self._manifest_url = manifest_url
self._bug_url = bug_url
self._bypass_hooks = bypass_hooks
self._allow_all_hooks = allow_all_hooks
self._ignore_hooks = ignore_hooks
@ -414,6 +423,20 @@ class RepoHook:
ignore the result through the option combinations as listed in
AddHookOptionGroup().
"""
# Make sure our own callers use the documented API.
exp_kwargs = _API_ARGS.get(self._hook_type, set())
got_kwargs = set(kwargs.keys())
if exp_kwargs != got_kwargs:
print(
"repo internal error: "
f"hook '{self._hook_type}' called incorrectly\n"
f" got: {sorted(got_kwargs)}\n"
f" expected: {sorted(exp_kwargs)}\n"
f"Please file a bug: {self._bug_url}",
file=sys.stderr,
)
return False
# Do not do anything in case bypass_hooks is set, or
# no-op if there is no hooks project or if hook is disabled.
if (
@ -472,6 +495,7 @@ class RepoHook:
"manifest_url": manifest.manifestProject.GetRemote(
"origin"
).url,
"bug_url": manifest.contactinfo.bugurl,
}
)
return cls(*args, **kwargs)

View File

@ -1,5 +1,5 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man.
.TH REPO "1" "September 2024" "repo smartsync" "Repo Manual"
.TH REPO "1" "June 2025" "repo smartsync" "Repo Manual"
.SH NAME
repo \- repo smartsync - manual page for repo smartsync
.SH SYNOPSIS
@ -20,11 +20,12 @@ number of CPU cores)
.TP
\fB\-\-jobs\-network\fR=\fI\,JOBS\/\fR
number of network jobs to run in parallel (defaults to
\fB\-\-jobs\fR or 1)
\fB\-\-jobs\fR or 1). Ignored when \fB\-\-interleaved\fR is set
.TP
\fB\-\-jobs\-checkout\fR=\fI\,JOBS\/\fR
number of local checkout jobs to run in parallel
(defaults to \fB\-\-jobs\fR or 8)
(defaults to \fB\-\-jobs\fR or 8). Ignored when \fB\-\-interleaved\fR
is set
.TP
\fB\-f\fR, \fB\-\-force\-broken\fR
obsolete option (to be deleted in the future)
@ -58,6 +59,9 @@ only update working tree, don't fetch
use the existing manifest checkout as\-is. (do not
update to the latest revision)
.TP
\fB\-\-interleaved\fR
fetch and checkout projects in parallel (experimental)
.TP
\fB\-n\fR, \fB\-\-network\-only\fR
fetch only, don't update working tree
.TP

View File

@ -1,5 +1,5 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man.
.TH REPO "1" "September 2024" "repo sync" "Repo Manual"
.TH REPO "1" "June 2025" "repo sync" "Repo Manual"
.SH NAME
repo \- repo sync - manual page for repo sync
.SH SYNOPSIS
@ -20,11 +20,12 @@ number of CPU cores)
.TP
\fB\-\-jobs\-network\fR=\fI\,JOBS\/\fR
number of network jobs to run in parallel (defaults to
\fB\-\-jobs\fR or 1)
\fB\-\-jobs\fR or 1). Ignored when \fB\-\-interleaved\fR is set
.TP
\fB\-\-jobs\-checkout\fR=\fI\,JOBS\/\fR
number of local checkout jobs to run in parallel
(defaults to \fB\-\-jobs\fR or 8)
(defaults to \fB\-\-jobs\fR or 8). Ignored when \fB\-\-interleaved\fR
is set
.TP
\fB\-f\fR, \fB\-\-force\-broken\fR
obsolete option (to be deleted in the future)
@ -58,6 +59,9 @@ only update working tree, don't fetch
use the existing manifest checkout as\-is. (do not
update to the latest revision)
.TP
\fB\-\-interleaved\fR
fetch and checkout projects in parallel (experimental)
.TP
\fB\-n\fR, \fB\-\-network\-only\fR
fetch only, don't update working tree
.TP

View File

@ -195,6 +195,21 @@ class Progress:
)
)
def display_message(self, msg):
"""Clears the current progress line and prints a message above it.
The progress bar is then redrawn on the next line.
"""
if not _TTY or IsTraceToStderr() or self._quiet:
return
# Erase the current line, print the message with a newline,
# and then immediately redraw the progress bar on the new line.
sys.stderr.write("\r" + CSI_ERASE_LINE)
sys.stderr.write(msg + "\n")
sys.stderr.flush()
self.update(inc=0)
def end(self):
self._update_event.set()
if not _TTY or IsTraceToStderr() or self._quiet:

View File

@ -13,6 +13,7 @@
# limitations under the License.
import collections
import contextlib
import functools
import http.cookiejar as cookielib
import io
@ -25,7 +26,7 @@ from pathlib import Path
import sys
import tempfile
import time
from typing import List, NamedTuple, Set, Union
from typing import List, NamedTuple, Optional, Set, Union
import urllib.error
import urllib.parse
import urllib.request
@ -194,6 +195,57 @@ class _CheckoutOneResult(NamedTuple):
finish: float
class _SyncResult(NamedTuple):
"""Individual project sync result for interleaved mode.
Attributes:
project_index (int): The index of the project in the shared list.
relpath (str): The project's relative path from the repo client top.
remote_fetched (bool): True if the remote was actually queried.
fetch_success (bool): True if the fetch operation was successful.
fetch_error (Optional[Exception]): The Exception from a failed fetch,
or None.
fetch_start (Optional[float]): The time.time() when fetch started.
fetch_finish (Optional[float]): The time.time() when fetch finished.
checkout_success (bool): True if the checkout operation was
successful.
checkout_error (Optional[Exception]): The Exception from a failed
checkout, or None.
checkout_start (Optional[float]): The time.time() when checkout
started.
checkout_finish (Optional[float]): The time.time() when checkout
finished.
stderr_text (str): The combined output from both fetch and checkout.
"""
project_index: int
relpath: str
remote_fetched: bool
fetch_success: bool
fetch_error: Optional[Exception]
fetch_start: Optional[float]
fetch_finish: Optional[float]
checkout_success: bool
checkout_error: Optional[Exception]
checkout_start: Optional[float]
checkout_finish: Optional[float]
stderr_text: str
class _InterleavedSyncResult(NamedTuple):
"""Result of an interleaved sync.
Attributes:
results (List[_SyncResult]): A list of results, one for each project
processed. Empty if the worker failed before creating results.
"""
results: List[_SyncResult]
class SuperprojectError(SyncError):
"""Superproject sync repo."""
@ -359,7 +411,7 @@ later is required to fix a server side protocol bug.
type=int,
metavar="JOBS",
help="number of network jobs to run in parallel (defaults to "
"--jobs or 1)",
"--jobs or 1). Ignored when --interleaved is set",
)
p.add_option(
"--jobs-checkout",
@ -367,7 +419,8 @@ later is required to fix a server side protocol bug.
type=int,
metavar="JOBS",
help="number of local checkout jobs to run in parallel (defaults "
f"to --jobs or {DEFAULT_LOCAL_JOBS})",
f"to --jobs or {DEFAULT_LOCAL_JOBS}). Ignored when --interleaved "
"is set",
)
p.add_option(
@ -423,6 +476,11 @@ later is required to fix a server side protocol bug.
help="use the existing manifest checkout as-is. "
"(do not update to the latest revision)",
)
p.add_option(
"--interleaved",
action="store_true",
help="fetch and checkout projects in parallel (experimental)",
)
p.add_option(
"-n",
"--network-only",
@ -832,15 +890,7 @@ later is required to fix a server side protocol bug.
)
sync_event = _threading.Event()
def _MonitorSyncLoop():
while True:
pm.update(inc=0, msg=self._GetSyncProgressMessage())
if sync_event.wait(timeout=1):
return
sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
sync_progress_thread.daemon = True
sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
def _ProcessResults(pool, pm, results_sets):
ret = True
@ -956,6 +1006,7 @@ later is required to fix a server side protocol bug.
err_event.set()
# Call self update, unless requested not to
# TODO(b/42193561): Extract repo update logic to ExecuteHelper.
if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0":
_PostRepoFetch(rp, opt.repo_verify)
if opt.network_only:
@ -1136,6 +1187,16 @@ later is required to fix a server side protocol bug.
self._local_sync_state.Save()
return proc_res and not err_results
def _PrintManifestNotices(self, opt):
"""Print all manifest notices, but only once."""
printed_notices = set()
# Print all manifest notices, but only once.
# Sort by path_prefix to ensure consistent ordering.
for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix):
if m.notice and m.notice not in printed_notices:
print(m.notice)
printed_notices.add(m.notice)
@staticmethod
def _GetPreciousObjectsState(project: Project, opt):
"""Get the preciousObjects state for the project.
@ -1772,8 +1833,6 @@ later is required to fix a server side protocol bug.
e,
)
err_event = multiprocessing.Event()
rp = manifest.repoProject
rp.PreSync()
cb = rp.CurrentBranch
@ -1825,10 +1884,6 @@ later is required to fix a server side protocol bug.
all_manifests=not opt.this_manifest_only,
)
err_network_sync = False
err_update_projects = False
err_update_linkfiles = False
# Log the repo projects by existing and new.
existing = [x for x in all_projects if x.Exists]
mp.config.SetString("repo.existingprojectcount", str(len(existing)))
@ -1838,6 +1893,97 @@ later is required to fix a server side protocol bug.
self._fetch_times = _FetchTimes(manifest)
self._local_sync_state = LocalSyncState(manifest)
if opt.interleaved:
sync_method = self._SyncInterleaved
else:
sync_method = self._SyncPhased
sync_method(
opt,
args,
errors,
manifest,
mp,
all_projects,
superproject_logging_data,
)
# Log the previous sync analysis state from the config.
self.git_event_log.LogDataConfigEvents(
mp.config.GetSyncAnalysisStateData(), "previous_sync_state"
)
# Update and log with the new sync analysis state.
mp.config.UpdateSyncAnalysisState(opt, superproject_logging_data)
self.git_event_log.LogDataConfigEvents(
mp.config.GetSyncAnalysisStateData(), "current_sync_state"
)
self._local_sync_state.PruneRemovedProjects()
if self._local_sync_state.IsPartiallySynced():
logger.warning(
"warning: Partial syncs are not supported. For the best "
"experience, sync the entire tree."
)
if not opt.quiet:
print("repo sync has finished successfully.")
def _CreateSyncProgressThread(
self, pm: Progress, stop_event: _threading.Event
) -> _threading.Thread:
"""Creates and returns a daemon thread to update a Progress object.
The returned thread is not yet started. The thread will periodically
update the progress bar with information from _GetSyncProgressMessage
until the stop_event is set.
Args:
pm: The Progress object to update.
stop_event: The threading.Event to signal the monitor to stop.
Returns:
The configured _threading.Thread object.
"""
def _monitor_loop():
"""The target function for the monitor thread."""
while True:
# Update the progress bar with the current status message.
pm.update(inc=0, msg=self._GetSyncProgressMessage())
# Wait for 1 second or until the stop_event is set.
if stop_event.wait(timeout=1):
return
return _threading.Thread(target=_monitor_loop, daemon=True)
def _SyncPhased(
self,
opt,
args,
errors,
manifest,
mp,
all_projects,
superproject_logging_data,
):
"""Sync projects by separating network and local operations.
This method performs sync in two distinct, sequential phases:
1. Network Phase: Fetches updates for all projects from their remotes.
2. Local Phase: Checks out the updated revisions into the local
worktrees for all projects.
This approach ensures that the local work-tree is not modified until
all network operations are complete, providing a transactional-like
safety net for the checkout state.
"""
err_event = multiprocessing.Event()
err_network_sync = False
err_update_projects = False
err_update_linkfiles = False
if not opt.local_only:
with multiprocessing.Manager() as manager:
with ssh.ProxyManager(manager) as ssh_proxy:
@ -1907,14 +2053,7 @@ later is required to fix a server side protocol bug.
if err_checkout:
err_event.set()
printed_notices = set()
# If there's a notice that's supposed to print at the end of the sync,
# print it now... But avoid printing duplicate messages, and preserve
# order.
for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix):
if m.notice and m.notice not in printed_notices:
print(m.notice)
printed_notices.add(m.notice)
self._PrintManifestNotices(opt)
# If we saw an error, exit with code 1 so that other scripts can check.
if err_event.is_set():
@ -1942,26 +2081,383 @@ later is required to fix a server side protocol bug.
)
raise SyncError(aggregate_errors=errors)
# Log the previous sync analysis state from the config.
self.git_event_log.LogDataConfigEvents(
mp.config.GetSyncAnalysisStateData(), "previous_sync_state"
@classmethod
def _SyncOneProject(cls, opt, project_index, project) -> _SyncResult:
"""Syncs a single project for interleaved sync."""
fetch_success = False
remote_fetched = False
fetch_error = None
fetch_start = None
fetch_finish = None
network_output = ""
if opt.local_only:
fetch_success = True
else:
fetch_start = time.time()
network_output_capture = io.StringIO()
try:
ssh_proxy = cls.get_parallel_context().get("ssh_proxy")
sync_result = project.Sync_NetworkHalf(
quiet=opt.quiet,
verbose=opt.verbose,
output_redir=network_output_capture,
current_branch_only=cls._GetCurrentBranchOnly(
opt, project.manifest
),
force_sync=opt.force_sync,
clone_bundle=opt.clone_bundle,
tags=opt.tags,
archive=project.manifest.IsArchive,
optimized_fetch=opt.optimized_fetch,
retry_fetches=opt.retry_fetches,
prune=opt.prune,
ssh_proxy=ssh_proxy,
clone_filter=project.manifest.CloneFilter,
partial_clone_exclude=project.manifest.PartialCloneExclude,
clone_filter_for_depth=project.manifest.CloneFilterForDepth,
)
fetch_success = sync_result.success
remote_fetched = sync_result.remote_fetched
fetch_error = sync_result.error
except KeyboardInterrupt:
logger.error(
"Keyboard interrupt while processing %s", project.name
)
except GitError as e:
fetch_error = e
logger.error("error.GitError: Cannot fetch %s", e)
except Exception as e:
fetch_error = e
logger.error(
"error: Cannot fetch %s (%s: %s)",
project.name,
type(e).__name__,
e,
)
finally:
fetch_finish = time.time()
network_output = network_output_capture.getvalue()
checkout_success = False
checkout_error = None
checkout_start = None
checkout_finish = None
checkout_stderr = ""
if fetch_success and not opt.network_only:
checkout_start = time.time()
stderr_capture = io.StringIO()
try:
with contextlib.redirect_stderr(stderr_capture):
syncbuf = SyncBuffer(
project.manifest.manifestProject.config,
detach_head=opt.detach_head,
)
local_half_errors = []
project.Sync_LocalHalf(
syncbuf,
force_sync=opt.force_sync,
force_checkout=opt.force_checkout,
force_rebase=opt.rebase,
errors=local_half_errors,
verbose=opt.verbose,
)
checkout_success = syncbuf.Finish()
if local_half_errors:
checkout_error = SyncError(
aggregate_errors=local_half_errors
)
except KeyboardInterrupt:
logger.error(
"Keyboard interrupt while processing %s", project.name
)
except GitError as e:
checkout_error = e
logger.error(
"error.GitError: Cannot checkout %s: %s", project.name, e
)
except Exception as e:
checkout_error = e
logger.error(
"error: Cannot checkout %s: %s: %s",
project.name,
type(e).__name__,
e,
)
finally:
checkout_finish = time.time()
checkout_stderr = stderr_capture.getvalue()
elif fetch_success:
checkout_success = True
# Consolidate all captured output.
captured_parts = []
if network_output:
captured_parts.append(network_output)
if checkout_stderr:
captured_parts.append(checkout_stderr)
stderr_text = "\n".join(captured_parts)
return _SyncResult(
project_index=project_index,
relpath=project.relpath,
fetch_success=fetch_success,
remote_fetched=remote_fetched,
checkout_success=checkout_success,
fetch_error=fetch_error,
checkout_error=checkout_error,
stderr_text=stderr_text.strip(),
fetch_start=fetch_start,
fetch_finish=fetch_finish,
checkout_start=checkout_start,
checkout_finish=checkout_finish,
)
# Update and log with the new sync analysis state.
mp.config.UpdateSyncAnalysisState(opt, superproject_logging_data)
self.git_event_log.LogDataConfigEvents(
mp.config.GetSyncAnalysisStateData(), "current_sync_state"
)
@classmethod
def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult:
"""Worker for interleaved sync.
self._local_sync_state.PruneRemovedProjects()
if self._local_sync_state.IsPartiallySynced():
logger.warning(
"warning: Partial syncs are not supported. For the best "
"experience, sync the entire tree."
This function is responsible for syncing a group of projects that share
a git object directory.
Args:
opt: Program options returned from optparse. See _Options().
project_indices: A list of indices into the projects list stored in
the parallel context.
Returns:
An `_InterleavedSyncResult` containing the results for each project.
"""
results = []
context = cls.get_parallel_context()
projects = context["projects"]
sync_dict = context["sync_dict"]
assert project_indices, "_SyncProjectList called with no indices."
# Use the first project as the representative for the progress bar.
first_project = projects[project_indices[0]]
key = f"{first_project.name} @ {first_project.relpath}"
sync_dict[key] = time.time()
try:
for idx in project_indices:
project = projects[idx]
results.append(cls._SyncOneProject(opt, idx, project))
finally:
del sync_dict[key]
return _InterleavedSyncResult(results=results)
def _ProcessSyncInterleavedResults(
self,
synced_relpaths: Set[str],
err_event: _threading.Event,
errors: List[Exception],
opt: optparse.Values,
pool: Optional[multiprocessing.Pool],
pm: Progress,
results_sets: List[_InterleavedSyncResult],
):
"""Callback to process results from interleaved sync workers."""
ret = True
projects = self.get_parallel_context()["projects"]
for result_group in results_sets:
for result in result_group.results:
pm.update()
project = projects[result.project_index]
if opt.verbose and result.stderr_text:
pm.display_message(result.stderr_text)
if result.fetch_start:
self._fetch_times.Set(
project,
result.fetch_finish - result.fetch_start,
)
self._local_sync_state.SetFetchTime(project)
self.event_log.AddSync(
project,
event_log.TASK_SYNC_NETWORK,
result.fetch_start,
result.fetch_finish,
result.fetch_success,
)
if result.checkout_start:
if result.checkout_success:
self._local_sync_state.SetCheckoutTime(project)
self.event_log.AddSync(
project,
event_log.TASK_SYNC_LOCAL,
result.checkout_start,
result.checkout_finish,
result.checkout_success,
)
if result.fetch_success and result.checkout_success:
synced_relpaths.add(result.relpath)
else:
ret = False
err_event.set()
if result.fetch_error:
errors.append(result.fetch_error)
if result.checkout_error:
errors.append(result.checkout_error)
if not ret and opt.fail_fast:
if pool:
pool.close()
break
return ret
def _SyncInterleaved(
self,
opt,
args,
errors,
manifest,
mp,
all_projects,
superproject_logging_data,
):
"""Sync projects by performing network and local operations in parallel.
This method processes each project (or groups of projects that share git
objects) independently. For each project, it performs the fetch and
checkout operations back-to-back. These independent tasks are run in
parallel.
It respects two constraints for correctness:
1. Projects in nested directories (e.g. 'foo' and 'foo/bar') are
processed in hierarchical order.
2. Projects that share git objects are processed serially to prevent
race conditions.
"""
err_event = multiprocessing.Event()
synced_relpaths = set()
project_list = list(all_projects)
pm = Progress(
"Syncing",
len(project_list),
delay=False,
quiet=opt.quiet,
show_elapsed=True,
elide=True,
)
previously_pending_relpaths = set()
sync_event = _threading.Event()
sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
with multiprocessing.Manager() as manager, ssh.ProxyManager(
manager
) as ssh_proxy:
ssh_proxy.sock()
with self.ParallelContext():
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
sync_progress_thread.start()
try:
# Outer loop for dynamic project discovery. This continues
# until no unsynced projects remain.
while True:
projects_to_sync = [
p
for p in project_list
if p.relpath not in synced_relpaths
]
if not projects_to_sync:
break
pending_relpaths = {p.relpath for p in projects_to_sync}
if previously_pending_relpaths == pending_relpaths:
logger.error(
"Stall detected in interleaved sync, not all "
"projects could be synced."
)
err_event.set()
break
previously_pending_relpaths = pending_relpaths
self.get_parallel_context()[
"projects"
] = projects_to_sync
project_index_map = {
p: i for i, p in enumerate(projects_to_sync)
}
# Inner loop to process projects in a hierarchical
# order. This iterates through levels of project
# dependencies (e.g. 'foo' then 'foo/bar'). All projects
# in one level can be processed in parallel, but we must
# wait for a level to complete before starting the next.
for level_projects in _SafeCheckoutOrder(
projects_to_sync
):
if not level_projects:
continue
objdir_project_map = collections.defaultdict(list)
for p in level_projects:
objdir_project_map[p.objdir].append(
project_index_map[p]
)
work_items = list(objdir_project_map.values())
if not work_items:
continue
jobs = max(1, min(opt.jobs, len(work_items)))
callback = functools.partial(
self._ProcessSyncInterleavedResults,
synced_relpaths,
err_event,
errors,
opt,
)
if not self.ExecuteInParallel(
jobs,
functools.partial(self._SyncProjectList, opt),
work_items,
callback=callback,
output=pm,
chunksize=1,
):
err_event.set()
if err_event.is_set() and opt.fail_fast:
raise SyncFailFastError(aggregate_errors=errors)
self._ReloadManifest(None, manifest)
project_list = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
finally:
sync_event.set()
sync_progress_thread.join()
pm.end()
# TODO(b/421935613): Add the manifest loop block from PhasedSync.
if not self.outer_client.manifest.IsArchive:
self._GCProjects(project_list, opt, err_event)
self._PrintManifestNotices(opt)
if err_event.is_set():
# TODO(b/421935613): Log errors better like SyncPhased.
logger.error(
"error: Unable to fully sync the tree in interleaved mode."
)
if not opt.quiet:
print("repo sync has finished successfully.")
raise SyncError(aggregate_errors=errors)
def _PostRepoUpgrade(manifest, quiet=False):

View File

@ -627,9 +627,12 @@ Gerrit Code Review: https://www.gerritcodereview.com/
# If using superproject, add the root repo as a push option.
manifest = branch.project.manifest
push_options = list(opt.push_options)
sp = manifest.superproject
if sp and sp.repo_id and manifest.manifestProject.use_superproject:
push_options.append(f"custom-keyed-value=rootRepo:{sp.repo_id}")
if manifest.manifestProject.use_superproject:
sp = manifest.superproject
if sp:
r_id = sp.repo_id
if r_id:
push_options.append(f"custom-keyed-value=rootRepo:{r_id}")
branch.UploadForReview(
people,

View File

@ -305,8 +305,20 @@ class LocalSyncState(unittest.TestCase):
class FakeProject:
def __init__(self, relpath):
def __init__(self, relpath, name=None, objdir=None):
self.relpath = relpath
self.name = name or relpath
self.objdir = objdir or relpath
self.use_git_worktrees = False
self.UseAlternates = False
self.manifest = mock.MagicMock()
self.manifest.GetProjectsWithName.return_value = [self]
self.config = mock.MagicMock()
self.EnableRepositoryExtension = mock.MagicMock()
def RelPath(self, local=None):
return self.relpath
def __str__(self):
return f"project: {self.relpath}"
@ -513,3 +525,292 @@ class SyncCommand(unittest.TestCase):
self.cmd.Execute(self.opt, [])
self.assertIn(self.sync_local_half_error, e.aggregate_errors)
self.assertIn(self.sync_network_half_error, e.aggregate_errors)
class InterleavedSyncTest(unittest.TestCase):
"""Tests for interleaved sync."""
def setUp(self):
"""Set up a sync command with mocks."""
self.repodir = tempfile.mkdtemp(".repo")
self.manifest = mock.MagicMock(repodir=self.repodir)
self.manifest.repoProject.LastFetch = time.time()
self.manifest.repoProject.worktree = self.repodir
self.manifest.manifestProject.worktree = self.repodir
self.manifest.IsArchive = False
self.manifest.CloneBundle = False
self.manifest.default.sync_j = 1
self.outer_client = mock.MagicMock()
self.outer_client.manifest.IsArchive = False
self.cmd = sync.Sync(
manifest=self.manifest, outer_client=self.outer_client
)
self.cmd.outer_manifest = self.manifest
# Mock projects.
self.projA = FakeProject("projA", objdir="objA")
self.projB = FakeProject("projB", objdir="objB")
self.projA_sub = FakeProject(
"projA/sub", name="projA_sub", objdir="objA_sub"
)
self.projC = FakeProject("projC", objdir="objC")
# Mock methods that are not part of the core interleaved sync logic.
mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start()
mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start()
mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start()
mock.patch.object(sync, "_PostRepoUpgrade").start()
mock.patch.object(sync, "_PostRepoFetch").start()
# Mock parallel context for worker tests.
self.parallel_context_patcher = mock.patch(
"subcmds.sync.Sync.get_parallel_context"
)
self.mock_get_parallel_context = self.parallel_context_patcher.start()
self.sync_dict = {}
self.mock_context = {
"projects": [],
"sync_dict": self.sync_dict,
}
self.mock_get_parallel_context.return_value = self.mock_context
# Mock _GetCurrentBranchOnly for worker tests.
mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start()
def tearDown(self):
"""Clean up resources."""
shutil.rmtree(self.repodir)
mock.patch.stopall()
def test_interleaved_fail_fast(self):
"""Test that --fail-fast is respected in interleaved mode."""
opt, args = self.cmd.OptionParser.parse_args(
["--interleaved", "--fail-fast", "-j2"]
)
opt.quiet = True
# With projA/sub, _SafeCheckoutOrder creates two batches:
# 1. [projA, projB]
# 2. [projA/sub]
# We want to fail on the first batch and ensure the second isn't run.
all_projects = [self.projA, self.projB, self.projA_sub]
mock.patch.object(
self.cmd, "GetProjects", return_value=all_projects
).start()
# Mock ExecuteInParallel to simulate a failed run on the first batch of
# projects.
execute_mock = mock.patch.object(
self.cmd, "ExecuteInParallel", return_value=False
).start()
with self.assertRaises(sync.SyncFailFastError):
self.cmd._SyncInterleaved(
opt,
args,
[],
self.manifest,
self.manifest.manifestProject,
all_projects,
{},
)
execute_mock.assert_called_once()
def test_interleaved_shared_objdir_serial(self):
"""Test that projects with shared objdir are processed serially."""
opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"])
opt.quiet = True
# Setup projects with a shared objdir.
self.projA.objdir = "common_objdir"
self.projC.objdir = "common_objdir"
all_projects = [self.projA, self.projB, self.projC]
mock.patch.object(
self.cmd, "GetProjects", return_value=all_projects
).start()
def execute_side_effect(jobs, target, work_items, **kwargs):
# The callback is a partial object. The first arg is the set we
# need to update to avoid the stall detection.
synced_relpaths_set = kwargs["callback"].args[0]
projects_in_pass = self.cmd.get_parallel_context()["projects"]
for item in work_items:
for project_idx in item:
synced_relpaths_set.add(
projects_in_pass[project_idx].relpath
)
return True
execute_mock = mock.patch.object(
self.cmd, "ExecuteInParallel", side_effect=execute_side_effect
).start()
self.cmd._SyncInterleaved(
opt,
args,
[],
self.manifest,
self.manifest.manifestProject,
all_projects,
{},
)
execute_mock.assert_called_once()
jobs_arg, _, work_items = execute_mock.call_args.args
self.assertEqual(jobs_arg, 2)
work_items_sets = {frozenset(item) for item in work_items}
expected_sets = {frozenset([0, 2]), frozenset([1])}
self.assertEqual(work_items_sets, expected_sets)
def _get_opts(self, args=None):
"""Helper to get default options for worker tests."""
if args is None:
args = ["--interleaved"]
opt, _ = self.cmd.OptionParser.parse_args(args)
# Set defaults for options used by the worker.
opt.quiet = True
opt.verbose = False
opt.force_sync = False
opt.clone_bundle = False
opt.tags = False
opt.optimized_fetch = False
opt.retry_fetches = 0
opt.prune = False
opt.detach_head = False
opt.force_checkout = False
opt.rebase = False
return opt
def test_worker_successful_sync(self):
"""Test _SyncProjectList with a successful fetch and checkout."""
opt = self._get_opts()
project = self.projA
project.Sync_NetworkHalf = mock.Mock(
return_value=SyncNetworkHalfResult(error=None, remote_fetched=True)
)
project.Sync_LocalHalf = mock.Mock()
project.manifest.manifestProject.config = mock.MagicMock()
self.mock_context["projects"] = [project]
with mock.patch("subcmds.sync.SyncBuffer") as mock_sync_buffer:
mock_sync_buf_instance = mock.MagicMock()
mock_sync_buf_instance.Finish.return_value = True
mock_sync_buffer.return_value = mock_sync_buf_instance
result_obj = self.cmd._SyncProjectList(opt, [0])
self.assertEqual(len(result_obj.results), 1)
result = result_obj.results[0]
self.assertTrue(result.fetch_success)
self.assertTrue(result.checkout_success)
self.assertIsNone(result.fetch_error)
self.assertIsNone(result.checkout_error)
project.Sync_NetworkHalf.assert_called_once()
project.Sync_LocalHalf.assert_called_once()
def test_worker_fetch_fails(self):
"""Test _SyncProjectList with a failed fetch."""
opt = self._get_opts()
project = self.projA
fetch_error = GitError("Fetch failed")
project.Sync_NetworkHalf = mock.Mock(
return_value=SyncNetworkHalfResult(
error=fetch_error, remote_fetched=False
)
)
project.Sync_LocalHalf = mock.Mock()
self.mock_context["projects"] = [project]
result_obj = self.cmd._SyncProjectList(opt, [0])
result = result_obj.results[0]
self.assertFalse(result.fetch_success)
self.assertFalse(result.checkout_success)
self.assertEqual(result.fetch_error, fetch_error)
self.assertIsNone(result.checkout_error)
project.Sync_NetworkHalf.assert_called_once()
project.Sync_LocalHalf.assert_not_called()
def test_worker_fetch_fails_exception(self):
"""Test _SyncProjectList with an exception during fetch."""
opt = self._get_opts()
project = self.projA
fetch_error = GitError("Fetch failed")
project.Sync_NetworkHalf = mock.Mock(side_effect=fetch_error)
project.Sync_LocalHalf = mock.Mock()
self.mock_context["projects"] = [project]
result_obj = self.cmd._SyncProjectList(opt, [0])
result = result_obj.results[0]
self.assertFalse(result.fetch_success)
self.assertFalse(result.checkout_success)
self.assertEqual(result.fetch_error, fetch_error)
project.Sync_NetworkHalf.assert_called_once()
project.Sync_LocalHalf.assert_not_called()
def test_worker_checkout_fails(self):
"""Test _SyncProjectList with an exception during checkout."""
opt = self._get_opts()
project = self.projA
project.Sync_NetworkHalf = mock.Mock(
return_value=SyncNetworkHalfResult(error=None, remote_fetched=True)
)
checkout_error = GitError("Checkout failed")
project.Sync_LocalHalf = mock.Mock(side_effect=checkout_error)
project.manifest.manifestProject.config = mock.MagicMock()
self.mock_context["projects"] = [project]
with mock.patch("subcmds.sync.SyncBuffer"):
result_obj = self.cmd._SyncProjectList(opt, [0])
result = result_obj.results[0]
self.assertTrue(result.fetch_success)
self.assertFalse(result.checkout_success)
self.assertIsNone(result.fetch_error)
self.assertEqual(result.checkout_error, checkout_error)
project.Sync_NetworkHalf.assert_called_once()
project.Sync_LocalHalf.assert_called_once()
def test_worker_local_only(self):
"""Test _SyncProjectList with --local-only."""
opt = self._get_opts(["--interleaved", "--local-only"])
project = self.projA
project.Sync_NetworkHalf = mock.Mock()
project.Sync_LocalHalf = mock.Mock()
project.manifest.manifestProject.config = mock.MagicMock()
self.mock_context["projects"] = [project]
with mock.patch("subcmds.sync.SyncBuffer") as mock_sync_buffer:
mock_sync_buf_instance = mock.MagicMock()
mock_sync_buf_instance.Finish.return_value = True
mock_sync_buffer.return_value = mock_sync_buf_instance
result_obj = self.cmd._SyncProjectList(opt, [0])
result = result_obj.results[0]
self.assertTrue(result.fetch_success)
self.assertTrue(result.checkout_success)
project.Sync_NetworkHalf.assert_not_called()
project.Sync_LocalHalf.assert_called_once()
def test_worker_network_only(self):
"""Test _SyncProjectList with --network-only."""
opt = self._get_opts(["--interleaved", "--network-only"])
project = self.projA
project.Sync_NetworkHalf = mock.Mock(
return_value=SyncNetworkHalfResult(error=None, remote_fetched=True)
)
project.Sync_LocalHalf = mock.Mock()
self.mock_context["projects"] = [project]
result_obj = self.cmd._SyncProjectList(opt, [0])
result = result_obj.results[0]
self.assertTrue(result.fetch_success)
self.assertTrue(result.checkout_success)
project.Sync_NetworkHalf.assert_called_once()
project.Sync_LocalHalf.assert_not_called()