/usr/share/pyshared/allmydata/frontends/auth.py is in tahoe-lafs 1.9.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | import os
from zope.interface import implements
from twisted.web.client import getPage
from twisted.internet import defer
from twisted.cred import error, checkers, credentials
from allmydata.util import base32
class NeedRootcapLookupScheme(Exception):
"""Accountname+Password-based access schemes require some kind of
mechanism to translate name+passwd pairs into a rootcap, either a file of
name/passwd/rootcap tuples, or a server to do the translation."""
class FTPAvatarID:
def __init__(self, username, rootcap):
self.username = username
self.rootcap = rootcap
class AccountFileChecker:
implements(checkers.ICredentialsChecker)
credentialInterfaces = (credentials.IUsernamePassword,
credentials.IUsernameHashedPassword)
def __init__(self, client, accountfile):
self.client = client
self.passwords = {}
self.pubkeys = {}
self.rootcaps = {}
for line in open(os.path.expanduser(accountfile), "r"):
line = line.strip()
if line.startswith("#") or not line:
continue
name, passwd, rest = line.split(None, 2)
if passwd in ("ssh-dss", "ssh-rsa"):
bits = rest.split()
keystring = " ".join(bits[-1])
rootcap = bits[-1]
self.pubkeys[name] = keystring
else:
self.passwords[name] = passwd
rootcap = rest
self.rootcaps[name] = rootcap
def _cbPasswordMatch(self, matched, username):
if matched:
return FTPAvatarID(username, self.rootcaps[username])
raise error.UnauthorizedLogin
def requestAvatarId(self, credentials):
if credentials.username in self.passwords:
d = defer.maybeDeferred(credentials.checkPassword,
self.passwords[credentials.username])
d.addCallback(self._cbPasswordMatch, str(credentials.username))
return d
return defer.fail(error.UnauthorizedLogin())
class AccountURLChecker:
implements(checkers.ICredentialsChecker)
credentialInterfaces = (credentials.IUsernamePassword,)
def __init__(self, client, auth_url):
self.client = client
self.auth_url = auth_url
def _cbPasswordMatch(self, rootcap, username):
return FTPAvatarID(username, rootcap)
def post_form(self, username, password):
sepbase = base32.b2a(os.urandom(4))
sep = "--" + sepbase
form = []
form.append(sep)
fields = {"action": "authenticate",
"email": username,
"passwd": password,
}
for name, value in fields.iteritems():
form.append('Content-Disposition: form-data; name="%s"' % name)
form.append('')
assert isinstance(value, str)
form.append(value)
form.append(sep)
form[-1] += "--"
body = "\r\n".join(form) + "\r\n"
headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
}
return getPage(self.auth_url, method="POST",
postdata=body, headers=headers,
followRedirect=True, timeout=30)
def _parse_response(self, res):
rootcap = res.strip()
if rootcap == "0":
raise error.UnauthorizedLogin
return rootcap
def requestAvatarId(self, credentials):
# construct a POST to the login form. While this could theoretically
# be done with something like the stdlib 'email' package, I can't
# figure out how, so we just slam together a form manually.
d = self.post_form(credentials.username, credentials.password)
d.addCallback(self._parse_response)
d.addCallback(self._cbPasswordMatch, str(credentials.username))
return d
|