/usr/share/postgresql/9.5/extension/mimeo--1.2.1--1.2.2.sql is in postgresql-9.5-mimeo 1.4.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 | -- New function "check_missing_source_tables()" can show tables that exist on the configured data sources that are not configured for replication.
-- Provides monitoring capability for situations where all tables on source should be replicated.
-- Optional parameter to check one specific data source. Otherwise, all sources listed in dblink_mapping_mimeo table are checked.
-- Returns a record value so WHERE conditions can be used to ignore tables that aren't desired.
-- New function "check_source_columns()" can show columns that exist on source tables that do not exist on the destination
-- Provides monitoring capability for data source tables to see if columns have been added or types changed.
-- Does not check if destination has columns that source does not (therefore does not check if columns were dropped on source but not on destination).
-- Accounts for when the "filter" configuration option is used to only grab specific columns.
-- Optional parameter to check one specific data source. Otherwise, all sources listed in dblink_mapping_mimeo table are checked.
-- Returns a record value so WHERE conditions can be used to ignore tables and/or columns that don't matter for your situation.
-- Fix bug during index creation when dblink is not installed in a schema called "dblink" (Github Issue #6).
-- Added note to documentation about how to add/remove columns with DML replication and avoid errors.
-- Added pg_tap tests for new monitoring functions & snapshot column change replication.
/*
* Check data sources to see what tables exist there that are not set up for mimeo replication
* Provides monitoring capability for situations where all tables on source should be replicated.
* Returns a record value so that a WHERE condition can be used to ignore tables that aren't desired.
* If p_data_source_id value is not given, all configured sources are checked.
*/
CREATE FUNCTION check_missing_source_tables(p_data_source_id int DEFAULT NULL, OUT schemaname text, OUT tablename text, OUT data_source int) RETURNS SETOF record
LANGUAGE plpgsql SECURITY DEFINER
AS $$
DECLARE
v_dblink_schema text;
v_exists int;
v_row_dblink record;
v_row_missing record;
BEGIN
IF p_data_source_id IS NOT NULL THEN
SELECT count(*) INTO v_exists FROM @extschema@.dblink_mapping_mimeo WHERE data_source_id = p_data_source_id;
IF v_exists < 1 THEN
RAISE EXCEPTION 'Given data_source_id (%) does not exist in @extschema@.dblink_mapping_mimeo config table', p_data_source_id;
END IF;
END IF;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
FOR v_row_dblink IN SELECT data_source_id FROM @extschema@.dblink_mapping_mimeo
LOOP
-- Allow a parameter to choose which data sources are checked. If parameter is NULL, check them all.
IF p_data_source_id IS NOT NULL THEN
IF p_data_source_id <> v_row_dblink.data_source_id THEN
CONTINUE;
END IF;
END IF;
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_connect(''mimeo_missing'', @extschema@.auth('||v_row_dblink.data_source_id||'))';
CREATE TEMP TABLE current_source_tables_tmp (schemaname text, tablename text);
EXECUTE 'INSERT INTO current_source_tables_tmp SELECT schemaname, tablename FROM '||v_dblink_schema||'.dblink(''mimeo_missing'',
''SELECT schemaname, tablename AS tablename FROM pg_catalog.pg_tables WHERE schemaname NOT IN (''''pg_catalog'''', ''''information_schema'''', ''''@extschema@'''')'') t (schemaname text, tablename text)';
CREATE TEMP TABLE current_dest_tables_tmp AS
SELECT source_table FROM @extschema@.refresh_config_snap WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table FROM @extschema@.refresh_config_inserter WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table FROM @extschema@.refresh_config_updater WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table FROM @extschema@.refresh_config_dml WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table FROM @extschema@.refresh_config_logdel WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table FROM @extschema@.refresh_config_table WHERE dblink = v_row_dblink.data_source_id;
FOR v_row_missing IN
SELECT s.schemaname, s.tablename
FROM current_source_tables_tmp s
LEFT OUTER JOIN current_dest_tables_tmp d ON s.schemaname||'.'||s.tablename = d.source_table
WHERE d.source_table IS NULL
ORDER BY s.schemaname, s.tablename
LOOP
schemaname := v_row_missing.schemaname;
tablename := v_row_missing.tablename;
data_source = v_row_dblink.data_source_id;
RETURN NEXT;
END LOOP;
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect(''mimeo_missing'')';
DROP TABLE IF EXISTS current_source_tables_tmp;
DROP TABLE IF EXISTS current_dest_tables_tmp;
END LOOP;
END
$$;
/*
* Check tables on data sources to see if columns have been added or types changed.
* Returns a record value so that a WHERE condition can be used to ignore tables that aren't desired.
* If p_data_source_id value is not given, all configured sources are checked.
*/
CREATE FUNCTION check_source_columns(p_data_source_id int DEFAULT NULL
, OUT dest_schemaname text
, OUT dest_tablename text
, OUT src_schemaname text
, OUT src_tablename text
, OUT missing_column_name text
, OUT missing_column_type text
, OUT data_source int) RETURNS SETOF record
LANGUAGE plpgsql
AS $$
DECLARE
v_dblink_schema text;
v_exists int;
v_row_dblink record;
v_row_table record;
v_row_col record;
BEGIN
IF p_data_source_id IS NOT NULL THEN
SELECT count(*) INTO v_exists FROM @extschema@.dblink_mapping_mimeo WHERE data_source_id = p_data_source_id;
IF v_exists < 1 THEN
RAISE EXCEPTION 'Given data_source_id (%) does not exist in @extschema@.dblink_mapping_mimeo config table', p_data_source_id;
END IF;
END IF;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
FOR v_row_dblink IN SELECT data_source_id FROM @extschema@.dblink_mapping_mimeo
LOOP
-- Allow a parameter to choose which data sources are checked. If parameter is NULL, check them all.
IF p_data_source_id IS NOT NULL THEN
IF p_data_source_id <> v_row_dblink.data_source_id THEN
CONTINUE;
END IF;
END IF;
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_connect(''mimeo_col_check'', @extschema@.auth('||v_row_dblink.data_source_id||'))';
FOR v_row_table IN
( SELECT source_table, dest_table, filter, type FROM @extschema@.refresh_config_snap WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table, dest_table, filter, type FROM @extschema@.refresh_config_inserter WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table, dest_table, filter, type FROM @extschema@.refresh_config_updater WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table, dest_table, filter, type FROM @extschema@.refresh_config_dml WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table, dest_table, filter, type FROM @extschema@.refresh_config_logdel WHERE dblink = v_row_dblink.data_source_id
UNION
SELECT source_table, dest_table, filter, type FROM @extschema@.refresh_config_table WHERE dblink = v_row_dblink.data_source_id )
ORDER BY 1,2
LOOP
FOR v_row_col IN
EXECUTE 'SELECT attname, atttypid, atttypmod, schemaname, tablename FROM '||v_dblink_schema||'.dblink(''mimeo_col_check'',
''SELECT a.attname::text, a.atttypid, a.atttypmod, n.nspname AS schemaname, c.relname AS tablename
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE a.attrelid = '''''||v_row_table.source_table||'''''::regclass
AND a.attnum > 0
AND attisdropped = false
ORDER BY 1,2'') t (attname text, atttypid oid, atttypmod int, schemaname text, tablename text)'
LOOP
IF v_row_col.attname <> ANY (v_row_table.filter) THEN
CONTINUE;
END IF;
SELECT count(*) INTO v_exists
FROM pg_attribute
WHERE attrelid = v_row_table.dest_table::regclass
AND attnum > 0
AND attisdropped = false
AND attname = v_row_col.attname
AND atttypid = v_row_col.atttypid
AND atttypmod = v_row_col.atttypmod;
-- if column doesn't exist, means it's missing on destination.
IF v_exists < 1 THEN
IF v_row_table.type = 'snap' THEN
SELECT schemaname, viewname INTO dest_schemaname, dest_tablename FROM pg_catalog.pg_views WHERE schemaname||'.'||viewname = v_row_table.dest_table;
ELSE
SELECT schemaname, tablename INTO dest_schemaname, dest_tablename FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = v_row_table.dest_table;
END IF;
src_schemaname := v_row_col.schemaname;
src_tablename := v_row_col.tablename;
missing_column_name := v_row_col.attname;
missing_column_type := format_type(v_row_col.atttypid, v_row_col.atttypmod)::text;
data_source := v_row_dblink.data_source_id;
RETURN NEXT;
END IF;
END LOOP; -- end v_row_col
END LOOP; -- end v_row_table
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect(''mimeo_col_check'')';
END LOOP; -- end v_row_dblink
END
$$;
/*
* Create index(es) on destination table
*/
CREATE OR REPLACE FUNCTION create_index(p_destination text, p_snap text DEFAULT NULL, p_debug boolean DEFAULT false) RETURNS void
LANGUAGE plpgsql SECURITY DEFINER
AS $$
DECLARE
v_conf text;
v_dblink int;
v_dblink_name text;
v_dblink_schema text;
v_dest_table text;
v_dest_table_name text;
v_filter text;
v_link_exists boolean;
v_old_search_path text;
v_repl_index oid;
v_remote_index_sql text;
v_row record;
v_source_table text;
v_src_table_name text;
v_statement text;
v_type text;
BEGIN
v_dblink_name := @extschema@.check_name_length('create_index_dblink_'||p_destination);
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||''',''false'')';
SELECT dest_table
, type
, dblink
, filter
INTO v_dest_table
, v_type
, v_dblink
, v_filter
FROM refresh_config
WHERE dest_table = p_destination;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: This table is not set up for replication: %', p_destination;
END IF;
EXECUTE 'SELECT source_table FROM refresh_config_'||v_type||' WHERE dest_table = '||quote_literal(v_dest_table) INTO v_source_table;
IF p_snap IS NOT NULL AND p_snap NOT IN ('snap1', 'snap2') THEN
RAISE EXCEPTION 'Invalid value for p_snap parameter given to create_index() function';
END IF;
PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink));
-- Reset search_path on remote connection to ensure schema is included in table name in index creation statement
-- set_config returns a record value, so can't just use dblink_exec
SELECT set_config INTO v_conf FROM dblink(v_dblink_name, 'SELECT set_config(''search_path'', '''', false)::text') t (set_config text);
v_dest_table_name := split_part(v_dest_table, '.', 2);
SELECT tablename INTO v_src_table_name
FROM dblink(v_dblink_name, 'SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname ||''.''|| tablename = '||quote_literal(v_source_table)) t (tablename text);
-- Gets primary key or unique index used by updater/dml/logdel replication (same function is called in their makers).
-- Should only loop once, but just easier to keep code consistent with below method
FOR v_row IN SELECT indexrelid, key_type, indkey_names, statement FROM fetch_replication_key(v_source_table, v_dblink_name)
LOOP
EXIT WHEN v_row.indexrelid IS NULL; -- function still returns a row full of nulls when nothing found
IF v_row.key_type = 'primary' THEN
v_statement := 'ALTER TABLE '||v_dest_table || COALESCE('_'||p_snap, '')||' ADD CONSTRAINT '||
COALESCE(p_snap||'_', '')|| v_dest_table_name ||'_'||array_to_string(v_row.indkey_names, '_')||'_pk
PRIMARY KEY ('||array_to_string(v_row.indkey_names, ',')||')';
ELSIF v_row.key_type = 'unique' THEN
v_statement := v_row.statement;
-- Replace source table name with destination
v_statement := replace(v_statement, ' ON '||v_source_table, ' ON '||v_dest_table || COALESCE('_'||p_snap, ''));
-- If source index name contains the table name, replace it with the destination table.
v_statement := regexp_replace(v_statement, '(INDEX \w*)'||v_src_table_name||'(\w* ON)', '\1'||v_dest_table_name||'\2');
-- If it's a snap table, prepend to ensure unique index name.
-- This is done separately from above replace because it must always be done even if the index name doesn't contain the source table
IF p_snap IS NOT NULL THEN
v_statement := replace(v_statement, 'UNIQUE INDEX ' , 'UNIQUE INDEX '||p_snap||'_');
END IF;
END IF;
PERFORM gdb(p_debug, 'statement: ' || v_statement);
EXECUTE v_statement;
v_repl_index = v_row.indexrelid;
END LOOP;
-- Get all indexes other than one obtained above.
-- Cannot set these indexes when column filters are in use because there's no easy way to check columns in expression indexes.
IF v_filter IS NULL THEN
v_remote_index_sql := 'select c.relname AS src_table, pg_get_indexdef(i.indexrelid) as statement
FROM pg_catalog.pg_index i
JOIN pg_catalog.pg_class c ON i.indrelid = c.oid
WHERE i.indrelid = '||quote_literal(v_source_table)||'::regclass
AND i.indisprimary IS false
AND i.indisvalid';
IF v_repl_index IS NOT NULL THEN
v_remote_index_sql := v_remote_index_sql ||' AND i.indexrelid <> '||v_repl_index;
END IF;
FOR v_row IN EXECUTE 'SELECT src_table, statement FROM dblink('||quote_literal(v_dblink_name)||', '||quote_literal(v_remote_index_sql)||') t (src_table text, statement text)' LOOP
v_statement := v_row.statement;
-- Replace source table name with destination
v_statement := replace(v_statement, ' ON '||v_source_table, ' ON '||v_dest_table || COALESCE('_'||p_snap, ''));
-- If source index name contains the table name, replace it with the destination table.
v_statement := regexp_replace(v_statement, '(INDEX \w*)'||v_src_table_name||'(\w* ON)', '\1'||v_dest_table_name||'\2');
-- If it's a snap table, prepend to ensure unique index name.
-- This is done separately from above replace because it must always be done even if the index name doesn't contain the source table
IF p_snap IS NOT NULL THEN
v_statement := replace(v_statement, 'E INDEX ' , 'E INDEX '||p_snap||'_');
END IF;
PERFORM gdb(p_debug, 'statement: ' || v_statement);
EXECUTE v_statement;
END LOOP;
END IF;
PERFORM dblink_disconnect(v_dblink_name);
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
EXCEPTION
WHEN QUERY_CANCELED OR OTHERS THEN
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists;
IF v_link_exists THEN
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')';
END IF;
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
RAISE EXCEPTION '%', SQLERRM;
END
$$;
|