/usr/include/qpid/sys/BlockingQueue.h is in libqpidcommon2-dev 0.16-7ubuntu5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | #ifndef QPID_SYS_BLOCKINGQUEUE_H
#define QPID_SYS_BLOCKINGQUEUE_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/sys/Waitable.h"
#include <queue>
namespace qpid {
namespace sys {
/**
* A simple blocking queue template
*/
template <class T>
class BlockingQueue
{
mutable sys::Waitable waitable;
std::queue<T> queue;
public:
BlockingQueue() {}
~BlockingQueue() { close(); }
/** Pop from the queue, block up to timeout if empty.
*@param result Set to value popped from queue.
*@param timeout Defaults to infinite.
*@return true if result was set, false if queue empty after timeout.
*/
bool pop(T& result, Duration timeout=TIME_INFINITE) {
Mutex::ScopedLock l(waitable);
{
Waitable::ScopedWait w(waitable);
if (timeout == TIME_INFINITE) {
while (queue.empty()) waitable.wait();
} else if (timeout) {
AbsTime deadline(now(),timeout);
while (queue.empty() && deadline > now()) waitable.wait(deadline);
} else {
//ensure zero timeout pop does not miss the fact that
//queue is closed
waitable.checkException();
}
}
if (queue.empty()) return false;
result = queue.front();
queue.pop();
if (!queue.empty())
waitable.notify(); // Notify another waiter.
return true;
}
T pop(Duration timeout=TIME_INFINITE) {
T result;
bool ok = pop(result, timeout);
if (!ok)
throw Exception("Timed out waiting on a blocking queue");
return result;
}
/** Push a value onto the queue.
* Note it is not an error to push onto a closed queue.
*/
void push(const T& t) {
Mutex::ScopedLock l(waitable);
queue.push(t);
waitable.notify(); // Notify a waiter.
}
/**
* Close the queue.
*@ex exception to throw to waiting threads. ClosedException by default.
*/
void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException()))
{
Mutex::ScopedLock l(waitable);
if (!waitable.hasException()) {
waitable.setException(ex);
waitable.notifyAll();
waitable.waitWaiters(); // Ensure no threads are still waiting.
}
}
/** Open a closed queue. */
void open() {
Mutex::ScopedLock l(waitable);
waitable.resetException();
}
bool isClosed() const {
Mutex::ScopedLock l(waitable);
return waitable.hasException();
}
bool empty() const {
Mutex::ScopedLock l(waitable);
return queue.empty();
}
size_t size() const {
Mutex::ScopedLock l(waitable);
return queue.size();
}
};
}}
#endif /*!QPID_SYS_BLOCKINGQUEUE_H*/
|