This file is indexed.

/usr/share/pyshared/juju/hooks/scheduler.py is in juju-0.7 0.7-0ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
import logging
import os


from twisted.internet.defer import (
    DeferredQueue, inlineCallbacks, succeed, Deferred,
    QueueUnderflow, QueueOverflow)
from juju.lib import serializer
from juju.state.hook import RelationHookContext, RelationChange


ADDED = "joined"
REMOVED = "departed"
MODIFIED = "modified"


log = logging.getLogger("hook.scheduler")


def check_writeable(path):
    try:
        with open(path, "a"):
            pass
    except IOError:
        raise AssertionError("%s is not writable!" % path)


class HookQueue(DeferredQueue):
    # Single consumer, multi producer LIFO, with Nones treated
    # as FIFO

    def __init__(self, modify_callback):
        self._modify_callback = modify_callback
        super(HookQueue, self).__init__(backlog=1)

    def put(self, change, offset=1):
        """
        LIFO except for a None value which is FIFO

        Add an object to this queue.

        @raise QueueOverflow: Too many objects are in this queue.
        """
        if self.waiting:
            if change is None:
                return self.waiting.pop(0).callback(change)
            # Because there's a waiter we know the offset is 0
            self._queue_change(change, offset=0)
            if self.pending:
                self.waiting.pop(0).callback(self.pending[0])
        elif self.size is None or len(self.pending) < self.size:
            # If the queue is currently processing, no need
            # to store the stop change, it will catch the stop
            # during the loop iter.
            if change is None:
                return
            self._queue_change(change, offset)
        else:
            raise QueueOverflow()

    def next_change(self):
        """Get the next change from the queue"""
        if self.pending:
            return succeed(self.pending[0])
        elif self.backlog is None or len(self.waiting) < self.backlog:
            d = Deferred(canceller=self._cancelGet)
            self.waiting.append(d)
            return d
        else:
            raise QueueUnderflow()

    def finished_change(self):
        """The last change fetched has been processed."""
        if self.pending:
            value = self.pending.pop(0)
            self.cb_modified()
            return value

    def cb_modified(self):
        """Callback invoked by queue when the state has been modified"""
        self._modify_callback()

    def _previous(self, unit_name, pending):
        """Find the most recent previous operation for a unit.

        :param pending: sequence of pending operations to consider.
        """
        for p in reversed(pending):
            if p['unit_name'] == unit_name:
                return pending.index(p), p
        return None, None

    def _wipe_member(self, unit_name, pending):
        """Remove a given unit from membership in pending."""
        for p in pending:
            if unit_name in p['members']:
                p['members'].remove(unit_name)

    def _queue_change(self, change, offset=1):
        """Queue up the node change for execution.

        The process of queuing the change will automatically
        merge with previously queued changes.

        :param change: The change to queue up.
        :param offset: Starting position of any queue merges.
               If the queue is currently being processed, we
               don't attempt to merge with the head of the queue
               as it's currently being operated on.
        """
        # Find the previous change if any.
        previous_idx, previous = self._previous(
            change['unit_name'], self.pending[offset:])

        # No previous change, just add
        if previous_idx is None:
            self.pending.append(change)
            return self.cb_modified()

        # Reduce
        change_idx, change_type = self._reduce(
            (previous_idx, previous['change_type']),
            (-1, change['change_type']))

        # Previous change, done.
        if previous_idx == change_idx:
            return

        # New change, remove previous
        elif change_type is not None:
            self.pending.pop(previous_idx)
            change['change_type'] = change_type
            self.pending.append(change)

        # Changes cancelled, remove previous, wipe membership
        elif change_type is None or change_idx != previous_idx:
            assert change['change_type'] == REMOVED
            self._wipe_member(
                change['unit_name'], self.pending[offset:])
            self.pending.pop(previous_idx)

        # Notify changed
        self.cb_modified()

    def _reduce(self, previous, new):
        """Given two change operations for a node, reduce to one operation.

        We depend on zookeeper's total ordering behavior as we don't
        attempt to handle nonsensical operation sequences like
        removed followed by a modified, or modified followed by an
        add.
        """
        previous_clock, previous_change = previous
        new_clock, new_change = new

        if previous_change == REMOVED and new_change == ADDED:
            return (new_clock, MODIFIED)

        elif previous_change == ADDED and new_change == MODIFIED:
            return (previous_clock, previous_change)

        elif previous_change == ADDED and new_change == REMOVED:
            return (None, None)

        elif previous_change == MODIFIED and new_change == REMOVED:
            return (new_clock, new_change)

        elif previous_change == MODIFIED and new_change == MODIFIED:
            return (previous_clock, previous_change)

        elif previous_change == REMOVED and new_change == MODIFIED:
            return (previous_clock, previous_change)


