This file is indexed.

/usr/lib/python3/dist-packages/postgresql/unittest.py is in python3-postgresql 1.0.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
##
# .unittest
##
"""
TestCase subclasses (only) used by postgresql.test.test_[ssl_]connect
"""
import sys
import os
import atexit
import unittest

from . import exceptions as pg_exc
from . import cluster as pg_cluster
from . import installation

from .python.socket import find_available_port

class TestCaseWithCluster(unittest.TestCase):
	"""
	postgresql.driver *interface* tests.
	"""
	def __init__(self, *args, **kw):
		super().__init__(*args, **kw)
		self.installation = installation.default()
		self.cluster_path = \
			'py_unittest_pg_cluster_' \
			+ str(os.getpid()) + getattr(self, 'cluster_path_suffix', '')

		if self.installation is None:
			sys.stderr.write("ERROR: cannot find 'default' pg_config\n")
			sys.stderr.write(
				"HINT: set the PGINSTALLATION environment variable to the `pg_config` path\n"
			)
			sys.exit(1)

		self.cluster = pg_cluster.Cluster(
			self.installation,
			self.cluster_path,
		)
		if self.cluster.initialized():
			self.cluster.drop()

	def configure_cluster(self):
		self.cluster_port = find_available_port()
		if self.cluster_port is None:
			pg_exc.ClusterError(
				'failed to find a port for the test cluster on localhost',
				creator = self.cluster
			).raise_exception()
		self.cluster.settings.update(dict(
			port = str(self.cluster_port),
			max_connections = '6',
			shared_buffers = '24',
			listen_addresses = 'localhost',
			log_destination = 'stderr',
			log_min_messages = 'FATAL',
			silent_mode = 'off',
		))
		# 8.4 turns prepared transactions off by default.
		if self.cluster.installation.version_info >= (8,1):
			self.cluster.settings.update(dict(
				max_prepared_transactions = '3',
			))

	def initialize_database(self):
		c = self.cluster.connection(
			user = 'test',
			database = 'template1',
		)
		with c:
			if c.prepare(
				"select true from pg_catalog.pg_database " \
				"where datname = 'test'"
			).first() is None:
				c.execute('create database test')

	def connection(self, *args, **kw):
		return self.cluster.connection(*args, user = 'test', **kw)

	def run(self, *args, **kw):
		if not self.cluster.initialized():
			self.cluster.encoding = 'utf-8'
			self.cluster.init(
				user = 'test',
				encoding = self.cluster.encoding,
				logfile = None,
			)
			sys.stderr.write('*')
			try:
				atexit.register(self.cluster.drop)
				self.configure_cluster()
				self.cluster.start(logfile = sys.stdout)
				self.cluster.wait_until_started()
				self.initialize_database()
			except Exception:
				self.cluster.drop()
				atexit.unregister(self.cluster.drop)
				raise
		if not self.cluster.running():
			self.cluster.start()
			self.cluster.wait_until_started()

		db = self.connection()
		with db:
			self.db = db
			return super().run(*args, **kw)
			self.db = None