/usr/share/skytools/pgq_ext.sql is in skytools 2.1.13-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | set client_min_messages = 'warning';
set default_with_oids = 'off';
create schema pgq_ext;
grant usage on schema pgq_ext to public;
--
-- batch tracking
--
create table pgq_ext.completed_batch (
consumer_id text not null,
last_batch_id bigint not null,
primary key (consumer_id)
);
--
-- event tracking
--
create table pgq_ext.completed_event (
consumer_id text not null,
batch_id bigint not null,
event_id bigint not null,
primary key (consumer_id, batch_id, event_id)
);
create table pgq_ext.partial_batch (
consumer_id text not null,
cur_batch_id bigint not null,
primary key (consumer_id)
);
--
-- tick tracking for SerialConsumer()
-- no access functions provided here
--
create table pgq_ext.completed_tick (
consumer_id text not null,
last_tick_id bigint not null,
primary key (consumer_id)
);
create or replace function pgq_ext.is_batch_done(
a_consumer text, a_batch_id bigint)
returns boolean as $$
declare
res boolean;
begin
select last_batch_id = a_batch_id
into res from pgq_ext.completed_batch
where consumer_id = a_consumer;
if not found then
return false;
end if;
return res;
end;
$$ language plpgsql security definer;
create or replace function pgq_ext.set_batch_done(
a_consumer text, a_batch_id bigint)
returns boolean as $$
begin
if pgq_ext.is_batch_done(a_consumer, a_batch_id) then
return false;
end if;
if a_batch_id > 0 then
update pgq_ext.completed_batch
set last_batch_id = a_batch_id
where consumer_id = a_consumer;
if not found then
insert into pgq_ext.completed_batch (consumer_id, last_batch_id)
values (a_consumer, a_batch_id);
end if;
end if;
return true;
end;
$$ language plpgsql security definer;
create or replace function pgq_ext.is_event_done(
a_consumer text,
a_batch_id bigint, a_event_id bigint)
returns boolean as $$
declare
res bigint;
begin
perform 1 from pgq_ext.completed_event
where consumer_id = a_consumer
and batch_id = a_batch_id
and event_id = a_event_id;
return found;
end;
$$ language plpgsql security definer;
create or replace function pgq_ext.set_event_done(
a_consumer text, a_batch_id bigint, a_event_id bigint)
returns boolean as $$
declare
old_batch bigint;
begin
-- check if done
perform 1 from pgq_ext.completed_event
where consumer_id = a_consumer
and batch_id = a_batch_id
and event_id = a_event_id;
if found then
return false;
end if;
-- if batch changed, do cleanup
select cur_batch_id into old_batch
from pgq_ext.partial_batch
where consumer_id = a_consumer;
if not found then
-- first time here
insert into pgq_ext.partial_batch
(consumer_id, cur_batch_id)
values (a_consumer, a_batch_id);
elsif old_batch <> a_batch_id then
-- batch changed, that means old is finished on queue db
-- thus the tagged events are not needed anymore
delete from pgq_ext.completed_event
where consumer_id = a_consumer
and batch_id = old_batch;
-- remember current one
update pgq_ext.partial_batch
set cur_batch_id = a_batch_id
where consumer_id = a_consumer;
end if;
-- tag as done
insert into pgq_ext.completed_event (consumer_id, batch_id, event_id)
values (a_consumer, a_batch_id, a_event_id);
return true;
end;
$$ language plpgsql security definer;
create or replace function pgq_ext.get_last_tick(a_consumer text)
returns int8 as $$
declare
res int8;
begin
select last_tick_id into res
from pgq_ext.completed_tick
where consumer_id = a_consumer;
return res;
end;
$$ language plpgsql security definer;
create or replace function pgq_ext.set_last_tick(a_consumer text, a_tick_id bigint)
returns integer as $$
begin
if a_tick_id is null then
delete from pgq_ext.completed_tick
where consumer_id = a_consumer;
else
update pgq_ext.completed_tick
set last_tick_id = a_tick_id
where consumer_id = a_consumer;
if not found then
insert into pgq_ext.completed_tick (consumer_id, last_tick_id)
values (a_consumer, a_tick_id);
end if;
end if;
return 1;
end;
$$ language plpgsql security definer;
create or replace function pgq_ext.version()
returns text as $$
begin
return '2.1.6';
end;
$$ language plpgsql;
|