This file is indexed.

/usr/share/skytools/pgq_ext.sql is in skytools 2.1.13-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
set client_min_messages = 'warning';
set default_with_oids = 'off';

create schema pgq_ext;
grant usage on schema pgq_ext to public;


--
-- batch tracking
--
create table pgq_ext.completed_batch (
    consumer_id   text not null,
    last_batch_id bigint not null,

    primary key (consumer_id)
);


--
-- event tracking
--
create table pgq_ext.completed_event (
    consumer_id   text not null,
    batch_id      bigint not null,
    event_id      bigint not null,

    primary key (consumer_id, batch_id, event_id)
);

create table pgq_ext.partial_batch (
    consumer_id   text not null,
    cur_batch_id  bigint not null,

    primary key (consumer_id)
);

--
-- tick tracking for SerialConsumer()
-- no access functions provided here
--
create table pgq_ext.completed_tick (
    consumer_id   text not null,
    last_tick_id  bigint not null,

    primary key (consumer_id)
);


create or replace function pgq_ext.is_batch_done(
    a_consumer text, a_batch_id bigint)
returns boolean as $$
declare
    res   boolean;
begin
    select last_batch_id = a_batch_id
      into res from pgq_ext.completed_batch
     where consumer_id = a_consumer;
    if not found then
        return false;
    end if;
    return res;
end;
$$ language plpgsql security definer;

create or replace function pgq_ext.set_batch_done(
    a_consumer text, a_batch_id bigint)
returns boolean as $$
begin
    if pgq_ext.is_batch_done(a_consumer, a_batch_id) then
        return false;
    end if;

    if a_batch_id > 0 then
        update pgq_ext.completed_batch
           set last_batch_id = a_batch_id
         where consumer_id = a_consumer;
        if not found then
            insert into pgq_ext.completed_batch (consumer_id, last_batch_id)
                values (a_consumer, a_batch_id);
        end if;
    end if;

    return true;
end;
$$ language plpgsql security definer;


create or replace function pgq_ext.is_event_done(
    a_consumer text,
    a_batch_id bigint, a_event_id bigint)
returns boolean as $$
declare
    res   bigint;
begin
    perform 1 from pgq_ext.completed_event
     where consumer_id = a_consumer
       and batch_id = a_batch_id
       and event_id = a_event_id;
    return found;
end;
$$ language plpgsql security definer;

create or replace function pgq_ext.set_event_done(
    a_consumer text, a_batch_id bigint, a_event_id bigint)
returns boolean as $$
declare
    old_batch bigint;
begin
    -- check if done
    perform 1 from pgq_ext.completed_event
     where consumer_id = a_consumer
       and batch_id = a_batch_id
       and event_id = a_event_id;
    if found then
        return false;
    end if;

    -- if batch changed, do cleanup
    select cur_batch_id into old_batch
        from pgq_ext.partial_batch
        where consumer_id = a_consumer;
    if not found then
        -- first time here
        insert into pgq_ext.partial_batch
            (consumer_id, cur_batch_id)
            values (a_consumer, a_batch_id);
    elsif old_batch <> a_batch_id then
        -- batch changed, that means old is finished on queue db
        -- thus the tagged events are not needed anymore
        delete from pgq_ext.completed_event
            where consumer_id = a_consumer
              and batch_id = old_batch;
        -- remember current one
        update pgq_ext.partial_batch
            set cur_batch_id = a_batch_id
            where consumer_id = a_consumer;
    end if;

    -- tag as done
    insert into pgq_ext.completed_event (consumer_id, batch_id, event_id)
      values (a_consumer, a_batch_id, a_event_id);

    return true;
end;
$$ language plpgsql security definer;


create or replace function pgq_ext.get_last_tick(a_consumer text)
returns int8 as $$
declare
    res   int8;
begin
    select last_tick_id into res
      from pgq_ext.completed_tick
     where consumer_id = a_consumer;
    return res;
end;
$$ language plpgsql security definer;

create or replace function pgq_ext.set_last_tick(a_consumer text, a_tick_id bigint)
returns integer as $$
begin
    if a_tick_id is null then
        delete from pgq_ext.completed_tick
         where consumer_id = a_consumer;
    else   
        update pgq_ext.completed_tick
           set last_tick_id = a_tick_id
         where consumer_id = a_consumer;
        if not found then
            insert into pgq_ext.completed_tick (consumer_id, last_tick_id)
                values (a_consumer, a_tick_id);
        end if;
    end if;

    return 1;
end;
$$ language plpgsql security definer;


create or replace function pgq_ext.version()
returns text as $$
begin
    return '2.1.6';
end;
$$ language plpgsql;