/usr/lib/python3/dist-packages/diaspy/connection.py is in python3-diaspy 0.5.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
"""This module abstracts connection to pod.
"""
import json
import re
import requests
import warnings
from diaspy import errors
DEBUG = True
class Connection():
"""Object representing connection with the pod.
"""
_token_regex = re.compile(r'name="csrf-token"\s+content="(.*?)"')
_userinfo_regex = re.compile(r'window.current_user_attributes = ({.*})')
# this is for older version of D*
_token_regex_2 = re.compile(r'content="(.*?)"\s+name="csrf-token')
_userinfo_regex_2 = re.compile(r'gon.user=({.*});gon.preloads')
_verify_SSL = True
def __init__(self, pod, username, password, schema='https'):
"""
:param pod: The complete url of the diaspora pod to use.
:type pod: str
:param username: The username used to log in.
:type username: str
:param password: The password used to log in.
:type password: str
"""
self.pod = pod
self._session = requests.Session()
self._login_data = {'user[remember_me]': 1, 'utf8': '✓'}
self._userdata = {}
self._token = ''
self._diaspora_session = ''
self._cookies = self._fetchcookies()
try:
#self._setlogin(username, password)
self._login_data = {'user[username]': username,
'user[password]': password,
'authenticity_token': self._fetchtoken()}
success = True
except requests.exceptions.MissingSchema:
self.pod = '{0}://{1}'.format(schema, self.pod)
warnings.warn('schema was missing')
success = False
finally:
pass
try:
if not success:
self._login_data = {'user[username]': username,
'user[password]': password,
'authenticity_token': self._fetchtoken()}
except Exception as e:
raise errors.LoginError('cannot create login data (caused by: {0})'.format(e))
def _fetchcookies(self):
request = self.get('stream')
return request.cookies
def __repr__(self):
"""Returns token string.
It will be easier to change backend if programs will just use:
repr(connection)
instead of calling a specified method.
"""
return self._fetchtoken()
def get(self, string, headers={}, params={}, direct=False, **kwargs):
"""This method gets data from session.
Performs additional checks if needed.
Example:
To obtain 'foo' from pod one should call `get('foo')`.
:param string: URL to get without the pod's URL and slash eg. 'stream'.
:type string: str
:param direct: if passed as True it will not be expanded
:type direct: bool
"""
if not direct: url = '{0}/{1}'.format(self.pod, string)
else: url = string
return self._session.get(url, params=params, headers=headers, verify=self._verify_SSL, **kwargs)
def post(self, string, data, headers={}, params={}, **kwargs):
"""This method posts data to session.
Performs additional checks if needed.
Example:
To post to 'foo' one should call `post('foo', data={})`.
:param string: URL to post without the pod's URL and slash eg. 'status_messages'.
:type string: str
:param data: Data to post.
:param headers: Headers (optional).
:type headers: dict
:param params: Parameters (optional).
:type params: dict
"""
string = '{0}/{1}'.format(self.pod, string)
request = self._session.post(string, data, headers=headers, params=params, verify=self._verify_SSL, **kwargs)
return request
def put(self, string, data=None, headers={}, params={}, **kwargs):
"""This method PUTs to session.
"""
string = '{0}/{1}'.format(self.pod, string)
if data is not None: request = self._session.put(string, data, headers=headers, params=params, **kwargs)
else: request = self._session.put(string, headers=headers, params=params, verify=self._verify_SSL, **kwargs)
return request
def delete(self, string, data, headers={}, **kwargs):
"""This method lets you send delete request to session.
Performs additional checks if needed.
:param string: URL to use.
:type string: str
:param data: Data to use.
:param headers: Headers to use (optional).
:type headers: dict
"""
string = '{0}/{1}'.format(self.pod, string)
request = self._session.delete(string, data=data, headers=headers, **kwargs)
return request
def _setlogin(self, username, password):
"""This function is used to set data for login.
.. note::
It should be called before _login() function.
"""
self._login_data = {'user[username]': username,
'user[password]': password,
'authenticity_token': self._fetchtoken()}
def _login(self):
"""Handles actual login request.
Raises LoginError if login failed.
"""
request = self.post('users/sign_in',
data=self._login_data,
allow_redirects=False)
if request.status_code != 302:
raise errors.LoginError('{0}: login failed'.format(request.status_code))
def login(self, remember_me=1):
"""This function is used to log in to a pod.
Will raise LoginError if password or username was not specified.
"""
if not self._login_data['user[username]'] or not self._login_data['user[password]']:
raise errors.LoginError('username and/or password is not specified')
self._login_data['user[remember_me]'] = remember_me
status = self._login()
self._login_data = {}
return self
def logout(self):
"""Logs out from a pod.
When logged out you can't do anything.
"""
self.get('users/sign_out')
self.token = ''
def podswitch(self, pod, username, password):
"""Switches pod from current to another one.
"""
self.pod = pod
self._setlogin(username, password)
self._login()
def _fetchtoken(self):
"""This method tries to get token string needed for authentication on D*.
:returns: token string
"""
request = self.get('stream')
token = self._token_regex.search(request.text)
if token is None: token = self._token_regex_2.search(request.text)
if token is not None: token = token.group(1)
else: raise errors.TokenError('could not find valid CSRF token')
self._token = token
return token
def get_token(self, fetch=True):
"""This function returns a token needed for authentication in most cases.
**Notice:** using repr() is recommended method for getting token.
Each time it is run a _fetchtoken() is called and refreshed token is stored.
It is more safe to use than _fetchtoken().
By setting new you can request new token or decide to get stored one.
If no token is stored new one will be fetched anyway.
:returns: string -- token used to authenticate
"""
try:
if fetch or not self._token: self._fetchtoken()
except requests.exceptions.ConnectionError as e:
warnings.warn('{0} was cought: reusing old token'.format(e))
finally:
if not self._token: raise errors.TokenError('cannot obtain token and no previous token found for reuse')
return self._token
def getSessionToken(self):
"""Returns session token string (_diaspora_session).
"""
return self._diaspora_session
def getUserData(self):
"""Returns user data.
"""
request = self.get('bookmarklet')
userdata = self._userinfo_regex.search(request.text)
if userdata is None: userdata = self._userinfo_regex_2.search(request.text)
if userdata is None: raise errors.DiaspyError('cannot find user data')
userdata = userdata.group(1)
return json.loads(userdata)
def set_verify_SSL(self, verify):
"""Sets whether there should be an error if a SSL-Certificate could not be verified.
"""
self._verify_SSL = verify
|