/usr/lib/python3/dist-packages/postgresql/unittest.py is in python3-postgresql 1.0.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | ##
# .unittest
##
"""
TestCase subclasses (only) used by postgresql.test.test_[ssl_]connect
"""
import sys
import os
import atexit
import unittest
from . import exceptions as pg_exc
from . import cluster as pg_cluster
from . import installation
from .python.socket import find_available_port
class TestCaseWithCluster(unittest.TestCase):
"""
postgresql.driver *interface* tests.
"""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.installation = installation.default()
self.cluster_path = \
'py_unittest_pg_cluster_' \
+ str(os.getpid()) + getattr(self, 'cluster_path_suffix', '')
if self.installation is None:
sys.stderr.write("ERROR: cannot find 'default' pg_config\n")
sys.stderr.write(
"HINT: set the PGINSTALLATION environment variable to the `pg_config` path\n"
)
sys.exit(1)
self.cluster = pg_cluster.Cluster(
self.installation,
self.cluster_path,
)
if self.cluster.initialized():
self.cluster.drop()
def configure_cluster(self):
self.cluster_port = find_available_port()
if self.cluster_port is None:
pg_exc.ClusterError(
'failed to find a port for the test cluster on localhost',
creator = self.cluster
).raise_exception()
self.cluster.settings.update(dict(
port = str(self.cluster_port),
max_connections = '6',
shared_buffers = '24',
listen_addresses = 'localhost',
log_destination = 'stderr',
log_min_messages = 'FATAL',
silent_mode = 'off',
))
# 8.4 turns prepared transactions off by default.
if self.cluster.installation.version_info >= (8,1):
self.cluster.settings.update(dict(
max_prepared_transactions = '3',
))
def initialize_database(self):
c = self.cluster.connection(
user = 'test',
database = 'template1',
)
with c:
if c.prepare(
"select true from pg_catalog.pg_database " \
"where datname = 'test'"
).first() is None:
c.execute('create database test')
def connection(self, *args, **kw):
return self.cluster.connection(*args, user = 'test', **kw)
def run(self, *args, **kw):
if not self.cluster.initialized():
self.cluster.encoding = 'utf-8'
self.cluster.init(
user = 'test',
encoding = self.cluster.encoding,
logfile = None,
)
sys.stderr.write('*')
try:
atexit.register(self.cluster.drop)
self.configure_cluster()
self.cluster.start(logfile = sys.stdout)
self.cluster.wait_until_started()
self.initialize_database()
except Exception:
self.cluster.drop()
atexit.unregister(self.cluster.drop)
raise
if not self.cluster.running():
self.cluster.start()
self.cluster.wait_until_started()
db = self.connection()
with db:
self.db = db
return super().run(*args, **kw)
self.db = None
|