/usr/include/qpid/broker/AsyncCompletion.h is in libqpidbroker2-dev 0.16-9ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | #ifndef _AsyncCompletion_
#define _AsyncCompletion_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <boost/intrusive_ptr.hpp>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Monitor.h"
namespace qpid {
namespace broker {
/**
* Class to implement asynchronous notification of completion.
*
* Use-case: An "initiator" needs to wait for a set of "completers" to
* finish a unit of work before an action can occur. This object
* tracks the progress of the set of completers, and allows the action
* to occur once all completers have signalled that they are done.
*
* The initiator and completers may be running in separate threads.
*
* The initiating thread is the thread that initiates the action,
* i.e. the connection read thread.
*
* A completing thread is any thread that contributes to completion,
* e.g. a store thread that does an async write.
* There may be zero or more completers.
*
* When the work is complete, a callback is invoked. The callback
* may be invoked in the Initiator thread, or one of the Completer
* threads. The callback is passed a flag indicating whether or not
* the callback is running under the context of the Initiator thread.
*
* Use model:
* 1) Initiator thread invokes begin()
* 2) After begin() has been invoked, zero or more Completers invoke
* startCompleter(). Completers may be running in the same or
* different thread as the Initiator, as long as they guarantee that
* startCompleter() is invoked at least once before the Initiator invokes end().
* 3) Completers may invoke finishCompleter() at any time, even after the
* initiator has invoked end(). finishCompleter() may be called from any
* thread.
* 4) startCompleter()/finishCompleter() calls "nest": for each call to
* startCompleter(), a corresponding call to finishCompleter() must be made.
* Once the last finishCompleter() is called, the Completer must no longer
* reference the completion object.
* 5) The Initiator invokes end() at the point where it has finished
* dispatching work to the Completers, and is prepared for the callback
* handler to be invoked. Note: if there are no outstanding Completers
* pending when the Initiator invokes end(), the callback will be invoked
* directly, and the sync parameter will be set true. This indicates to the
* Initiator that the callback is executing in the context of the end() call,
* and the Initiator is free to optimize the handling of the completion,
* assuming no need for synchronization with Completer threads.
*/
class AsyncCompletion
{
public:
/** Supplied by the Initiator to the end() method, allows for a callback
* when all outstanding completers are done. If the callback cannot be
* made during the end() call, the clone() method must supply a copy of
* this callback object that persists after end() returns. The cloned
* callback object will be used by the last completer thread, and
* released when the callback returns.
*/
class Callback : public RefCounted
{
public:
virtual void completed(bool) = 0;
virtual boost::intrusive_ptr<Callback> clone() = 0;
};
private:
mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
mutable qpid::sys::Monitor callbackLock;
bool inCallback, active;
void invokeCallback(bool sync) {
qpid::sys::Mutex::ScopedLock l(callbackLock);
if (active) {
if (callback.get()) {
inCallback = true;
{
qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
callback->completed(sync);
}
inCallback = false;
callback = boost::intrusive_ptr<Callback>();
callbackLock.notifyAll();
}
active = false;
}
}
protected:
/** Invoked when all completers have signalled that they have completed
* (via calls to finishCompleter()). bool == true if called via end()
*/
boost::intrusive_ptr<Callback> callback;
public:
AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {};
virtual ~AsyncCompletion() { cancel(); }
/** True when all outstanding operations have compeleted
*/
bool isDone()
{
return !active;
}
/** Called to signal the start of an asynchronous operation. The operation
* is considered pending until finishCompleter() is called.
* E.g. called when initiating an async store operation.
*/
void startCompleter() { ++completionsNeeded; }
/** Called by completer to signal that it has finished the operation started
* when startCompleter() was invoked.
* e.g. called when async write complete.
*/
void finishCompleter()
{
if (--completionsNeeded == 0) {
invokeCallback(false);
}
}
/** called by initiator before any calls to startCompleter can be done.
*/
void begin()
{
++completionsNeeded;
}
/** called by initiator after all potential completers have called
* startCompleter().
*/
void end(Callback& cb)
{
assert(completionsNeeded.get() > 0); // ensure begin() has been called!
// the following only "decrements" the count if it is 1. This means
// there are no more outstanding completers and we are done.
if (completionsNeeded.boolCompareAndSwap(1, 0)) {
// done! Complete immediately
cb.completed(true);
return;
}
// the compare-and-swap did not succeed. This means there are
// outstanding completers pending (count > 1). Get a persistent
// Callback object to use when the last completer is done.
// Decrement after setting up the callback ensures that pending
// completers cannot touch the callback until it is ready.
callback = cb.clone();
if (--completionsNeeded == 0) {
// note that a completer may have completed during the
// callback setup or decrement:
invokeCallback(true);
}
}
/** may be called by Initiator to cancel the callback. Will wait for
* callback to complete if in progress.
*/
virtual void cancel() {
qpid::sys::Mutex::ScopedLock l(callbackLock);
while (inCallback) callbackLock.wait();
callback = boost::intrusive_ptr<Callback>();
active = false;
}
};
}} // qpid::broker::
#endif /*!_AsyncCompletion_*/
|