/usr/share/pyshared/allmydata/immutable/upload.py is in tahoe-lafs 1.9.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 | import os, time, weakref, itertools
from zope.interface import implements
from twisted.python import failure
from twisted.internet import defer
from twisted.application import service
from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata import hashtree, uri
from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
from allmydata.util import base32, dictutil, idlib, log, mathutil
from allmydata.util.happinessutil import servers_of_happiness, \
shares_by_server, merge_servers, \
failure_message
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
from cStringIO import StringIO
# this wants to live in storage, not here
class TooFullError(Exception):
pass
class UploadResults(Copyable, RemoteCopy):
implements(IUploadResults)
# note: don't change this string, it needs to match the value used on the
# helper, and it does *not* need to match the fully-qualified
# package/module/class name
typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
copytype = typeToCopy
# also, think twice about changing the shape of any existing attribute,
# because instances of this class are sent from the helper to its client,
# so changing this may break compatibility. Consider adding new fields
# instead of modifying existing ones.
def __init__(self):
self.timings = {} # dict of name to number of seconds
self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
self.file_size = None
self.ciphertext_fetched = None # how much the helper fetched
self.uri = None
self.preexisting_shares = None # count of shares already present
self.pushed_shares = None # count of shares we pushed
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
# few places). Ask for a little bit more just in case we need it. If
# the extension changes size, we can change EXTENSION_SIZE to
# allocate a more accurate amount of space.
EXTENSION_SIZE = 1000
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
# this.
def pretty_print_shnum_to_servers(s):
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
class ServerTracker:
def __init__(self, server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
self._server = server
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
blocksize, num_segments,
num_share_hashes,
EXTENSION_SIZE)
self.wbp_class = wbp.__class__ # to create more of them
self.allocated_size = wbp.get_allocated_size()
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
self.storage_index = storage_index
self.renew_secret = bucket_renewal_secret
self.cancel_secret = bucket_cancel_secret
def __repr__(self):
return ("<ServerTracker for server %s and SI %s>"
% (self._server.get_name(), si_b2a(self.storage_index)[:5]))
def get_serverid(self):
return self._server.get_serverid()
def get_name(self):
return self._server.get_name()
def query(self, sharenums):
rref = self._server.get_rref()
d = rref.callRemote("allocate_buckets",
self.storage_index,
self.renew_secret,
self.cancel_secret,
sharenums,
self.allocated_size,
canary=Referenceable())
d.addCallback(self._got_reply)
return d
def ask_about_existing_shares(self):
rref = self._server.get_rref()
return rref.callRemote("get_buckets", self.storage_index)
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
bp = self.wbp_class(rref, self._server, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes,
EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
def abort(self):
"""
I abort the remote bucket writers for all shares. This is a good idea
to conserve space on the storage server.
"""
self.abort_some_buckets(self.buckets.keys())
def abort_some_buckets(self, sharenums):
"""
I abort the remote bucket writers for the share numbers in sharenums.
"""
for sharenum in sharenums:
if sharenum in self.buckets:
self.buckets[sharenum].abort()
del self.buckets[sharenum]
def str_shareloc(shnum, bucketwriter):
return "%s: %s" % (shnum, bucketwriter.get_servername(),)
class Tahoe2ServerSelector(log.PrefixingLogMixin):
def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
# Servers that are working normally, but full.
self.full_count = 0
self.error_count = 0
self.num_servers_contacted = 0
self.last_failure_msg = None
self._status = IUploadStatus(upload_status)
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
self.log("starting", level=log.OPERATIONAL)
def __repr__(self):
return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
num_segments, total_shares, needed_shares,
servers_of_happiness):
"""
@return: (upload_trackers, already_serverids), where upload_trackers
is a set of ServerTracker instances that have agreed to hold
some shares for us (the shareids are stashed inside the
ServerTracker), and already_serverids is a dict mapping
shnum to a set of serverids for servers which claim to
already have the share.
"""
if self._status:
self._status.set_status("Contacting Servers..")
self.total_shares = total_shares
self.servers_of_happiness = servers_of_happiness
self.needed_shares = needed_shares
self.homeless_shares = set(range(total_shares))
self.use_trackers = set() # ServerTrackers that have shares assigned
# to them
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
# These servers have shares -- any shares -- for our SI. We keep
# track of these to write an error message with them later.
self.serverids_with_shares = set()
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
# (instead of a HashTree) because we don't require actual hashing
# just to count the levels.
ht = hashtree.IncompleteHashTree(total_shares)
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# figure out how much space to ask for
wbp = layout.make_write_bucket_proxy(None, None,
share_size, 0, num_segments,
num_share_hashes, EXTENSION_SIZE)
allocated_size = wbp.get_allocated_size()
all_servers = storage_broker.get_servers_for_psi(storage_index)
if not all_servers:
raise NoServersError("client gave us zero servers")
# filter the list of servers according to which ones can accomodate
# this request. This excludes older servers (which used a 4-byte size
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
def _get_maxsize(server):
v0 = server.get_rref().version
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
writeable_servers = [server for server in all_servers
if _get_maxsize(server) >= allocated_size]
readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
client_renewal_secret = secret_holder.get_renewal_secret()
client_cancel_secret = secret_holder.get_cancel_secret()
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
storage_index)
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
def _make_trackers(servers):
trackers = []
for s in servers:
seed = s.get_lease_seed()
renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
st = ServerTracker(s,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
renew, cancel)
trackers.append(st)
return trackers
# We assign each servers/trackers into one three lists. They all
# start in the "first pass" list. During the first pass, as we ask
# each one to hold a share, we move their tracker to the "second
# pass" list, until the first-pass list is empty. Then during the
# second pass, as we ask each to hold more shares, we move their
# tracker to the "next pass" list, until the second-pass list is
# empty. Then we move everybody from the next-pass list back to the
# second-pass list and repeat the "second" pass (really the third,
# fourth, etc pass), until all shares are assigned, or we've run out
# of potential servers.
self.first_pass_trackers = _make_trackers(writeable_servers)
self.second_pass_trackers = [] # servers worth asking again
self.next_pass_trackers = [] # servers that we have asked again
self._started_second_pass = False
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that we'd
# want to store. We ask them about existing shares for this storage
# index, which we want to know about for accurate
# servers_of_happiness accounting, then we forget about them.
readonly_trackers = _make_trackers(readonly_servers)
# We now ask servers that can't hold any new shares about existing
# shares that they might have for our SI. Once this is done, we
# start placing the shares that we haven't already accounted
# for.
ds = []
if self._status and readonly_trackers:
self._status.set_status("Contacting readonly servers to find "
"any existing shares")
for tracker in readonly_trackers:
assert isinstance(tracker, ServerTracker)
d = tracker.ask_about_existing_shares()
d.addBoth(self._handle_existing_response, tracker)
ds.append(d)
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
(tracker.get_name(),), level=log.NOISY)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
return dl
def _handle_existing_response(self, res, tracker):
"""
I handle responses to the queries sent by
Tahoe2ServerSelector._existing_shares.
"""
serverid = tracker.get_serverid()
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
% (tracker.get_name(), res), level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
else:
buckets = res
if buckets:
self.serverids_with_shares.add(serverid)
self.log("response to get_buckets() from server %s: alreadygot=%s"
% (tracker.get_name(), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
self.preexisting_shares.setdefault(bucket, set()).add(serverid)
self.homeless_shares.discard(bucket)
self.full_count += 1
self.bad_query_count += 1
def _get_progress_message(self):
if not self.homeless_shares:
msg = "placed all %d shares, " % (self.total_shares)
else:
msg = ("placed %d shares out of %d total (%d homeless), " %
(self.total_shares - len(self.homeless_shares),
self.total_shares,
len(self.homeless_shares)))
return (msg + "want to place shares on at least %d servers such that "
"any %d of them have enough shares to recover the file, "
"sent %d queries to %d servers, "
"%d queries placed some shares, %d placed none "
"(of which %d placed none due to the server being"
" full and %d placed none due to an error)" %
(self.servers_of_happiness, self.needed_shares,
self.query_count, self.num_servers_contacted,
self.good_query_count, self.bad_query_count,
self.full_count, self.error_count))
def _loop(self):
if not self.homeless_shares:
merged = merge_servers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if self.servers_of_happiness <= effective_happiness:
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
"self.use_trackers: %s, self.preexisting_shares: %s") \
% (self, self._get_progress_message(),
pretty_print_shnum_to_servers(merged),
[', '.join([str_shareloc(k,v)
for k,v in st.buckets.iteritems()])
for st in self.use_trackers],
pretty_print_shnum_to_servers(self.preexisting_shares))
self.log(msg, level=log.OPERATIONAL)
return (self.use_trackers, self.preexisting_shares)
else:
# We're not okay right now, but maybe we can fix it by
# redistributing some shares. In cases where one or two
# servers has, before the upload, all or most of the
# shares for a given SI, this can work by allowing _loop
# a chance to spread those out over the other servers,
delta = self.servers_of_happiness - effective_happiness
shares = shares_by_server(self.preexisting_shares)
# Each server in shares maps to a set of shares stored on it.
# Since we want to keep at least one share on each server
# that has one (otherwise we'd only be making
# the situation worse by removing distinct servers),
# each server has len(its shares) - 1 to spread around.
shares_to_spread = sum([len(list(sharelist)) - 1
for (server, sharelist)
in shares.items()])
if delta <= len(self.first_pass_trackers) and \
shares_to_spread >= delta:
items = shares.items()
while len(self.homeless_shares) < delta:
# Loop through the allocated shares, removing
# one from each server that has more than one
# and putting it back into self.homeless_shares
# until we've done this delta times.
server, sharelist = items.pop()
if len(sharelist) > 1:
share = sharelist.pop()
self.homeless_shares.add(share)
self.preexisting_shares[share].remove(server)
if not self.preexisting_shares[share]:
del self.preexisting_shares[share]
items.append((server, sharelist))
for writer in self.use_trackers:
writer.abort_some_buckets(self.homeless_shares)
return self._loop()
else:
# Redistribution won't help us; fail.
server_count = len(self.serverids_with_shares)
failmsg = failure_message(server_count,
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
servmsg = servmsgtempl % (
self,
failmsg,
self._get_progress_message(),
pretty_print_shnum_to_servers(merged)
)
self.log(servmsg, level=log.INFREQUENT)
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
if self.first_pass_trackers:
tracker = self.first_pass_trackers.pop(0)
# TODO: don't pre-convert all serverids to ServerTrackers
assert isinstance(tracker, ServerTracker)
shares_to_ask = set(sorted(self.homeless_shares)[:1])
self.homeless_shares -= shares_to_ask
self.query_count += 1
self.num_servers_contacted += 1
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
% (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
self.second_pass_trackers)
return d
elif self.second_pass_trackers:
# ask a server that we've already asked.
if not self._started_second_pass:
self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
len(self.second_pass_trackers))
tracker = self.second_pass_trackers.pop(0)
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
self.homeless_shares -= shares_to_ask
self.query_count += 1
if self._status:
self._status.set_status("Contacting Servers [%s] (second query),"
" %d shares left.."
% (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
self.next_pass_trackers)
return d
elif self.next_pass_trackers:
# we've finished the second-or-later pass. Move all the remaining
# servers back into self.second_pass_trackers for the next pass.
self.second_pass_trackers.extend(self.next_pass_trackers)
self.next_pass_trackers[:] = []
return self._loop()
else:
# no more servers. If we haven't placed enough shares, we fail.
merged = merge_servers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness < self.servers_of_happiness:
msg = failure_message(len(self.serverids_with_shares),
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
msg = ("server selection failed for %s: %s (%s)" %
(self, msg, self._get_progress_message()))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
self.log(msg, level=log.UNUSUAL)
return self._failed(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
self.log(msg, level=log.OPERATIONAL)
return (self.use_trackers, self.preexisting_shares)
def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
self.log("%s got error during server selection: %s" % (tracker, res),
level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
self.homeless_shares |= shares_to_ask
if (self.first_pass_trackers
or self.second_pass_trackers
or self.next_pass_trackers):
# there is still hope, so just loop
pass
else:
# No more servers, so this upload might fail (it depends upon
# whether we've hit servers_of_happiness or not). Log the last
# failure we got: if a coding error causes all servers to fail
# in the same way, this allows the common failure to be seen
# by the uploader and should help with debugging
msg = ("last failure (from %s) was: %s" % (tracker, res))
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
% (tracker.get_name(),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
level=log.NOISY)
progress = False
for s in alreadygot:
self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
elif s in shares_to_ask:
progress = True
# the ServerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
if allocated:
self.use_trackers.add(tracker)
progress = True
if allocated or alreadygot:
self.serverids_with_shares.add(tracker.get_serverid())
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if progress:
# They accepted at least one of the shares that we asked
# them to accept, or they had a share that we didn't ask
# them to accept but that we hadn't placed yet, so this
# was a productive query
self.good_query_count += 1
else:
self.bad_query_count += 1
self.full_count += 1
if still_homeless:
# In networks with lots of space, this is very unusual and
# probably indicates an error. In networks with servers that
# are full, it is merely unusual. In networks that are very
# full, it is common, and many uploads will fail. In most
# cases, this is obviously not fatal, and we'll just use some
# other servers.
# some shares are still homeless, keep trying to find them a
# home. The ones that were rejected get first priority.
self.homeless_shares |= still_homeless
# Since they were unable to accept all of our requests, so it
# is safe to assume that asking them again won't help.
else:
# if they *were* able to accept everything, they might be
# willing to accept even more.
put_tracker_here.append(tracker)
# now loop
return self._loop()
def _failed(self, msg):
"""
I am called when server selection fails. I first abort all of the
remote buckets that I allocated during my unsuccessful attempt to
place shares for this file. I then raise an
UploadUnhappinessError with my msg argument.
"""
for tracker in self.use_trackers:
assert isinstance(tracker, ServerTracker)
tracker.abort()
raise UploadUnhappinessError(msg)
class EncryptAnUploadable:
"""This is a wrapper that takes an IUploadable and provides
IEncryptedUploadable."""
implements(IEncryptedUploadable)
CHUNKSIZE = 50*1024
def __init__(self, original, log_parent=None):
self.original = IUploadable(original)
self._log_number = log_parent
self._encryptor = None
self._plaintext_hasher = plaintext_hasher()
self._plaintext_segment_hasher = None
self._plaintext_segment_hashes = []
self._encoding_parameters = None
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
self.original.set_upload_status(upload_status)
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "upload.encryption"
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
return log.msg(*args, **kwargs)
def get_size(self):
if self._file_size is not None:
return defer.succeed(self._file_size)
d = self.original.get_size()
def _got_size(size):
self._file_size = size
if self._status:
self._status.set_size(size)
return size
d.addCallback(_got_size)
return d
def get_all_encoding_parameters(self):
if self._encoding_parameters is not None:
return defer.succeed(self._encoding_parameters)
d = self.original.get_all_encoding_parameters()
def _got(encoding_parameters):
(k, happy, n, segsize) = encoding_parameters
self._segment_size = segsize # used by segment hashers
self._encoding_parameters = encoding_parameters
self.log("my encoding parameters: %s" % (encoding_parameters,),
level=log.NOISY)
return encoding_parameters
d.addCallback(_got)
return d
def _get_encryptor(self):
if self._encryptor:
return defer.succeed(self._encryptor)
d = self.original.get_encryption_key()
def _got(key):
e = AES(key)
self._encryptor = e
storage_index = storage_index_hash(key)
assert isinstance(storage_index, str)
# There's no point to having the SI be longer than the key, so we
# specify that it is truncated to the same 128 bits as the AES key.
assert len(storage_index) == 16 # SHA-256 truncated to 128b
self._storage_index = storage_index
if self._status:
self._status.set_storage_index(storage_index)
return e
d.addCallback(_got)
return d
def get_storage_index(self):
d = self._get_encryptor()
d.addCallback(lambda res: self._storage_index)
return d
def _get_segment_hasher(self):
p = self._plaintext_segment_hasher
if p:
left = self._segment_size - self._plaintext_segment_hashed_bytes
return p, left
p = plaintext_segment_hasher()
self._plaintext_segment_hasher = p
self._plaintext_segment_hashed_bytes = 0
return p, self._segment_size
def _update_segment_hash(self, chunk):
offset = 0
while offset < len(chunk):
p, segment_left = self._get_segment_hasher()
chunk_left = len(chunk) - offset
this_segment = min(chunk_left, segment_left)
p.update(chunk[offset:offset+this_segment])
self._plaintext_segment_hashed_bytes += this_segment
if self._plaintext_segment_hashed_bytes == self._segment_size:
# we've filled this segment
self._plaintext_segment_hashes.append(p.digest())
self._plaintext_segment_hasher = None
self.log("closed hash [%d]: %dB" %
(len(self._plaintext_segment_hashes)-1,
self._plaintext_segment_hashed_bytes),
level=log.NOISY)
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
segnum=len(self._plaintext_segment_hashes)-1,
hash=base32.b2a(p.digest()),
level=log.NOISY)
offset += this_segment
def read_encrypted(self, length, hash_only):
# make sure our parameters have been set up first
d = self.get_all_encoding_parameters()
# and size
d.addCallback(lambda ignored: self.get_size())
d.addCallback(lambda ignored: self._get_encryptor())
# then fetch and encrypt the plaintext. The unusual structure here
# (passing a Deferred *into* a function) is needed to avoid
# overflowing the stack: Deferreds don't optimize out tail recursion.
# We also pass in a list, to which _read_encrypted will append
# ciphertext.
ciphertext = []
d2 = defer.Deferred()
d.addCallback(lambda ignored:
self._read_encrypted(length, ciphertext, hash_only, d2))
d.addCallback(lambda ignored: d2)
return d
def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
if not remaining:
fire_when_done.callback(ciphertext)
return None
# tolerate large length= values without consuming a lot of RAM by
# reading just a chunk (say 50kB) at a time. This only really matters
# when hash_only==True (i.e. resuming an interrupted upload), since
# that's the case where we will be skipping over a lot of data.
size = min(remaining, self.CHUNKSIZE)
remaining = remaining - size
# read a chunk of plaintext..
d = defer.maybeDeferred(self.original.read, size)
# N.B.: if read() is synchronous, then since everything else is
# actually synchronous too, we'd blow the stack unless we stall for a
# tick. Once you accept a Deferred from IUploadable.read(), you must
# be prepared to have it fire immediately too.
d.addCallback(fireEventually)
def _good(plaintext):
# and encrypt it..
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
ciphertext.extend(ct)
self._read_encrypted(remaining, ciphertext, hash_only,
fire_when_done)
def _err(why):
fire_when_done.errback(why)
d.addCallback(_good)
d.addErrback(_err)
return None
def _hash_and_encrypt_plaintext(self, data, hash_only):
assert isinstance(data, (tuple, list)), type(data)
data = list(data)
cryptdata = []
# we use data.pop(0) instead of 'for chunk in data' to save
# memory: each chunk is destroyed as soon as we're done with it.
bytes_processed = 0
while data:
chunk = data.pop(0)
self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
level=log.NOISY)
bytes_processed += len(chunk)
self._plaintext_hasher.update(chunk)
self._update_segment_hash(chunk)
# TODO: we have to encrypt the data (even if hash_only==True)
# because pycryptopp's AES-CTR implementation doesn't offer a
# way to change the counter value. Once pycryptopp acquires
# this ability, change this to simply update the counter
# before each call to (hash_only==False) _encryptor.process()
ciphertext = self._encryptor.process(chunk)
if hash_only:
self.log(" skipping encryption", level=log.NOISY)
else:
cryptdata.append(ciphertext)
del ciphertext
del chunk
self._ciphertext_bytes_read += bytes_processed
if self._status:
progress = float(self._ciphertext_bytes_read) / self._file_size
self._status.set_progress(1, progress)
return cryptdata
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
# this is currently unused, but will live again when we fix #453
if len(self._plaintext_segment_hashes) < num_segments:
# close out the last one
assert len(self._plaintext_segment_hashes) == num_segments-1
p, segment_left = self._get_segment_hasher()
self._plaintext_segment_hashes.append(p.digest())
del self._plaintext_segment_hasher
self.log("closing plaintext leaf hasher, hashed %d bytes" %
self._plaintext_segment_hashed_bytes,
level=log.NOISY)
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
segnum=len(self._plaintext_segment_hashes)-1,
hash=base32.b2a(p.digest()),
level=log.NOISY)
assert len(self._plaintext_segment_hashes) == num_segments
return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
def get_plaintext_hash(self):
h = self._plaintext_hasher.digest()
return defer.succeed(h)
def close(self):
return self.original.close()
class UploadStatus:
implements(IUploadStatus)
statusid_counter = itertools.count(0)
def __init__(self):
self.storage_index = None
self.size = None
self.helper = False
self.status = "Not started"
self.progress = [0.0, 0.0, 0.0]
self.active = True
self.results = None
self.counter = self.statusid_counter.next()
self.started = time.time()
def get_started(self):
return self.started
def get_storage_index(self):
return self.storage_index
def get_size(self):
return self.size
def using_helper(self):
return self.helper
def get_status(self):
return self.status
def get_progress(self):
return tuple(self.progress)
def get_active(self):
return self.active
def get_results(self):
return self.results
def get_counter(self):
return self.counter
def set_storage_index(self, si):
self.storage_index = si
def set_size(self, size):
self.size = size
def set_helper(self, helper):
self.helper = helper
def set_status(self, status):
self.status = status
def set_progress(self, which, value):
# [0]: chk, [1]: ciphertext, [2]: encode+push
self.progress[which] = value
def set_active(self, value):
self.active = value
def set_results(self, value):
self.results = value
class CHKUploader:
server_selector_class = Tahoe2ServerSelector
def __init__(self, storage_broker, secret_holder):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
self._log_number = self.log("CHKUploader starting", parent=None)
self._encoder = None
self._results = UploadResults()
self._storage_index = None
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
self._upload_status.set_results(self._results)
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.upload"
return log.msg(*args, **kwargs)
def start(self, encrypted_uploadable):
"""Start uploading the file.
Returns a Deferred that will fire with the UploadResults instance.
"""
self._started = time.time()
eu = IEncryptedUploadable(encrypted_uploadable)
self.log("starting upload of %s" % eu)
eu.set_upload_status(self._upload_status)
d = self.start_encrypted(eu)
def _done(uploadresults):
self._upload_status.set_active(False)
return uploadresults
d.addBoth(_done)
return d
def abort(self):
"""Call this if the upload must be abandoned before it completes.
This will tell the shareholders to delete their partial shares. I
return a Deferred that fires when these messages have been acked."""
if not self._encoder:
# how did you call abort() before calling start() ?
return defer.succeed(None)
return self._encoder.abort()
def start_encrypted(self, encrypted):
""" Returns a Deferred that will fire with the UploadResults instance. """
eu = IEncryptedUploadable(encrypted)
started = time.time()
self._encoder = e = encode.Encoder(self._log_number,
self._upload_status)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
d.addCallback(lambda res: e.start())
d.addCallback(self._encrypted_done)
return d
def locate_all_shareholders(self, encoder, started):
server_selection_started = now = time.time()
self._storage_index_elapsed = now - started
storage_broker = self._storage_broker
secret_holder = self._secret_holder
storage_index = encoder.get_param("storage_index")
self._storage_index = storage_index
upload_id = si_b2a(storage_index)[:5]
self.log("using storage index %s" % upload_id)
server_selector = self.server_selector_class(upload_id,
self._log_number,
self._upload_status)
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
self._server_selection_started = time.time()
d = server_selector.get_shareholders(storage_broker, secret_holder,
storage_index,
share_size, block_size,
num_segments, n, k, desired)
def _done(res):
self._server_selection_elapsed = time.time() - server_selection_started
return res
d.addCallback(_done)
return d
def set_shareholders(self, (upload_trackers, already_serverids), encoder):
"""
@param upload_trackers: a sequence of ServerTracker objects that
have agreed to hold some shares for us (the
shareids are stashed inside the ServerTracker)
@paran already_serverids: a dict mapping sharenum to a set of
serverids for servers that claim to already
have this share
"""
msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
values = ([', '.join([str_shareloc(k,v)
for k,v in st.buckets.iteritems()])
for st in upload_trackers], already_serverids)
self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results
self._results.preexisting_shares = len(already_serverids)
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
for tracker in upload_trackers:
assert isinstance(tracker, ServerTracker)
buckets = {}
servermap = already_serverids.copy()
for tracker in upload_trackers:
buckets.update(tracker.buckets)
for shnum in tracker.buckets:
self._server_trackers[shnum] = tracker
servermap.setdefault(shnum, set()).add(tracker.get_serverid())
assert len(buckets) == sum([len(tracker.buckets)
for tracker in upload_trackers]), \
"%s (%s) != %s (%s)" % (
len(buckets),
buckets,
sum([len(tracker.buckets) for tracker in upload_trackers]),
[(t.buckets, t.get_serverid()) for t in upload_trackers]
)
encoder.set_shareholders(buckets, servermap)
def _encrypted_done(self, verifycap):
""" Returns a Deferred that will fire with the UploadResults instance. """
r = self._results
for shnum in self._encoder.get_shares_placed():
server_tracker = self._server_trackers[shnum]
serverid = server_tracker.get_serverid()
r.sharemap.add(shnum, serverid)
r.servermap.add(serverid, shnum)
r.pushed_shares = len(self._encoder.get_shares_placed())
now = time.time()
r.file_size = self._encoder.file_size
r.timings["total"] = now - self._started
r.timings["storage_index"] = self._storage_index_elapsed
r.timings["peer_selection"] = self._server_selection_elapsed
r.timings.update(self._encoder.get_times())
r.uri_extension_data = self._encoder.get_uri_extension_data()
r.verifycapstr = verifycap.to_string()
return r
def get_upload_status(self):
return self._upload_status
def read_this_many_bytes(uploadable, size, prepend_data=[]):
if size == 0:
return defer.succeed([])
d = uploadable.read(size)
def _got(data):
assert isinstance(data, list)
bytes = sum([len(piece) for piece in data])
assert bytes > 0
assert bytes <= size
remaining = size - bytes
if remaining:
return read_this_many_bytes(uploadable, remaining,
prepend_data + data)
return prepend_data + data
d.addCallback(_got)
return d
class LiteralUploader:
def __init__(self):
self._results = UploadResults()
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
s.set_results(self._results)
def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
self._size = size
self._status.set_size(size)
self._results.file_size = size
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
d.addCallback(lambda u: u.to_string())
d.addCallback(self._build_results)
return d
def _build_results(self, uri):
self._results.uri = uri
self._status.set_status("Finished")
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
return self._results
def close(self):
pass
def get_upload_status(self):
return self._status
class RemoteEncryptedUploadable(Referenceable):
implements(RIEncryptedUploadable)
def __init__(self, encrypted_uploadable, upload_status):
self._eu = IEncryptedUploadable(encrypted_uploadable)
self._offset = 0
self._bytes_sent = 0
self._status = IUploadStatus(upload_status)
# we are responsible for updating the status string while we run, and
# for setting the ciphertext-fetch progress.
self._size = None
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
d = self._eu.get_size()
def _got_size(size):
self._size = size
return size
d.addCallback(_got_size)
return d
def remote_get_size(self):
return self.get_size()
def remote_get_all_encoding_parameters(self):
return self._eu.get_all_encoding_parameters()
def _read_encrypted(self, length, hash_only):
d = self._eu.read_encrypted(length, hash_only)
def _read(strings):
if hash_only:
self._offset += length
else:
size = sum([len(data) for data in strings])
self._offset += size
return strings
d.addCallback(_read)
return d
def remote_read_encrypted(self, offset, length):
# we don't support seek backwards, but we allow skipping forwards
precondition(offset >= 0, offset)
precondition(length >= 0, length)
lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
level=log.NOISY)
precondition(offset >= self._offset, offset, self._offset)
if offset > self._offset:
# read the data from disk anyways, to build up the hash tree
skip = offset - self._offset
log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
(self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
d = self._read_encrypted(skip, hash_only=True)
else:
d = defer.succeed(None)
def _at_correct_offset(res):
assert offset == self._offset, "%d != %d" % (offset, self._offset)
return self._read_encrypted(length, hash_only=False)
d.addCallback(_at_correct_offset)
def _read(strings):
size = sum([len(data) for data in strings])
self._bytes_sent += size
return strings
d.addCallback(_read)
return d
def remote_close(self):
return self._eu.close()
class AssistedUploader:
def __init__(self, helper):
self._helper = helper
self._log_number = log.msg("AssistedUploader starting")
self._storage_index = None
self._upload_status = s = UploadStatus()
s.set_helper(True)
s.set_active(True)
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
return log.msg(*args, **kwargs)
def start(self, encrypted_uploadable, storage_index):
"""Start uploading the file.
Returns a Deferred that will fire with the UploadResults instance.
"""
precondition(isinstance(storage_index, str), storage_index)
self._started = time.time()
eu = IEncryptedUploadable(encrypted_uploadable)
eu.set_upload_status(self._upload_status)
self._encuploadable = eu
self._storage_index = storage_index
d = eu.get_size()
d.addCallback(self._got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
d.addCallback(self._contact_helper)
d.addCallback(self._build_verifycap)
def _done(res):
self._upload_status.set_active(False)
return res
d.addBoth(_done)
return d
def _got_size(self, size):
self._size = size
self._upload_status.set_size(size)
def _got_all_encoding_parameters(self, params):
k, happy, n, segment_size = params
# stash these for URI generation later
self._needed_shares = k
self._total_shares = n
self._segment_size = segment_size
def _contact_helper(self, res):
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
self.log(format="contacting helper for SI %(si)s..",
si=si_b2a(self._storage_index), level=log.NOISY)
self._upload_status.set_status("Contacting Helper")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, (upload_results, upload_helper)):
now = time.time()
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
if upload_helper:
self.log("helper says we need to upload", level=log.NOISY)
self._upload_status.set_status("Uploading Ciphertext")
# we need to upload the file
reu = RemoteEncryptedUploadable(self._encuploadable,
self._upload_status)
# let it pre-compute the size for progress purposes
d = reu.get_size()
d.addCallback(lambda ignored:
upload_helper.callRemote("upload", reu))
# this Deferred will fire with the upload results
return d
self.log("helper says file is already uploaded", level=log.OPERATIONAL)
self._upload_status.set_progress(1, 1.0)
self._upload_status.set_results(upload_results)
return upload_results
def _convert_old_upload_results(self, upload_results):
# pre-1.3.0 helpers return upload results which contain a mapping
# from shnum to a single human-readable string, containing things
# like "Found on [x],[y],[z]" (for healthy files that were already in
# the grid), "Found on [x]" (for files that needed upload but which
# discovered pre-existing shares), and "Placed on [x]" (for newly
# uploaded shares). The 1.3.0 helper returns a mapping from shnum to
# set of binary serverid strings.
# the old results are too hard to deal with (they don't even contain
# as much information as the new results, since the nodeids are
# abbreviated), so if we detect old results, just clobber them.
sharemap = upload_results.sharemap
if str in [type(v) for v in sharemap.values()]:
upload_results.sharemap = None
def _build_verifycap(self, upload_results):
self.log("upload finished, building readcap", level=log.OPERATIONAL)
self._convert_old_upload_results(upload_results)
self._upload_status.set_status("Building Readcap")
r = upload_results
assert r.uri_extension_data["needed_shares"] == self._needed_shares
assert r.uri_extension_data["total_shares"] == self._total_shares
assert r.uri_extension_data["segment_size"] == self._segment_size
assert r.uri_extension_data["size"] == self._size
r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
uri_extension_hash=r.uri_extension_hash,
needed_shares=self._needed_shares,
total_shares=self._total_shares, size=self._size
).to_string()
now = time.time()
r.file_size = self._size
r.timings["storage_index"] = self._storage_index_elapsed
r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
if "total" in r.timings:
r.timings["helper_total"] = r.timings["total"]
r.timings["total"] = now - self._started
self._upload_status.set_status("Finished")
self._upload_status.set_results(r)
return r
def get_upload_status(self):
return self._upload_status
class BaseUploadable:
# this is overridden by max_segment_size
default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
default_encoding_param_k = 3 # overridden by encoding_parameters
default_encoding_param_happy = 7
default_encoding_param_n = 10
max_segment_size = None
encoding_param_k = None
encoding_param_happy = None
encoding_param_n = None
_all_encoding_parameters = None
_status = None
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
def set_default_encoding_parameters(self, default_params):
assert isinstance(default_params, dict)
for k,v in default_params.items():
precondition(isinstance(k, str), k, v)
precondition(isinstance(v, int), k, v)
if "k" in default_params:
self.default_encoding_param_k = default_params["k"]
if "happy" in default_params:
self.default_encoding_param_happy = default_params["happy"]
if "n" in default_params:
self.default_encoding_param_n = default_params["n"]
if "max_segment_size" in default_params:
self.default_max_segment_size = default_params["max_segment_size"]
def get_all_encoding_parameters(self):
if self._all_encoding_parameters:
return defer.succeed(self._all_encoding_parameters)
max_segsize = self.max_segment_size or self.default_max_segment_size
k = self.encoding_param_k or self.default_encoding_param_k
happy = self.encoding_param_happy or self.default_encoding_param_happy
n = self.encoding_param_n or self.default_encoding_param_n
d = self.get_size()
def _got_size(file_size):
# for small files, shrink the segment size to avoid wasting space
segsize = min(max_segsize, file_size)
# this must be a multiple of 'required_shares'==k
segsize = mathutil.next_multiple(segsize, k)
encoding_parameters = (k, happy, n, segsize)
self._all_encoding_parameters = encoding_parameters
return encoding_parameters
d.addCallback(_got_size)
return d
class FileHandle(BaseUploadable):
implements(IUploadable)
def __init__(self, filehandle, convergence):
"""
Upload the data from the filehandle. If convergence is None then a
random encryption key will be used, else the plaintext will be hashed,
then the hash will be hashed together with the string in the
"convergence" argument to form the encryption key.
"""
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
self._filehandle = filehandle
self._key = None
self.convergence = convergence
self._size = None
def _get_encryption_key_convergent(self):
if self._key is not None:
return defer.succeed(self._key)
d = self.get_size()
# that sets self._size as a side-effect
d.addCallback(lambda size: self.get_all_encoding_parameters())
def _got(params):
k, happy, n, segsize = params
f = self._filehandle
enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
f.seek(0)
BLOCKSIZE = 64*1024
bytes_read = 0
while True:
data = f.read(BLOCKSIZE)
if not data:
break
enckey_hasher.update(data)
# TODO: setting progress in a non-yielding loop is kind of
# pointless, but I'm anticipating (perhaps prematurely) the
# day when we use a slowjob or twisted's CooperatorService to
# make this yield time to other jobs.
bytes_read += len(data)
if self._status:
self._status.set_progress(0, float(bytes_read)/self._size)
f.seek(0)
self._key = enckey_hasher.digest()
if self._status:
self._status.set_progress(0, 1.0)
assert len(self._key) == 16
return self._key
d.addCallback(_got)
return d
def _get_encryption_key_random(self):
if self._key is None:
self._key = os.urandom(16)
return defer.succeed(self._key)
def get_encryption_key(self):
if self.convergence is not None:
return self._get_encryption_key_convergent()
else:
return self._get_encryption_key_random()
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
self._filehandle.seek(0,2)
size = self._filehandle.tell()
self._size = size
self._filehandle.seek(0)
return defer.succeed(size)
def read(self, length):
return defer.succeed([self._filehandle.read(length)])
def close(self):
# the originator of the filehandle reserves the right to close it
pass
class FileName(FileHandle):
def __init__(self, filename, convergence):
"""
Upload the data from the filename. If convergence is None then a
random encryption key will be used, else the plaintext will be hashed,
then the hash will be hashed together with the string in the
"convergence" argument to form the encryption key.
"""
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
def close(self):
FileHandle.close(self)
self._filehandle.close()
class Data(FileHandle):
def __init__(self, data, convergence):
"""
Upload the data from the data argument. If convergence is None then a
random encryption key will be used, else the plaintext will be hashed,
then the hash will be hashed together with the string in the
"convergence" argument to form the encryption key.
"""
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
FileHandle.__init__(self, StringIO(data), convergence=convergence)
class Uploader(service.MultiService, log.PrefixingLogMixin):
"""I am a service that allows file uploading. I am a service-child of the
Client.
"""
implements(IUploader)
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
def __init__(self, helper_furl=None, stats_provider=None, history=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
def startService(self):
service.MultiService.startService(self)
if self._helper_furl:
self.parent.tub.connectTo(self._helper_furl,
self._got_helper)
def _got_helper(self, helper):
self.log("got helper connection, getting versions")
default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
{ },
"application-version": "unknown: no get_version()",
}
d = add_version_to_remote_reference(helper, default)
d.addCallback(self._got_versioned_helper)
def _got_versioned_helper(self, helper):
needed = "http://allmydata.org/tahoe/protocols/helper/v1"
if needed not in helper.version:
raise InsufficientVersionError(needed, helper.version)
self._helper = helper
helper.notifyOnDisconnect(self._lost_helper)
def _lost_helper(self):
self._helper = None
def get_helper_info(self):
# return a tuple of (helper_furl_or_None, connected_bool)
return (self._helper_furl, bool(self._helper))
def upload(self, uploadable):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
default_params = self.parent.get_encoding_parameters()
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader()
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2 = defer.succeed(None)
if self._helper:
uploader = AssistedUploader(self._helper)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
uploader = CHKUploader(storage_broker, secret_holder)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None
if self._history:
self._history.add_upload(uploader.get_upload_status())
def turn_verifycap_into_read_cap(uploadresults):
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()
def put_readcap_into_results(key):
v = uri.from_string(uploadresults.verifycapstr)
r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
uploadresults.uri = r.to_string()
return uploadresults
d3.addCallback(put_readcap_into_results)
return d3
d2.addCallback(turn_verifycap_into_read_cap)
return d2
d.addCallback(_got_size)
def _done(res):
uploadable.close()
return res
d.addBoth(_done)
return d
|