/usr/lib/python2.7/dist-packages/diaspy/connection.py is in python-diaspy 0.5.0.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
| #!/usr/bin/env python
# -*- coding: utf-8 -*-
"""This module abstracts connection to pod.
"""
import json
import re
import requests
import warnings
from diaspy import errors
DEBUG = True
class Connection():
"""Object representing connection with the pod.
"""
_token_regex = re.compile(r'name="csrf-token"\s+content="(.*?)"')
_userinfo_regex = re.compile(r'window.current_user_attributes = ({.*})')
# this is for older version of D*
_token_regex_2 = re.compile(r'content="(.*?)"\s+name="csrf-token')
_userinfo_regex_2 = re.compile(r'gon.user=({.*});gon.preloads')
_verify_SSL = True
def __init__(self, pod, username, password, schema='https'):
"""
:param pod: The complete url of the diaspora pod to use.
:type pod: str
:param username: The username used to log in.
:type username: str
:param password: The password used to log in.
:type password: str
"""
self.pod = pod
self._session = requests.Session()
self._login_data = {'user[remember_me]': 1, 'utf8': '✓'}
self._userdata = {}
self._token = ''
self._diaspora_session = ''
self._cookies = self._fetchcookies()
try:
#self._setlogin(username, password)
self._login_data = {'user[username]': username,
'user[password]': password,
'authenticity_token': self._fetchtoken()}
success = True
except requests.exceptions.MissingSchema:
self.pod = '{0}://{1}'.format(schema, self.pod)
warnings.warn('schema was missing')
success = False
finally:
pass
try:
if not success:
self._login_data = {'user[username]': username,
'user[password]': password,
'authenticity_token': self._fetchtoken()}
except Exception as e:
raise errors.LoginError('cannot create login data (caused by: {0})'.format(e))
def _fetchcookies(self):
request = self.get('stream')
return request.cookies
def __repr__(self):
"""Returns token string.
It will be easier to change backend if programs will just use:
repr(connection)
instead of calling a specified method.
"""
return self._fetchtoken()
def get(self, string, headers={}, params={}, direct=False, **kwargs):
"""This method gets data from session.
Performs additional checks if needed.
Example:
To obtain 'foo' from pod one should call `get('foo')`.
:param string: URL to get without the pod's URL and slash eg. 'stream'.
:type string: str
:param direct: if passed as True it will not be expanded
:type direct: bool
"""
if not direct: url = '{0}/{1}'.format(self.pod, string)
else: url = string
return self._session.get(url, params=params, headers=headers, verify=self._verify_SSL, **kwargs)
def post(self, string, data, headers={}, params={}, **kwargs):
"""This method posts data to session.
Performs additional checks if needed.
Example:
To post to 'foo' one should call `post('foo', data={})`.
:param string: URL to post without the pod's URL and slash eg. 'status_messages'.
:type string: str
:param data: Data to post.
:param headers: Headers (optional).
:type headers: dict
:param params: Parameters (optional).
:type params: dict
"""
string = '{0}/{1}'.format(self.pod, string)
request = self._session.post(string, data, headers=headers, params=params, verify=self._verify_SSL, **kwargs)
return request
def put(self, string, data=None, headers={}, params={}, **kwargs):
"""This method PUTs to session.
"""
string = '{0}/{1}'.format(self.pod, string)
if data is not None: request = self._session.put(string, data, headers=headers, params=params, **kwargs)
else: request = self._session.put(string, headers=headers, params=params, verify=self._verify_SSL, **kwargs)
return request
def delete(self, string, data, headers={}, **kwargs):
"""This method lets you send delete request to session.
Performs additional checks if needed.
:param string: URL to use.
:type string: str
:param data: Data to use.
:param headers: Headers to use (optional).
:type headers: dict
"""
string = '{0}/{1}'.format(self.pod, string)
request = self._session.delete(string, data=data, headers=headers, **kwargs)
return request
def _setlogin(self, username, password):
"""This function is used to set data for login.
.. note::
It should be called before _login() function.
"""
self._login_data = {'user[username]': username,
'user[password]': password,
'authenticity_token': self._fetchtoken()}
def _login(self):
"""Handles actual login request.
Raises LoginError if login failed.
"""
request = self.post('users/sign_in',
data=self._login_data,
allow_redirects=False)
if request.status_code != 302:
raise errors.LoginError('{0}: login failed'.format(request.status_code))
def login(self, remember_me=1):
"""This function is used to log in to a pod.
Will raise LoginError if password or username was not specified.
"""
if not self._login_data['user[username]'] or not self._login_data['user[password]']:
raise errors.LoginError('username and/or password is not specified')
self._login_data['user[remember_me]'] = remember_me
status = self._login()
self._login_data = {}
return self
def logout(self):
"""Logs out from a pod.
When logged out you can't do anything.
"""
self.get('users/sign_out')
self.token = ''
def podswitch(self, pod, username, password):
"""Switches pod from current to another one.
"""
self.pod = pod
self._setlogin(username, password)
self._login()
def _fetchtoken(self):
"""This method tries to get token string needed for authentication on D*.
:returns: token string
"""
request = self.get('stream')
token = self._token_regex.search(request.text)
if token is None: token = self._token_regex_2.search(request.text)
if token is not None: token = token.group(1)
else: raise errors.TokenError('could not find valid CSRF token')
self._token = token
return token
def get_token(self, fetch=True):
"""This function returns a token needed for authentication in most cases.
**Notice:** using repr() is recommended method for getting token.
Each time it is run a _fetchtoken() is called and refreshed token is stored.
It is more safe to use than _fetchtoken().
By setting new you can request new token or decide to get stored one.
If no token is stored new one will be fetched anyway.
:returns: string -- token used to authenticate
"""
try:
if fetch or not self._token: self._fetchtoken()
except requests.exceptions.ConnectionError as e:
warnings.warn('{0} was cought: reusing old token'.format(e))
finally:
if not self._token: raise errors.TokenError('cannot obtain token and no previous token found for reuse')
return self._token
def getSessionToken(self):
"""Returns session token string (_diaspora_session).
"""
return self._diaspora_session
def getUserData(self):
"""Returns user data.
"""
request = self.get('bookmarklet')
userdata = self._userinfo_regex.search(request.text)
if userdata is None: userdata = self._userinfo_regex_2.search(request.text)
if userdata is None: raise errors.DiaspyError('cannot find user data')
userdata = userdata.group(1)
return json.loads(userdata)
def set_verify_SSL(self, verify):
"""Sets whether there should be an error if a SSL-Certificate could not be verified.
"""
self._verify_SSL = verify
|