/usr/share/pyshared/landscape/user/provider.py is in landscape-common 12.04.3-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | from pwd import struct_passwd
from grp import struct_group
import csv
import subprocess
import logging
class UserManagementError(Exception):
"""Catch all error for problems with User Management."""
class UserNotFoundError(Exception):
"""Raised when a user couldn't be found by uid/username."""
class GroupNotFoundError(Exception):
"""Raised when a group couldn't be found by gid/groupname."""
class UserProviderBase(object):
"""This is a base class for user Providers."""
def __init__(self, locked_users=None):
self.locked_users = locked_users or []
self._min_uid = 1000
self._max_uid = 60000
def get_users(self):
"""Returns a list of all local users on the computer.
Each user is represented as a dict with the keys: C{username},
C{name}, C{uid}, C{enabled}, C{location}, C{work-phone} and
C{home-phone}.
"""
users = []
found_usernames = set()
for user in self.get_user_data():
if not isinstance(user, struct_passwd):
user = struct_passwd(user)
if user.pw_name in found_usernames:
continue
gecos_data = [x.decode("utf-8", "replace") or None
for x in user.pw_gecos.split(",")[:4]]
while len(gecos_data) < 4:
gecos_data.append(None)
name, location, work_phone, home_phone = tuple(gecos_data)
enabled = user.pw_name not in self.locked_users
users.append({"username": user.pw_name, "name": name,
"uid": user.pw_uid, "enabled": enabled,
"location": location, "work-phone": work_phone,
"home-phone": home_phone,
"primary-gid": user.pw_gid})
found_usernames.add(user.pw_name)
return users
def get_groups(self):
"""Returns a list of groups on the computer.
Each group is represented as a dict with the keys: C{name},
C{gid} and C{members}.
"""
user_names = set([x["username"] for x in self.get_users()])
groups = []
found_groupnames = set()
for group in self.get_group_data():
if not isinstance(group, struct_group):
group = struct_group(group)
if group.gr_name in found_groupnames:
continue
member_names = user_names.intersection(group.gr_mem)
groups.append({"name": group.gr_name, "gid": group.gr_gid,
"members": list(member_names)})
found_groupnames.add(group.gr_name)
return groups
def get_uid(self, username):
"""Returns the UID for C{username}.
@raises UserNotFoundError: Raised if C{username} doesn't match a
user on the computer.
"""
for data in self.get_users():
if data["username"] == username:
return data["uid"]
raise UserNotFoundError("UID not found for user %s." % username)
def get_gid(self, groupname):
"""Returns the GID for C{groupname}.
@raises UserManagementError: Raised if C{groupname} doesn't
match a group on the computer.
"""
for data in self.get_groups():
if data["name"] == groupname:
return data["gid"]
raise GroupNotFoundError("Group not found for group %s." % groupname)
class UserProvider(UserProviderBase):
popen = subprocess.Popen
passwd_fields = ["username", "passwd", "uid", "primary-gid", "gecos",
"home", "shell"]
group_fields = ["name", "passwd", "gid", "members"]
def __init__(self, locked_users=[], passwd_file="/etc/passwd",
group_file="/etc/group"):
super(UserProvider, self).__init__(locked_users)
self._passwd_file = passwd_file
self._group_file = group_file
def get_user_data(self):
"""
Parse passwd(5) formatted files and return tuples of user data in the
form (username, password, uid, primary-group-id, gecos data, home
directory, path to the user's shell)
"""
user_data = []
passwd_file = open(self._passwd_file, "r")
reader = csv.DictReader(passwd_file, fieldnames=self.passwd_fields,
delimiter=":", quoting=csv.QUOTE_NONE)
current_line = 0
for row in reader:
current_line += 1
# This skips the NIS user marker in the passwd file.
if (row["username"].startswith("+") or
row["username"].startswith("-")):
continue
try:
user_data.append((row["username"], row["passwd"],
int(row["uid"]), int(row["primary-gid"]),
row["gecos"], row["home"], row["shell"]))
except (ValueError, TypeError):
logging.warn("passwd file %s is incorrectly formatted: "
"line %d." % (self._passwd_file, current_line))
passwd_file.close()
return user_data
def get_group_data(self):
"""
Parse group(5) formatted files and return tuples of group data in the
form (groupname, group password, group id and a list of member
usernames).
"""
group_data = []
group_file = open(self._group_file, "r")
reader = csv.DictReader(group_file, fieldnames=self.group_fields,
delimiter=":", quoting=csv.QUOTE_NONE)
current_line = 0
for row in reader:
current_line += 1
# Skip if we find the NIS marker
if (row["name"].startswith("+") or
row["name"].startswith("-")):
continue
try:
group_data.append((row["name"], row["passwd"], int(row["gid"]),
row["members"].split(",")))
except (AttributeError, ValueError):
logging.warn("group file %s is incorrectly formatted: "
"line %d." % (self._group_file, current_line))
group_file.close()
return group_data
|