# # Copyright (C) 2008 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import itertools import os import re import sys import urlparse import xml.dom.minidom from git_config import GitConfig from git_refs import R_HEADS, HEAD from project import RemoteSpec, Project, MetaProject from error import ManifestParseError MANIFEST_FILE_NAME = 'manifest.xml' LOCAL_MANIFEST_NAME = 'local_manifest.xml' LOCAL_MANIFESTS_DIR_NAME = 'local_manifests' urlparse.uses_relative.extend(['ssh', 'git']) urlparse.uses_netloc.extend(['ssh', 'git']) class _Default(object): """Project defaults within the manifest.""" revisionExpr = None remote = None sync_j = 1 sync_c = False sync_s = False class _XmlRemote(object): def __init__(self, name, alias=None, fetch=None, manifestUrl=None, review=None): self.name = name self.fetchUrl = fetch self.manifestUrl = manifestUrl self.remoteAlias = alias self.reviewUrl = review self.resolvedFetchUrl = self._resolveFetchUrl() def __eq__(self, other): return self.__dict__ == other.__dict__ def __ne__(self, other): return self.__dict__ != other.__dict__ def _resolveFetchUrl(self): url = self.fetchUrl.rstrip('/') manifestUrl = self.manifestUrl.rstrip('/') p = manifestUrl.startswith('persistent-http') if p: manifestUrl = manifestUrl[len('persistent-'):] # urljoin will get confused if there is no scheme in the base url # ie, if manifestUrl is of the form <hostname:port> if manifestUrl.find(':') != manifestUrl.find('/') - 1: manifestUrl = 'gopher://' + manifestUrl url = urlparse.urljoin(manifestUrl, url) url = re.sub(r'^gopher://', '', url) if p: url = 'persistent-' + url return url def ToRemoteSpec(self, projectName): url = self.resolvedFetchUrl.rstrip('/') + '/' + projectName remoteName = self.name if self.remoteAlias: remoteName = self.remoteAlias return RemoteSpec(remoteName, url, self.reviewUrl) class XmlManifest(object): """manages the repo configuration file""" def __init__(self, repodir): self.repodir = os.path.abspath(repodir) self.topdir = os.path.dirname(self.repodir) self.manifestFile = os.path.join(self.repodir, MANIFEST_FILE_NAME) self.globalConfig = GitConfig.ForUser() self.repoProject = MetaProject(self, 'repo', gitdir = os.path.join(repodir, 'repo/.git'), worktree = os.path.join(repodir, 'repo')) self.manifestProject = MetaProject(self, 'manifests', gitdir = os.path.join(repodir, 'manifests.git'), worktree = os.path.join(repodir, 'manifests')) self._Unload() def Override(self, name): """Use a different manifest, just for the current instantiation. """ path = os.path.join(self.manifestProject.worktree, name) if not os.path.isfile(path): raise ManifestParseError('manifest %s not found' % name) old = self.manifestFile try: self.manifestFile = path self._Unload() self._Load() finally: self.manifestFile = old def Link(self, name): """Update the repo metadata to use a different manifest. """ self.Override(name) try: if os.path.lexists(self.manifestFile): os.remove(self.manifestFile) os.symlink('manifests/%s' % name, self.manifestFile) except OSError as e: raise ManifestParseError('cannot link manifest %s: %s' % (name, str(e))) def _RemoteToXml(self, r, doc, root): e = doc.createElement('remote') root.appendChild(e) e.setAttribute('name', r.name) e.setAttribute('fetch', r.fetchUrl) if r.reviewUrl is not None: e.setAttribute('review', r.reviewUrl) def Save(self, fd, peg_rev=False, peg_rev_upstream=True): """Write the current manifest out to the given file descriptor. """ mp = self.manifestProject groups = mp.config.GetString('manifest.groups') if groups: groups = [x for x in re.split(r'[,\s]+', groups) if x] doc = xml.dom.minidom.Document() root = doc.createElement('manifest') doc.appendChild(root) # Save out the notice. There's a little bit of work here to give it the # right whitespace, which assumes that the notice is automatically indented # by 4 by minidom. if self.notice: notice_element = root.appendChild(doc.createElement('notice')) notice_lines = self.notice.splitlines() indented_notice = ('\n'.join(" "*4 + line for line in notice_lines))[4:] notice_element.appendChild(doc.createTextNode(indented_notice)) d = self.default sort_remotes = list(self.remotes.keys()) sort_remotes.sort() for r in sort_remotes: self._RemoteToXml(self.remotes[r], doc, root) if self.remotes: root.appendChild(doc.createTextNode('')) have_default = False e = doc.createElement('default') if d.remote: have_default = True e.setAttribute('remote', d.remote.name) if d.revisionExpr: have_default = True e.setAttribute('revision', d.revisionExpr) if d.sync_j > 1: have_default = True e.setAttribute('sync-j', '%d' % d.sync_j) if d.sync_c: have_default = True e.setAttribute('sync-c', 'true') if d.sync_s: have_default = True e.setAttribute('sync-s', 'true') if have_default: root.appendChild(e) root.appendChild(doc.createTextNode('')) if self._manifest_server: e = doc.createElement('manifest-server') e.setAttribute('url', self._manifest_server) root.appendChild(e) root.appendChild(doc.createTextNode('')) def output_projects(parent, parent_node, projects): for p in projects: output_project(parent, parent_node, self.projects[p]) def output_project(parent, parent_node, p): if not p.MatchesGroups(groups): return name = p.name relpath = p.relpath if parent: name = self._UnjoinName(parent.name, name) relpath = self._UnjoinRelpath(parent.relpath, relpath) e = doc.createElement('project') parent_node.appendChild(e) e.setAttribute('name', name) if relpath != name: e.setAttribute('path', relpath) if not d.remote or p.remote.name != d.remote.name: e.setAttribute('remote', p.remote.name) if peg_rev: if self.IsMirror: value = p.bare_git.rev_parse(p.revisionExpr + '^0') else: value = p.work_git.rev_parse(HEAD + '^0') e.setAttribute('revision', value) if peg_rev_upstream and value != p.revisionExpr: # Only save the origin if the origin is not a sha1, and the default # isn't our value, and the if the default doesn't already have that # covered. e.setAttribute('upstream', p.revisionExpr) elif not d.revisionExpr or p.revisionExpr != d.revisionExpr: e.setAttribute('revision', p.revisionExpr) for c in p.copyfiles: ce = doc.createElement('copyfile') ce.setAttribute('src', c.src) ce.setAttribute('dest', c.dest) e.appendChild(ce) default_groups = ['all', 'name:%s' % p.name, 'path:%s' % p.relpath] egroups = [g for g in p.groups if g not in default_groups] if egroups: e.setAttribute('groups', ','.join(egroups)) for a in p.annotations: if a.keep == "true": ae = doc.createElement('annotation') ae.setAttribute('name', a.name) ae.setAttribute('value', a.value) e.appendChild(ae) if p.sync_c: e.setAttribute('sync-c', 'true') if p.sync_s: e.setAttribute('sync-s', 'true') if p.subprojects: sort_projects = [subp.name for subp in p.subprojects] sort_projects.sort() output_projects(p, e, sort_projects) sort_projects = [key for key in self.projects.keys() if not self.projects[key].parent] sort_projects.sort() output_projects(None, root, sort_projects) if self._repo_hooks_project: root.appendChild(doc.createTextNode('')) e = doc.createElement('repo-hooks') e.setAttribute('in-project', self._repo_hooks_project.name) e.setAttribute('enabled-list', ' '.join(self._repo_hooks_project.enabled_repo_hooks)) root.appendChild(e) doc.writexml(fd, '', ' ', '\n', 'UTF-8') @property def projects(self): self._Load() return self._projects @property def remotes(self): self._Load() return self._remotes @property def default(self): self._Load() return self._default @property def repo_hooks_project(self): self._Load() return self._repo_hooks_project @property def notice(self): self._Load() return self._notice @property def manifest_server(self): self._Load() return self._manifest_server @property def IsMirror(self): return self.manifestProject.config.GetBoolean('repo.mirror') def _Unload(self): self._loaded = False self._projects = {} self._remotes = {} self._default = None self._repo_hooks_project = None self._notice = None self.branch = None self._manifest_server = None def _Load(self): if not self._loaded: m = self.manifestProject b = m.GetBranch(m.CurrentBranch).merge if b is not None and b.startswith(R_HEADS): b = b[len(R_HEADS):] self.branch = b nodes = [] nodes.append(self._ParseManifestXml(self.manifestFile, self.manifestProject.worktree)) local = os.path.join(self.repodir, LOCAL_MANIFEST_NAME) if os.path.exists(local): print('warning: %s is deprecated; put local manifests in `%s` instead' % (LOCAL_MANIFEST_NAME, os.path.join(self.repodir, LOCAL_MANIFESTS_DIR_NAME)), file=sys.stderr) nodes.append(self._ParseManifestXml(local, self.repodir)) local_dir = os.path.abspath(os.path.join(self.repodir, LOCAL_MANIFESTS_DIR_NAME)) try: for local_file in sorted(os.listdir(local_dir)): if local_file.endswith('.xml'): local = os.path.join(local_dir, local_file) nodes.append(self._ParseManifestXml(local, self.repodir)) except OSError: pass try: self._ParseManifest(nodes) except ManifestParseError as e: # There was a problem parsing, unload ourselves in case they catch # this error and try again later, we will show the correct error self._Unload() raise e if self.IsMirror: self._AddMetaProjectMirror(self.repoProject) self._AddMetaProjectMirror(self.manifestProject) self._loaded = True def _ParseManifestXml(self, path, include_root): try: root = xml.dom.minidom.parse(path) except (OSError, xml.parsers.expat.ExpatError) as e: raise ManifestParseError("error parsing manifest %s: %s" % (path, e)) if not root or not root.childNodes: raise ManifestParseError("no root node in %s" % (path,)) for manifest in root.childNodes: if manifest.nodeName == 'manifest': break else: raise ManifestParseError("no <manifest> in %s" % (path,)) nodes = [] for node in manifest.childNodes: # pylint:disable=W0631 # We only get here if manifest is initialised if node.nodeName == 'include': name = self._reqatt(node, 'name') fp = os.path.join(include_root, name) if not os.path.isfile(fp): raise ManifestParseError, \ "include %s doesn't exist or isn't a file" % \ (name,) try: nodes.extend(self._ParseManifestXml(fp, include_root)) # should isolate this to the exact exception, but that's # tricky. actual parsing implementation may vary. except (KeyboardInterrupt, RuntimeError, SystemExit): raise except Exception as e: raise ManifestParseError( "failed parsing included manifest %s: %s", (name, e)) else: nodes.append(node) return nodes def _ParseManifest(self, node_list): for node in itertools.chain(*node_list): if node.nodeName == 'remote': remote = self._ParseRemote(node) if remote: if remote.name in self._remotes: if remote != self._remotes[remote.name]: raise ManifestParseError( 'remote %s already exists with different attributes' % (remote.name)) else: self._remotes[remote.name] = remote for node in itertools.chain(*node_list): if node.nodeName == 'default': if self._default is not None: raise ManifestParseError( 'duplicate default in %s' % (self.manifestFile)) self._default = self._ParseDefault(node) if self._default is None: self._default = _Default() for node in itertools.chain(*node_list): if node.nodeName == 'notice': if self._notice is not None: raise ManifestParseError( 'duplicate notice in %s' % (self.manifestFile)) self._notice = self._ParseNotice(node) for node in itertools.chain(*node_list): if node.nodeName == 'manifest-server': url = self._reqatt(node, 'url') if self._manifest_server is not None: raise ManifestParseError( 'duplicate manifest-server in %s' % (self.manifestFile)) self._manifest_server = url def recursively_add_projects(project): if self._projects.get(project.name): raise ManifestParseError( 'duplicate project %s in %s' % (project.name, self.manifestFile)) self._projects[project.name] = project for subproject in project.subprojects: recursively_add_projects(subproject) for node in itertools.chain(*node_list): if node.nodeName == 'project': project = self._ParseProject(node) recursively_add_projects(project) if node.nodeName == 'repo-hooks': # Get the name of the project and the (space-separated) list of enabled. repo_hooks_project = self._reqatt(node, 'in-project') enabled_repo_hooks = self._reqatt(node, 'enabled-list').split() # Only one project can be the hooks project if self._repo_hooks_project is not None: raise ManifestParseError( 'duplicate repo-hooks in %s' % (self.manifestFile)) # Store a reference to the Project. try: self._repo_hooks_project = self._projects[repo_hooks_project] except KeyError: raise ManifestParseError( 'project %s not found for repo-hooks' % (repo_hooks_project)) # Store the enabled hooks in the Project object. self._repo_hooks_project.enabled_repo_hooks = enabled_repo_hooks if node.nodeName == 'remove-project': name = self._reqatt(node, 'name') try: del self._projects[name] except KeyError: raise ManifestParseError('remove-project element specifies non-existent ' 'project: %s' % name) # If the manifest removes the hooks project, treat it as if it deleted # the repo-hooks element too. if self._repo_hooks_project and (self._repo_hooks_project.name == name): self._repo_hooks_project = None def _AddMetaProjectMirror(self, m): name = None m_url = m.GetRemote(m.remote.name).url if m_url.endswith('/.git'): raise ManifestParseError, 'refusing to mirror %s' % m_url if self._default and self._default.remote: url = self._default.remote.resolvedFetchUrl if not url.endswith('/'): url += '/' if m_url.startswith(url): remote = self._default.remote name = m_url[len(url):] if name is None: s = m_url.rindex('/') + 1 manifestUrl = self.manifestProject.config.GetString('remote.origin.url') remote = _XmlRemote('origin', fetch=m_url[:s], manifestUrl=manifestUrl) name = m_url[s:] if name.endswith('.git'): name = name[:-4] if name not in self._projects: m.PreSync() gitdir = os.path.join(self.topdir, '%s.git' % name) project = Project(manifest = self, name = name, remote = remote.ToRemoteSpec(name), gitdir = gitdir, worktree = None, relpath = None, revisionExpr = m.revisionExpr, revisionId = None) self._projects[project.name] = project def _ParseRemote(self, node): """ reads a <remote> element from the manifest file """ name = self._reqatt(node, 'name') alias = node.getAttribute('alias') if alias == '': alias = None fetch = self._reqatt(node, 'fetch') review = node.getAttribute('review') if review == '': review = None manifestUrl = self.manifestProject.config.GetString('remote.origin.url') return _XmlRemote(name, alias, fetch, manifestUrl, review) def _ParseDefault(self, node): """ reads a <default> element from the manifest file """ d = _Default() d.remote = self._get_remote(node) d.revisionExpr = node.getAttribute('revision') if d.revisionExpr == '': d.revisionExpr = None sync_j = node.getAttribute('sync-j') if sync_j == '' or sync_j is None: d.sync_j = 1 else: d.sync_j = int(sync_j) sync_c = node.getAttribute('sync-c') if not sync_c: d.sync_c = False else: d.sync_c = sync_c.lower() in ("yes", "true", "1") sync_s = node.getAttribute('sync-s') if not sync_s: d.sync_s = False else: d.sync_s = sync_s.lower() in ("yes", "true", "1") return d def _ParseNotice(self, node): """ reads a <notice> element from the manifest file The <notice> element is distinct from other tags in the XML in that the data is conveyed between the start and end tag (it's not an empty-element tag). The white space (carriage returns, indentation) for the notice element is relevant and is parsed in a way that is based on how python docstrings work. In fact, the code is remarkably similar to here: http://www.python.org/dev/peps/pep-0257/ """ # Get the data out of the node... notice = node.childNodes[0].data # Figure out minimum indentation, skipping the first line (the same line # as the <notice> tag)... minIndent = sys.maxint lines = notice.splitlines() for line in lines[1:]: lstrippedLine = line.lstrip() if lstrippedLine: indent = len(line) - len(lstrippedLine) minIndent = min(indent, minIndent) # Strip leading / trailing blank lines and also indentation. cleanLines = [lines[0].strip()] for line in lines[1:]: cleanLines.append(line[minIndent:].rstrip()) # Clear completely blank lines from front and back... while cleanLines and not cleanLines[0]: del cleanLines[0] while cleanLines and not cleanLines[-1]: del cleanLines[-1] return '\n'.join(cleanLines) def _JoinName(self, parent_name, name): return os.path.join(parent_name, name) def _UnjoinName(self, parent_name, name): return os.path.relpath(name, parent_name) def _ParseProject(self, node, parent = None): """ reads a <project> element from the manifest file """ name = self._reqatt(node, 'name') if parent: name = self._JoinName(parent.name, name) remote = self._get_remote(node) if remote is None: remote = self._default.remote if remote is None: raise ManifestParseError, \ "no remote for project %s within %s" % \ (name, self.manifestFile) revisionExpr = node.getAttribute('revision') if not revisionExpr: revisionExpr = self._default.revisionExpr if not revisionExpr: raise ManifestParseError, \ "no revision for project %s within %s" % \ (name, self.manifestFile) path = node.getAttribute('path') if not path: path = name if path.startswith('/'): raise ManifestParseError, \ "project %s path cannot be absolute in %s" % \ (name, self.manifestFile) rebase = node.getAttribute('rebase') if not rebase: rebase = True else: rebase = rebase.lower() in ("yes", "true", "1") sync_c = node.getAttribute('sync-c') if not sync_c: sync_c = False else: sync_c = sync_c.lower() in ("yes", "true", "1") sync_s = node.getAttribute('sync-s') if not sync_s: sync_s = self._default.sync_s else: sync_s = sync_s.lower() in ("yes", "true", "1") upstream = node.getAttribute('upstream') groups = '' if node.hasAttribute('groups'): groups = node.getAttribute('groups') groups = [x for x in re.split(r'[,\s]+', groups) if x] if parent is None: relpath, worktree, gitdir = self.GetProjectPaths(name, path) else: relpath, worktree, gitdir = self.GetSubprojectPaths(parent, path) default_groups = ['all', 'name:%s' % name, 'path:%s' % relpath] groups.extend(set(default_groups).difference(groups)) project = Project(manifest = self, name = name, remote = remote.ToRemoteSpec(name), gitdir = gitdir, worktree = worktree, relpath = relpath, revisionExpr = revisionExpr, revisionId = None, rebase = rebase, groups = groups, sync_c = sync_c, sync_s = sync_s, upstream = upstream, parent = parent) for n in node.childNodes: if n.nodeName == 'copyfile': self._ParseCopyFile(project, n) if n.nodeName == 'annotation': self._ParseAnnotation(project, n) if n.nodeName == 'project': project.subprojects.append(self._ParseProject(n, parent = project)) return project def GetProjectPaths(self, name, path): relpath = path if self.IsMirror: worktree = None gitdir = os.path.join(self.topdir, '%s.git' % name) else: worktree = os.path.join(self.topdir, path).replace('\\', '/') gitdir = os.path.join(self.repodir, 'projects', '%s.git' % path) return relpath, worktree, gitdir def GetSubprojectName(self, parent, submodule_path): return os.path.join(parent.name, submodule_path) def _JoinRelpath(self, parent_relpath, relpath): return os.path.join(parent_relpath, relpath) def _UnjoinRelpath(self, parent_relpath, relpath): return os.path.relpath(relpath, parent_relpath) def GetSubprojectPaths(self, parent, path): relpath = self._JoinRelpath(parent.relpath, path) gitdir = os.path.join(parent.gitdir, 'subprojects', '%s.git' % path) if self.IsMirror: worktree = None else: worktree = os.path.join(parent.worktree, path).replace('\\', '/') return relpath, worktree, gitdir def _ParseCopyFile(self, project, node): src = self._reqatt(node, 'src') dest = self._reqatt(node, 'dest') if not self.IsMirror: # src is project relative; # dest is relative to the top of the tree project.AddCopyFile(src, dest, os.path.join(self.topdir, dest)) def _ParseAnnotation(self, project, node): name = self._reqatt(node, 'name') value = self._reqatt(node, 'value') try: keep = self._reqatt(node, 'keep').lower() except ManifestParseError: keep = "true" if keep != "true" and keep != "false": raise ManifestParseError, "optional \"keep\" attribute must be \"true\" or \"false\"" project.AddAnnotation(name, value, keep) def _get_remote(self, node): name = node.getAttribute('remote') if not name: return None v = self._remotes.get(name) if not v: raise ManifestParseError, \ "remote %s not defined in %s" % \ (name, self.manifestFile) return v def _reqatt(self, node, attname): """ reads a required attribute from the node. """ v = node.getAttribute(attname) if not v: raise ManifestParseError, \ "no %s in <%s> within %s" % \ (attname, node.nodeName, self.manifestFile) return v