/usr/lib/python2.7/dist-packages/Pegasus/netlogger/analysis/schema/schema_check.py is in pegasus-wms 4.4.0+dfsg-7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | """
Code to handle various aspects of transitioning to a new verson of the
Stampede schema.
"""
__rcsid__ = "$Id: schema_check.py 30802 2012-03-07 17:01:34Z mgoode $"
__author__ = "Monte Goode"
from Pegasus.netlogger.analysis.modules._base import SQLAlchemyInit, dsn_dialect
from Pegasus.netlogger.analysis.schema.stampede_schema import *
from Pegasus.netlogger.nllog import DoesLogging
class SchemaVersionError(Exception):
"""
Custom exception. Will be raised in the loader/etc if the schema
is out of date so the calling program can catch and handle it.
"""
pass
class ConnHandle(SQLAlchemyInit, DoesLogging):
"""
Stand-alone connection class that returns a SQLAlchemy session.
"""
def __init__(self, connString=None, mysql_engine=None, **kw):
DoesLogging.__init__(self)
if connString is None:
raise ValueError("connString is required")
_kw = { }
dialect = dsn_dialect(connString)
_kw[dialect] = { }
if dialect == 'mysql':
if mysql_engine is not None:
_kw[dialect]['mysql_engine'] = mysql_engine
try:
SQLAlchemyInit.__init__(self, connString, initializeToPegasusDB, **_kw)
except exc.OperationalError, e:
self.log.error('init', msg='%s' % ErrorStrings.get_init_error(e))
raise RuntimeError
pass
def get_session(self):
return self.session
class ErrorStrings:
"""
Parses SQLAlchemy OperationalErrors to generate error strings.
Currently just handles case of when a user with limited permissions
might hit a wall when running 4.0 code on an existing 3.1 DB.
"""
# Actions
create_failure = 'CREATE command denied to user'
# Tables
schema_info = 'schema_info'
@staticmethod
def get_init_error(e):
action_error = table_error = None
try:
action_error = e.args[0].split("'")[0].split('"')[1].strip()
table_error = e.args[0].split("'")[-2]
except IndexError:
# specific parse didn't work, so pass the original
# exception through.
pass
er = ''
if action_error == ErrorStrings.create_failure and \
table_error == ErrorStrings.schema_info:
er = 'The schema_info table does not exist: '
er += 'user does not have CREATE TABLE permissions - '
er += 'database schema needs to be upgraded to version %s, ' % CURRENT_SCHEMA_VERSION
er += 'database admin will need to run upgrade tool'
return er
else:
er = 'Error raised during database init: %s' % e.args[0]
return er
class SchemaCheck(DoesLogging):
"""
This handles checking the schema, setting the proper version_number
if not already set, returning the current version for things like the
API and methods to manually upgrade an older existing DB to the new
schema.
The check_schema() method should be called by any code that
creates/initializes a database (like the loader) so it can scan
schema and set the correct version number.
"""
def __init__(self, session):
DoesLogging.__init__(self)
self.session = session
self.log.info('init')
self._table_map = {}
pass
def _get_current_version(self):
q = self.session.query(cast(func.max(SchemaInfo.version_number), Float))
if not q.one()[0]:
return q.one()[0]
else:
return round(q.one()[0],1)
def _version_check(self, version_number):
self.log.info('check_schema', msg='Current version set to: %s' % version_number)
if float(version_number) == CURRENT_SCHEMA_VERSION:
self.log.info('check_schema', msg='Schema up to date')
return True
elif float(version_number) < CURRENT_SCHEMA_VERSION:
self.log.error('check_schema', \
msg='Schema version %s found - expecting %s - database admin will need to run upgrade tool' % \
(float(version_number), CURRENT_SCHEMA_VERSION))
return False
def _table_scan(self, sql, table, idx):
if not self._table_map.has_key(table):
self._table_map[table] = {}
for row in self.session.execute(sql).fetchall():
self._table_map[table][row[idx]] = True
def check_schema(self):
"""
Checks the schema to determine the version, sets the information
in the schema_info table, and outputs an error message if the
schema is out of date. Returns True or False so calling apps
(like the loader) can handle appropriately with execptions, etc.
"""
self.log.info('check_schema.start')
version_number = self._get_current_version()
if not version_number:
self.log.info('check_schema', msg='No version_number set in schema_info')
elif version_number == 3.2:
self.log.info('check_schema', msg='Schema set to 3.2 deveopment version - resetting to release version')
else:
return self._version_check(version_number)
self.log.info('check_schema', msg='Determining schema version')
table_scan = ['job_instance', 'invocation']
# Due to how the SQLAlchemy mapper works, I need to look at these with
# raw SQL calls to the DBM and not use the mapper objects.
for t in table_scan:
if self.session.connection().dialect.name == 'sqlite':
self._table_scan('PRAGMA table_info(%s)' % t, t, 1)
elif self.session.connection().dialect.name == 'mysql':
self._table_scan('desc %s' % t, t, 0)
else:
self.log.error('check_schema', msg='Dialect %s not available for scanning' \
% self.session.connection().dialect.name )
#
# Checks for version 4.0
#
m_factor_check = exitcode_check = remote_cpu_check = False
# Check job_instance table
if self._table_map['job_instance'].has_key('multiplier_factor'):
m_factor_check = True
if self._table_map['job_instance'].has_key('exitcode'):
exitcode_check = True
# Check invocation
if self._table_map['invocation'].has_key('remote_cpu_time'):
remote_cpu_check = True
s_info = SchemaInfo()
if not m_factor_check and not exitcode_check and not remote_cpu_check:
self.log.info('check_schema', msg='Setting schema to version 3.1')
s_info.version_number = 3.1
elif m_factor_check and exitcode_check and remote_cpu_check:
s_info.version_number = 4.0
self.log.info('check_schema', msg='Setting schema to version 4.0')
else:
self.log.error('check_schema', msg='Error in determining database schema')
raise RuntimeError
s_info.commit_to_db(self.session)
#
# End version 4.0 code
#
self._table_map = {}
return self._version_check(self._get_current_version())
def check_version(self):
"""
Check version in the schema_info table. Called for things
like the stats api. Assumes 3.1 if no version set.
"""
version_number = self._get_current_version()
if not version_number:
# presume 3.1
return 3.1
else:
return version_number
def upgrade_to_4_0(self):
"""
Called by the "upgrade tool" - upgrades a populated 3.1 DB to
4.0.
"""
self.log.info('upgrade_to_4_0', msg='Upgrading to schema version 4.0')
if self._get_current_version() >= 4.0:
self.log.warn('upgrade_to_4_0', msg='Schema version already 4.0 - skipping upgrade')
return
# Alter tables
r_c_t = 'ALTER TABLE invocation ADD COLUMN remote_cpu_time NUMERIC(10,3) NULL'
if self.session.connection().dialect.name != 'sqlite':
r_c_t += ' AFTER remote_duration'
m_fac = 'ALTER TABLE job_instance ADD COLUMN multiplier_factor INT NOT NULL DEFAULT 1'
e_cod = 'ALTER TABLE job_instance ADD COLUMN exitcode INT NULL'
self.session.execute(r_c_t)
self.session.execute(m_fac)
self.session.execute(e_cod)
# Seed new columns with data derived from existing 3.1 data
success = ['JOB_SUCCESS', 'POST_SCRIPT_SUCCESS']
failure = ['PRE_SCRIPT_FAILED', 'SUBMIT_FAILED', 'JOB_FAILURE', 'POST_SCRIPT_FAILED']
q = self.session.query(JobInstance.job_instance_id).order_by(JobInstance.job_instance_id)
for r in q.all():
qq = self.session.query(Jobstate.state)
qq = qq.filter(Jobstate.job_instance_id == r.job_instance_id)
qq = qq.order_by(Jobstate.jobstate_submit_seq.desc()).limit(1)
for rr in qq.all():
if rr.state in success:
self.session.execute('UPDATE job_instance set exitcode = 0 where job_instance_id = %s' \
% r.job_instance_id )
elif rr.state in failure:
self.session.execute('UPDATE job_instance set exitcode = 256 where job_instance_id = %s' \
% r.job_instance_id)
else:
pass
s_info = SchemaInfo()
s_info.version_number = 4.0
s_info.commit_to_db(self.session)
pass
def upgrade(self):
"""
Public wrapper around the version-specific upgrade methods.
"""
self.check_schema()
self.upgrade_to_4_0()
pass
def main():
pass
if __name__ == '__main__':
main()
|