/usr/share/pyshared/allmydata/immutable/filenode.py is in tahoe-lafs 1.9.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | import binascii
import copy
import time
now = time.time
from zope.interface import implements
from twisted.internet import defer
from allmydata import uri
from twisted.internet.interfaces import IConsumer
from allmydata.interfaces import IImmutableFileNode, IUploadResults
from allmydata.util import consumer
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.util.dictutil import DictOfSets
from pycryptopp.cipher.aes import AES
# local imports
from allmydata.immutable.checker import Checker
from allmydata.immutable.repairer import Repairer
from allmydata.immutable.downloader.node import DownloadNode, \
IDownloadStatusHandlingConsumer
from allmydata.immutable.downloader.status import DownloadStatus
class CiphertextFileNode:
def __init__(self, verifycap, storage_broker, secret_holder,
terminator, history):
assert isinstance(verifycap, uri.CHKFileVerifierURI)
self._verifycap = verifycap
self._storage_broker = storage_broker
self._secret_holder = secret_holder
self._terminator = terminator
self._history = history
self._download_status = None
self._node = None # created lazily, on read()
def _maybe_create_download_node(self):
if not self._download_status:
ds = DownloadStatus(self._verifycap.storage_index,
self._verifycap.size)
if self._history:
self._history.add_download(ds)
self._download_status = ds
if self._node is None:
self._node = DownloadNode(self._verifycap, self._storage_broker,
self._secret_holder,
self._terminator,
self._history, self._download_status)
def read(self, consumer, offset=0, size=None):
"""I am the main entry point, from which FileNode.read() can get
data. I feed the consumer with the desired range of ciphertext. I
return a Deferred that fires (with the consumer) when the read is
finished."""
self._maybe_create_download_node()
return self._node.read(consumer, offset, size)
def get_segment(self, segnum):
"""Begin downloading a segment. I return a tuple (d, c): 'd' is a
Deferred that fires with (offset,data) when the desired segment is
available, and c is an object on which c.cancel() can be called to
disavow interest in the segment (after which 'd' will never fire).
You probably need to know the segment size before calling this,
unless you want the first few bytes of the file. If you ask for a
segment number which turns out to be too large, the Deferred will
errback with BadSegmentNumberError.
The Deferred fires with the offset of the first byte of the data
segment, so that you can call get_segment() before knowing the
segment size, and still know which data you received.
"""
self._maybe_create_download_node()
return self._node.get_segment(segnum)
def get_segment_size(self):
# return a Deferred that fires with the file's real segment size
self._maybe_create_download_node()
return self._node.get_segsize()
def get_storage_index(self):
return self._verifycap.storage_index
def get_verify_cap(self):
return self._verifycap
def get_size(self):
return self._verifycap.size
def raise_error(self):
pass
def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self._verifycap
storage_index = verifycap.storage_index
sb = self._storage_broker
servers = sb.get_connected_servers()
sh = self._secret_holder
c = Checker(verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, secret_holder=sh,
monitor=monitor)
d = c.start()
def _maybe_repair(cr):
crr = CheckAndRepairResults(storage_index)
crr.pre_repair_results = cr
if cr.is_healthy():
crr.post_repair_results = cr
return defer.succeed(crr)
else:
crr.repair_attempted = True
crr.repair_successful = False # until proven successful
def _gather_repair_results(ur):
assert IUploadResults.providedBy(ur), ur
# clone the cr (check results) to form the basis of the
# prr (post-repair results)
prr = CheckResults(cr.uri, cr.storage_index)
prr.data = copy.deepcopy(cr.data)
sm = prr.data['sharemap']
assert isinstance(sm, DictOfSets), sm
sm.update(ur.sharemap)
servers_responding = set(prr.data['servers-responding'])
for shnum, serverids in ur.sharemap.items():
servers_responding.update(serverids)
servers_responding = sorted(servers_responding)
prr.data['servers-responding'] = servers_responding
prr.data['count-shares-good'] = len(sm)
good_hosts = len(reduce(set.union, sm.itervalues(), set()))
prr.data['count-good-share-hosts'] = good_hosts
is_healthy = bool(len(sm) >= verifycap.total_shares)
is_recoverable = bool(len(sm) >= verifycap.needed_shares)
prr.set_healthy(is_healthy)
prr.set_recoverable(is_recoverable)
crr.repair_successful = is_healthy
# TODO: this may be wrong, see ticket #1115 comment:27 and ticket #1784.
prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
crr.post_repair_results = prr
return crr
def _repair_error(f):
# as with mutable repair, I'm not sure if I want to pass
# through a failure or not. TODO
crr.repair_successful = False
crr.repair_failure = f
return f
r = Repairer(self, storage_broker=sb, secret_holder=sh,
monitor=monitor)
d = r.start()
d.addCallbacks(_gather_repair_results, _repair_error)
return d
d.addCallback(_maybe_repair)
return d
def check(self, monitor, verify=False, add_lease=False):
verifycap = self._verifycap
sb = self._storage_broker
servers = sb.get_connected_servers()
sh = self._secret_holder
v = Checker(verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, secret_holder=sh,
monitor=monitor)
return v.start()
class DecryptingConsumer:
"""I sit between a CiphertextDownloader (which acts as a Producer) and
the real Consumer, decrypting everything that passes by. The real
Consumer sees the real Producer, but the Producer sees us instead of the
real consumer."""
implements(IConsumer, IDownloadStatusHandlingConsumer)
def __init__(self, consumer, readkey, offset):
self._consumer = consumer
self._read_ev = None
self._download_status = None
# TODO: pycryptopp CTR-mode needs random-access operations: I want
# either a=AES(readkey, offset) or better yet both of:
# a=AES(readkey, offset=0)
# a.process(ciphertext, offset=xyz)
# For now, we fake it with the existing iv= argument.
offset_big = offset // 16
offset_small = offset % 16
iv = binascii.unhexlify("%032x" % offset_big)
self._decryptor = AES(readkey, iv=iv)
self._decryptor.process("\x00"*offset_small)
def set_download_status_read_event(self, read_ev):
self._read_ev = read_ev
def set_download_status(self, ds):
self._download_status = ds
def registerProducer(self, producer, streaming):
# this passes through, so the real consumer can flow-control the real
# producer. Therefore we don't need to provide any IPushProducer
# methods. We implement all the IConsumer methods as pass-throughs,
# and only intercept write() to perform decryption.
self._consumer.registerProducer(producer, streaming)
def unregisterProducer(self):
self._consumer.unregisterProducer()
def write(self, ciphertext):
started = now()
plaintext = self._decryptor.process(ciphertext)
if self._read_ev:
elapsed = now() - started
self._read_ev.update(0, elapsed, 0)
if self._download_status:
self._download_status.add_misc_event("AES", started, now())
self._consumer.write(plaintext)
class ImmutableFileNode:
implements(IImmutableFileNode)
# I wrap a CiphertextFileNode with a decryption key
def __init__(self, filecap, storage_broker, secret_holder, terminator,
history):
assert isinstance(filecap, uri.CHKFileURI)
verifycap = filecap.get_verify_cap()
self._cnode = CiphertextFileNode(verifycap, storage_broker,
secret_holder, terminator, history)
assert isinstance(filecap, uri.CHKFileURI)
self.u = filecap
self._readkey = filecap.key
# TODO: I'm not sure about this.. what's the use case for node==node? If
# we keep it here, we should also put this on CiphertextFileNode
def __hash__(self):
return self.u.__hash__()
def __eq__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return False
def __ne__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return True
def read(self, consumer, offset=0, size=None):
decryptor = DecryptingConsumer(consumer, self._readkey, offset)
d = self._cnode.read(decryptor, offset, size)
d.addCallback(lambda dc: consumer)
return d
def raise_error(self):
pass
def get_write_uri(self):
return None
def get_readonly_uri(self):
return self.get_uri()
def get_uri(self):
return self.u.to_string()
def get_cap(self):
return self.u
def get_readcap(self):
return self.u.get_readonly()
def get_verify_cap(self):
return self.u.get_verify_cap()
def get_repair_cap(self):
# CHK files can be repaired with just the verifycap
return self.u.get_verify_cap()
def get_storage_index(self):
return self.u.get_storage_index()
def get_size(self):
return self.u.get_size()
def get_current_size(self):
return defer.succeed(self.get_size())
def is_mutable(self):
return False
def is_readonly(self):
return True
def is_unknown(self):
return False
def is_allowed_in_immutable_directory(self):
return True
def check_and_repair(self, monitor, verify=False, add_lease=False):
return self._cnode.check_and_repair(monitor, verify, add_lease)
def check(self, monitor, verify=False, add_lease=False):
return self._cnode.check(monitor, verify, add_lease)
def get_best_readable_version(self):
"""
Return an IReadable of the best version of this file. Since
immutable files can have only one version, we just return the
current filenode.
"""
return defer.succeed(self)
def download_best_version(self):
"""
Download the best version of this file, returning its contents
as a bytestring. Since there is only one version of an immutable
file, we download and return the contents of this file.
"""
d = consumer.download_to_data(self)
return d
# for an immutable file, download_to_data (specified in IReadable)
# is the same as download_best_version (specified in IFileNode). For
# mutable files, the difference is more meaningful, since they can
# have multiple versions.
download_to_data = download_best_version
# get_size() (IReadable), get_current_size() (IFilesystemNode), and
# get_size_of_best_version(IFileNode) are all the same for immutable
# files.
get_size_of_best_version = get_current_size
|