/usr/lib/python2.7/dist-packages/flower/views/auth.py is in python-flower 0.8.3+dfsg-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | from __future__ import absolute_import
import json
import functools
import re
import urllib as urllib_parse
import tornado.web
import tornado.auth
from tornado import httpclient
from tornado.options import options
from celery.utils.imports import instantiate
from ..views import BaseHandler
class GoogleAuth2LoginHandler(BaseHandler, tornado.auth.GoogleOAuth2Mixin):
_OAUTH_SETTINGS_KEY = 'oauth'
@tornado.web.asynchronous
def get(self):
redirect_uri = self.settings[self._OAUTH_SETTINGS_KEY]['redirect_uri']
if self.get_argument('code', False):
self.get_authenticated_user(
redirect_uri=redirect_uri,
code=self.get_argument('code'),
callback=self._on_auth,
)
else:
self.authorize_redirect(
redirect_uri=redirect_uri,
client_id=self.settings[self._OAUTH_SETTINGS_KEY]['key'],
scope=['profile', 'email'],
response_type='code',
extra_params={'approval_prompt': 'auto'}
)
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(403, 'Google auth failed')
access_token = user['access_token']
try:
response = httpclient.HTTPClient().fetch(
'https://www.googleapis.com/plus/v1/people/me',
headers={'Authorization': 'Bearer %s' % access_token})
except Exception as e:
raise tornado.web.HTTPError(403, 'Google auth failed: %s' % e)
email = json.loads(response.body.decode('utf-8'))['emails'][0]['value']
if not re.match(self.application.options.auth, email):
message = (
"Access denied to '{email}'. Please use another account or "
"ask your admin to add your email to flower --auth."
).format(email=email)
raise tornado.web.HTTPError(403, message)
self.set_secure_cookie("user", str(email))
next = self.get_argument('next', '/')
self.redirect(next)
class LoginHandler(BaseHandler):
def __new__(cls, *args, **kwargs):
return instantiate(options.auth_provider, *args, **kwargs)
class GithubLoginHandler(BaseHandler, tornado.auth.OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = "https://github.com/login/oauth/authorize"
_OAUTH_ACCESS_TOKEN_URL = "https://github.com/login/oauth/access_token"
_OAUTH_NO_CALLBACKS = False
_OAUTH_SETTINGS_KEY = 'oauth'
@tornado.auth._auth_return_future
def get_authenticated_user(self, redirect_uri, code, callback):
http = self.get_auth_http_client()
body = urllib_parse.urlencode({
"redirect_uri": redirect_uri,
"code": code,
"client_id": self.settings[self._OAUTH_SETTINGS_KEY]['key'],
"client_secret": self.settings[self._OAUTH_SETTINGS_KEY]['secret'],
"grant_type": "authorization_code",
})
http.fetch(
self._OAUTH_ACCESS_TOKEN_URL,
functools.partial(self._on_access_token, callback),
method="POST",
headers={'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'}, body=body)
@tornado.web.asynchronous
def _on_access_token(self, future, response):
if response.error:
future.set_exception(tornado.auth.AuthError('OAuth authentication error: %s' % str(response)))
return
future.set_result(json.loads(response.body))
def get_auth_http_client(self):
return httpclient.AsyncHTTPClient()
@tornado.web.asynchronous
def get(self):
redirect_uri = self.settings[self._OAUTH_SETTINGS_KEY]['redirect_uri']
if self.get_argument('code', False):
self.get_authenticated_user(
redirect_uri=redirect_uri,
code=self.get_argument('code'),
callback=self._on_auth,
)
else:
self.authorize_redirect(
redirect_uri=redirect_uri,
client_id=self.settings[self._OAUTH_SETTINGS_KEY]['key'],
scope=['user:email'],
response_type='code',
extra_params={'approval_prompt': 'auto'}
)
@tornado.web.asynchronous
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, 'OAuth authentication failed')
access_token = user['access_token']
req = httpclient.HTTPRequest('https://api.github.com/user/emails',
headers={'Authorization': 'token ' + access_token, 'User-agent': 'Tornado auth'})
response = httpclient.HTTPClient().fetch(req)
emails = [email['email'].lower() for email in json.loads(response.body.decode('utf-8'))
if email['verified'] and re.match(self.application.options.auth, email['email'])]
if not emails:
message = (
"Access denied. Please use another account or "
"ask your admin to add your email to flower --auth."
)
raise tornado.web.HTTPError(403, message)
self.set_secure_cookie("user", str(emails.pop()))
next_ = self.get_argument('next', '/')
self.redirect(next_)
class LogoutHandler(BaseHandler):
def get(self):
self.clear_cookie('user')
self.render('404.html', message='Successfully logged out!')
|