/usr/lib/python2.7/dist-packages/bundlewrap/group.py is in bundlewrap 3.2.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 | # -*- coding: utf-8 -*-
from __future__ import unicode_literals
import re
from .exceptions import NoSuchGroup, NoSuchNode, RepositoryError
from .utils import cached_property, names
from .utils.dicts import hash_statedict
from .utils.text import mark_for_translation as _, validate_name
GROUP_ATTR_DEFAULTS = {
'cmd_wrapper_inner': "export LANG=C; {}",
'cmd_wrapper_outer': "sudo sh -c {}",
'dummy': False,
'os': 'linux',
# Setting os_version to 0 by default will probably yield less
# surprises than setting it to max_int. Users will probably
# start at a certain version and then gradually update their
# systems, adding conditions like this:
#
# if node.os_version >= (2,):
# new_behavior()
# else:
# old_behavior()
#
# If we set os_version to max_int, nodes without an explicit
# os_version would automatically adopt the new_behavior() as
# soon as it appears in the repo - which is probably not what
# people want.
'os_version': (0,),
'use_shadow_passwords': True,
}
def _build_error_chain(loop_node, last_node, nodes_in_between):
"""
Used to illustrate subgroup loop paths in error messages.
loop_node: name of node that loops back to itself
last_node: name of last node pointing back to loop_node,
causing the loop
nodes_in_between: names of nodes traversed during loop detection,
does include loop_node if not a direct loop,
but not last_node
"""
error_chain = []
for visited in nodes_in_between:
if (loop_node in error_chain) != (loop_node == visited):
error_chain.append(visited)
error_chain.append(last_node)
error_chain.append(loop_node)
return error_chain
class Group(object):
"""
A group of nodes.
"""
def __init__(self, group_name, infodict=None):
if infodict is None:
infodict = {}
if not validate_name(group_name):
raise RepositoryError(_("'{}' is not a valid group name.").format(group_name))
self.name = group_name
self.bundle_names = infodict.get('bundles', [])
self.immediate_subgroup_names = infodict.get('subgroups', [])
self.immediate_subgroup_patterns = infodict.get('subgroup_patterns', [])
self.members_add = infodict.get('members_add', None)
self.members_remove = infodict.get('members_remove', None)
self.metadata = infodict.get('metadata', {})
self.node_patterns = infodict.get('member_patterns', [])
self.static_member_names = infodict.get('members', [])
for attr in GROUP_ATTR_DEFAULTS:
# defaults are applied in node.py
setattr(self, attr, infodict.get(attr))
def __lt__(self, other):
return self.name < other.name
def __repr__(self):
return "<Group: {}>".format(self.name)
def __str__(self):
return self.name
@cached_property
def cdict(self):
group_dict = {}
for node in self.nodes:
group_dict[node.name] = node.hash()
return group_dict
def group_membership_hash(self):
return hash_statedict(sorted(names(self.nodes)))
def hash(self):
return hash_statedict(self.cdict)
def metadata_hash(self):
group_dict = {}
for node in self.nodes:
group_dict[node.name] = node.metadata_hash()
return hash_statedict(group_dict)
@cached_property
def nodes(self):
for node in self.repo.nodes:
if node.in_group(self.name):
yield node
@cached_property
def _static_nodes(self):
result = set()
result.update(self._nodes_from_members)
result.update(self._nodes_from_patterns)
return result
@property
def _subgroup_names_from_patterns(self):
for pattern in self.immediate_subgroup_patterns:
compiled_pattern = re.compile(pattern)
for group in self.repo.groups:
if compiled_pattern.search(group.name) is not None and group != self:
yield group.name
@property
def _nodes_from_members(self):
for node_name in self.static_member_names:
try:
yield self.repo.get_node(node_name)
except NoSuchNode:
raise RepositoryError(_(
"Group '{group}' has '{node}' listed as a member in groups.py, "
"but no such node could be found."
).format(
group=self.name,
node=node_name,
))
@property
def _nodes_from_patterns(self):
for pattern in self.node_patterns:
compiled_pattern = re.compile(pattern)
for node in self.repo.nodes:
if not compiled_pattern.search(node.name) is None:
yield node
def _check_subgroup_names(self, visited_names):
"""
Recursively finds subgroups and checks for loops.
"""
for name in set(
list(self.immediate_subgroup_names) +
list(self._subgroup_names_from_patterns)
):
if name not in visited_names:
try:
group = self.repo.get_group(name)
except NoSuchGroup:
raise RepositoryError(_(
"Group '{group}' has '{subgroup}' listed as a subgroup in groups.py, "
"but no such group could be found."
).format(
group=self.name,
subgroup=name,
))
for group_name in group._check_subgroup_names(
visited_names + [self.name],
):
yield group_name
else:
error_chain = _build_error_chain(
name,
self.name,
visited_names,
)
raise RepositoryError(_(
"Group '{group}' can't be a subgroup of itself. "
"({chain})"
).format(
group=name,
chain=" -> ".join(error_chain),
))
if self.name not in visited_names:
yield self.name
@cached_property
def parent_groups(self):
for group in self.repo.groups:
if self in group.subgroups:
yield group
@cached_property
def immediate_parent_groups(self):
for group in self.repo.groups:
if self in group.immediate_subgroups:
yield group
@cached_property
def subgroups(self):
"""
Iterator over all subgroups as group objects.
"""
for group_name in set(self._check_subgroup_names([self.name])):
yield self.repo.get_group(group_name)
@cached_property
def immediate_subgroups(self):
"""
Iterator over all immediate subgroups as group objects.
"""
for group_name in set(
list(self.immediate_subgroup_names) +
list(self._subgroup_names_from_patterns)
):
try:
yield self.repo.get_group(group_name)
except NoSuchGroup:
raise RepositoryError(_(
"Group '{group}' has '{subgroup}' listed as a subgroup in groups.py, "
"but no such group could be found."
).format(
group=self.name,
subgroup=group_name,
))
|