/usr/lib/python2.7/dist-packages/secretstorage/util.py is in python-secretstorage 2.1.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | # SecretStorage module for Python
# Access passwords using the SecretService DBus API
# Author: Dmitry Shachnev, 2013
# License: BSD
"""This module provides some utility functions, but these shouldn't
normally be used by external applications."""
import dbus
from secretstorage.defines import DBUS_UNKNOWN_METHOD, DBUS_NO_SUCH_OBJECT, \
DBUS_SERVICE_UNKNOWN, DBUS_NO_REPLY, DBUS_NOT_SUPPORTED, DBUS_EXEC_FAILED, \
SECRETS, SS_PATH, SS_PREFIX, ALGORITHM_DH, ALGORITHM_PLAIN
from secretstorage.dhcrypto import Session
from secretstorage.exceptions import ItemNotFoundException, \
SecretServiceNotAvailableException
from Crypto.Random import get_random_bytes
from Crypto.Cipher.AES import AESCipher, MODE_CBC, block_size
from secretstorage.dhcrypto import long_to_bytes, bytes_to_long
class InterfaceWrapper(dbus.Interface):
"""Wraps :cls:`dbus.Interface` class and replaces some D-Bus exceptions
with :doc:`SecretStorage exceptions <exceptions>`."""
def catch_errors(self, function_in):
def function_out(*args, **kwargs):
try:
return function_in(*args, **kwargs)
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() == DBUS_UNKNOWN_METHOD:
raise ItemNotFoundException('Item does not exist!')
if e.get_dbus_name() == DBUS_NO_SUCH_OBJECT:
raise ItemNotFoundException(e.get_dbus_message())
if e.get_dbus_name() in (DBUS_NO_REPLY, DBUS_NOT_SUPPORTED):
raise SecretServiceNotAvailableException(
e.get_dbus_message())
raise
return function_out
def __getattr__(self, attribute):
result = dbus.Interface.__getattr__(self, attribute)
if callable(result):
result = self.catch_errors(result)
return result
def bus_get_object(bus, name, object_path):
"""A wrapper around :meth:`SessionBus.get_object` that raises
:exc:`~secretstorage.exceptions.SecretServiceNotAvailableException`
when appropriate."""
try:
return bus.get_object(name, object_path, introspect=False)
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() in (DBUS_SERVICE_UNKNOWN, DBUS_EXEC_FAILED,
DBUS_NO_REPLY):
raise SecretServiceNotAvailableException(e.get_dbus_message())
raise
def open_session(bus):
"""Returns a new Secret Service session."""
service_obj = bus_get_object(bus, SECRETS, SS_PATH)
service_iface = dbus.Interface(service_obj, SS_PREFIX+'Service')
session = Session()
try:
output, result = service_iface.OpenSession(
ALGORITHM_DH,
dbus.ByteArray(long_to_bytes(session.my_public_key)),
signature='sv'
)
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() != DBUS_NOT_SUPPORTED:
raise
output, result = service_iface.OpenSession(
ALGORITHM_PLAIN,
'',
signature='sv'
)
session.encrypted = False
else:
session.set_server_public_key(bytes_to_long(output))
session.object_path = result
return session
def format_secret(session, secret, content_type):
"""Formats `secret` to make possible to pass it to the
Secret Service API."""
if not isinstance(secret, bytes):
secret = secret.encode('utf-8')
if not session.encrypted:
return dbus.Struct((session.object_path, '',
dbus.ByteArray(secret), content_type))
# PKCS-7 style padding
padding = 0x10 - (len(secret) & 0xf)
secret += bytes(bytearray((padding,)) * padding)
aes_iv = get_random_bytes(block_size)
aes_cipher = AESCipher(session.aes_key, mode=MODE_CBC, IV=aes_iv)
return dbus.Struct((
session.object_path,
dbus.Array(aes_iv),
dbus.Array(bytearray(aes_cipher.encrypt(secret))),
content_type
))
def exec_prompt(bus, prompt, callback):
"""Executes the given `prompt`, when complete calls `callback`
function with two arguments: a boolean representing whether the
operation was dismissed and a list of unlocked item paths. A main
loop should be running and registered for this function to work."""
prompt_obj = bus_get_object(bus, SECRETS, prompt)
prompt_iface = dbus.Interface(prompt_obj, SS_PREFIX+'Prompt')
prompt_iface.Prompt('', signature='s')
def new_callback(dismissed, unlocked):
if isinstance(unlocked, dbus.Array):
unlocked = list(unlocked)
callback(bool(dismissed), unlocked)
prompt_iface.connect_to_signal('Completed', new_callback)
def exec_prompt_glib(bus, prompt):
"""Like :func:`exec_prompt`, but synchronous (uses loop from GLib
API). Returns (*dismissed*, *unlocked*) tuple."""
from gi.repository import GLib
loop = GLib.MainLoop()
result = []
def callback(dismissed, unlocked):
result.append(dismissed)
result.append(unlocked)
loop.quit()
exec_prompt(bus, prompt, callback)
loop.run()
return result[0], result[1]
def exec_prompt_qt(bus, prompt):
"""Like :func:`exec_prompt`, but synchronous (uses loop from PyQt5
API). Returns (*dismissed*, *unlocked*) tuple."""
from PyQt5.QtCore import QCoreApplication
app = QCoreApplication([])
result = []
def callback(dismissed, unlocked):
result.append(dismissed)
result.append(unlocked)
app.quit()
exec_prompt(bus, prompt, callback)
app.exec_()
return result[0], result[1]
# Compatibility aliases
exec_prompt_async_glib = exec_prompt_glib
exec_prompt_async_qt = exec_prompt_qt
def to_unicode(string):
"""Converts D-Bus string to unicode string."""
try:
# For Python 2
return unicode(string)
except NameError:
# For Python 3
return str(string)
|