/usr/lib/python3/dist-packages/twisted/python/filepath.py is in python3-twisted-experimental 13.2.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 | # -*- test-case-name: twisted.test.test_paths -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Object-oriented filesystem path representation.
"""
from __future__ import division, absolute_import
import os
import errno
import base64
from hashlib import sha1
from os.path import isabs, exists, normpath, abspath, splitext
from os.path import basename, dirname
from os.path import join as joinpath
from os import sep as slash
from os import listdir, utime, stat
from stat import S_ISREG, S_ISDIR, S_IMODE, S_ISBLK, S_ISSOCK
from stat import S_IRUSR, S_IWUSR, S_IXUSR
from stat import S_IRGRP, S_IWGRP, S_IXGRP
from stat import S_IROTH, S_IWOTH, S_IXOTH
from zope.interface import Interface, Attribute, implementer
# Please keep this as light as possible on other Twisted imports; many, many
# things import this module, and it would be good if it could easily be
# modified for inclusion in the standard library. --glyph
from twisted.python.compat import comparable, cmp
from twisted.python.runtime import platform
from twisted.python.win32 import ERROR_FILE_NOT_FOUND, ERROR_PATH_NOT_FOUND
from twisted.python.win32 import ERROR_INVALID_NAME, ERROR_DIRECTORY, O_BINARY
from twisted.python.win32 import WindowsError
from twisted.python.util import FancyEqMixin
_CREATE_FLAGS = (os.O_EXCL |
os.O_CREAT |
os.O_RDWR |
O_BINARY)
def _stub_islink(path):
"""
Always return C{False} if the operating system does not support symlinks.
@param path: A path string.
@type path: L{str}
@return: C{False}
@rtype: L{bool}
"""
return False
islink = getattr(os.path, 'islink', _stub_islink)
randomBytes = os.urandom
armor = base64.urlsafe_b64encode
class IFilePath(Interface):
"""
File path object.
A file path represents a location for a file-like-object and can be
organized into a hierarchy; a file path can can children which are
themselves file paths.
A file path has a name which unique identifies it in the context of its
parent (if it has one); a file path can not have two children with the same
name. This name is referred to as the file path's "base name".
A series of such names can be used to locate nested children of a file path;
such a series is referred to as the child's "path", relative to the parent.
In this case, each name in the path is referred to as a "path segment"; the
child's base name is the segment in the path.
When representing a file path as a string, a "path separator" is used to
delimit the path segments within the string. For a file system path, that
would be C{os.sep}.
Note that the values of child names may be restricted. For example, a file
system path will not allow the use of the path separator in a name, and
certain names (eg. C{"."} and C{".."}) may be reserved or have special
meanings.
@since: 12.1
"""
sep = Attribute("The path separator to use in string representations")
def child(name):
"""
Obtain a direct child of this file path. The child may or may not
exist.
@param name: the name of a child of this path. C{name} must be a direct
child of this path and may not contain a path separator.
@return: the child of this path with the given C{name}.
@raise InsecurePath: if C{name} describes a file path that is not a
direct child of this file path.
"""
def open(mode="r"):
"""
Opens this file path with the given mode.
@return: a file-like object.
@raise Exception: if this file path cannot be opened.
"""
def changed():
"""
Clear any cached information about the state of this path on disk.
"""
def getsize():
"""
Retrieve the size of this file in bytes.
@return: the size of the file at this file path in bytes.
@raise Exception: if the size cannot be obtained.
"""
def getModificationTime():
"""
Retrieve the time of last access from this file.
@return: a number of seconds from the epoch.
@rtype: L{float}
"""
def getStatusChangeTime():
"""
Retrieve the time of the last status change for this file.
@return: a number of seconds from the epoch.
@rtype: L{float}
"""
def getAccessTime():
"""
Retrieve the time that this file was last accessed.
@return: a number of seconds from the epoch.
@rtype: L{float}
"""
def exists():
"""
Check if this file path exists.
@return: C{True} if the file at this file path exists, C{False}
otherwise.
@rtype: L{bool}
"""
def isdir():
"""
Check if this file path refers to a directory.
@return: C{True} if the file at this file path is a directory, C{False}
otherwise.
"""
def isfile():
"""
Check if this file path refers to a regular file.
@return: C{True} if the file at this file path is a regular file,
C{False} otherwise.
"""
def children():
"""
List the children of this path object.
@return: a sequence of the children of the directory at this file path.
@raise Exception: if the file at this file path is not a directory.
"""
def basename():
"""
Retrieve the final component of the file path's path (everything
after the final path separator).
@return: the base name of this file path.
@rtype: L{str}
"""
def parent():
"""
A file path for the directory containing the file at this file path.
"""
def sibling(name):
"""
A file path for the directory containing the file at this file path.
@param name: the name of a sibling of this path. C{name} must be a direct
sibling of this path and may not contain a path separator.
@return: a sibling file path of this one.
"""
class InsecurePath(Exception):
"""
Error that is raised when the path provided to L{FilePath} is invalid.
"""
class LinkError(Exception):
"""
An error with symlinks - either that there are cyclical symlinks or that
symlink are not supported on this platform.
"""
class UnlistableError(OSError):
"""
An exception which is used to distinguish between errors which mean 'this
is not a directory you can list' and other, more catastrophic errors.
This error will try to look as much like the original error as possible,
while still being catchable as an independent type.
@ivar originalException: the actual original exception instance, either an
L{OSError} or a L{WindowsError}.
"""
def __init__(self, originalException):
"""
Create an UnlistableError exception.
@param originalException: an instance of OSError.
"""
self.__dict__.update(originalException.__dict__)
self.originalException = originalException
class _WindowsUnlistableError(UnlistableError, WindowsError):
"""
This exception is raised on Windows, for compatibility with previous
releases of FilePath where unportable programs may have done "except
WindowsError:" around a call to children().
It is private because all application code may portably catch
L{UnlistableError} instead.
"""
def _secureEnoughString():
"""
Compute a string usable as a new, temporary filename.
@return: A pseudorandom, 16 byte string for use in secure filenames.
@rtype: C{bytes}
"""
return armor(sha1(randomBytes(64)).digest())[:16]
class AbstractFilePath(object):
"""
Abstract implementation of an L{IFilePath}; must be completed by a
subclass.
This class primarily exists to provide common implementations of certain
methods in L{IFilePath}. It is *not* a required parent class for
L{IFilePath} implementations, just a useful starting point.
"""
def getContent(self):
"""
Retrieve the file-like object for this file path.
"""
fp = self.open()
try:
return fp.read()
finally:
fp.close()
def parents(self):
"""
Retrieve an iterator of all the ancestors of this path.
@return: an iterator of all the ancestors of this path, from the most
recent (its immediate parent) to the root of its filesystem.
"""
path = self
parent = path.parent()
# root.parent() == root, so this means "are we the root"
while path != parent:
yield parent
path = parent
parent = parent.parent()
def children(self):
"""
List the children of this path object.
@raise OSError: If an error occurs while listing the directory. If the
error is 'serious', meaning that the operation failed due to an access
violation, exhaustion of some kind of resource (file descriptors or
memory), OSError or a platform-specific variant will be raised.
@raise UnlistableError: If the inability to list the directory is due
to this path not existing or not being a directory, the more specific
OSError subclass L{UnlistableError} is raised instead.
@return: an iterable of all currently-existing children of this object.
"""
try:
subnames = self.listdir()
except WindowsError as winErrObj:
# WindowsError is an OSError subclass, so if not for this clause
# the OSError clause below would be handling these. Windows error
# codes aren't the same as POSIX error codes, so we need to handle
# them differently.
# Under Python 2.5 on Windows, WindowsError has a winerror
# attribute and an errno attribute. The winerror attribute is
# bound to the Windows error code while the errno attribute is
# bound to a translation of that code to a perhaps equivalent POSIX
# error number.
# Under Python 2.4 on Windows, WindowsError only has an errno
# attribute. It is bound to the Windows error code.
# For simplicity of code and to keep the number of paths through
# this suite minimal, we grab the Windows error code under either
# version.
# Furthermore, attempting to use os.listdir on a non-existent path
# in Python 2.4 will result in a Windows error code of
# ERROR_PATH_NOT_FOUND. However, in Python 2.5,
# ERROR_FILE_NOT_FOUND results instead. -exarkun
winerror = getattr(winErrObj, 'winerror', winErrObj.errno)
if winerror not in (ERROR_PATH_NOT_FOUND,
ERROR_FILE_NOT_FOUND,
ERROR_INVALID_NAME,
ERROR_DIRECTORY):
raise
raise _WindowsUnlistableError(winErrObj)
except OSError as ose:
if ose.errno not in (errno.ENOENT, errno.ENOTDIR):
# Other possible errors here, according to linux manpages:
# EACCES, EMIFLE, ENFILE, ENOMEM. None of these seem like the
# sort of thing which should be handled normally. -glyph
raise
raise UnlistableError(ose)
return map(self.child, subnames)
def walk(self, descend=None):
"""
Yield myself, then each of my children, and each of those children's
children in turn.
The optional argument C{descend} is a predicate that takes a FilePath,
and determines whether or not that FilePath is traversed/descended into.
It will be called with each path for which C{isdir} returns C{True}. If
C{descend} is not specified, all directories will be traversed
(including symbolic links which refer to directories).
@param descend: A one-argument callable that will return True for
FilePaths that should be traversed, False otherwise.
@return: a generator yielding FilePath-like objects.
"""
yield self
if self.isdir():
for c in self.children():
# we should first see if it's what we want, then we
# can walk through the directory
if (descend is None or descend(c)):
for subc in c.walk(descend):
if os.path.realpath(self.path).startswith(
os.path.realpath(subc.path)):
raise LinkError("Cycle in file graph.")
yield subc
else:
yield c
def sibling(self, path):
"""
Return a L{FilePath} with the same directory as this instance but with a
basename of C{path}.
@param path: The basename of the L{FilePath} to return.
@type path: L{str}
@return: The sibling path.
@rtype: L{FilePath}
"""
return self.parent().child(path)
def descendant(self, segments):
"""
Retrieve a child or child's child of this path.
@param segments: A sequence of path segments as L{str} instances.
@return: A L{FilePath} constructed by looking up the C{segments[0]}
child of this path, the C{segments[1]} child of that path, and so
on.
@since: 10.2
"""
path = self
for name in segments:
path = path.child(name)
return path
def segmentsFrom(self, ancestor):
"""
Return a list of segments between a child and its ancestor.
For example, in the case of a path X representing /a/b/c/d and a path Y
representing /a/b, C{Y.segmentsFrom(X)} will return C{['c',
'd']}.
@param ancestor: an instance of the same class as self, ostensibly an
ancestor of self.
@raise: ValueError if the 'ancestor' parameter is not actually an
ancestor, i.e. a path for /x/y/z is passed as an ancestor for /a/b/c/d.
@return: a list of strs
"""
# this might be an unnecessarily inefficient implementation but it will
# work on win32 and for zipfiles; later I will deterimine if the
# obvious fast implemenation does the right thing too
f = self
p = f.parent()
segments = []
while f != ancestor and p != f:
segments[0:0] = [f.basename()]
f = p
p = p.parent()
if f == ancestor and segments:
return segments
raise ValueError("%r not parent of %r" % (ancestor, self))
# new in 8.0
def __hash__(self):
"""
Hash the same as another L{FilePath} with the same path as mine.
"""
return hash((self.__class__, self.path))
# pending deprecation in 8.0
def getmtime(self):
"""
Deprecated. Use getModificationTime instead.
"""
return int(self.getModificationTime())
def getatime(self):
"""
Deprecated. Use getAccessTime instead.
"""
return int(self.getAccessTime())
def getctime(self):
"""
Deprecated. Use getStatusChangeTime instead.
"""
return int(self.getStatusChangeTime())
class RWX(FancyEqMixin, object):
"""
A class representing read/write/execute permissions for a single user
category (i.e. user/owner, group, or other/world). Instantiate with
three boolean values: readable? writable? executable?.
@type read: C{bool}
@ivar read: Whether permission to read is given
@type write: C{bool}
@ivar write: Whether permission to write is given
@type execute: C{bool}
@ivar execute: Whether permission to execute is given
@since: 11.1
"""
compareAttributes = ('read', 'write', 'execute')
def __init__(self, readable, writable, executable):
self.read = readable
self.write = writable
self.execute = executable
def __repr__(self):
return "RWX(read=%s, write=%s, execute=%s)" % (
self.read, self.write, self.execute)
def shorthand(self):
"""
Returns a short string representing the permission bits. Looks like
part of what is printed by command line utilities such as 'ls -l'
(e.g. 'rwx')
@return: The shorthand string.
@rtype: L{str}
"""
returnval = ['r', 'w', 'x']
i = 0
for val in (self.read, self.write, self.execute):
if not val:
returnval[i] = '-'
i += 1
return ''.join(returnval)
class Permissions(FancyEqMixin, object):
"""
A class representing read/write/execute permissions. Instantiate with any
portion of the file's mode that includes the permission bits.
@type user: L{RWX}
@ivar user: User/Owner permissions
@type group: L{RWX}
@ivar group: Group permissions
@type other: L{RWX}
@ivar other: Other/World permissions
@since: 11.1
"""
compareAttributes = ('user', 'group', 'other')
def __init__(self, statModeInt):
self.user, self.group, self.other = (
[RWX(*[statModeInt & bit > 0 for bit in bitGroup]) for bitGroup in
[[S_IRUSR, S_IWUSR, S_IXUSR],
[S_IRGRP, S_IWGRP, S_IXGRP],
[S_IROTH, S_IWOTH, S_IXOTH]]]
)
def __repr__(self):
return "[%s | %s | %s]" % (
str(self.user), str(self.group), str(self.other))
def shorthand(self):
"""
Returns a short string representing the permission bits. Looks like
what is printed by command line utilities such as 'ls -l'
(e.g. 'rwx-wx--x')
@return: The shorthand string.
@rtype: L{str}
"""
return "".join(
[x.shorthand() for x in (self.user, self.group, self.other)])
@comparable
@implementer(IFilePath)
class FilePath(AbstractFilePath):
"""
I am a path on the filesystem that only permits 'downwards' access.
Instantiate me with a pathname (for example,
FilePath('/home/myuser/public_html')) and I will attempt to only provide
access to files which reside inside that path. I may be a path to a file,
a directory, or a file which does not exist.
The correct way to use me is to instantiate me, and then do ALL filesystem
access through me. In other words, do not import the 'os' module; if you
need to open a file, call my 'open' method. If you need to list a
directory, call my 'path' method.
Even if you pass me a relative path, I will convert that to an absolute
path internally.
Note: although time-related methods do return floating-point results, they
may still be only second resolution depending on the platform and the last
value passed to L{os.stat_float_times}. If you want greater-than-second
precision, call C{os.stat_float_times(True)}, or use Python 2.5.
Greater-than-second precision is only available in Windows on Python2.5 and
later.
On both Python 2 and Python 3, paths can only be bytes.
@type alwaysCreate: L{bool}
@ivar alwaysCreate: When opening this file, only succeed if the file does
not already exist.
@type path: L{bytes}
@ivar path: The path from which 'downward' traversal is permitted.
@ivar statinfo: The currently cached status information about the file on
the filesystem that this L{FilePath} points to. This attribute is
C{None} if the file is in an indeterminate state (either this
L{FilePath} has not yet had cause to call C{stat()} yet or
L{FilePath.changed} indicated that new information is required), 0 if
C{stat()} was called and returned an error (i.e. the path did not exist
when C{stat()} was called), or a C{stat_result} object that describes
the last known status of the underlying file (or directory, as the case
may be). Trust me when I tell you that you do not want to use this
attribute. Instead, use the methods on L{FilePath} which give you
information about it, like C{getsize()}, C{isdir()},
C{getModificationTime()}, and so on.
@type statinfo: L{int} or L{types.NoneType} or L{os.stat_result}
"""
statinfo = None
path = None
sep = slash.encode("ascii")
def __init__(self, path, alwaysCreate=False):
"""
Convert a path string to an absolute path if necessary and initialize
the L{FilePath} with the result.
"""
self.path = abspath(path)
self.alwaysCreate = alwaysCreate
def __getstate__(self):
"""
Support serialization by discarding cached L{os.stat} results and
returning everything else.
"""
d = self.__dict__.copy()
if 'statinfo' in d:
del d['statinfo']
return d
def child(self, path):
"""
Create and return a new L{FilePath} representing a path contained by
C{self}.
@param path: The base name of the new L{FilePath}. If this contains
directory separators or parent references it will be rejected.
@type path: L{bytes}
@raise InsecurePath: If the result of combining this path with C{path}
would result in a path which is not a direct child of this path.
@return: The child path.
@rtype: L{FilePath}
"""
if platform.isWindows() and path.count(b":"):
# Catch paths like C:blah that don't have a slash
raise InsecurePath("%r contains a colon." % (path,))
norm = normpath(path)
if self.sep in norm:
raise InsecurePath("%r contains one or more directory separators" % (path,))
newpath = abspath(joinpath(self.path, norm))
if not newpath.startswith(self.path):
raise InsecurePath("%r is not a child of %s" % (newpath, self.path))
return self.clonePath(newpath)
def preauthChild(self, path):
"""
Use me if C{path} might have slashes in it, but you know they're safe.
@param path: A relative path (ie, a path not starting with C{"/"}) which
will be interpreted as a child or descendant of this path.
@type path: L{bytes}
@return: The child path.
@rtype: L{FilePath}
"""
newpath = abspath(joinpath(self.path, normpath(path)))
if not newpath.startswith(self.path):
raise InsecurePath("%s is not a child of %s" % (newpath, self.path))
return self.clonePath(newpath)
def childSearchPreauth(self, *paths):
"""
Return my first existing child with a name in C{paths}.
C{paths} is expected to be a list of *pre-secured* path fragments;
in most cases this will be specified by a system administrator and not
an arbitrary user.
If no appropriately-named children exist, this will return C{None}.
@return: C{None} or the child path.
@rtype: L{types.NoneType} or L{FilePath}
"""
p = self.path
for child in paths:
jp = joinpath(p, child)
if exists(jp):
return self.clonePath(jp)
def siblingExtensionSearch(self, *exts):
"""
Attempt to return a path with my name, given multiple possible
extensions.
Each extension in C{exts} will be tested and the first path which
exists will be returned. If no path exists, C{None} will be returned.
If C{''} is in C{exts}, then if the file referred to by this path
exists, C{self} will be returned.
The extension '*' has a magic meaning, which means "any path that
begins with C{self.path + '.'} is acceptable".
"""
p = self.path
for ext in exts:
if not ext and self.exists():
return self
if ext == b'*':
basedot = basename(p) + b'.'
for fn in listdir(dirname(p)):
if fn.startswith(basedot):
return self.clonePath(joinpath(dirname(p), fn))
p2 = p + ext
if exists(p2):
return self.clonePath(p2)
def realpath(self):
"""
Returns the absolute target as a L{FilePath} if self is a link, self
otherwise.
The absolute link is the ultimate file or directory the
link refers to (for instance, if the link refers to another link, and
another...). If the filesystem does not support symlinks, or
if the link is cyclical, raises a L{LinkError}.
Behaves like L{os.path.realpath} in that it does not resolve link
names in the middle (ex. /x/y/z, y is a link to w - realpath on z
will return /x/y/z, not /x/w/z).
@return: L{FilePath} of the target path.
@rtype: L{FilePath}
@raises LinkError: if links are not supported or links are cyclical.
"""
if self.islink():
result = os.path.realpath(self.path)
if result == self.path:
raise LinkError("Cyclical link - will loop forever")
return self.clonePath(result)
return self
def siblingExtension(self, ext):
"""
Attempt to return a path with my name, given the extension at C{ext}.
@param ext: File-extension to search for.
@type ext: L{str}
@return: The sibling path.
@rtype: L{FilePath}
"""
return self.clonePath(self.path + ext)
def linkTo(self, linkFilePath):
"""
Creates a symlink to self to at the path in the L{FilePath}
C{linkFilePath}.
Only works on posix systems due to its dependence on
L{os.symlink}. Propagates L{OSError}s up from L{os.symlink} if
C{linkFilePath.parent()} does not exist, or C{linkFilePath} already
exists.
@param linkFilePath: a FilePath representing the link to be created.
@type linkFilePath: L{FilePath}
"""
os.symlink(self.path, linkFilePath.path)
def open(self, mode='r'):
"""
Open this file using C{mode} or for writing if C{alwaysCreate} is
C{True}.
In all cases the file is opened in binary mode, so it is not necessary
to include C{"b"} in C{mode}.
@param mode: The mode to open the file in. Default is C{"r"}.
@type mode: L{str}
@raises AssertionError: If C{"a"} is included in the mode and
C{alwaysCreate} is C{True}.
@rtype: L{file}
@return: An open L{file} object.
"""
if self.alwaysCreate:
assert 'a' not in mode, ("Appending not supported when "
"alwaysCreate == True")
return self.create()
# This hack is necessary because of a bug in Python 2.7 on Windows:
# http://bugs.python.org/issue7686
mode = mode.replace('b', '')
return open(self.path, mode + 'b')
# stat methods below
def restat(self, reraise=True):
"""
Re-calculate cached effects of 'stat'. To refresh information on this path
after you know the filesystem may have changed, call this method.
@param reraise: a boolean. If true, re-raise exceptions from
L{os.stat}; otherwise, mark this path as not existing, and remove
any cached stat information.
@raise Exception: If C{reraise} is C{True} and an exception occurs while
reloading metadata.
"""
try:
self.statinfo = stat(self.path)
except OSError:
self.statinfo = 0
if reraise:
raise
def changed(self):
"""
Clear any cached information about the state of this path on disk.
@since: 10.1.0
"""
self.statinfo = None
def chmod(self, mode):
"""
Changes the permissions on self, if possible. Propagates errors from
L{os.chmod} up.
@param mode: integer representing the new permissions desired (same as
the command line chmod)
@type mode: L{int}
"""
os.chmod(self.path, mode)
def getsize(self):
"""
Retrieve the size of this file in bytes.
@return: The size of the file at this file path in bytes.
@raise Exception: if the size cannot be obtained.
@rtype: L{int}
"""
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return st.st_size
def getModificationTime(self):
"""
Retrieve the time of last access from this file.
@return: a number of seconds from the epoch.
@rtype: L{float}
"""
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return float(st.st_mtime)
def getStatusChangeTime(self):
"""
Retrieve the time of the last status change for this file.
@return: a number of seconds from the epoch.
@rtype: L{float}
"""
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return float(st.st_ctime)
def getAccessTime(self):
"""
Retrieve the time that this file was last accessed.
@return: a number of seconds from the epoch.
@rtype: L{float}
"""
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return float(st.st_atime)
def getInodeNumber(self):
"""
Retrieve the file serial number, also called inode number, which
distinguishes this file from all other files on the same device.
@raise NotImplementedError: if the platform is Windows, since the
inode number would be a dummy value for all files in Windows
@return: a number representing the file serial number
@rtype: L{int}
@since: 11.0
"""
if platform.isWindows():
raise NotImplementedError
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return st.st_ino
def getDevice(self):
"""
Retrieves the device containing the file. The inode number and device
number together uniquely identify the file, but the device number is
not necessarily consistent across reboots or system crashes.
@raise NotImplementedError: if the platform is Windows, since the device
number would be 0 for all partitions on a Windows platform
@return: a number representing the device
@rtype: L{int}
@since: 11.0
"""
if platform.isWindows():
raise NotImplementedError
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return st.st_dev
def getNumberOfHardLinks(self):
"""
Retrieves the number of hard links to the file.
This count keeps track of how many directories have entries for this
file. If the count is ever decremented to zero then the file itself is
discarded as soon as no process still holds it open. Symbolic links
are not counted in the total.
@raise NotImplementedError: if the platform is Windows, since Windows
doesn't maintain a link count for directories, and L{os.stat} does
not set C{st_nlink} on Windows anyway.
@return: the number of hard links to the file
@rtype: L{int}
@since: 11.0
"""
if platform.isWindows():
raise NotImplementedError
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return st.st_nlink
def getUserID(self):
"""
Returns the user ID of the file's owner.
@raise NotImplementedError: if the platform is Windows, since the UID
is always 0 on Windows
@return: the user ID of the file's owner
@rtype: L{int}
@since: 11.0
"""
if platform.isWindows():
raise NotImplementedError
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return st.st_uid
def getGroupID(self):
"""
Returns the group ID of the file.
@raise NotImplementedError: if the platform is Windows, since the GID
is always 0 on windows
@return: the group ID of the file
@rtype: L{int}
@since: 11.0
"""
if platform.isWindows():
raise NotImplementedError
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return st.st_gid
def getPermissions(self):
"""
Returns the permissions of the file. Should also work on Windows,
however, those permissions may not be what is expected in Windows.
@return: the permissions for the file
@rtype: L{Permissions}
@since: 11.1
"""
st = self.statinfo
if not st:
self.restat()
st = self.statinfo
return Permissions(S_IMODE(st.st_mode))
def exists(self):
"""
Check if this L{FilePath} exists.
@return: C{True} if the stats of C{path} can be retrieved successfully,
C{False} in the other cases.
@rtype: L{bool}
"""
if self.statinfo:
return True
else:
self.restat(False)
if self.statinfo:
return True
else:
return False
def isdir(self):
"""
Check if this L{FilePath} refers to a directory.
@return: C{True} if this L{FilePath} refers to a directory, C{False}
otherwise.
@rtype: L{bool}
"""
st = self.statinfo
if not st:
self.restat(False)
st = self.statinfo
if not st:
return False
return S_ISDIR(st.st_mode)
def isfile(self):
"""
Check if this file path refers to a regular file.
@return: C{True} if this L{FilePath} points to a regular file (not a
directory, socket, named pipe, etc), C{False} otherwise.
@rtype: L{bool}
"""
st = self.statinfo
if not st:
self.restat(False)
st = self.statinfo
if not st:
return False
return S_ISREG(st.st_mode)
def isBlockDevice(self):
"""
Returns whether the underlying path is a block device.
@return: C{True} if it is a block device, C{False} otherwise
@rtype: L{bool}
@since: 11.1
"""
st = self.statinfo
if not st:
self.restat(False)
st = self.statinfo
if not st:
return False
return S_ISBLK(st.st_mode)
def isSocket(self):
"""
Returns whether the underlying path is a socket.
@return: C{True} if it is a socket, C{False} otherwise
@rtype: L{bool}
@since: 11.1
"""
st = self.statinfo
if not st:
self.restat(False)
st = self.statinfo
if not st:
return False
return S_ISSOCK(st.st_mode)
def islink(self):
"""
Check if this L{FilePath} points to a symbolic link.
@return: C{True} if this L{FilePath} points to a symbolic link,
C{False} otherwise.
@rtype: L{bool}
"""
# We can't use cached stat results here, because that is the stat of
# the destination - (see #1773) which in *every case* but this one is
# the right thing to use. We could call lstat here and use that, but
# it seems unlikely we'd actually save any work that way. -glyph
return islink(self.path)
def isabs(self):
"""
Check if this L{FilePath} refers to an absolute path.
This always returns C{True}.
@return: C{True}, always.
@rtype: L{bool}
"""
return isabs(self.path)
def listdir(self):
"""
List the base names of the direct children of this L{FilePath}.
@return: A L{list} of L{bytes} giving the names of the contents of the
directory this L{FilePath} refers to. These names are relative to
this L{FilePath}.
@rtype: L{list}
@raise: Anything the platform L{os.listdir} implementation might raise
(typically L{OSError}).
"""
return listdir(self.path)
def splitext(self):
"""
Split the file path into a pair C{(root, ext)} such that
C{root + ext == path}.
@return: Tuple where the first item is the filename and second item is
the file extension. See Python docs for L{os.path.splitext}.
@rtype: L{tuple}
"""
return splitext(self.path)
def __repr__(self):
return 'FilePath(%r)' % (self.path,)
def touch(self):
"""
Updates the access and last modification times of the file at this
file path to the current time. Also creates the file if it does not
already exist.
@raise Exception: if unable to create or modify the last modification
time of the file.
"""
try:
self.open('a').close()
except IOError:
pass
utime(self.path, None)
def remove(self):
"""
Removes the file or directory that is represented by self. If
C{self.path} is a directory, recursively remove all its children
before removing the directory. If it's a file or link, just delete it.
"""
if self.isdir() and not self.islink():
for child in self.children():
child.remove()
os.rmdir(self.path)
else:
os.remove(self.path)
self.changed()
def makedirs(self):
"""
Create all directories not yet existing in C{path} segments, using
L{os.makedirs}.
@return: C{None}
"""
return os.makedirs(self.path)
def globChildren(self, pattern):
"""
Assuming I am representing a directory, return a list of FilePaths
representing my children that match the given pattern.
@param pattern: A glob pattern to use to match child paths.
@type pattern: L{bytes}
@return: A L{list} of matching children.
@rtype: L{list}
"""
import glob
path = self.path[-1] == b'/' and self.path + pattern or self.sep.join(
[self.path, pattern])
return map(self.clonePath, glob.glob(path))
def basename(self):
"""
Retrieve the final component of the file path's path (everything
after the final path separator).
@return: The final component of the L{FilePath}'s path (Everything
after the final path separator).
@rtype: L{bytes}
"""
return basename(self.path)
def dirname(self):
"""
Retrieve all of the components of the L{FilePath}'s path except the
last one (everything up to the final path separator).
@return: All of the components of the L{FilePath}'s path except the
last one (everything up to the final path separator).
@rtype: L{bytes}
"""
return dirname(self.path)
def parent(self):
"""
A file path for the directory containing the file at this file path.
@return: A L{FilePath} representing the path which directly contains
this L{FilePath}.
@rtype: L{FilePath}
"""
return self.clonePath(self.dirname())
def setContent(self, content, ext=b'.new'):
"""
Replace the file at this path with a new file that contains the given
bytes, trying to avoid data-loss in the meanwhile.
On UNIX-like platforms, this method does its best to ensure that by the
time this method returns, either the old contents I{or} the new contents
of the file will be present at this path for subsequent readers
regardless of premature device removal, program crash, or power loss,
making the following assumptions:
- your filesystem is journaled (i.e. your filesystem will not
I{itself} lose data due to power loss)
- your filesystem's C{rename()} is atomic
- your filesystem will not discard new data while preserving new
metadata (see U{http://mjg59.livejournal.com/108257.html} for more
detail)
On most versions of Windows there is no atomic C{rename()} (see
U{http://bit.ly/win32-overwrite} for more information), so this method
is slightly less helpful. There is a small window where the file at
this path may be deleted before the new file is moved to replace it:
however, the new file will be fully written and flushed beforehand so in
the unlikely event that there is a crash at that point, it should be
possible for the user to manually recover the new version of their data.
In the future, Twisted will support atomic file moves on those versions
of Windows which I{do} support them: see U{Twisted ticket
3004<http://twistedmatrix.com/trac/ticket/3004>}.
This method should be safe for use by multiple concurrent processes, but
note that it is not easy to predict which process's contents will
ultimately end up on disk if they invoke this method at close to the
same time.
@param content: The desired contents of the file at this path.
@type content: L{bytes}
@param ext: An extension to append to the temporary filename used to
store the bytes while they are being written. This can be used to
make sure that temporary files can be identified by their suffix,
for cleanup in case of crashes.
@type ext: L{bytes}
"""
sib = self.temporarySibling(ext)
f = sib.open('w')
try:
f.write(content)
finally:
f.close()
if platform.isWindows() and exists(self.path):
os.unlink(self.path)
os.rename(sib.path, self.path)
def __cmp__(self, other):
if not isinstance(other, FilePath):
return NotImplemented
return cmp(self.path, other.path)
def createDirectory(self):
"""
Create the directory the L{FilePath} refers to.
@see: L{makedirs}
@raise OSError: If the directory cannot be created.
"""
os.mkdir(self.path)
def requireCreate(self, val=1):
"""
Sets the C{alwaysCreate} variable.
@param val: C{True} or C{False}, indicating whether opening this path
will be required to create the file or not.
@type val: L{bool}
@return: C{None}
"""
self.alwaysCreate = val
def create(self):
"""
Exclusively create a file, only if this file previously did not exist.
@return: A file-like object opened from this path.
"""
fdint = os.open(self.path, _CREATE_FLAGS)
# XXX TODO: 'name' attribute of returned files is not mutable or
# settable via fdopen, so this file is slighly less functional than the
# one returned from 'open' by default. send a patch to Python...
return os.fdopen(fdint, 'w+b')
def temporarySibling(self, extension=b""):
"""
Construct a path referring to a sibling of this path.
The resulting path will be unpredictable, so that other subprocesses
should neither accidentally attempt to refer to the same path before it
is created, nor they should other processes be able to guess its name in
advance.
@param extension: A suffix to append to the created filename. (Note
that if you want an extension with a '.' you must include the '.'
yourself.)
@type extension: L{bytes}
@return: a path object with the given extension suffix, C{alwaysCreate}
set to True.
@rtype: L{FilePath}
"""
sib = self.sibling(_secureEnoughString() + self.basename() + extension)
sib.requireCreate()
return sib
_chunkSize = 2 ** 2 ** 2 ** 2
def copyTo(self, destination, followLinks=True):
"""
Copies self to destination.
If self doesn't exist, an OSError is raised.
If self is a directory, this method copies its children (but not
itself) recursively to destination - if destination does not exist as a
directory, this method creates it. If destination is a file, an
IOError will be raised.
If self is a file, this method copies it to destination. If
destination is a file, this method overwrites it. If destination is a
directory, an IOError will be raised.
If self is a link (and followLinks is False), self will be copied
over as a new symlink with the same target as returned by os.readlink.
That means that if it is absolute, both the old and new symlink will
link to the same thing. If it's relative, then perhaps not (and
it's also possible that this relative link will be broken).
File/directory permissions and ownership will NOT be copied over.
If followLinks is True, symlinks are followed so that they're treated
as their targets. In other words, if self is a link, the link's target
will be copied. If destination is a link, self will be copied to the
destination's target (the actual destination will be destination's
target). Symlinks under self (if self is a directory) will be
followed and its target's children be copied recursively.
If followLinks is False, symlinks will be copied over as symlinks.
@param destination: the destination (a FilePath) to which self
should be copied
@param followLinks: whether symlinks in self should be treated as links
or as their targets
"""
if self.islink() and not followLinks:
os.symlink(os.readlink(self.path), destination.path)
return
# XXX TODO: *thorough* audit and documentation of the exact desired
# semantics of this code. Right now the behavior of existent
# destination symlinks is convenient, and quite possibly correct, but
# its security properties need to be explained.
if self.isdir():
if not destination.exists():
destination.createDirectory()
for child in self.children():
destChild = destination.child(child.basename())
child.copyTo(destChild, followLinks)
elif self.isfile():
writefile = destination.open('w')
try:
readfile = self.open()
try:
while 1:
# XXX TODO: optionally use os.open, os.read and O_DIRECT
# and use os.fstatvfs to determine chunk sizes and make
# *****sure**** copy is page-atomic; the following is
# good enough for 99.9% of everybody and won't take a
# week to audit though.
chunk = readfile.read(self._chunkSize)
writefile.write(chunk)
if len(chunk) < self._chunkSize:
break
finally:
readfile.close()
finally:
writefile.close()
elif not self.exists():
raise OSError(errno.ENOENT, "No such file or directory")
else:
# If you see the following message because you want to copy
# symlinks, fifos, block devices, character devices, or unix
# sockets, please feel free to add support to do sensible things in
# reaction to those types!
raise NotImplementedError(
"Only copying of files and directories supported")
def moveTo(self, destination, followLinks=True):
"""
Move self to destination - basically renaming self to whatever
destination is named.
If destination is an already-existing directory,
moves all children to destination if destination is empty. If
destination is a non-empty directory, or destination is a file, an
OSError will be raised.
If moving between filesystems, self needs to be copied, and everything
that applies to copyTo applies to moveTo.
@param destination: the destination (a FilePath) to which self
should be copied
@param followLinks: whether symlinks in self should be treated as links
or as their targets (only applicable when moving between
filesystems)
"""
try:
os.rename(self.path, destination.path)
except OSError as ose:
if ose.errno == errno.EXDEV:
# man 2 rename, ubuntu linux 5.10 "breezy":
# oldpath and newpath are not on the same mounted filesystem.
# (Linux permits a filesystem to be mounted at multiple
# points, but rename(2) does not work across different mount
# points, even if the same filesystem is mounted on both.)
# that means it's time to copy trees of directories!
secsib = destination.temporarySibling()
self.copyTo(secsib, followLinks) # slow
secsib.moveTo(destination, followLinks) # visible
# done creating new stuff. let's clean me up.
mysecsib = self.temporarySibling()
self.moveTo(mysecsib, followLinks) # visible
mysecsib.remove() # slow
else:
raise
else:
self.changed()
destination.changed()
FilePath.clonePath = FilePath
|