/usr/share/pyshared/londiste/table_copy.py is in skytools 2.1.12-6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | #! /usr/bin/env python
"""Do a full table copy.
For internal usage.
"""
import sys, os, skytools
from skytools.dbstruct import *
from playback import *
__all__ = ['CopyTable']
class CopyTable(Replicator):
def __init__(self, args, copy_thread = 1):
Replicator.__init__(self, args)
if copy_thread:
self.pidfile += ".copy"
self.consumer_id += "_copy"
self.copy_thread = 1
def do_copy(self, tbl_stat):
src_db = self.get_database('provider_db')
dst_db = self.get_database('subscriber_db')
# it should not matter to pgq
src_db.commit()
dst_db.commit()
# change to SERIALIZABLE isolation level
src_db.set_isolation_level(skytools.I_SERIALIZABLE)
src_db.commit()
self.sync_database_encodings(src_db, dst_db)
# initial sync copy
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
self.log.info("Starting full copy of %s" % tbl_stat.name)
# just in case, drop all fkeys (in case "replay" was skipped)
# !! this may commit, so must be done before anything else !!
self.drop_fkeys(dst_db, tbl_stat.name)
# just in case, drop all triggers (in case "subscriber add" was skipped)
q_triggers = "select londiste.subscriber_drop_all_table_triggers(%s)"
dst_curs.execute(q_triggers, [tbl_stat.name])
# find dst struct
src_struct = TableStruct(src_curs, tbl_stat.name)
dst_struct = TableStruct(dst_curs, tbl_stat.name)
# check if columns match
dlist = dst_struct.get_column_list()
for c in src_struct.get_column_list():
if c not in dlist:
raise Exception('Column %s does not exist on dest side' % c)
# drop unnecessary stuff
objs = T_CONSTRAINT | T_INDEX | T_RULE | T_PARENT
dst_struct.drop(dst_curs, objs, log = self.log)
# do truncate & copy
self.real_copy(src_curs, dst_curs, tbl_stat)
# get snapshot
src_curs.execute("select txid_current_snapshot()")
snapshot = src_curs.fetchone()[0]
src_db.commit()
# restore READ COMMITTED behaviour
src_db.set_isolation_level(1)
src_db.commit()
# create previously dropped objects
dst_struct.create(dst_curs, objs, log = self.log)
dst_db.commit()
# set state
if self.copy_thread:
tbl_stat.change_state(TABLE_CATCHING_UP)
else:
tbl_stat.change_state(TABLE_OK)
tbl_stat.change_snapshot(snapshot)
self.save_table_state(dst_curs)
dst_db.commit()
self.log.debug("%s: ANALYZE" % tbl_stat.name)
dst_curs.execute("analyze " + skytools.quote_fqident(tbl_stat.name))
dst_db.commit()
# if copy done, request immidiate tick from pgqadm,
# to make state juggling faster. on mostly idle db-s
# each step may take tickers idle_timeout secs, which is pain.
q = "select pgq.force_tick(%s)"
src_curs.execute(q, [self.pgq_queue_name])
src_db.commit()
def real_copy(self, srccurs, dstcurs, tbl_stat):
"Main copy logic."
tablename = tbl_stat.name
# drop data
if tbl_stat.skip_truncate:
self.log.info("%s: skipping truncate" % tablename)
else:
self.log.info("%s: truncating" % tablename)
# truncate behaviour changed in 8.4
dstcurs.execute("show server_version_num")
pgver = int(dstcurs.fetchone()[0])
if pgver >= 80400:
dstcurs.execute("truncate only " + skytools.quote_fqident(tablename))
else:
dstcurs.execute("truncate " + skytools.quote_fqident(tablename))
# do copy
self.log.info("%s: start copy" % tablename)
col_list = skytools.get_table_columns(srccurs, tablename)
stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list)
if stats:
self.log.info("%s: copy finished: %d bytes, %d rows" % (
tablename, stats[0], stats[1]))
if __name__ == '__main__':
script = CopyTable(sys.argv[1:])
script.start()
|