This file is indexed.

/usr/lib/python3/dist-packages/secretstorage/util.py is in python3-secretstorage 2.3.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# SecretStorage module for Python
# Access passwords using the SecretService DBus API
# Author: Dmitry Shachnev, 2013
# License: BSD

"""This module provides some utility functions, but these shouldn't
normally be used by external applications."""

import dbus
import os
from secretstorage.defines import DBUS_UNKNOWN_METHOD, DBUS_NO_SUCH_OBJECT, \
 DBUS_SERVICE_UNKNOWN, DBUS_NO_REPLY, DBUS_NOT_SUPPORTED, DBUS_EXEC_FAILED, \
 SS_PATH, SS_PREFIX, ALGORITHM_DH, ALGORITHM_PLAIN
from secretstorage.dhcrypto import Session, int_to_bytes
from secretstorage.exceptions import ItemNotFoundException, \
 SecretServiceNotAvailableException
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.utils import int_from_bytes

BUS_NAME = 'org.freedesktop.secrets'
SERVICE_IFACE = SS_PREFIX + 'Service'

class InterfaceWrapper(dbus.Interface):
	"""Wraps :cls:`dbus.Interface` class and replaces some D-Bus exceptions
	with :doc:`SecretStorage exceptions <exceptions>`."""

	def catch_errors(self, function_in):
		def function_out(*args, **kwargs):
			try:
				return function_in(*args, **kwargs)
			except dbus.exceptions.DBusException as e:
				if e.get_dbus_name() == DBUS_UNKNOWN_METHOD:
					raise ItemNotFoundException('Item does not exist!')
				if e.get_dbus_name() == DBUS_NO_SUCH_OBJECT:
					raise ItemNotFoundException(e.get_dbus_message())
				if e.get_dbus_name() in (DBUS_NO_REPLY, DBUS_NOT_SUPPORTED):
					raise SecretServiceNotAvailableException(
						e.get_dbus_message())
				raise
		return function_out

	def __getattr__(self, attribute):
		result = dbus.Interface.__getattr__(self, attribute)
		if callable(result):
			result = self.catch_errors(result)
		return result

def bus_get_object(bus, object_path, service_name=None):
	"""A wrapper around :meth:`SessionBus.get_object` that raises
	:exc:`~secretstorage.exceptions.SecretServiceNotAvailableException`
	when appropriate."""
	name = service_name or BUS_NAME
	try:
		return bus.get_object(name, object_path, introspect=False)
	except dbus.exceptions.DBusException as e:
		if e.get_dbus_name() in (DBUS_SERVICE_UNKNOWN, DBUS_EXEC_FAILED,
		                         DBUS_NO_REPLY):
			raise SecretServiceNotAvailableException(e.get_dbus_message())
		raise

def open_session(bus):
	"""Returns a new Secret Service session."""
	service_obj = bus_get_object(bus, SS_PATH)
	service_iface = dbus.Interface(service_obj, SS_PREFIX+'Service')
	session = Session()
	try:
		output, result = service_iface.OpenSession(
			ALGORITHM_DH,
			dbus.ByteArray(int_to_bytes(session.my_public_key)),
			signature='sv'
		)
	except dbus.exceptions.DBusException as e:
		if e.get_dbus_name() != DBUS_NOT_SUPPORTED:
			raise
		output, result = service_iface.OpenSession(
			ALGORITHM_PLAIN,
			'',
			signature='sv'
		)
		session.encrypted = False
	else:
		output = int_from_bytes(bytearray(output), 'big')
		session.set_server_public_key(output)
	session.object_path = result
	return session

def format_secret(session, secret, content_type):
	"""Formats `secret` to make possible to pass it to the
	Secret Service API."""
	if not isinstance(secret, bytes):
		secret = secret.encode('utf-8')
	if not session.encrypted:
		return dbus.Struct((session.object_path, '',
			dbus.ByteArray(secret), content_type))
	# PKCS-7 style padding
	padding = 0x10 - (len(secret) & 0xf)
	secret += bytes(bytearray((padding,)) * padding)
	aes_iv = os.urandom(0x10)
	aes = algorithms.AES(session.aes_key)
	encryptor = Cipher(aes, modes.CBC(aes_iv), default_backend()).encryptor()
	encrypted_secret = encryptor.update(secret) + encryptor.finalize()
	return dbus.Struct((
		session.object_path,
		dbus.Array(aes_iv),
		dbus.Array(bytearray(encrypted_secret)),
		content_type
	))

def exec_prompt(bus, prompt, callback):
	"""Executes the given `prompt`, when complete calls `callback`
	function with two arguments: a boolean representing whether the
	operation was dismissed and a list of unlocked item paths. A main
	loop should be running and registered for this function to work."""
	prompt_obj = bus_get_object(bus, prompt)
	prompt_iface = dbus.Interface(prompt_obj, SS_PREFIX+'Prompt')
	prompt_iface.Prompt('', signature='s')
	def new_callback(dismissed, unlocked):
		if isinstance(unlocked, dbus.Array):
			unlocked = list(unlocked)
		callback(bool(dismissed), unlocked)
	prompt_iface.connect_to_signal('Completed', new_callback)

def exec_prompt_glib(bus, prompt):
	"""Like :func:`exec_prompt`, but synchronous (uses loop from GLib
	API). Returns (*dismissed*, *unlocked*) tuple."""
	from gi.repository import GLib
	loop = GLib.MainLoop()
	result = []
	def callback(dismissed, unlocked):
		result.append(dismissed)
		result.append(unlocked)
		loop.quit()
	exec_prompt(bus, prompt, callback)
	loop.run()
	return result[0], result[1]

def exec_prompt_qt(bus, prompt):
	"""Like :func:`exec_prompt`, but synchronous (uses loop from PyQt5
	API). Returns (*dismissed*, *unlocked*) tuple."""
	from PyQt5.QtCore import QCoreApplication
	app = QCoreApplication([])
	result = []
	def callback(dismissed, unlocked):
		result.append(dismissed)
		result.append(unlocked)
		app.quit()
	exec_prompt(bus, prompt, callback)
	app.exec_()
	return result[0], result[1]

def unlock_objects(bus, paths, callback=None):
	"""Requests unlocking objects specified in `paths`. If `callback`
	is specified, calls it when unlocking is complete (see
	:func:`exec_prompt` description for details).
	Otherwise, uses the loop from GLib API and returns a boolean
	representing whether the operation was dismissed.

	.. versionadded:: 2.1.2"""
	service_obj = bus_get_object(bus, SS_PATH)
	service_iface = InterfaceWrapper(service_obj, SERVICE_IFACE)
	unlocked_paths, prompt = service_iface.Unlock(paths, signature='ao')
	unlocked_paths = list(unlocked_paths)  # Convert from dbus.Array
	if len(prompt) > 1:
		if callback:
			exec_prompt(bus, prompt, callback)
		else:
			return exec_prompt_glib(bus, prompt)[0]
	elif callback:
		# We still need to call it.
		callback(False, unlocked_paths)

def to_unicode(string):
	"""Converts D-Bus string to unicode string."""
	try:
		# For Python 2
		return unicode(string)
	except NameError:
		# For Python 3
		return str(string)