/usr/lib/python2.7/dist-packages/kopano/company.py is in python-kopano 8.5.5-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 | """
Part of the high-level python bindings for Kopano.
Copyright 2005 - 2016 Zarafa and its licensors (see LICENSE file)
Copyright 2016 - Kopano and its licensors (see LICENSE file)
"""
import sys
from MAPI import (
MAPI_UNICODE, RELOP_EQ, TBL_BATCH, ECSTORE_TYPE_PUBLIC,
WrapStoreEntryID
)
from MAPI.Defs import bin2hex, PpropFindProp
from MAPI.Struct import (
MAPIErrorNotFound, SPropertyRestriction, SPropValue,
MAPIErrorCollision
)
from MAPI.Tags import PR_EC_COMPANY_NAME_W, PR_EC_STOREGUID
from MAPI.Util import GetPublicStore
from .properties import Properties
from .quota import Quota
from .group import Group
from .defs import EID_EVERYONE
from .errors import (
NotFoundError, DuplicateError
)
from .compat import (
hex as _hex, unhex as _unhex, repr as _repr, fake_unicode as _unicode
)
if sys.hexversion >= 0x03000000:
try:
from . import server as _server
except ImportError:
_server = sys.modules[__package__+'.server']
try:
from . import user as _user
except ImportError:
_user = sys.modules[__package__+'.user']
try:
from . import store as _store
except ImportError:
_store = sys.modules[__package__+'.store']
else:
import server as _server
import user as _user
import store as _store
class Company(Properties):
"""Company class"""
def __init__(self, name, server=None):
self.server = server or _server.Server()
self._name = name = _unicode(name)
if name != u'Default': # XXX
try:
id_ = self.server.sa.ResolveCompanyName(name, MAPI_UNICODE)
self._eccompany = self.server.sa.GetCompany(id_, MAPI_UNICODE)
except MAPIErrorNotFound:
raise NotFoundError("no such company: '%s'" % name)
# XXX cached because GetPublicStore does not see changes..
# do we need folder & store notifications (slow)?
self._public_store = None
self._mapiobj = None
@property
def mapiobj(self):
if not self._mapiobj:
id_ = self._eccompany.CompanyID
self._mapiobj = self.server.mapisession.OpenEntry(id_, None, 0)
return self._mapiobj
@property
def companyid(self): # XXX single-tenant case
if self._name != u'Default':
return bin2hex(self._eccompany.CompanyID)
@property
def admin(self):
"""Company :class:`administrator <User>` in multi-tenant mode."""
if self._name != u'Default':
id_ = self._eccompany.AdministratorID
ecuser = self.server.sa.GetUser(id_, MAPI_UNICODE)
return self.server.user(ecuser.Username)
@admin.setter
def admin(self, user):
self._eccompany.AdministratorID = user._ecuser.UserID
self.server.sa.SetCompany(self._eccompany, MAPI_UNICODE)
@property
def hidden(self):
"""The company is hidden from the addressbook."""
if self._name != u'Default':
return bool(self._eccompany.IsHidden)
return False
@property
def name(self):
"""Company name."""
return self._name
@name.setter
def name(self, value):
value = _unicode(value)
self._eccompany.Companyname = value
self._name = value
self.server.sa.SetCompany(self._eccompany, MAPI_UNICODE)
def store(self, guid):
"""Store for the given GUID
:param guid: store guid
:return: :class:`store <Store>` with given GUID.
"""
if guid == 'public':
if not self.public_store:
raise NotFoundError("no public store for company '%s'" % \
self.name)
return self.public_store
else:
return self.server.store(guid)
def stores(self):
"""Return all company :class:`stores <Store>`."""
if self.server.multitenant:
table = self.server.sa.OpenUserStoresTable(MAPI_UNICODE)
restriction = SPropertyRestriction(RELOP_EQ, PR_EC_COMPANY_NAME_W,
SPropValue(PR_EC_COMPANY_NAME_W, self.name))
table.Restrict(restriction, TBL_BATCH)
for row in table.QueryRows(-1, 0):
prop = PpropFindProp(row, PR_EC_STOREGUID)
if prop:
yield _store.Store(_hex(prop.Value), self.server)
else:
for store in self.server.stores():
yield store
@property
def public_store(self):
"""Company public :class:`store <Store>`."""
if not self._public_store:
if self._name == u'Default': # XXX
pubstore = GetPublicStore(self.server.mapisession)
if pubstore is None:
self._public_store = None
else:
self._public_store = _store.Store(mapiobj=pubstore, server=self.server)
else:
try:
entryid = self.server.ems.CreateStoreEntryID(None, self._name, MAPI_UNICODE)
except MAPIErrorNotFound:
self._public_store = None
else:
self._public_store = _store.Store(entryid=_hex(entryid), server=self.server)
return self._public_store
def create_public_store(self):
"""Create company public :class:`store <Store>`."""
if self._name == u'Default':
try:
storeid_rootid = self.server.sa.CreateStore(ECSTORE_TYPE_PUBLIC, EID_EVERYONE)
except MAPIErrorCollision:
raise DuplicateError("public store already exists")
else:
try:
storeid_rootid = self.server.sa.CreateStore(ECSTORE_TYPE_PUBLIC, self._eccompany.CompanyID)
except MAPIErrorCollision:
raise DuplicateError("public store already exists for company '%s'" % self.name)
store_entryid = WrapStoreEntryID(0, b'zarafa6client.dll', storeid_rootid[0][:-4]) + self.server.pseudo_url + b'\x00'
self._public_store = _store.Store(entryid=_hex(store_entryid), server=self.server)
return self._public_store
def hook_public_store(self, store):
"""Hook company public :class:`store <Store>`.
:param store: store to hook as public store
"""
if self._name == u'Default':
try:
self.server.sa.HookStore(ECSTORE_TYPE_PUBLIC, EID_EVERYONE, _unhex(store.guid))
except MAPIErrorCollision:
raise DuplicateError("hooked public store already exists")
else:
try:
self.server.sa.HookStore(ECSTORE_TYPE_PUBLIC, _unhex(self.companyid), _unhex(store.guid))
except MAPIErrorCollision:
raise DuplicateError("hooked public store already exists for company '%s'" % self.name)
self._public_store = store
def unhook_public_store(self):
"""Unhook company public :class:`store <Store>`."""
if self._name == u'Default':
try:
self.server.sa.UnhookStore(ECSTORE_TYPE_PUBLIC, EID_EVERYONE)
except MAPIErrorNotFound:
raise NotFoundError("no hooked public store")
else:
try:
self.server.sa.UnhookStore(ECSTORE_TYPE_PUBLIC, _unhex(self.companyid))
except MAPIErrorNotFound:
raise NotFoundError("no hooked public store for company '%s'" % self.name)
self._public_store = None
def user(self, name, create=False):
"""Return :class:`user <User>` with given name.
:param name: user name
:param create: create user if it doesn't exist (default False)
"""
if not '@' in name and self._name != 'Default':
name = name + '@' + self._name
try:
return self.server.user(name)
except NotFoundError:
if create:
return self.create_user(name)
else:
raise # FIXME
def get_user(self, name):
"""Return :class:`user <User>` with given name or *None* if not found.
:param name: user name
"""
try:
return self.user(name)
except NotFoundError:
pass
def users(self, parse=True, system=False):
"""
:param parse: filter users on cli argument --user (default True)
:param system: include system users (default False)
:return: all :class:`users <User>` within company
"""
if parse and getattr(self.server.options, 'users', None):
for username in self.server.options.users:
yield _user.User(username, self.server)
return
if self._name == u'Default':
for user in self.server.users(system=system, remote=True):
yield user
else:
for ecuser in self.server.sa.GetUserList(self._eccompany.CompanyID, MAPI_UNICODE):
if ecuser.Username != 'SYSTEM':
yield _user.User(ecuser.Username, self.server)
def admins(self):
for ecuser in self.server.sa.GetRemoteAdminList(self._eccompany.CompanyID, MAPI_UNICODE):
yield self.server.user(ecuser.Username)
def add_admin(self, user):
"""Add user to admin list
:param user: the user class:`user <User>`
:raises: DuplicateError
"""
try:
self.server.sa.AddUserToRemoteAdminList(user._ecuser.UserID, self._eccompany.CompanyID)
except MAPIErrorCollision:
raise DuplicateError("user '%s' already admin for company '%s'" % (user.name, self.name))
def remove_admin(self, user):
"""Remove user from admin list
:param user: the :class:`user <User>`
:raises: NotFoundError
"""
try:
self.server.sa.DelUserFromRemoteAdminList(user._ecuser.UserID, self._eccompany.CompanyID)
except MAPIErrorNotFound:
raise NotFoundError("user '%s' no admin for company '%s'" % (user.name, self.name))
def views(self):
for eccompany in self.server.sa.GetRemoteViewList(self._eccompany.CompanyID, MAPI_UNICODE):
yield self.server.company(eccompany.Companyname)
def add_view(self, company):
try:
self.server.sa.AddCompanyToRemoteViewList(company._eccompany.CompanyID, self._eccompany.CompanyID)
except MAPIErrorCollision:
raise DuplicateError("company '%s' already in view-list for company '%s'" % (company.name, self.name))
def remove_view(self, company):
try:
self.server.sa.DelCompanyFromRemoteViewList(company._eccompany.CompanyID, self._eccompany.CompanyID)
except MAPIErrorNotFound:
raise NotFoundError("company '%s' not in view-list for company '%s'" % (company.name, self.name))
def create_user(self, name, password=None):
"""Create a new :class:`user <User>` within the company.
:param name: user name
:param password: password (default None)
"""
name = name.split('@')[0]
self.server.create_user(name, password=password, company=self._name)
return self.user('%s@%s' % (name, self._name))
#XXX create_group/create=True
def group(self, name):
"""Return :class:`group <Group>` with given name.
:param name: group name
:raises: NotFoundError
"""
for group in self.groups(): # XXX
if group.name == name:
return group
raise NotFoundError("no such group: '%s'" % name)
def groups(self):
"""Return all :class:`groups <Group>` within the company."""
if self.name == u'Default': # XXX
for group in self.server.groups():
yield group
else:
for ecgroup in self.server.sa.GetGroupList(self._eccompany.CompanyID, MAPI_UNICODE):
yield Group(ecgroup.Groupname, self.server)
@property
def quota(self):
"""Company :class:`Quota`."""
if self._name == u'Default':
return Quota(self.server, None)
else:
return Quota(self.server, self._eccompany.CompanyID)
def __eq__(self, c):
if isinstance(c, Company):
return self.companyid == c.companyid
return False
def __ne__(self, c):
return not self == c
def __contains__(self, u):
return u in self.users()
def __unicode__(self):
return u"Company('%s')" % self._name
|