/usr/share/postgresql/9.6/extension/mimeo--0.5.1--0.5.2.sql is in postgresql-9.6-mimeo 1.4.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | -- Fixed all temp tables not getting removed in refresh_dml(). Caused errors if there were no new rows for consecutive runs in the same session.
CREATE OR REPLACE FUNCTION refresh_dml(p_destination text, p_limit int default NULL, p_repull boolean DEFAULT false, p_debug boolean DEFAULT false) RETURNS void
LANGUAGE plpgsql SECURITY DEFINER
AS $$
DECLARE
v_adv_lock boolean;
v_cols_n_types text;
v_cols text;
v_control text;
v_create_f_sql text;
v_create_q_sql text;
v_dblink_schema text;
v_dblink text;
v_delete_sql text;
v_dest_table text;
v_exec_status text;
v_field text;
v_filter text[];
v_insert_sql text;
v_job_id int;
v_jobmon_schema text;
v_job_name text;
v_last_value_sql text;
v_limit int;
v_old_search_path text;
v_pk_counter int;
v_pk_field_csv text := '';
v_pk_field_type_csv text := '';
v_pk_field text[];
v_pk_type text[];
v_pk_where text;
v_remote_f_sql text;
v_remote_q_sql text;
v_rowcount bigint;
v_source_table text;
v_step_id int;
v_tmp_table text;
v_trigger_delete text;
v_trigger_update text;
v_with_update text;
BEGIN
IF p_debug IS DISTINCT FROM true THEN
PERFORM set_config( 'client_min_messages', 'warning', true );
END IF;
v_job_name := 'Refresh DML: '||p_destination;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
-- Set custom search path to allow easier calls to other functions, especially job logging
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')';
SELECT source_table
, dest_table
, 'tmp_'||replace(dest_table,'.','_')
, dblink
, control
, pk_field
, pk_type
, filter
, batch_limit
FROM refresh_config_dml
WHERE dest_table = p_destination INTO v_source_table, v_dest_table, v_tmp_table,
v_dblink, v_control, v_pk_field, v_pk_type, v_filter, v_limit;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: no mapping found for %',v_job_name;
END IF;
v_job_id := add_job(v_job_name);
PERFORM gdb(p_debug,'Job ID: '||v_job_id::text);
-- Take advisory lock to prevent multiple calls to function overlapping
v_adv_lock := pg_try_advisory_lock(hashtext('refresh_dml'), hashtext(v_job_name));
IF v_adv_lock = 'false' THEN
v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name);
PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name);
PERFORM update_step(v_step_id, 'OK','Found concurrent job. Exiting gracefully');
PERFORM close_job(v_job_id);
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
RETURN;
END IF;
v_step_id := add_step(v_job_id,'Building SQL');
IF v_pk_field IS NULL OR v_pk_type IS NULL THEN
RAISE EXCEPTION 'ERROR: primary key fields in refresh_config_dml must be defined';
END IF;
-- determine column list, column type list
IF v_filter IS NULL THEN
SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||atttypid::regtype::text),',') FROM
pg_attribute WHERE attnum > 0 AND attisdropped is false AND attrelid = p_destination::regclass INTO v_cols, v_cols_n_types;
ELSE
-- ensure all primary key columns are included in any column filters
FOREACH v_field IN ARRAY v_pk_field LOOP
IF v_field = ANY(v_filter) THEN
CONTINUE;
ELSE
RAISE EXCEPTION 'ERROR: filter list did not contain all columns that compose primary key for %',v_job_name;
END IF;
END LOOP;
SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||atttypid::regtype::text),',') FROM
(SELECT unnest(filter) FROM refresh_config_dml WHERE dest_table = p_destination) x
JOIN pg_attribute ON (unnest=attname::text AND attrelid=p_destination::regclass) INTO v_cols, v_cols_n_types;
END IF;
-- init sql statements
v_limit = COALESCE(p_limit, v_limit, 10000);
v_pk_field_csv := array_to_string(v_pk_field, ',');
v_pk_counter := 1;
WHILE v_pk_counter <= array_length(v_pk_field,1) LOOP
IF v_pk_counter > 1 THEN
v_pk_field_type_csv := v_pk_field_type_csv || ', ';
END IF;
v_pk_field_type_csv := v_pk_field_type_csv ||v_pk_field[v_pk_counter]||' '||v_pk_type[v_pk_counter];
v_pk_counter := v_pk_counter + 1;
END LOOP;
v_with_update := 'WITH a AS (SELECT '||v_pk_field_csv||' FROM '|| v_control ||' ORDER BY 1 LIMIT '|| v_limit ||') UPDATE '||v_control||' b SET processed = true FROM a WHERE a.'||v_pk_field[1]||' = b.'||v_pk_field[1];
v_pk_counter := 2;
IF array_length(v_pk_field, 1) > 1 THEN
v_pk_where := '';
WHILE v_pk_counter <= array_length(v_pk_field,1) LOOP
v_pk_where := v_pk_where || ' AND a.'||v_pk_field[v_pk_counter]||' = b.'||v_pk_field[v_pk_counter];
v_pk_counter := v_pk_counter + 1;
END LOOP;
END IF;
IF v_pk_where IS NOT NULL THEN
v_with_update := v_with_update || v_pk_where;
END IF;
PERFORM gdb(p_debug, v_with_update);
v_trigger_update := 'SELECT dblink_exec(auth('||v_dblink||'),'|| quote_literal(v_with_update)||')';
v_remote_q_sql := 'SELECT DISTINCT '||v_pk_field_csv||' FROM '||v_control||' WHERE processed = true';
IF p_repull THEN
v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table;
-- Actual truncate is done after pull to temp table to minimize lock on dest_table
PERFORM update_step(v_step_id, 'OK','Request to repull ALL data from source. This could take a while...');
PERFORM gdb(p_debug, 'Request to repull ALL data from source. This could take a while...');
ELSE
v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table||' JOIN ('||v_remote_q_sql||') x USING ('||v_pk_field_csv||')';
v_create_q_sql := 'CREATE TEMP TABLE '||v_tmp_table||'_queue AS SELECT '||v_pk_field_csv||'
FROM dblink(auth('||v_dblink||'),'||quote_literal(v_remote_q_sql)||') t ('||v_pk_field_type_csv||')';
v_delete_sql := 'DELETE FROM '||v_dest_table||' a USING '||v_tmp_table||'_queue b WHERE a.'||v_pk_field[1]||'= b.'||v_pk_field[1];
IF array_length(v_pk_field, 1) > 1 THEN
v_delete_sql := v_delete_sql || v_pk_where;
END IF;
PERFORM update_step(v_step_id, 'OK','Done');
END IF;
v_create_f_sql := 'CREATE TEMP TABLE '||v_tmp_table||'_full AS SELECT '||v_cols||'
FROM dblink(auth('||v_dblink||'),'||quote_literal(v_remote_f_sql)||') t ('||v_cols_n_types||')';
v_insert_sql := 'INSERT INTO '||v_dest_table||'('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table||'_full';
v_trigger_delete := 'SELECT dblink_exec(auth('||v_dblink||'),'||quote_literal('DELETE FROM '||v_control||' WHERE processed = true')||')';
-- update remote entries
v_step_id := add_step(v_job_id,'Updating remote trigger table');
PERFORM gdb(p_debug,v_trigger_update);
EXECUTE v_trigger_update INTO v_exec_status;
PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status);
-- create temp tables
v_step_id := add_step(v_job_id,'Creating temp tables');
-- Full table with all insert/update data
PERFORM gdb(p_debug,v_create_f_sql);
EXECUTE v_create_f_sql;
-- Queue table with all rows to process (inserts, updates & deletes)
PERFORM gdb(p_debug,v_create_q_sql);
EXECUTE v_create_q_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
PERFORM gdb(p_debug,'Temp queue table row count '||v_rowcount::text);
IF v_rowcount < 1 THEN
PERFORM update_step(v_step_id, 'OK','No new rows found');
EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_queue';
EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_full';
PERFORM close_job(v_job_id);
-- Ensure old search path is reset for the current session
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name));
RETURN;
END IF;
PERFORM update_step(v_step_id, 'OK','Number of rows to process: '||v_rowcount);
-- remove records from local table
IF p_repull THEN
v_step_id := add_step(v_job_id,'Truncating local table');
PERFORM gdb(p_debug,'Truncating local table');
EXECUTE 'TRUNCATE '||v_dest_table;
PERFORM update_step(v_step_id, 'OK','Done');
ELSE
v_step_id := add_step(v_job_id,'Deleting records from local table');
PERFORM gdb(p_debug,v_delete_sql);
EXECUTE v_delete_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
PERFORM gdb(p_debug,'Rows removed from local table before applying changes: '||v_rowcount::text);
PERFORM update_step(v_step_id, 'OK','Removed '||v_rowcount||' records');
END IF;
-- insert records to local table
v_step_id := add_step(v_job_id,'Inserting new records into local table');
PERFORM gdb(p_debug,v_insert_sql);
EXECUTE v_insert_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
PERFORM gdb(p_debug,'Rows inserted: '||v_rowcount::text);
PERFORM update_step(v_step_id, 'OK','Inserted '||v_rowcount||' records');
-- clean out rows from txn table
v_step_id := add_step(v_job_id,'Cleaning out rows from txn table');
PERFORM gdb(p_debug,v_trigger_delete);
EXECUTE v_trigger_delete INTO v_exec_status;
PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status);
-- update activity status
v_step_id := add_step(v_job_id,'Updating last_value in config table');
v_last_value_sql := 'UPDATE refresh_config_dml SET last_value = '|| quote_literal(current_timestamp::timestamp) ||' WHERE dest_table = ' ||quote_literal(p_destination);
PERFORM gdb(p_debug,v_last_value_sql);
EXECUTE v_last_value_sql;
PERFORM update_step(v_step_id, 'OK','Last Value was '||current_timestamp);
PERFORM close_job(v_job_id);
EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_queue';
EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_full';
-- Ensure old search path is reset for the current session
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name));
EXCEPTION
WHEN QUERY_CANCELED THEN
PERFORM pg_advisory_unlock(hashtext('refresh_inserter'), hashtext(v_job_name));
RAISE EXCEPTION '%', SQLERRM;
WHEN OTHERS THEN
-- Exception block resets path, so have to reset it again
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')';
IF v_step_id IS NULL THEN
v_step_id := add_step(v_job_id, 'EXCEPTION before first step logged');
END IF;
PERFORM update_step(v_step_id, 'BAD', 'ERROR: '||coalesce(SQLERRM,'unknown'));
PERFORM fail_job(v_job_id);
-- Ensure old search path is reset for the current session
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name));
RAISE EXCEPTION '%', SQLERRM;
END
$$;
|