/usr/lib/python3/dist-packages/sqlalchemy/dialects/postgresql/pg8000.py is in python3-sqlalchemy 1.0.15+ds1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | # postgresql/pg8000.py
# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors <see AUTHORS
# file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
.. dialect:: postgresql+pg8000
:name: pg8000
:dbapi: pg8000
:connectstring: \
postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
:url: https://pythonhosted.org/pg8000/
.. _pg8000_unicode:
Unicode
-------
pg8000 will encode / decode string values between it and the server using the
PostgreSQL ``client_encoding`` parameter; by default this is the value in
the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
Typically, this can be changed to ``utf-8``, as a more useful default::
#client_encoding = sql_ascii # actually, defaults to database
# encoding
client_encoding = utf8
The ``client_encoding`` can be overriden for a session by executing the SQL:
SET CLIENT_ENCODING TO 'utf8';
SQLAlchemy will execute this SQL on all new connections based on the value
passed to :func:`.create_engine` using the ``client_encoding`` parameter::
engine = create_engine(
"postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8')
.. _pg8000_isolation_level:
pg8000 Transaction Isolation Level
-------------------------------------
The pg8000 dialect offers the same isolation level settings as that
of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
* ``READ COMMITTED``
* ``READ UNCOMMITTED``
* ``REPEATABLE READ``
* ``SERIALIZABLE``
* ``AUTOCOMMIT``
.. versionadded:: 0.9.5 support for AUTOCOMMIT isolation level when using
pg8000.
.. seealso::
:ref:`postgresql_isolation_level`
:ref:`psycopg2_isolation_level`
"""
from ... import util, exc
import decimal
from ... import processors
from ... import types as sqltypes
from .base import (
PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext,
_DECIMAL_TYPES, _FLOAT_TYPES, _INT_TYPES)
import re
from sqlalchemy.dialects.postgresql.json import JSON
class _PGNumeric(sqltypes.Numeric):
def result_processor(self, dialect, coltype):
if self.asdecimal:
if coltype in _FLOAT_TYPES:
return processors.to_decimal_processor_factory(
decimal.Decimal, self._effective_decimal_return_scale)
elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
# pg8000 returns Decimal natively for 1700
return None
else:
raise exc.InvalidRequestError(
"Unknown PG numeric type: %d" % coltype)
else:
if coltype in _FLOAT_TYPES:
# pg8000 returns float natively for 701
return None
elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
return processors.to_float
else:
raise exc.InvalidRequestError(
"Unknown PG numeric type: %d" % coltype)
class _PGNumericNoBind(_PGNumeric):
def bind_processor(self, dialect):
return None
class _PGJSON(JSON):
def result_processor(self, dialect, coltype):
if dialect._dbapi_version > (1, 10, 1):
return None # Has native JSON
else:
return super(_PGJSON, self).result_processor(dialect, coltype)
class PGExecutionContext_pg8000(PGExecutionContext):
pass
class PGCompiler_pg8000(PGCompiler):
def visit_mod_binary(self, binary, operator, **kw):
return self.process(binary.left, **kw) + " %% " + \
self.process(binary.right, **kw)
def post_process_text(self, text):
if '%%' in text:
util.warn("The SQLAlchemy postgresql dialect "
"now automatically escapes '%' in text() "
"expressions to '%%'.")
return text.replace('%', '%%')
class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
def _escape_identifier(self, value):
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace('%', '%%')
class PGDialect_pg8000(PGDialect):
driver = 'pg8000'
supports_unicode_statements = True
supports_unicode_binds = True
default_paramstyle = 'format'
supports_sane_multi_rowcount = True
execution_ctx_cls = PGExecutionContext_pg8000
statement_compiler = PGCompiler_pg8000
preparer = PGIdentifierPreparer_pg8000
description_encoding = 'use_encoding'
colspecs = util.update_copy(
PGDialect.colspecs,
{
sqltypes.Numeric: _PGNumericNoBind,
sqltypes.Float: _PGNumeric,
JSON: _PGJSON,
}
)
def __init__(self, client_encoding=None, **kwargs):
PGDialect.__init__(self, **kwargs)
self.client_encoding = client_encoding
def initialize(self, connection):
self.supports_sane_multi_rowcount = self._dbapi_version >= (1, 9, 14)
super(PGDialect_pg8000, self).initialize(connection)
@util.memoized_property
def _dbapi_version(self):
if self.dbapi and hasattr(self.dbapi, '__version__'):
return tuple(
[
int(x) for x in re.findall(
r'(\d+)(?:[-\.]?|$)', self.dbapi.__version__)])
else:
return (99, 99, 99)
@classmethod
def dbapi(cls):
return __import__('pg8000')
def create_connect_args(self, url):
opts = url.translate_connect_args(username='user')
if 'port' in opts:
opts['port'] = int(opts['port'])
opts.update(url.query)
return ([], opts)
def is_disconnect(self, e, connection, cursor):
return "connection is closed" in str(e)
def set_isolation_level(self, connection, level):
level = level.replace('_', ' ')
# adjust for ConnectionFairy possibly being present
if hasattr(connection, 'connection'):
connection = connection.connection
if level == 'AUTOCOMMIT':
connection.autocommit = True
elif level in self._isolation_lookup:
connection.autocommit = False
cursor = connection.cursor()
cursor.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION "
"ISOLATION LEVEL %s" % level)
cursor.execute("COMMIT")
cursor.close()
else:
raise exc.ArgumentError(
"Invalid value '%s' for isolation_level. "
"Valid isolation levels for %s are %s or AUTOCOMMIT" %
(level, self.name, ", ".join(self._isolation_lookup))
)
def set_client_encoding(self, connection, client_encoding):
# adjust for ConnectionFairy possibly being present
if hasattr(connection, 'connection'):
connection = connection.connection
cursor = connection.cursor()
cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'")
cursor.execute("COMMIT")
cursor.close()
def do_begin_twophase(self, connection, xid):
connection.connection.tpc_begin((0, xid, ''))
def do_prepare_twophase(self, connection, xid):
connection.connection.tpc_prepare()
def do_rollback_twophase(
self, connection, xid, is_prepared=True, recover=False):
connection.connection.tpc_rollback((0, xid, ''))
def do_commit_twophase(
self, connection, xid, is_prepared=True, recover=False):
connection.connection.tpc_commit((0, xid, ''))
def do_recover_twophase(self, connection):
return [row[1] for row in connection.connection.tpc_recover()]
def on_connect(self):
fns = []
if self.client_encoding is not None:
def on_connect(conn):
self.set_client_encoding(conn, self.client_encoding)
fns.append(on_connect)
if self.isolation_level is not None:
def on_connect(conn):
self.set_isolation_level(conn, self.isolation_level)
fns.append(on_connect)
if len(fns) > 0:
def on_connect(conn):
for fn in fns:
fn(conn)
return on_connect
else:
return None
dialect = PGDialect_pg8000
|