/usr/share/postgresql/9.6/extension/mimeo--1.4.3--1.4.4.sql is in postgresql-9.6-mimeo 1.4.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 | -- Snapshot maker now gives a clearer error when the destination table already exists and clarifies that it cannot (Github Issue #18).
-- Further fixes to validate_rowcount() for incremental replication. May have been giving incorrect match failure before due to not always setting the proper upper boundary value.
/*
* Snapshot maker function.
*/
CREATE OR REPLACE FUNCTION snapshot_maker(
p_src_table text
, p_dblink_id int
, p_dest_table text DEFAULT NULL
, p_index boolean DEFAULT true
, p_filter text[] DEFAULT NULL
, p_condition text DEFAULT NULL
, p_pulldata boolean DEFAULT true
, p_jobmon boolean DEFAULT NULL
, p_debug boolean DEFAULT false)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_data_source text;
v_dest_exists text;
v_insert_refresh_config text;
v_job_id bigint;
v_job_name text;
v_jobmon boolean;
v_jobmon_schema text;
v_old_search_path text;
v_step_id bigint;
BEGIN
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||COALESCE(v_jobmon_schema||',', '')||'public'',''false'')';
SELECT data_source INTO v_data_source FROM @extschema@.dblink_mapping_mimeo WHERE data_source_id = p_dblink_id;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: Database link ID does not exist in @extschema@.dblink_mapping_mimeo: %', p_dblink_id;
END IF;
IF p_dest_table IS NULL THEN
p_dest_table := p_src_table;
END IF;
IF position('.' in p_dest_table) = 0 AND position('.' in p_src_table) = 0 THEN
RAISE EXCEPTION 'Source (and destination) table must be schema qualified';
END IF;
SELECT tablename INTO v_dest_exists
FROM pg_catalog.pg_tables
WHERE schemaname = split_part(p_dest_table, '.', 1)::name
AND tablename = split_part(p_dest_table, '.', 2)::name;
IF v_dest_exists IS NOT NULL THEN
RAISE EXCEPTION 'Destination table cannot exist before running snapshot_maker(): %', p_dest_table;
END IF;
IF p_jobmon IS TRUE AND v_jobmon_schema IS NULL THEN
RAISE EXCEPTION 'p_jobmon parameter set to TRUE, but unable to determine if pg_jobmon extension is installed';
ELSIF (p_jobmon IS TRUE OR p_jobmon IS NULL) AND v_jobmon_schema IS NOT NULL THEN
v_jobmon := true;
ELSE
v_jobmon := false;
END IF;
v_job_name := 'Snapshot Maker: '||p_src_table;
IF v_jobmon THEN
v_job_id := add_job(v_job_name);
PERFORM gdb(p_debug,'Job ID: '||v_job_id::text);
v_step_id := add_step(v_job_id,'Inserting config data');
END IF;
v_insert_refresh_config := 'INSERT INTO @extschema@.refresh_config_snap(
source_table
, dest_table
, dblink
, filter
, condition
, jobmon)
VALUES('
||quote_literal(p_src_table)
||', '||quote_literal(p_dest_table)
||', '||p_dblink_id
||', '||COALESCE(quote_literal(p_filter), 'NULL')
||', '||COALESCE(quote_literal(p_condition), 'NULL')
||', '||v_jobmon||')';
EXECUTE v_insert_refresh_config;
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK','Done');
END IF;
IF v_jobmon THEN
v_step_id := add_step(v_job_id,'Running first snapshot. See separate refresh job for more details.');
END IF;
RAISE NOTICE 'attempting first snapshot';
EXECUTE 'SELECT @extschema@.refresh_snap('||quote_literal(p_dest_table)||', p_index := '||p_index||', p_pulldata := '||p_pulldata||', p_debug := '||p_debug||')';
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK','Done');
v_step_id := add_step(v_job_id,'Running second snapshot. See separate refresh job for more details.');
END IF;
RAISE NOTICE 'attempting second snapshot';
EXECUTE 'SELECT @extschema@.refresh_snap('||quote_literal(p_dest_table)||', p_index := '||p_index||', p_pulldata := '||p_pulldata||', p_debug := '||p_debug||')';
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK','Done');
END IF;
IF v_jobmon THEN
PERFORM close_job(v_job_id);
END IF;
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
RAISE NOTICE 'Done';
RETURN;
EXCEPTION
WHEN OTHERS THEN
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
SELECT jobmon INTO v_jobmon FROM @extschema@.refresh_config_snap WHERE dest_table = p_src_table;
v_jobmon := COALESCE(p_jobmon, v_jobmon);
IF v_jobmon_schema IS NULL THEN
v_jobmon := false;
END IF;
IF v_jobmon THEN
IF v_job_id IS NULL THEN
EXECUTE format('SELECT %I.add_job(%L)', v_jobmon_schema, 'Snapshot Maker: '||p_src_table) INTO v_job_id;
EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'EXCEPTION before job logging started') INTO v_step_id;
END IF;
IF v_step_id IS NULL THEN
EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'EXCEPTION before first step logged') INTO v_step_id;
END IF;
EXECUTE format('SELECT %I.update_step(%L, %L, %L)', v_jobmon_schema, v_step_id, 'CRITICAL', 'ERROR: '||COALESCE(SQLERRM,'unknown'));
EXECUTE format('SELECT %I.fail_job(%L)', v_jobmon_schema, v_job_id);
END IF;
RAISE EXCEPTION '%', SQLERRM;
END
$$;
/*
* Simple row count compare.
* For any replication type other than inserter/updater, this will fail to run if replication is currently running.
* For any replication type other than inserter/updater, this will pause replication for the given table until validation is complete
*/
CREATE OR REPLACE FUNCTION validate_rowcount(p_destination text, p_src_incr_less boolean DEFAULT false, p_incr_interval text DEFAULT NULL, p_debug boolean DEFAULT false, OUT match boolean, OUT source_count bigint, OUT dest_count bigint, OUT min_source_value text, OUT max_source_value text) RETURNS record
LANGUAGE plpgsql
AS $$
DECLARE
v_adv_lock boolean := true;
v_condition text;
v_control text;
v_dblink int;
v_dblink_name text;
v_dblink_schema text;
v_dest_table text;
v_dest_schemaname text;
v_dest_tablename text;
v_link_exists boolean;
v_local_sql text;
v_max_dest_serial bigint;
v_max_dest_time timestamptz;
v_old_search_path text;
v_remote_sql text;
v_remote_min_sql text;
v_source_min_serial bigint;
v_source_min_time timestamptz;
v_source_table text;
v_src_schemaname text;
v_src_tablename text;
v_type text;
BEGIN
SELECT nspname INTO v_dblink_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||',public'',''true'')';
v_dblink_name := @extschema@.check_name_length('mimeo_data_validation_'||p_destination);
SELECT dest_table
, type
, dblink
, condition
, source_table
INTO v_dest_table
, v_type
, v_dblink
, v_condition
, v_source_table
FROM refresh_config
WHERE dest_table = p_destination;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: This table is not set up for replication: %', p_destination;
END IF;
IF v_type = 'snap' OR v_type = 'dml' OR v_type = 'logdel' OR v_type = 'table' THEN
v_adv_lock := @extschema@.concurrent_lock_check(v_dest_table);
END IF;
IF v_type = 'inserter_time' OR v_type = 'inserter_serial' THEN
SELECT control INTO v_control FROM refresh_config_inserter WHERE dest_table = v_dest_table;
ELSIF v_type = 'updater_time' OR v_type = 'updater_serial' THEN
SELECT control INTO v_control FROM refresh_config_updater WHERE dest_table = v_dest_table;
END IF;
IF v_adv_lock = 'false' THEN
RAISE EXCEPTION 'Validation cannot run while refresh for given table is running: %', v_dest_table;
RETURN;
END IF;
PERFORM dblink_connect(v_dblink_name, auth(v_dblink));
v_remote_sql := format('
SELECT schemaname, tablename
FROM (
SELECT schemaname, tablename
FROM pg_catalog.pg_tables
WHERE schemaname ||''.''|| tablename = %L
UNION
SELECT schemaname, viewname AS tablename
FROM pg_catalog.pg_views
WHERE schemaname || ''.'' || viewname = %L
) tables LIMIT 1'
, v_source_table, v_source_table);
v_remote_sql := format('SELECT schemaname, tablename FROM dblink(%L, %L) t (schemaname text, tablename text)', v_dblink_name, v_remote_sql);
EXECUTE v_remote_sql INTO v_src_schemaname, v_src_tablename;
SELECT schemaname, tablename
FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = v_dest_table
UNION
SELECT schemaname, viewname AS tablename
FROM pg_catalog.pg_views WHERE schemaname||'.'||viewname = v_dest_table
INTO v_dest_schemaname, v_dest_tablename;
v_remote_sql := format('SELECT count(*) as row_count FROM %I.%I', v_src_schemaname, v_src_tablename);
v_local_sql := format('SELECT count(*) FROM %I.%I', v_dest_schemaname, v_dest_tablename);
IF v_control IS NOT NULL THEN
IF v_condition IS NOT NULL THEN
v_remote_sql := v_remote_sql ||' '|| v_condition || ' AND ';
ELSE
v_remote_sql := v_remote_sql ||' WHERE ';
END IF;
IF p_src_incr_less THEN
v_remote_min_sql := format('SELECT min(%I) AS min_source FROM %I.%I', v_control, v_src_schemaname, v_src_tablename);
PERFORM gdb(p_debug, 'v_remote_min_sql: '||v_remote_min_sql);
IF v_condition IS NOT NULL THEN
v_remote_min_sql := v_remote_min_sql ||' '||v_condition;
END IF;
IF v_type = 'inserter_time' OR v_type = 'updater_time' THEN
v_remote_min_sql := 'SELECT min_source FROM dblink('||quote_literal(v_dblink_name)||','||quote_literal(v_remote_min_sql)||') t (min_source timestamptz)';
PERFORM gdb(p_debug, 'v_remote_min_sql: '||v_remote_min_sql);
EXECUTE v_remote_min_sql INTO v_source_min_time;
v_local_sql := format(v_local_sql || ' WHERE %I >= %L', v_control, COALESCE(v_source_min_time, '-infinity'));
min_source_value := v_source_min_time::text;
ELSIF v_type = 'inserter_serial' OR v_type = 'updater_serial' THEN
v_remote_min_sql := 'SELECT min_source FROM dblink('||quote_literal(v_dblink_name)||','||quote_literal(v_remote_min_sql)||') t (min_source bigint)';
PERFORM gdb(p_debug, 'v_remote_min_sql: '||v_remote_min_sql);
EXECUTE v_remote_min_sql INTO v_source_min_serial;
v_local_sql := format(v_local_sql || ' WHERE %I >= %L', v_control, COALESCE(v_source_min_serial, 0));
min_source_value := v_source_min_serial::text;
END IF;
END IF;
IF v_type = 'inserter_time' OR v_type = 'updater_time' THEN
EXECUTE format('SELECT max(%I) FROM %I.%I', v_control, v_dest_schemaname, v_dest_tablename) INTO v_max_dest_time;
-- Reduce the max value being compared by the given interval value
IF p_incr_interval IS NOT NULL AND v_max_dest_time IS NOT NULL THEN
v_max_dest_time := v_max_dest_time - p_incr_interval::interval;
IF p_src_incr_less THEN
v_local_sql := v_local_sql || ' AND ';
ELSE
v_local_sql := v_local_sql || ' WHERE ';
END IF;
v_local_sql := format(v_local_sql || ' %I <= %L', v_control, v_max_dest_time);
END IF;
v_remote_sql := format(v_remote_sql ||' %I <= %L', v_control, COALESCE(v_max_dest_time, 'infinity'));
max_source_value := v_max_dest_time::text;
ELSIF v_type = 'inserter_serial' OR v_type = 'updater_serial' THEN
EXECUTE format('SELECT max(%I) FROM %I.%I', v_control, v_dest_schemaname, v_dest_tablename) INTO v_max_dest_serial;
-- Reduce the max value being compared by the given interval value
IF p_incr_interval IS NOT NULL AND v_max_dest_serial IS NOT NULL THEN
v_max_dest_serial := v_max_dest_serial - p_incr_interval::bigint;
IF p_src_incr_less THEN
v_local_sql := v_local_sql || ' AND ';
ELSE
v_local_sql := v_local_sql || ' WHERE ';
END IF;
v_local_sql := format(v_local_sql || ' %I <= %L', v_control, v_max_dest_serial);
END IF;
v_remote_sql := format(v_remote_sql ' %I <= %L', v_control, COALESCE(v_max_dest_serial, 0));
max_source_value := v_max_dest_serial::text;
END IF;
ELSIF v_condition IS NOT NULL THEN
v_remote_sql := v_remote_sql ||' '|| v_condition;
END IF;
v_remote_sql := 'SELECT row_count FROM dblink('||quote_literal(v_dblink_name)||','||quote_literal(v_remote_sql)||') t (row_count bigint)';
PERFORM gdb(p_debug, 'v_remote_sql: '||v_remote_sql);
EXECUTE v_remote_sql INTO source_count;
PERFORM gdb(p_debug, 'v_local_sql: '||v_local_sql);
EXECUTE v_local_sql INTO dest_count;
IF source_count = dest_count THEN
match = true;
ELSE
match = false;
END IF;
PERFORM dblink_disconnect(v_dblink_name);
EXCEPTION
WHEN QUERY_CANCELED OR OTHERS THEN
SELECT nspname INTO v_dblink_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
EXECUTE format('SELECT %I.dblink_get_connections() @> ARRAY[%L]', v_dblink_schema, v_dblink_name) INTO v_link_exists;
IF v_link_exists THEN
EXECUTE format('SELECT %I.dblink_disconnect(%L)', v_dblink_schema, v_dblink_name);
END IF;
RAISE EXCEPTION '%', SQLERRM;
END
$$;
|