/usr/share/pyshared/pip/vcs/bazaar.py is in python-pip 1.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | import os
import tempfile
import re
from pip import call_subprocess
from pip.backwardcompat import urlparse
from pip.log import logger
from pip.util import rmtree, display_path
from pip.vcs import vcs, VersionControl
from pip.download import path_to_url2
class Bazaar(VersionControl):
name = 'bzr'
dirname = '.bzr'
repo_name = 'branch'
bundle_file = 'bzr-branch.txt'
schemes = ('bzr', 'bzr+http', 'bzr+https', 'bzr+ssh', 'bzr+sftp', 'bzr+ftp', 'bzr+lp')
guide = ('# This was a Bazaar branch; to make it a branch again run:\n'
'bzr branch -r %(rev)s %(url)s .\n')
def __init__(self, url=None, *args, **kwargs):
super(Bazaar, self).__init__(url, *args, **kwargs)
# Python >= 2.7.4, 3.3 doesn't have uses_fragment or non_hierarchical
if getattr(urlparse, 'uses_fragment', None):
urlparse.uses_fragment.extend(self.schemes)
urlparse.non_hierarchical.extend(['lp'])
def parse_vcs_bundle_file(self, content):
url = rev = None
for line in content.splitlines():
if not line.strip() or line.strip().startswith('#'):
continue
match = re.search(r'^bzr\s*branch\s*-r\s*(\d*)', line)
if match:
rev = match.group(1).strip()
url = line[match.end():].strip().split(None, 1)[0]
if url and rev:
return url, rev
return None, None
def export(self, location):
"""Export the Bazaar repository at the url to the destination location"""
temp_dir = tempfile.mkdtemp('-export', 'pip-')
self.unpack(temp_dir)
if os.path.exists(location):
# Remove the location to make sure Bazaar can export it correctly
rmtree(location)
try:
call_subprocess([self.cmd, 'export', location], cwd=temp_dir,
filter_stdout=self._filter, show_stdout=False)
finally:
rmtree(temp_dir)
def switch(self, dest, url, rev_options):
call_subprocess([self.cmd, 'switch', url], cwd=dest)
def update(self, dest, rev_options):
call_subprocess(
[self.cmd, 'pull', '-q'] + rev_options, cwd=dest)
def obtain(self, dest):
url, rev = self.get_url_rev()
if rev:
rev_options = ['-r', rev]
rev_display = ' (to revision %s)' % rev
else:
rev_options = []
rev_display = ''
if self.check_destination(dest, url, rev_options, rev_display):
logger.notify('Checking out %s%s to %s'
% (url, rev_display, display_path(dest)))
call_subprocess(
[self.cmd, 'branch', '-q'] + rev_options + [url, dest])
def get_url_rev(self):
# hotfix the URL scheme after removing bzr+ from bzr+ssh:// readd it
url, rev = super(Bazaar, self).get_url_rev()
if url.startswith('ssh://'):
url = 'bzr+' + url
return url, rev
def get_url(self, location):
urls = call_subprocess(
[self.cmd, 'info'], show_stdout=False, cwd=location)
for line in urls.splitlines():
line = line.strip()
for x in ('checkout of branch: ',
'parent branch: '):
if line.startswith(x):
repo = line.split(x)[1]
if self._is_local_repository(repo):
return path_to_url2(repo)
return repo
return None
def get_revision(self, location):
revision = call_subprocess(
[self.cmd, 'revno'], show_stdout=False, cwd=location)
return revision.splitlines()[-1]
def get_tag_revs(self, location):
tags = call_subprocess(
[self.cmd, 'tags'], show_stdout=False, cwd=location)
tag_revs = []
for line in tags.splitlines():
tags_match = re.search(r'([.\w-]+)\s*(.*)$', line)
if tags_match:
tag = tags_match.group(1)
rev = tags_match.group(2)
tag_revs.append((rev.strip(), tag.strip()))
return dict(tag_revs)
def get_src_requirement(self, dist, location, find_tags):
repo = self.get_url(location)
if not repo.lower().startswith('bzr:'):
repo = 'bzr+' + repo
egg_project_name = dist.egg_name().split('-', 1)[0]
if not repo:
return None
current_rev = self.get_revision(location)
tag_revs = self.get_tag_revs(location)
if current_rev in tag_revs:
# It's a tag
full_egg_name = '%s-%s' % (egg_project_name, tag_revs[current_rev])
else:
full_egg_name = '%s-dev_r%s' % (dist.egg_name(), current_rev)
return '%s@%s#egg=%s' % (repo, current_rev, full_egg_name)
vcs.register(Bazaar)
|