/usr/share/pyshared/landscape/user/management.py is in landscape-common 12.04.3-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | # XXX: There is the potential for some sort of "unixadmin" package
# which wraps up the commands which we use in this module in a Python
# API, with thorough usage of exceptions and such, instead of pipes to
# subprocesses. liboobs (i.e. System Tools) is a possibility, and has
# documentation now in the 2.17 series, but is not wrapped to Python.
import os
import logging
import subprocess
from landscape.lib import md5crypt
from landscape.user.provider import UserManagementError, UserProvider
class UserManagement(object):
"""Manage system users and groups."""
def __init__(self, provider=None):
self._provider = provider or UserProvider()
def add_user(self, username, name, password, require_password_reset,
primary_group_name, location, work_phone, home_phone):
"""Add C{username} to the computer.
@raises UserManagementError: Raised when C{adduser} fails.
@raises UserManagementError: Raised when C{passwd} fails.
"""
logging.info("Adding user %s.", username)
gecos = "%s,%s,%s,%s" % (name, location or "", work_phone or "",
home_phone or "")
command = ["adduser", username, "--disabled-password", "--gecos",
gecos]
if primary_group_name:
command.extend(["--gid", str(self._provider.get_gid(
primary_group_name))])
result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error adding user %s.\n%s" %
(username, output))
self._set_password(username, password)
if require_password_reset:
result, new_output = self.call_popen(["passwd", username, "-e"])
if result !=0:
raise UserManagementError("Error resetting password for user "
"%s.\n%s" % (username, new_output))
else:
output += new_output
return output
def _set_password(self, username, password):
# XXX temporary workaround? We're getting unicode here.
username = username.encode("ascii")
password = password.encode("ascii")
salt = os.urandom(6).encode("base64")[:-1]
crypted = md5crypt.md5crypt(password, salt)
result, output = self.call_popen(["usermod", "-p", crypted, username])
if result != 0:
raise UserManagementError("Error setting password for user "
"%s.\n%s" % (username, output))
return output
def _set_primary_group(self, username, groupname):
primary_gid = self._provider.get_gid(groupname)
command = ["usermod", "-g", str(primary_gid), username]
result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error setting primary group to %d for"
"%s.\n%s" % (primary_gid, username,
output))
return output
def set_user_details(self, username, password=None, name=None,
location=None, work_number=None, home_number=None,
primary_group_name=None):
"""Update details for the account matching C{uid}."""
uid = self._provider.get_uid(username)
logging.info("Updating metadata for user %s (UID %d).", username, uid)
if password:
self._set_password(username, password)
if primary_group_name:
self._set_primary_group(username, primary_group_name)
command = ["chfn"]
for option, value in [("-r", location), ("-f", name),
("-w", work_number), ("-h", home_number)]:
if value is not None:
command += [option, value]
if len(command) > 1:
result, output = self.call_popen(command + [username])
if result != 0:
raise UserManagementError("Error setting details for user "
"%s.\n%s" % (username, output))
return output
def lock_user(self, username):
"""
Lock the account matching C{username} to prevent them from logging in.
"""
uid = self._provider.get_uid(username)
logging.info("Locking out user %s (UID %d).", username, uid)
result, output = self.call_popen(["usermod", "-L", username])
if result != 0:
raise UserManagementError("Error locking user %s.\n%s"
% (username, output))
def unlock_user(self, username):
"""Unlock the account matching C{username}."""
uid = self._provider.get_uid(username)
logging.info("Unlocking user %s (UID %d).", username, uid)
result, output = self.call_popen(["usermod", "-U", username])
if result != 0:
raise UserManagementError("Error unlocking user %s.\n%s"
% (username, output))
return output
def remove_user(self, username, delete_home=False):
"""Remove the account matching C{username} from the computer."""
uid = self._provider.get_uid(username)
command = ["deluser", username]
if delete_home:
logging.info("Removing user %s (UID %d) and deleting their home "
"directory.", username, uid)
command.append("--remove-home")
else:
logging.info("Removing user %s (UID %d) without deleting their "
"home directory.", username, uid)
result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error removing user %s (UID %d).\n%s"
% (username, uid, output))
return output
def add_group(self, groupname):
"""Add C{group} with the C{addgroup} system command."""
logging.info("Adding group %s.", groupname)
result, output = self.call_popen(["addgroup", groupname])
if result != 0:
raise UserManagementError("Error adding group %s.\n%s" %
(groupname, output))
return output
def set_group_details(self, groupname, new_name):
"""Update details for the group matching C{gid}."""
gid = self._provider.get_gid(groupname)
logging.info("Renaming group %s (GID %d) to %s.",
groupname, gid, new_name)
command = ["groupmod", "-n", new_name, groupname]
result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error renaming group %s (GID %d) to "
"%s.\n%s" % (groupname, gid, new_name,
output))
return output
def add_group_member(self, username, groupname):
"""
Add the user matching C{username} to the group matching C{groupname}
with the C{gpasswd} system command.
"""
uid = self._provider.get_uid(username)
gid = self._provider.get_gid(groupname)
logging.info("Adding user %s (UID %d) to group %s (GID %d).",
username, uid, groupname, gid)
result, output = self.call_popen(["gpasswd", "-a", username, groupname])
if result != 0:
raise UserManagementError("Error adding user %s (UID %d) to "
"group %s (GID %d).\n%s" %
(username, uid, groupname, gid, output))
return output
def remove_group_member(self, username, groupname):
"""
Remove the user matching C{username} from the group matching
C{groupname} with the C{gpasswd} system command.
"""
uid = self._provider.get_uid(username)
gid = self._provider.get_gid(groupname)
logging.info("Removing user %s (UID %d) from group %s (GID %d).",
username, uid, groupname, gid)
result, output = self.call_popen(["gpasswd", "-d", username, groupname])
if result != 0:
raise UserManagementError("Error removing user %s (UID %d) "
"from group %s (GID (%d).\n%s"
% (username, uid, groupname,
gid, output))
return output
def remove_group(self, groupname):
"""Remove the account matching C{groupname} from the computer."""
gid = self._provider.get_gid(groupname)
logging.info("Removing group %s (GID %d).", groupname, gid)
result, output = self.call_popen(["groupdel", groupname])
if result != 0:
raise UserManagementError("Error removing group %s (GID %d).\n%s"
% (groupname, gid, output))
return output
def call_popen(self, args):
popen = self._provider.popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = popen.stdout.read()
result = popen.wait()
return result, output
|