/usr/lib/python2.7/dist-packages/kopano/user.py is in python-kopano 8.5.5-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 | """
Part of the high-level python bindings for Kopano
Copyright 2005 - 2016 Zarafa and its licensors (see LICENSE file for details)
Copyright 2016 - Kopano and its licensors (see LICENSE file for details)
"""
import sys
from MAPI import (
MAPI_UNICODE, MAPI_UNRESOLVED, ECSTORE_TYPE_PRIVATE, ECSTORE_TYPE_ARCHIVE,
WrapStoreEntryID
)
from MAPI.Defs import bin2hex, HrGetOneProp
from MAPI.Struct import (
SPropValue, MAPIErrorNotFound, MAPIErrorInvalidParameter,
MAPIErrorCollision, MAPIErrorNoSupport, ECUSER
)
from MAPI.Tags import (
PR_EMAIL_ADDRESS_W, PR_DISPLAY_NAME_W, PR_EC_ENABLED_FEATURES_W,
PR_EC_DISABLED_FEATURES_W, PR_EC_COMPANY_NAME_W,
PR_MAPPING_SIGNATURE, PR_EC_ARCHIVE_SERVERS,
EMS_AB_ADDRESS_LOOKUP
)
from .store import Store
from .properties import Properties
from .group import Group
from .quota import Quota
from .defs import (
ACTIVE_USER, NONACTIVE_USER,
)
from .errors import Error, NotFoundError, NotSupportedError, DuplicateError
from .compat import (
hex as _hex, unhex as _unhex, fake_unicode as _unicode
)
if sys.hexversion >= 0x03000000:
from . import server as _server
from . import company as _company
else:
import server as _server
import company as _company
class User(Properties):
"""User class"""
def __init__(self, name=None, server=None, email=None, ecuser=None,
userid=None
):
self.server = server or _server.Server()
if ecuser:
self._ecuser = ecuser
self._name = ecuser.Username
elif userid:
try:
self._ecuser = self.server.sa.GetUser(_unhex(userid), MAPI_UNICODE)
except MAPIErrorNotFound:
raise NotFoundError("no user found with userid '%s'" % userid)
self._name = self._ecuser.Username
else:
if email:
try:
self._name = _unicode(self.server.gab.ResolveNames([PR_EMAIL_ADDRESS_W], MAPI_UNICODE | EMS_AB_ADDRESS_LOOKUP, [[SPropValue(PR_DISPLAY_NAME_W, _unicode(email))]], [MAPI_UNRESOLVED])[0][0][1].Value)
except (MAPIErrorNotFound, MAPIErrorInvalidParameter, IndexError):
raise NotFoundError("no such user '%s'" % email)
else:
self._name = _unicode(name)
try:
self._ecuser = self.server.sa.GetUser(self.server.sa.ResolveUserName(self._name, MAPI_UNICODE), MAPI_UNICODE)
except (MAPIErrorNotFound, MAPIErrorInvalidParameter): # multi-tenant, but no '@' in username..
raise NotFoundError("no such user: '%s'" % self.name)
self._mapiobj = None
@property
def mapiobj(self):
if not self._mapiobj:
self._mapiobj = self.server.mapisession.OpenEntry(self._ecuser.UserID, None, 0)
return self._mapiobj
@property
def admin(self):
return self._ecuser.IsAdmin >= 1
@admin.setter
def admin(self, value):
self._update(admin=value)
@property
def admin_level(self):
return self._ecuser.IsAdmin
@admin_level.setter
def admin_level(self, value):
self._update(admin=value)
@property
def hidden(self):
"""The user is hidden from the addressbook."""
return bool(self._ecuser.IsHidden)
@property
def name(self):
""" Account name """
return self._name
@name.setter
def name(self, value):
self._update(username=_unicode(value))
@property
def fullname(self):
""" Full name """
return self._ecuser.FullName
@fullname.setter
def fullname(self, value):
self._update(fullname=_unicode(value))
@property
def email(self):
""" Email address """
return self._ecuser.Email
@email.setter
def email(self, value):
self._update(email=_unicode(value))
@property
def password(self):
raise Error('passwords are write-only')
@password.setter
def password(self, value):
self._update(password=_unicode(value))
@property
def features(self):
""" Enabled features (pop3/imap/mobile) """
if not hasattr(self._ecuser, 'MVPropMap'):
raise NotSupportedError('Python-Mapi does not support MVPropMap')
for entry in self._ecuser.MVPropMap:
if entry.ulPropId == PR_EC_ENABLED_FEATURES_W:
return entry.Values
@features.setter
def features(self, value):
if not hasattr(self._ecuser, 'MVPropMap'):
raise NotSupportedError('Python-Mapi does not support MVPropMap')
# Enabled + Disabled defines all features.
features = set([e for entry in self._ecuser.MVPropMap for e in entry.Values])
disabled = list(features - set(value))
# XXX: performance
for entry in self._ecuser.MVPropMap:
if entry.ulPropId == PR_EC_ENABLED_FEATURES_W:
entry.Values = value
if entry.ulPropId == PR_EC_DISABLED_FEATURES_W:
entry.Values = disabled
self._update()
def add_feature(self, feature):
""" Add a feature for a user
:param feature: the new feature
"""
feature = _unicode(feature)
if feature in self.features:
raise DuplicateError("feature '%s' already enabled for user '%s'" % (feature, self.name))
self.features = self.features + [feature]
def remove_feature(self, feature):
""" Remove a feature for a user
:param feature: the to be removed feature
"""
# Copy features otherwise we will miss a disabled feature.
# XXX: improvement?
features = self.features[:]
try:
features.remove(_unicode(feature))
except ValueError:
raise NotFoundError("no feature '%s' enabled for user '%s'" % (feature, self.name))
self.features = features
self._update()
@property
def userid(self):
""" Userid """
return bin2hex(self._ecuser.UserID)
@property
def company(self):
""" :class:`Company` the user belongs to """
try:
return _company.Company(HrGetOneProp(self.mapiobj, PR_EC_COMPANY_NAME_W).Value, self.server)
except MAPIErrorNoSupport:
return _company.Company(u'Default', self.server)
@property # XXX
def local(self):
store = self.store
return bool(store and (self.server.guid == bin2hex(HrGetOneProp(store.mapiobj, PR_MAPPING_SIGNATURE).Value)))
def create_store(self):
try:
storeid_rootid = self.server.sa.CreateStore(ECSTORE_TYPE_PRIVATE, self._ecuser.UserID)
except MAPIErrorCollision:
raise DuplicateError("user '%s' already has store" % self.name)
store_entryid = WrapStoreEntryID(0, b'zarafa6client.dll', storeid_rootid[0][:-4]) + self.server.pseudo_url + b'\x00'
return Store(entryid=_hex(store_entryid), server=self.server)
@property
def store(self):
""" Default :class:`Store` for user or *None* if no store is attached """
try:
entryid = self.server.ems.CreateStoreEntryID(None, self._name, MAPI_UNICODE)
return Store(entryid=_hex(entryid), server=self.server)
except (MAPIErrorNotFound, NotFoundError):
pass
# XXX deprecated? user.store = .., user.archive_store = ..
def hook(self, store):
try:
self.server.sa.HookStore(ECSTORE_TYPE_PRIVATE, _unhex(self.userid), _unhex(store.guid))
except MAPIErrorCollision:
raise DuplicateError("user '%s' already has hooked store" % self.name)
# XXX deprecated? user.store = None
def unhook(self):
try:
self.server.sa.UnhookStore(ECSTORE_TYPE_PRIVATE, _unhex(self.userid))
except MAPIErrorNotFound:
raise NotFoundError("user '%s' has no hooked store" % self.name)
def hook_archive(self, store):
try:
self.server.sa.HookStore(ECSTORE_TYPE_ARCHIVE, _unhex(self.userid), _unhex(store.guid))
except MAPIErrorCollision:
raise DuplicateError("user '%s' already has hooked archive store" % self.name)
def unhook_archive(self):
try:
self.server.sa.UnhookStore(ECSTORE_TYPE_ARCHIVE, _unhex(self.userid))
except MAPIErrorNotFound:
raise NotFoundError("user '%s' has no hooked archive store" % self.name)
@property
def active(self):
return self._ecuser.Class == ACTIVE_USER
@active.setter
def active(self, value):
if value:
self._update(user_class=ACTIVE_USER)
else:
self._update(user_class=NONACTIVE_USER)
@property
def home_server(self):
return self._ecuser.Servername or self.server.name
@property
def archive_server(self):
try:
return HrGetOneProp(self.mapiobj, PR_EC_ARCHIVE_SERVERS).Value[0]
except MAPIErrorNotFound:
return
@property
def quota(self):
""" User :class:`Quota` """
return Quota(self.server, self._ecuser.UserID)
def groups(self):
for g in self.server.sa.GetGroupListOfUser(self._ecuser.UserID, MAPI_UNICODE):
yield Group(g.Groupname, self.server)
def send_as(self):
for u in self.server.sa.GetSendAsList(self._ecuser.UserID, MAPI_UNICODE):
yield self.server.user(u.Username)
def add_send_as(self, user):
try:
self.server.sa.AddSendAsUser(self._ecuser.UserID, user._ecuser.UserID)
except MAPIErrorCollision:
raise DuplicateError("user '%s' already in send-as for user '%s'" % (user.name, self.name))
def remove_send_as(self, user):
try:
self.server.sa.DelSendAsUser(self._ecuser.UserID, user._ecuser.UserID)
except MAPIErrorNotFound:
raise NotFoundError("no user '%s' in send-as for user '%s'" % (user.name, self.name))
def rules(self):
return self.inbox.rules()
def __eq__(self, u): # XXX check same server?
if isinstance(u, User):
return self.userid == u.userid
return False
def __ne__(self, u):
return not self == u
def __unicode__(self):
return u"User('%s')" % self._name
def _update(self, **kwargs):
username = kwargs.get('username', self.name)
password = kwargs.get('password', self._ecuser.Password)
email = kwargs.get('email', _unicode(self._ecuser.Email))
fullname = kwargs.get('fullname', _unicode(self._ecuser.FullName))
user_class = kwargs.get('user_class', self._ecuser.Class)
admin = kwargs.get('admin', self._ecuser.IsAdmin)
try:
# Pass the MVPropMAP otherwise the set values are reset
if hasattr(self._ecuser, 'MVPropMap'):
self.server.sa.SetUser(ECUSER(
Username=username, Password=password,
Email=email, FullName=fullname,
Class=user_class, UserID=self._ecuser.UserID,
IsAdmin=admin, MVPropMap=self._ecuser.MVPropMap
), MAPI_UNICODE)
else:
self.server.sa.SetUser(ECUSER(
Username=username, Password=password, Email=email,
FullName=fullname, Class=user_class,
UserID=self._ecuser.UserID, IsAdmin=admin
), MAPI_UNICODE)
except MAPIErrorNoSupport:
pass
self._ecuser = self.server.sa.GetUser(
self.server.sa.ResolveUserName(username, MAPI_UNICODE),
MAPI_UNICODE
)
self._name = username
def __getattr__(self, x):
store = self.store
if store:
try:
return getattr(store, x)
except AttributeError:
raise AttributeError("'User' object has no attribute '%s'" % x)
def __setattr__(self, x, val):
if x not in User.__dict__ and x in Store.__dict__:
setattr(self.store, x, val)
else:
super(User, self).__setattr__(x, val)
|