/usr/lib/python3/dist-packages/anosql/core.py is in python3-anosql 0.2.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """
anosql main module
"""
import os
from functools import partial
import re
class SQLLoadException(Exception):
pass
class SQLParseException(Exception):
pass
SELECT = 1
INSERT_UPDATE_DELETE = 2
AUTO_GEN = 3
class Queries(object):
def __init__(self, queries=list()):
self.available_queries = []
for name, fn in queries:
self.add_query(name, fn)
def __repr__(self):
return "anosql.Queries(" + self.available_queries.__repr__() + ")"
def add_query(self, name, fn):
setattr(self, name, fn)
if name not in self.available_queries:
self.available_queries.append(name)
def get_fn_name(line):
line = line.replace('-', '_')
return line[9:]
def parse_sql_entry(db_type, e):
is_sqlite = False
is_postgres = False
if db_type == 'sqlite':
is_sqlite = True
if db_type == 'postgres':
is_postgres = True
lines = e.split('\n')
if not lines[0].startswith('-- name:'):
raise SQLParseException('Query does not start with "-- name:".')
name = get_fn_name(lines[0])
doc = None
if '<!' in name:
sql_type = AUTO_GEN
name = name.replace('<!', '_auto')
elif '!' in name:
sql_type = INSERT_UPDATE_DELETE
name = name.replace('!', '')
else:
sql_type = SELECT
if lines[1].startswith('-- '):
doc = lines[1][3:]
if doc:
query = lines[2:]
else:
query = lines[1:]
query = ' '.join(query)
if is_postgres and sql_type == AUTO_GEN:
query += ' RETURNING id'
if is_sqlite:
query = query.replace('%s', '?')
elif is_postgres:
query = re.sub(r'[^:]:([a-zA-Z_-]+)', r'%(\1)s', query)
def fn(conn, *args, **kwargs):
results = None
cur = conn.cursor()
if sql_type == INSERT_UPDATE_DELETE:
cur.execute(query, kwargs if len(kwargs) > 0 else args)
conn.commit()
if is_postgres and sql_type == AUTO_GEN:
cur.execute(query, kwargs if len(kwargs) > 0 else args)
results = cur.fetchone()[0]
conn.commit()
if is_sqlite and sql_type == AUTO_GEN:
cur.execute(query, args)
results = cur.lastrowid
conn.commit()
if sql_type == SELECT:
cur.execute(query, kwargs if len(kwargs) > 0 else args)
results = cur.fetchall()
cur.close()
return results
fn.__doc__ = doc
fn.__query__ = query
fn.func_name = name
return name, fn
def parse_queries_string(db_type, s):
return [parse_sql_entry(db_type, expression)
for expression in s.split('\n\n')]
def load_queries(db_type, filename):
if not os.path.exists(filename):
raise SQLLoadException('Could not read file', filename)
with open(filename) as queries_file:
f = queries_file.read()
queries = parse_queries_string(db_type, f)
return Queries(queries)
def load_queries_from_string(db_type, string):
queries = parse_queries_string(db_type, string)
return Queries(queries)
|