/usr/bin/data_maintainer3 is in skytools3 3.2.6-4.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 | #!/usr/bin/python
"""Generic script for processing large data sets in small batches.
Reads events from one datasource and commits them into another one,
either one by one or in batches.
Config template::
[data_maintainer3]
job_name = dm_remove_expired_services
# if source is database, you need to specify dbread and sql_get_pk_list
dbread = dbname=sourcedb_test
sql_get_pk_list =
select username
from user_service
where expire_date < now();
# if source is csv file, you need to specify fileread and optionally csv_delimiter and csv_quotechar
#fileread = data.csv
#csv_delimiter = ,
#csv_quotechar = "
dbwrite = dbname=destdb port=1234 host=dbhost.com user=guest password=secret
dbbefore = dbname=destdb_test
dbafter = dbname=destdb_test
dbcrash = dbname=destdb_test
dbthrottle = dbname=queuedb_test
# It is a good practice to include same where condition on target side as on read side,
# to ensure that you are actually changing the same data you think you are,
# especially when reading from replica database or when processing takes days.
sql_modify =
delete from user_service
where username = %%(username)s
and expire_date < now();
# This will be run before executing the sql_get_pk_list query (optional)
#sql_before_run =
# select * from somefunction1(%(job_name)s);
# This will be run when the DM finishes (optional)
#sql_after_run =
# select * from somefunction2(%(job_name)s);
# Determines whether the sql_after_run query will be run in case the pk list query returns no rows
#after_zero_rows = 1
# This will be run if the DM crashes (optional)
#sql_on_crash =
# select * from somefunction3(%(job_name)s);
# This may be used to control throttling of the DM (optional)
#sql_throttle =
# select lag>'5 minutes'::interval from pgq.get_consumer_info('failoverconsumer');
# materialize query so that transaction should not be open while processing it (only used when source is a database)
#with_hold = 1
# how many records process to fetch at once and if batch processing is used then
# also how many records are processed in one commit
#fetch_count = 100
# by default commit after each row (safe when behind plproxy, bouncer or whatever)
# can be turned off for better performance when connected directly to database
#autocommit = 1
# just for tuning to throttle how much load we let onto write database
#commit_delay = 0.0
# quite often data_maintainer is run from crontab and then loop delay is not needed
# in case it has to be run as daemon set loop delay in seconds
#loop_delay = 1
logfile = ~/log/%(job_name)s.log
pidfile = ~/pid/%(job_name)s.pid
use_skylog = 0
"""
import csv
import datetime
import os.path
import sys
import time
import pkgloader
pkgloader.require('skytools', '3.0')
import skytools
class DataSource (object):
def __init__(self, log):
self.log = log
def open(self):
raise NotImplementedError
def close(self):
raise NotImplementedError
def fetch(self, count):
raise NotImplementedError
class DBDataSource (DataSource):
def __init__(self, log, db, query, bres = None, with_hold = False):
super(DBDataSource, self).__init__(log)
self.db = db
if with_hold:
self.query = "DECLARE data_maint_cur NO SCROLL CURSOR WITH HOLD FOR %s" % query
else:
self.query = "DECLARE data_maint_cur NO SCROLL CURSOR FOR %s" % query
self.bres = bres
self.with_hold = with_hold
def _run_query(self, query, params = None):
self.cur.execute(query, params)
self.log.debug(self.cur.query)
self.log.debug(self.cur.statusmessage)
def open(self):
self.cur = self.db.cursor()
self._run_query(self.query, self.bres) # pass results from before_query into sql_pk
def close(self):
self.cur.execute("CLOSE data_maint_cur")
if not self.with_hold:
self.db.rollback()
def fetch(self, count):
self._run_query("FETCH FORWARD %i FROM data_maint_cur" % count)
return self.cur.fetchall()
class CSVDataSource (DataSource):
def __init__(self, log, filename, delimiter, quotechar):
super(CSVDataSource, self).__init__(log)
self.filename = filename
self.delimiter = delimiter
self.quotechar = quotechar
def open(self):
self.fp = open(self.filename, 'rb')
self.reader = csv.DictReader(self.fp, delimiter = self.delimiter, quotechar = self.quotechar)
def close(self):
self.fp.close()
def fetch(self, count):
ret = []
for row in self.reader:
ret.append(row)
count -= 1
if count <= 0:
break
return ret
class DataMaintainer (skytools.DBScript):
__doc__ = __doc__
loop_delay = -1
def __init__(self, args):
super(DataMaintainer, self).__init__("data_maintainer3", args)
# source file
self.fileread = self.cf.get("fileread", "")
if self.fileread:
self.fileread = os.path.expanduser(self.fileread)
self.set_single_loop(True) # force single run if source is file
self.csv_delimiter = self.cf.get("csv_delimiter", ',')
self.csv_quotechar = self.cf.get("csv_quotechar", '"')
# query for fetching the PK-s of the data set to be maintained
self.sql_pk = self.cf.get("sql_get_pk_list", "")
if (int(bool(self.sql_pk)) + int(bool(self.fileread))) in (0,2):
raise skytools.UsageError("Either fileread or sql_get_pk_list must be specified in the configuration file")
# query for changing data tuple ( autocommit )
self.sql_modify = self.cf.get("sql_modify")
# query to be run before starting the data maintainer,
# useful for retrieving initialization parameters of the query
self.sql_before = self.cf.get("sql_before_run", "")
# query to be run after finishing the data maintainer
self.sql_after = self.cf.get("sql_after_run", "")
# whether to run the sql_after query in case of 0 rows
self.after_zero_rows = self.cf.getint("after_zero_rows", 1)
# query to be run if the process crashes
self.sql_crash = self.cf.get("sql_on_crash", "")
# query for checking if / how much to throttle
self.sql_throttle = self.cf.get("sql_throttle", "")
# how many records to fetch at once
self.fetchcnt = self.cf.getint("fetchcnt", 100)
self.fetchcnt = self.cf.getint("fetch_count", self.fetchcnt)
# specifies if non-transactional cursor should be created (0 -> without hold)
self.withhold = self.cf.getint("with_hold", 1)
# execution mode (0 -> whole batch is committed / 1 -> autocommit)
self.autocommit = self.cf.getint("autocommit", 1)
# delay in seconds after each commit
self.commit_delay = self.cf.getfloat("commit_delay", 0.0)
def work(self):
self.log.info('Starting..')
self.started = self.lap_time = time.time()
self.total_count = 0
bres = {}
if self.sql_before:
bdb = self.get_database("dbbefore", autocommit=1)
bcur = bdb.cursor()
bcur.execute(self.sql_before)
if bcur.statusmessage.startswith('SELECT'):
res = bcur.fetchall()
assert len(res)==1, "Result of a 'before' query must be 1 row"
bres = res[0].copy()
if self.sql_throttle:
dbt = self.get_database("dbthrottle", autocommit=1)
tcur = dbt.cursor()
if self.autocommit:
self.log.info("Autocommit after each modify")
dbw = self.get_database("dbwrite", autocommit=1)
else:
self.log.info("Commit in %i record batches", self.fetchcnt)
dbw = self.get_database("dbwrite", autocommit=0)
if self.fileread:
self.datasource = CSVDataSource(self.log, self.fileread, self.csv_delimiter, self.csv_quotechar)
else:
if self.withhold:
dbr = self.get_database("dbread", autocommit=1)
else:
dbr = self.get_database("dbread", autocommit=0)
self.datasource = DBDataSource(self.log, dbr, self.sql_pk, bres, self.withhold)
self.datasource.open()
mcur = dbw.cursor()
while True: # loop while fetch returns fetch_count rows
self.fetch_started = time.time()
res = self.datasource.fetch(self.fetchcnt)
count, lastitem = self.process_batch(res, mcur, bres)
self.total_count += count
if not self.autocommit:
dbw.commit()
self.stat_put("duration", time.time() - self.fetch_started)
self.send_stats()
if len(res) < self.fetchcnt or self.last_sigint:
break
if self.commit_delay > 0.0:
time.sleep(self.commit_delay)
if self.sql_throttle:
self.throttle(tcur)
self._print_count("--- Running count: %s duration: %s ---")
if self.last_sigint:
self.log.info("Exiting on user request")
self.datasource.close()
self.log.info("--- Total count: %s duration: %s ---",
self.total_count, datetime.timedelta(0, round(time.time() - self.started)))
if self.sql_after and (self.after_zero_rows > 0 or self.total_count > 0):
adb = self.get_database("dbafter", autocommit=1)
acur = adb.cursor()
acur.execute(self.sql_after, lastitem)
def process_batch(self, res, mcur, bres):
""" Process events in autocommit mode reading results back and trying to make some sense out of them
"""
try:
count = 0
item = bres.copy()
for i in res: # for each row in read query result
item.update(i)
mcur.execute(self.sql_modify, item)
self.log.debug(mcur.query)
if mcur.statusmessage.startswith('SELECT'): # if select was used we can expect some result
mres = mcur.fetchall()
for r in mres:
if 'stats' in r: # if specially handled column 'stats' is present
for k, v in skytools.db_urldecode(r['stats'] or '').items():
self.stat_increase(k, int(v))
self.log.debug(r)
else:
self.stat_increase('processed', mcur.rowcount)
self.log.debug(mcur.statusmessage)
if 'cnt' in item:
count += item['cnt']
self.stat_increase("count", item['cnt'])
else:
count += 1
self.stat_increase("count")
if self.last_sigint:
break
return count, item
except: # process has crashed, run sql_crash and re-raise the exception
if self.sql_crash:
dbc = self.get_database("dbcrash", autocommit=1)
ccur = dbc.cursor()
ccur.execute(self.sql_crash, item)
raise
def throttle(self, tcur):
while not self.last_sigint:
tcur.execute(self.sql_throttle)
_r = tcur.fetchall()
assert len(_r) == 1 and len(_r[0]) == 1, "Result of 'throttle' query must be 1 value"
throttle = _r[0][0]
if isinstance(throttle, bool):
tt = float(throttle and 30)
elif isinstance(throttle, (int, float)):
tt = float(throttle)
else:
self.log.warn("Result of 'throttle' query must be boolean or numeric")
break
if tt > 0.0:
self.log.debug("sleeping %f s", tt)
time.sleep(tt)
else:
break
self._print_count("--- Waiting count: %s duration: %s ---")
def _print_count(self, text):
if time.time() - self.lap_time > 60.0: # if one minute has passed print running totals
self.log.info(text, self.total_count, datetime.timedelta(0, round(time.time() - self.started)))
self.lap_time = time.time()
def shutdown(self):
super(DataMaintainer, self).shutdown()
self.log.info("Script finished, exiting")
if __name__ == '__main__':
script = DataMaintainer(sys.argv[1:])
script.start()
|