/usr/share/pyshared/git/objects/fun.py is in python-git 0.3.2~RC1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | """Module with functions which are supposed to be as fast as possible"""
from stat import S_ISDIR
__all__ = ('tree_to_stream', 'tree_entries_from_data', 'traverse_trees_recursive',
'traverse_tree_recursive')
def tree_to_stream(entries, write):
"""Write the give list of entries into a stream using its write method
:param entries: **sorted** list of tuples with (binsha, mode, name)
:param write: write method which takes a data string"""
ord_zero = ord('0')
bit_mask = 7 # 3 bits set
for binsha, mode, name in entries:
mode_str = ''
for i in xrange(6):
mode_str = chr(((mode >> (i*3)) & bit_mask) + ord_zero) + mode_str
# END for each 8 octal value
# git slices away the first octal if its zero
if mode_str[0] == '0':
mode_str = mode_str[1:]
# END save a byte
# here it comes: if the name is actually unicode, the replacement below
# will not work as the binsha is not part of the ascii unicode encoding -
# hence we must convert to an utf8 string for it to work properly.
# According to my tests, this is exactly what git does, that is it just
# takes the input literally, which appears to be utf8 on linux.
if isinstance(name, unicode):
name = name.encode("utf8")
write("%s %s\0%s" % (mode_str, name, binsha))
# END for each item
def tree_entries_from_data(data):
"""Reads the binary representation of a tree and returns tuples of Tree items
:param data: data block with tree data
:return: list(tuple(binsha, mode, tree_relative_path), ...)"""
ord_zero = ord('0')
len_data = len(data)
i = 0
out = list()
while i < len_data:
mode = 0
# read mode
# Some git versions truncate the leading 0, some don't
# The type will be extracted from the mode later
while data[i] != ' ':
# move existing mode integer up one level being 3 bits
# and add the actual ordinal value of the character
mode = (mode << 3) + (ord(data[i]) - ord_zero)
i += 1
# END while reading mode
# byte is space now, skip it
i += 1
# parse name, it is NULL separated
ns = i
while data[i] != '\0':
i += 1
# END while not reached NULL
# default encoding for strings in git is utf8
# Only use the respective unicode object if the byte stream was encoded
name = data[ns:i]
name_enc = name.decode("utf-8")
if len(name) > len(name_enc):
name = name_enc
# END handle encoding
# byte is NULL, get next 20
i += 1
sha = data[i:i+20]
i = i + 20
out.append((sha, mode, name))
# END for each byte in data stream
return out
def _find_by_name(tree_data, name, is_dir, start_at):
"""return data entry matching the given name and tree mode
or None.
Before the item is returned, the respective data item is set
None in the tree_data list to mark it done"""
try:
item = tree_data[start_at]
if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
tree_data[start_at] = None
return item
except IndexError:
pass
# END exception handling
for index, item in enumerate(tree_data):
if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
tree_data[index] = None
return item
# END if item matches
# END for each item
return None
def _to_full_path(item, path_prefix):
"""Rebuild entry with given path prefix"""
if not item:
return item
return (item[0], item[1], path_prefix+item[2])
def traverse_trees_recursive(odb, tree_shas, path_prefix):
"""
:return: list with entries according to the given binary tree-shas.
The result is encoded in a list
of n tuple|None per blob/commit, (n == len(tree_shas)), where
* [0] == 20 byte sha
* [1] == mode as int
* [2] == path relative to working tree root
The entry tuple is None if the respective blob/commit did not
exist in the given tree.
:param tree_shas: iterable of shas pointing to trees. All trees must
be on the same level. A tree-sha may be None in which case None
:param path_prefix: a prefix to be added to the returned paths on this level,
set it '' for the first iteration
:note: The ordering of the returned items will be partially lost"""
trees_data = list()
nt = len(tree_shas)
for tree_sha in tree_shas:
if tree_sha is None:
data = list()
else:
data = tree_entries_from_data(odb.stream(tree_sha).read())
# END handle muted trees
trees_data.append(data)
# END for each sha to get data for
out = list()
out_append = out.append
# find all matching entries and recursively process them together if the match
# is a tree. If the match is a non-tree item, put it into the result.
# Processed items will be set None
for ti, tree_data in enumerate(trees_data):
for ii, item in enumerate(tree_data):
if not item:
continue
# END skip already done items
entries = [ None for n in range(nt) ]
entries[ti] = item
sha, mode, name = item # its faster to unpack
is_dir = S_ISDIR(mode) # type mode bits
# find this item in all other tree data items
# wrap around, but stop one before our current index, hence
# ti+nt, not ti+1+nt
for tio in range(ti+1, ti+nt):
tio = tio % nt
entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii)
# END for each other item data
# if we are a directory, enter recursion
if is_dir:
out.extend(traverse_trees_recursive(odb, [((ei and ei[0]) or None) for ei in entries], path_prefix+name+'/'))
else:
out_append(tuple(_to_full_path(e, path_prefix) for e in entries))
# END handle recursion
# finally mark it done
tree_data[ii] = None
# END for each item
# we are done with one tree, set all its data empty
del(tree_data[:])
# END for each tree_data chunk
return out
def traverse_tree_recursive(odb, tree_sha, path_prefix):
"""
:return: list of entries of the tree pointed to by the binary tree_sha. An entry
has the following format:
* [0] 20 byte sha
* [1] mode as int
* [2] path relative to the repository
:param path_prefix: prefix to prepend to the front of all returned paths"""
entries = list()
data = tree_entries_from_data(odb.stream(tree_sha).read())
# unpacking/packing is faster than accessing individual items
for sha, mode, name in data:
if S_ISDIR(mode):
entries.extend(traverse_tree_recursive(odb, sha, path_prefix+name+'/'))
else:
entries.append((sha, mode, path_prefix+name))
# END for each item
return entries
|