/usr/lib/python2.7/dist-packages/kopano/service.py is in python-kopano 8.5.5-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | """
Part of the high-level python bindings for Kopano
Copyright 2005 - 2016 Zarafa and its licensors (see LICENSE file for details)
Copyright 2016 - Kopano and its licensors (see LICENSE file for details)
"""
import collections
import errno
import grp
import pwd
import logging
import logging.handlers
import multiprocessing
from multiprocessing import Process
import os
import os.path
import signal
import socket
import ssl
import sys
from .compat import decode as _decode
if sys.hexversion >= 0x03000000:
try:
import daemon # picks system version
import daemon.pidfile as pidlockfile
except ImportError:
pass
from . import config as _config
from . import log as _log
from . import server as _server
from . import errors as _errors
from . import parser as _parser
from . import log as _log
else:
try:
import daemon # picks bundled version
import daemon.pidlockfile as pidlockfile
except ImportError:
pass
import config as _config
import log as _log
import server as _server
import errors as _errors
import parser as _parser
import log as _log
def _daemon_helper(func, service, log):
try:
if not service or isinstance(service, Service):
if isinstance(service, Service): # XXX
service.log_queue = multiprocessing.Queue()
service.ql = _log.QueueListener(service.log_queue, *service.log.handlers)
service.ql.start()
func()
else:
func(service)
finally:
if isinstance(service, Service) and service.ql: # XXX move queue stuff into Service
service.ql.stop()
if log and service:
log.info('stopping %s', service.name)
def _daemonize(func, options=None, foreground=False, log=None, config=None, service=None):
uid = gid = None
working_directory = '/'
pidfile = None
if config:
working_directory = config.get('running_path')
pidfile = config.get('pid_file')
if config.get('run_as_user'):
uid = pwd.getpwnam(config.get('run_as_user')).pw_uid
if config.get('run_as_group'):
gid = grp.getgrnam(config.get('run_as_group')).gr_gid
if not pidfile and service:
pidfile = "/var/run/kopano/%s.pid" % service.name
if pidfile:
pidfile = pidlockfile.TimeoutPIDLockFile(pidfile, 10)
oldpid = pidfile.read_pid()
if oldpid is None:
# there was no pidfile, remove the lock if it's there
pidfile.break_lock()
elif oldpid:
try:
open('/proc/%u/cmdline' % oldpid).read().split('\0')
except IOError as error:
if error.errno != errno.ENOENT:
raise
# errno.ENOENT indicates that no process with pid=oldpid exists, which is ok
pidfile.break_lock()
if uid is not None and gid is not None:
for h in log.handlers:
if isinstance(h, logging.handlers.WatchedFileHandler):
os.chown(h.baseFilename, uid, gid)
if options and options.foreground:
foreground = options.foreground
working_directory = os.getcwd()
with daemon.DaemonContext(
pidfile=pidfile,
uid=uid,
gid=gid,
working_directory=working_directory,
files_preserve=[h.stream for h in log.handlers if isinstance(h, logging.handlers.WatchedFileHandler)] if log else None,
prevent_core=False,
detach_process=not foreground,
stdout=sys.stdout,
stderr=sys.stderr,
):
_daemon_helper(func, service, log)
class Service:
"""
Encapsulates everything to create a simple service, such as:
- Locating and parsing a configuration file
- Performing logging, as specifified in the configuration file
- Handling common command-line options (-c, -F)
- Daemonization (if no -F specified)
:param name: name of the service; if for example 'search', the configuration file should be called ``/etc/kopano/search.cfg`` or passed with -c
:param config: :class:`Configuration <Config>` to use
:param options: OptionParser instance to get settings from (see :func:`parser`)
"""
def __init__(self, name, config=None, options=None, args=None, logname=None, **kwargs):
self.name = name
self.__dict__.update(kwargs)
if not options:
options, args = _parser.parser('cskpUPufmvVFw').parse_args()
args = [_decode(arg) for arg in args]
self.options, self.args = options, args
self.name = name
self.logname = logname
self.ql = None
config2 = _config.CONFIG.copy()
if config:
config2.update(config)
if getattr(options, 'config_file', None):
options.config_file = os.path.abspath(options.config_file) # XXX useful during testing. could be generalized with optparse callback?
if not getattr(options, 'service', True):
options.foreground = True
self.config = _config.Config(config2, service=name, options=options)
self.config.data['server_socket'] = os.getenv("KOPANO_SOCKET") or self.config.data['server_socket']
if getattr(options, 'worker_processes', None):
self.config.data['worker_processes'] = options.worker_processes
self.log = _log.logger(self.logname or self.name, options=self.options, config=self.config) # check that this works here or daemon may die silently XXX check run_as_user..?
for msg in self.config.info:
self.log.info(msg)
for msg in self.config.warnings:
self.log.warn(msg)
if self.config.errors:
for msg in self.config.errors:
self.log.error(msg)
sys.exit(1)
self.stats = collections.defaultdict(int, {'errors': 0})
self._server = None
def main(self):
raise _errors.Error('Service.main not implemented')
@property
def server(self):
if self._server is None:
self._server = _server.Server(options=self.options, config=self.config.data, log=self.log, service=self)
return self._server
def start(self):
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda *args: sys.exit(-sig))
signal.signal(signal.SIGHUP, signal.SIG_IGN) # XXX long term, reload config?
self.log.info('starting %s', self.logname or self.name)
with _log.log_exc(self.log):
if getattr(self.options, 'service', True): # do not run-as-service (eg backup)
_daemonize(self.main, options=self.options, log=self.log, config=self.config, service=self)
else:
_daemon_helper(self.main, self, self.log)
class Worker(Process):
def __init__(self, service, name, **kwargs):
Process.__init__(self)
self.daemon = True
self.name = name
self.service = service
self._server = None
self.__dict__.update(kwargs)
self.log = logging.getLogger(name=self.name)
if not self.log.handlers:
loglevel = _log._loglevel(service.options, service.config)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
qh = _log.QueueHandler(service.log_queue)
qh.setFormatter(formatter)
qh.setLevel(loglevel)
self.log.addHandler(qh)
self.log.setLevel(loglevel)
@property
def server(self):
if self._server is None:
self._server = _server.Server(options=self.service.options, config=self.service.config, log=self.service.log, service=self.service)
return self._server
def main(self):
raise _errors.Error('Worker.main not implemented')
def run(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGHUP, signal.SIG_IGN)
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
with _log.log_exc(self.log):
self.main()
class _ZSocket: # XXX megh, double wrapper
def __init__(self, addr, ssl_key, ssl_cert):
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.bind(addr)
self.s.listen(socket.SOMAXCONN)
def accept(self):
newsocket, fromaddr = self.s.accept()
connstream = ssl.wrap_socket(newsocket, server_side=True, keyfile=self.ssl_key, certfile=self.ssl_cert)
return connstream, fromaddr
def server_socket(addr, ssl_key=None, ssl_cert=None, log=None): # XXX https, merge code with client_socket
if addr.startswith('file://'):
addr2 = addr.replace('file://', '')
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
os.system('rm -f %s' % addr2)
s.bind(addr2)
s.listen(socket.SOMAXCONN)
elif addr.startswith('https://'):
addr2 = addr.replace('https://', '').split(':')
addr2 = (addr2[0], int(addr2[1]))
s = _ZSocket(addr2, ssl_key=ssl_key, ssl_cert=ssl_cert)
else:
addr2 = addr.replace('http://', '').split(':')
addr2 = (addr2[0], int(addr2[1]))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(addr2)
s.listen(socket.SOMAXCONN)
if log:
log.info('listening on socket %s', addr)
return s
def client_socket(addr, ssl_cert=None, log=None):
if addr.startswith('file://'):
addr2 = addr.replace('file://', '')
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
elif addr.startswith('https://'):
addr2 = addr.replace('https://', '').split(':')
addr2 = (addr2[0], int(addr2[1]))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = ssl.wrap_socket(s, ca_certs=ssl_cert, cert_reqs=ssl.CERT_REQUIRED)
else:
addr2 = addr.replace('http://', '').split(':')
addr2 = (addr2[0], int(addr2[1]))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(addr2)
return s
|