mirror of
https://gerrit.googlesource.com/git-repo
synced 2024-12-21 07:16:21 +00:00
sync: Always use WORKER_BATCH_SIZE
With 551285fa35
, the comment about number
of workers no longer stands - dict is shared among multiprocesses and
real time information is available.
Using 2.7k projects as the baseline, using chunk size of 4 takes close
to 5 minutes. A chunk size of 32 takes this down to 40s - a reduction of
rougly 8 times which matches the increase.
R=gavinmak@google.com
Bug: b/371638995
Change-Id: Ida5fd8f7abc44b3b82c02aa0f7f7ae01dff5eb07
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/438523
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
Tested-by: Josip Sokcevic <sokcevic@google.com>
Reviewed-by: Gavin Mak <gavinmak@google.com>
This commit is contained in:
parent
f7f9dd4deb
commit
454fdaf119
19
project.py
19
project.py
@ -2396,26 +2396,25 @@ class Project:
|
|||||||
try:
|
try:
|
||||||
# if revision (sha or tag) is not present then following function
|
# if revision (sha or tag) is not present then following function
|
||||||
# throws an error.
|
# throws an error.
|
||||||
|
revs = [f"{self.revisionExpr}^0"]
|
||||||
|
upstream_rev = None
|
||||||
|
if self.upstream:
|
||||||
|
upstream_rev = self.GetRemote().ToLocal(self.upstream)
|
||||||
|
revs.append(upstream_rev)
|
||||||
|
|
||||||
self.bare_git.rev_list(
|
self.bare_git.rev_list(
|
||||||
"-1",
|
"-1",
|
||||||
"--missing=allow-any",
|
"--missing=allow-any",
|
||||||
"%s^0" % self.revisionExpr,
|
*revs,
|
||||||
"--",
|
"--",
|
||||||
log_as_error=False,
|
log_as_error=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.upstream:
|
if self.upstream:
|
||||||
rev = self.GetRemote().ToLocal(self.upstream)
|
|
||||||
self.bare_git.rev_list(
|
|
||||||
"-1",
|
|
||||||
"--missing=allow-any",
|
|
||||||
"%s^0" % rev,
|
|
||||||
"--",
|
|
||||||
log_as_error=False,
|
|
||||||
)
|
|
||||||
self.bare_git.merge_base(
|
self.bare_git.merge_base(
|
||||||
"--is-ancestor",
|
"--is-ancestor",
|
||||||
self.revisionExpr,
|
self.revisionExpr,
|
||||||
rev,
|
upstream_rev,
|
||||||
log_as_error=False,
|
log_as_error=False,
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
@ -131,6 +131,11 @@ def _SafeCheckoutOrder(checkouts: List[Project]) -> List[List[Project]]:
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
def _chunksize(projects: int, jobs: int) -> int:
|
||||||
|
"""Calculate chunk size for the given number of projects and jobs."""
|
||||||
|
return min(max(1, projects // jobs), WORKER_BATCH_SIZE)
|
||||||
|
|
||||||
|
|
||||||
class _FetchOneResult(NamedTuple):
|
class _FetchOneResult(NamedTuple):
|
||||||
"""_FetchOne return value.
|
"""_FetchOne return value.
|
||||||
|
|
||||||
@ -819,7 +824,6 @@ later is required to fix a server side protocol bug.
|
|||||||
def _Fetch(self, projects, opt, err_event, ssh_proxy, errors):
|
def _Fetch(self, projects, opt, err_event, ssh_proxy, errors):
|
||||||
ret = True
|
ret = True
|
||||||
|
|
||||||
jobs = opt.jobs_network
|
|
||||||
fetched = set()
|
fetched = set()
|
||||||
remote_fetched = set()
|
remote_fetched = set()
|
||||||
pm = Progress(
|
pm = Progress(
|
||||||
@ -849,6 +853,8 @@ later is required to fix a server side protocol bug.
|
|||||||
objdir_project_map.setdefault(project.objdir, []).append(project)
|
objdir_project_map.setdefault(project.objdir, []).append(project)
|
||||||
projects_list = list(objdir_project_map.values())
|
projects_list = list(objdir_project_map.values())
|
||||||
|
|
||||||
|
jobs = min(opt.jobs_network, len(projects_list))
|
||||||
|
|
||||||
def _ProcessResults(results_sets):
|
def _ProcessResults(results_sets):
|
||||||
ret = True
|
ret = True
|
||||||
for results in results_sets:
|
for results in results_sets:
|
||||||
@ -888,35 +894,22 @@ later is required to fix a server side protocol bug.
|
|||||||
Sync.ssh_proxy = None
|
Sync.ssh_proxy = None
|
||||||
|
|
||||||
# NB: Multiprocessing is heavy, so don't spin it up for one job.
|
# NB: Multiprocessing is heavy, so don't spin it up for one job.
|
||||||
if len(projects_list) == 1 or jobs == 1:
|
if jobs == 1:
|
||||||
self._FetchInitChild(ssh_proxy)
|
self._FetchInitChild(ssh_proxy)
|
||||||
if not _ProcessResults(
|
if not _ProcessResults(
|
||||||
self._FetchProjectList(opt, x) for x in projects_list
|
self._FetchProjectList(opt, x) for x in projects_list
|
||||||
):
|
):
|
||||||
ret = False
|
ret = False
|
||||||
else:
|
else:
|
||||||
# Favor throughput over responsiveness when quiet. It seems that
|
if not opt.quiet:
|
||||||
# imap() will yield results in batches relative to chunksize, so
|
|
||||||
# even as the children finish a sync, we won't see the result until
|
|
||||||
# one child finishes ~chunksize jobs. When using a large --jobs
|
|
||||||
# with large chunksize, this can be jarring as there will be a large
|
|
||||||
# initial delay where repo looks like it isn't doing anything and
|
|
||||||
# sits at 0%, but then suddenly completes a lot of jobs all at once.
|
|
||||||
# Since this code is more network bound, we can accept a bit more
|
|
||||||
# CPU overhead with a smaller chunksize so that the user sees more
|
|
||||||
# immediate & continuous feedback.
|
|
||||||
if opt.quiet:
|
|
||||||
chunksize = WORKER_BATCH_SIZE
|
|
||||||
else:
|
|
||||||
pm.update(inc=0, msg="warming up")
|
pm.update(inc=0, msg="warming up")
|
||||||
chunksize = 4
|
|
||||||
with multiprocessing.Pool(
|
with multiprocessing.Pool(
|
||||||
jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)
|
jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)
|
||||||
) as pool:
|
) as pool:
|
||||||
results = pool.imap_unordered(
|
results = pool.imap_unordered(
|
||||||
functools.partial(self._FetchProjectList, opt),
|
functools.partial(self._FetchProjectList, opt),
|
||||||
projects_list,
|
projects_list,
|
||||||
chunksize=chunksize,
|
chunksize=_chunksize(len(projects_list), jobs),
|
||||||
)
|
)
|
||||||
if not _ProcessResults(results):
|
if not _ProcessResults(results):
|
||||||
ret = False
|
ret = False
|
||||||
|
@ -355,6 +355,30 @@ class SafeCheckoutOrder(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Chunksize(unittest.TestCase):
|
||||||
|
"""Tests for _chunksize."""
|
||||||
|
|
||||||
|
def test_single_project(self):
|
||||||
|
"""Single project."""
|
||||||
|
self.assertEqual(sync._chunksize(1, 1), 1)
|
||||||
|
|
||||||
|
def test_low_project_count(self):
|
||||||
|
"""Multiple projects, low number of projects to sync."""
|
||||||
|
self.assertEqual(sync._chunksize(10, 1), 10)
|
||||||
|
self.assertEqual(sync._chunksize(10, 2), 5)
|
||||||
|
self.assertEqual(sync._chunksize(10, 4), 2)
|
||||||
|
self.assertEqual(sync._chunksize(10, 8), 1)
|
||||||
|
self.assertEqual(sync._chunksize(10, 16), 1)
|
||||||
|
|
||||||
|
def test_high_project_count(self):
|
||||||
|
"""Multiple projects, high number of projects to sync."""
|
||||||
|
self.assertEqual(sync._chunksize(2800, 1), 32)
|
||||||
|
self.assertEqual(sync._chunksize(2800, 16), 32)
|
||||||
|
self.assertEqual(sync._chunksize(2800, 32), 32)
|
||||||
|
self.assertEqual(sync._chunksize(2800, 64), 32)
|
||||||
|
self.assertEqual(sync._chunksize(2800, 128), 21)
|
||||||
|
|
||||||
|
|
||||||
class GetPreciousObjectsState(unittest.TestCase):
|
class GetPreciousObjectsState(unittest.TestCase):
|
||||||
"""Tests for _GetPreciousObjectsState."""
|
"""Tests for _GetPreciousObjectsState."""
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user