/usr/share/postgresql/9.5/extension/mimeo--0.2.2--0.2.3.sql is in postgresql-9.5-mimeo 1.4.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | -- Added ORDER BY to remote select query to fix missing data on destination when the limit is actually used
CREATE OR REPLACE FUNCTION refresh_inserter(p_destination text, p_debug boolean, integer DEFAULT 100000) RETURNS void
LANGUAGE plpgsql SECURITY DEFINER
AS $_$
declare
v_job_name text;
v_job_id int;
v_step_id int;
v_rowcount bigint;
v_dblink_schema text;
v_jobmon_schema text;
v_old_search_path text;
v_adv_lock boolean;
v_source_table text;
v_dest_table text;
v_tmp_table text;
v_dblink text;
v_control text;
v_last_value timestamptz;
v_boundary timestamptz;
v_filter text[];
v_cols text;
v_cols_n_types text;
v_last_value_sql text;
v_remote_sql text;
v_insert_sql text;
v_create_sql text;
v_delete_sql text;
BEGIN
IF p_debug IS DISTINCT FROM true THEN
PERFORM set_config( 'client_min_messages', 'warning', true );
END IF;
v_job_name := 'Refresh Inserter: '||p_destination;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
-- Set custom search path to allow easier calls to other functions, especially job logging
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')';
v_job_id := add_job(v_job_name);
PERFORM gdb(p_debug,'Job ID: '||v_job_id::text);
-- Take advisory lock to prevent multiple calls to function overlapping
v_adv_lock := pg_try_advisory_lock(hashtext('refresh_inserter'), hashtext(v_job_name));
IF v_adv_lock = 'false' THEN
v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name);
PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name);
PERFORM update_step(v_step_id, 'OK','Found concurrent job. Exiting gracefully');
PERFORM close_job(v_job_id);
RETURN;
END IF;
v_step_id := add_step(v_job_id,'Grabbing Boundries, Building SQL');
SELECT source_table, dest_table, 'tmp_'||replace(dest_table,'.','_'), dblink, control, last_value, now() - boundary::interval, filter FROM refresh_config WHERE dest_table = p_destination INTO v_source_table, v_dest_table, v_tmp_table, v_dblink, v_control, v_last_value, v_boundary, v_filter;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: no mapping found for %',v_job_name;
END IF;
IF v_filter IS NULL THEN
SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||atttypid::regtype::text),',') FROM
pg_attribute WHERE attnum > 0 AND attisdropped is false AND attrelid = p_destination::regclass INTO v_cols, v_cols_n_types;
ELSE
SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||atttypid::regtype::text),',') FROM
(SELECT unnest(filter) FROM @extschema@.refresh_config WHERE dest_table = p_destination) x
JOIN pg_attribute ON (unnest=attname::text AND attrelid=p_destination::regclass) INTO v_cols, v_cols_n_types;
END IF;
-- init sql statements
-- does < for upper boundary to keep missing data from happening on rare edge case where a newly inserted row outside the transaction batch
-- has the exact same timestamp as the previous batch's max timestamp
-- Note that this means the destination table may always be at least one row behind even when no new data is entered on the source.
v_remote_sql := 'SELECT '||v_cols||' FROM '||v_source_table||' WHERE '||v_control||' > '||quote_literal(v_last_value)||' AND '||v_control||' < '||quote_literal(v_boundary)||' ORDER BY '||v_control||' ASC LIMIT '|| $3;
v_create_sql := 'CREATE TEMP TABLE '||v_tmp_table||' AS SELECT '||v_cols||' FROM dblink(auth('||v_dblink||'),'||quote_literal(v_remote_sql)||') t ('||v_cols_n_types||')';
v_insert_sql := 'INSERT INTO '||v_dest_table||'('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table;
v_last_value_sql := 'SELECT max('||v_control||') FROM '||v_tmp_table;
PERFORM update_step(v_step_id, 'OK','Grabbing rows from '||v_last_value::text||' to '||v_boundary::text);
-- create temp from remote
v_step_id := add_step(v_job_id,'Creating temp table ('||v_tmp_table||') from remote table');
PERFORM gdb(p_debug,v_create_sql);
EXECUTE v_create_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
PERFORM update_step(v_step_id, 'OK','Table contains '||v_rowcount||' records');
PERFORM gdb(p_debug, v_rowcount || ' rows added to temp table');
v_step_id := add_step(v_job_id, 'Getting max control field value');
PERFORM gdb(p_debug, v_last_value_sql);
EXECUTE v_last_value_sql INTO v_last_value;
PERFORM update_step(v_step_id, 'OK','Max value is: '||v_last_value);
PERFORM gdb(p_debug, 'Max value is: '||v_last_value);
-- insert
v_step_id := add_step(v_job_id,'Inserting new records into local table');
PERFORM gdb(p_debug,v_insert_sql);
EXECUTE v_insert_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
PERFORM update_step(v_step_id, 'OK','Inserted '||v_rowcount||' records');
PERFORM gdb(p_debug, v_rowcount || ' rows added to ' || v_dest_table);
-- update boundries
v_step_id := add_step(v_job_id,'Updating boundary values');
UPDATE refresh_config set last_value = v_last_value WHERE dest_table = p_destination;
PERFORM update_step(v_step_id, 'OK','Done');
PERFORM close_job(v_job_id);
EXECUTE 'DROP TABLE IF EXISTS ' || v_tmp_table;
-- Ensure old search path is reset for the current session
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_inserter'), hashtext(v_job_name));
EXCEPTION
WHEN OTHERS THEN
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')';
PERFORM update_step(v_step_id, 'BAD', 'ERROR: '||coalesce(SQLERRM,'unknown'));
PERFORM fail_job(v_job_id);
-- Ensure old search path is reset for the current session
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_inserter'), hashtext(v_job_name));
RAISE EXCEPTION '%', SQLERRM;
END
$_$;
|