class HookScheduler(object):
    def __init__(self, client, executor, unit_relation, relation_ident,
                 unit_name, state_path):
        self._running = False
        self._state_path = state_path

        # The thing that will actually run the hook for us
        self._executor = executor
        # For hook context construction.
        self._client = client
        self._unit_relation = unit_relation
        self._relation_ident = relation_ident
        self._relation_name = relation_ident.split(":")[0]
        self._unit_name = unit_name
        self._current_clock = None

        if os.path.exists(self._state_path):
            self._load_state()
        else:
            self._create_state()

    def _create_state(self):
        # Current units (as far as the next hook should know)
        self._context_members = None
        # Current units and settings versions (as far as the scheduler knows)
        self._member_versions = {}
        # Run queue (clock)
        self._run_queue = HookQueue(self._save_state)

    def _load_state(self):
        with open(self._state_path) as f:
            state = serializer.load(f.read())
            if not state:
                return self._create_state()
        self._context_members = set(state["context_members"])
        self._member_versions = state["member_versions"]
        self._run_queue = HookQueue(self._save_state)
        self._run_queue.pending = state["change_queue"]

    def _save_state(self):
        state = serializer.dump({
            "context_members": sorted(self._context_members),
            "member_versions": self._member_versions,
            "change_queue": self._run_queue.pending})

        temp_path = self._state_path + "~"
        with open(temp_path, "w") as f:
            f.write(state)
        os.rename(temp_path, self._state_path)

    def _execute(self, change):
        """Execute a hook script for a change.
        """
        # Assemble the change and hook execution context
        rel_change = RelationChange(
            self._relation_ident, change['change_type'], change['unit_name'])
        context = RelationHookContext(
            self._client, self._unit_relation, self._relation_ident,
            change['members'], unit_name=self._unit_name)
        # Execute the change.
        return self._executor(context, rel_change)

    def _get_change(self, unit_name, change_type, members):
        """
        Return a hook context, corresponding to the current state of the
        system.
        """
        return dict(unit_name=unit_name,
                    change_type=change_type,
                    members=sorted(members))

    @property
    def running(self):
        return self._running is True

    @inlineCallbacks
    def run(self):
        assert not self._running, "Scheduler is already running"
        check_writeable(self._state_path)

        self._running = True
        log.debug("start")

        while self._running:
            change = yield self._run_queue.next_change()

            if change is None:
                if not self._running:
                    break
                continue

            log.debug(
                "executing hook for %s:%s",
                change['unit_name'], change['change_type'])

            # Execute the hook
            success = yield self._execute(change)

            # Queue up modified immediately after change.
            if change['change_type'] == ADDED:
                self._run_queue.put(
                    self._get_change(change['unit_name'],
                                     MODIFIED,
                                     self._context_members))

            if success:
                self._run_queue.finished_change()
            else:
                log.debug("hook error, stopping scheduler execution")
                self._running = False
                break
        log.info("stopped")

    def stop(self):
        """Stop the hook execution.

        Note this does not stop the scheduling, the relation watcher
        that feeds changes to the scheduler needs to be stopped to
        achieve that effect.
        """
        log.debug("stopping")
        if not self._running:
            return
        self._running = False
        # Put a marker value onto the queue to designate, stop now.
        # This is in case we're waiting on the queue, when the stop
        # occurs. The queue treats this None specially as a transient
        # value for extant waiters wakeup.
        self._run_queue.put(None)

    def pop(self):
        """Pop the next event on the queue.

        The goal is that on a relation hook error we'll come back up
        and we have the option of retrying the failed hook OR to proceed
        to the next event. To proceed to the next event we pop the failed
        event off the queue.
        """
        assert not self._running, "Scheduler must be stopped for pop()"
        return self._run_queue.finished_change()

    def cb_change_members(self, old_units, new_units):
        """Watch callback invoked when the relation membership changes.
        """
        log.debug("members changed: old=%s, new=%s", old_units, new_units)

        if self._context_members is None:
            self._context_members = set(old_units)

        if set(self._member_versions) != set(old_units):
            # Can happen when we miss seeing some changes ie. disconnected.
            log.debug(
                "old does not match last recorded units: %s",
                sorted(self._member_versions))

        added = set(new_units) - set(self._member_versions)
        removed = set(self._member_versions) - set(new_units)

        self._member_versions.update(dict((unit, 0) for unit in added))
        for unit in removed:
            del self._member_versions[unit]

        for unit_name in sorted(added):
            self._context_members.add(unit_name)
            self._run_queue.put(
                self._get_change(unit_name, ADDED, self._context_members),
                int(self._running))

        for unit_name in sorted(removed):
            self._context_members.remove(unit_name)
            self._run_queue.put(
                self._get_change(unit_name, REMOVED, self._context_members),
                int(self._running))
        self._save_state()

    def cb_change_settings(self, unit_versions):
        """Watch callback invoked when related units change data.
        """
        log.debug("settings changed: %s", unit_versions)
        for (unit_name, version) in unit_versions:
            if version > self._member_versions.get(unit_name, 0):
                self._member_versions[unit_name] = version
                self._run_queue.put(
                    self._get_change(
                        unit_name, MODIFIED, self._context_members),
                    int(self._running))
        self._save_state()