/usr/share/pyshared/desktopcouch/application/replication.py is in python-desktopcouch-application 1.0.8-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | # Copyright 2009 Canonical Ltd.
#
# This file is part of desktopcouch.
#
# desktopcouch is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# desktopcouch is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Chad Miller <chad.miller@canonical.com>
"""Replication."""
import logging
log = logging.getLogger("replication") # pylint: disable=C0103
import dbus.exceptions
from desktopcouch.application.pair.couchdb_pairing import couchdb_io
from desktopcouch.application.pair.couchdb_pairing import dbus_io
from desktopcouch.application import replication_services
# pylint: disable=E0611,F0401
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
# pylint: enable=E0611,F0401
from twisted.internet import task
known_bad_service_names = set() # pylint: disable=C0103
is_running = True # pylint: disable=C0103
def db_targetprefix_for_service(service_name):
"""Use the service name to look up what the prefix should be on the
databases. This gives an egalitarian way for non-UbuntuOne servers to have
their own remote-db-name scheme."""
try:
container = "desktopcouch.application.replication_services"
log.debug("Looking up prefix for service %r", service_name)
mod = __import__(container, fromlist=[service_name])
return getattr(mod, service_name).db_name_prefix
except ImportError:
log.error("The service %r is unknown. It is not a "
"module in the %s package ." % (service_name, container))
return ""
except Exception:
log.exception("Not changing remote db name.")
return ""
def oauth_info_for_service(service_name):
"""Use the service name to look up what oauth information we should use
when talking to that service."""
try:
container = "desktopcouch.application.replication_services"
log.debug("Looking up prefix for service %r", service_name)
mod = __import__(container, fromlist=[service_name])
return getattr(mod, service_name).get_oauth_data()
except ImportError:
log.error("The service %r is unknown. It is not a "
"module in the %s package ." % (service_name, container))
return None
def do_all_replication(local_port): # pylint: disable=R0914,R0912,R0915
"""Perform all replication tasks."""
log.debug("started replicating")
local_uri = couchdb_io.mkuri("localhost", local_port)
try:
try:
# All machines running desktopcouch must advertise themselves with
# zeroconf. We collect those elsewhere and filter out the ones
# that we have paired with. Now, it's time to send our changes to
# all those.
for remote_hostid, addr, port, is_unpaired, remote_oauth in \
dbus_io.get_seen_paired_hosts(uri=local_uri):
if is_unpaired:
# The far end doesn't know want to break up.
count = 0
for local_identifier in couchdb_io.get_my_host_unique_id():
last_exception = None
try:
# Tell her gently, using each pseudonym.
# pylint: disable=E1121
couchdb_io.expunge_pairing(local_identifier,
couchdb_io.mkuri(addr, port), remote_oauth)
# pylint: enable=E1121
count += 1
except Exception, e: # pylint: disable=W0703
last_exception = e
if count == 0:
if last_exception is not None:
# If she didn't recognize us, something's wrong.
try:
raise last_exception # pylint: disable=E0702
# push caught exception back...
except:
# ... so that we log it here.
log.exception( # pylint: disable=W0702
"failed to unpair from other end.")
continue
else:
# Finally, find your inner peace...
couchdb_io.expunge_pairing(remote_hostid)
# ...and move on.
continue
# Ah, good, this is an active relationship. Be a giver.
log.debug("want to replipush to discovered host %r @ %s",
remote_hostid, addr)
for db_name in couchdb_io.get_database_names_replicatable(
local_uri):
if not is_running:
return
couchdb_io.replicate(db_name, db_name,
target_host=addr, target_port=port,
source_port=local_port, target_oauth=remote_oauth,
local_uri=local_uri)
log.debug("replication of discovered hosts finished")
except Exception: # pylint: disable=W0703
log.exception("replication of discovered hosts aborted")
try:
# There may be services we send data to. Use the service name (sn)
# to look up what the service needs from us.
for remote_hostid, sn, to_pull, to_push in \
couchdb_io.get_static_paired_hosts(port=local_port):
if not sn in dir(replication_services):
if not is_running:
return
if sn in known_bad_service_names:
continue # Don't nag.
known_bad_service_names.add(sn)
remote_oauth_data = oauth_info_for_service(sn)
# TODO: push all this into service module.
try:
prefix_getter = db_targetprefix_for_service(sn)
remote_location = str(prefix_getter)
if hasattr(prefix_getter, 'user_id'):
# pylint: disable=E1103
user_id = prefix_getter.user_id
# pylint: enable=E1103
else:
user_id = None
urlinfo = urlparse.urlsplit(str(remote_location))
except ValueError, e:
log.warn("Can't reach service %s. %s", sn, e)
continue
# pylint: disable=E1103
if ":" in urlinfo.netloc:
addr, port = urlinfo.netloc.rsplit(":", 1)
else:
addr = urlinfo.netloc
port = 443 if urlinfo.scheme == "https" else 80
remote_db_name_prefix = urlinfo.path.strip("/")
# pylint: enable=E1103
# TODO: end ^
if to_push:
for db_name in couchdb_io.get_database_names_replicatable(
local_uri):
if not is_running:
return
remote_db_name = remote_db_name_prefix + "/" + db_name
log.debug(
"want to replipush %r to static host %r @ %s",
remote_db_name, remote_hostid, addr)
couchdb_io.replicate(db_name, remote_db_name,
target_host=addr, target_port=port,
source_port=local_port, target_ssl=True,
target_oauth=remote_oauth_data,
local_uri=local_uri)
if to_pull:
for remote_db_name in \
couchdb_io.get_database_names_replicatable(
# pylint: disable=E1103
couchdb_io.mkuri(
addr, int(port), has_ssl=(
urlinfo.scheme == 'https')),
oauth_tokens=remote_oauth_data, service=True,
user_id=user_id):
if not is_running:
return
try:
if not remote_db_name.startswith(
str(remote_db_name_prefix + "/")):
continue
except ValueError, e:
log.error("skipping %r on %s. %s", db_name, sn, e)
continue
prefix_len = len(str(remote_db_name_prefix))
db_name = remote_db_name[prefix_len + 1:]
if db_name.strip("/") == "management":
continue # be paranoid about what we accept.
log.debug(
"want to replipull %r from static host %r @ %s",
db_name, remote_hostid, addr)
couchdb_io.replicate(remote_db_name, db_name,
source_host=addr, source_port=port,
target_port=local_port, source_ssl=True,
source_oauth=remote_oauth_data,
local_uri=local_uri)
# pylint: enable=E1103
except Exception: # pylint: disable=W0703
log.exception("replication of services aborted")
finally:
log.debug("finished replicating")
def set_up(port_getter):
"""Set up the port getter."""
port = port_getter()
unique_identifiers = couchdb_io.get_my_host_unique_id(
couchdb_io.mkuri("localhost", int(port)), create=True)
beacons = [dbus_io.LocationAdvertisement(port, "desktopcouch " + i)
for i in unique_identifiers]
for beacon in beacons:
try:
beacon.publish()
except dbus.exceptions.DBusException, e:
log.error("We seem to be running already, or can't publish "
"our zeroconf advert. %s", e)
return None
dbus_io.maintain_discovered_servers()
task_running = task.LoopingCall(do_all_replication, int(port))
task_running.start(3600)
# TODO: port may change, so every so often, check it and
# perhaps refresh the beacons. We return an array of beacons, so we could
# keep a reference to that array and mutate it when the port-beacons
# change.
return beacons, task_running
def tear_down(beacons, looping_task):
"""Tear down the port getter."""
for beacon in beacons:
beacon.unpublish()
try:
looping_task.stop()
except:
pass # pylint: disable=W0702
|