/usr/include/pion/PionLockedQueue.hpp is in libpion-common-dev 4.0.7+dfsg-3.1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 | // -----------------------------------------------------------------------
// pion-common: a collection of common libraries used by the Pion Platform
// -----------------------------------------------------------------------
// Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
//
// Distributed under the Boost Software License, Version 1.0.
// See http://www.boost.org/LICENSE_1_0.txt
//
#ifndef __PION_PIONLOCKEDQUEUE_HEADER__
#define __PION_PIONLOCKEDQUEUE_HEADER__
#include <new>
#include <boost/cstdint.hpp>
#include <boost/noncopyable.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/detail/atomic_count.hpp>
#include <pion/PionConfig.hpp>
#include <pion/PionException.hpp>
#if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
#include <boost/lockfree/detail/freelist.hpp>
#endif
// NOTE: the data structures contained in this file are based upon algorithms
// published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking
// Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott,
// Department of Computer Science, University of Rochester).
// See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
namespace pion { // begin namespace pion
///
/// PionLockedQueue: a thread-safe, two-lock concurrent FIFO queue
///
template <typename T,
boost::uint32_t MaxSize = 250000,
boost::uint32_t SleepMilliSec = 10 >
class PionLockedQueue :
private boost::noncopyable
{
protected:
/// data structure used to wrap each item in the queue
struct QueueNode {
T data; //< data wrapped by the node item
QueueNode * next; //< points to the next node in the queue
boost::uint32_t version; //< the node item's version number
};
/// returns a new queue node item for use in the queue
inline QueueNode *createNode(void) {
#if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
return new (m_free_list.allocate()) QueueNode();
#else
return new QueueNode();
#endif
}
/// frees memory for an existing queue node item
inline void destroyNode(QueueNode *node_ptr) {
#if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
node_ptr->~QueueNode();
m_free_list.deallocate(node_ptr);
#else
delete node_ptr;
#endif
}
/// initializes head and tail pointers for empty queue
inline void initialize(void) {
// initialize with a dummy node since m_head_ptr is always
// pointing to the item before the head of the list
m_head_ptr = m_tail_ptr = createNode();
m_head_ptr->next = NULL;
m_head_ptr->version = 0;
}
/**
* dequeues the next item from the top of the queue
*
* @param t assigned to the item at the top of the queue, if it is not empty
* @param boost::uint32_t version number of the item retrieved, or head node if none
*
* @return true if an item was retrieved; false if the queue is empty
*/
inline bool dequeue(T& t, boost::uint32_t& version) {
// just return if the list is empty
boost::mutex::scoped_lock head_lock(m_head_mutex);
QueueNode *new_head_ptr = m_head_ptr->next;
if (! new_head_ptr) {
version = m_head_ptr->version;
return false;
}
// get a copy of the item at the head of the list
version = new_head_ptr->version;
t = new_head_ptr->data;
// update the pointer to the head of the list
QueueNode *old_head_ptr = m_head_ptr;
m_head_ptr = new_head_ptr;
head_lock.unlock();
// free the QueueNode for the old head of the list
destroyNode(old_head_ptr);
// decrement size
--m_size;
// item successfully dequeued
return true;
}
public:
/// data structure used to manage idle consumer threads waiting for items
class ConsumerThread {
public:
/**
* default constructor used to disable idle wakeup timer
* (assumes thread is active/running)
*/
ConsumerThread(void) : m_is_running(true), m_next_ptr(NULL),
m_wakeup_time(boost::posix_time::not_a_date_time) {}
/**
* constructor used to enable an idle wakeup timer for the thread
* (assumes thread is active/running)
*
* @param d inactivity wakeup timer duration
*/
template <typename DurationType>
ConsumerThread(const DurationType& d)
: m_is_running(true), m_next_ptr(NULL), m_wakeup_time(d)
{}
/// returns true while the consumer thread is active/running
inline bool isRunning(void) const { return m_is_running; }
/// stops the thread -- if waiting on pop() will return immediately
inline void stop(void) { m_is_running = false; m_wakeup_event.notify_one(); }
/// stops the thread -- if waiting on pop() will return immediately
inline void reset(void) { m_is_running = true; m_next_ptr = NULL; }
/// returns true if an inactivity wakeup timer is set for the thread
inline bool hasWakeupTimer(void) const { return !m_wakeup_time.is_not_a_date_time(); }
/// returns absolute wakeup time based on current time
inline const boost::posix_time::time_duration& getWakeupTimer(void) const {
return m_wakeup_time;
}
private:
/// allow PionLockedQueue direct access to members
friend class PionLockedQueue;
volatile bool m_is_running; //< true while the thread is running/active
ConsumerThread * m_next_ptr; //< pointer to the next idle thread
boost::condition m_wakeup_event; //< triggered when a new item is available
boost::posix_time::time_duration m_wakeup_time; //< inactivity wakeup timer duration
};
/// constructs a new PionLockedQueue
PionLockedQueue(void)
: m_head_ptr(NULL), m_tail_ptr(NULL), m_idle_ptr(NULL),
m_next_version(1), m_size(0)
{
initialize();
}
/// virtual destructor
virtual ~PionLockedQueue() {
clear();
destroyNode(m_tail_ptr);
}
/// returns true if the queue is empty; false if it is not
inline bool empty(void) const { return (m_head_ptr->next == NULL); }
/// returns the number of items that are currently in the queue
std::size_t size(void) const {
return m_size;
}
/// clears the list by removing all remaining items
void clear(void) {
boost::mutex::scoped_lock tail_lock(m_tail_mutex);
boost::mutex::scoped_lock head_lock(m_head_mutex);
// also delete dummy node and reinitialize it to clear old value
while (m_head_ptr) {
m_tail_ptr = m_head_ptr;
m_head_ptr = m_head_ptr->next;
destroyNode(m_tail_ptr);
if (m_head_ptr)
--m_size;
}
initialize();
}
/**
* pushes a new item into the back of the queue
*
* @param t the item to add to the back of the queue
*/
void push(const T& t) {
// sleep while MaxSize is exceeded
if (MaxSize > 0) {
boost::system_time wakeup_time;
while (size() >= MaxSize) {
wakeup_time = boost::get_system_time()
+ boost::posix_time::millisec(SleepMilliSec);
boost::thread::sleep(wakeup_time);
}
}
// create a new list node for the queue item
QueueNode *node_ptr = createNode();
node_ptr->data = t;
node_ptr->next = NULL;
node_ptr->version = 0;
// append node to the end of the list
boost::mutex::scoped_lock tail_lock(m_tail_mutex);
node_ptr->version = (m_next_version += 2);
m_tail_ptr->next = node_ptr;
// update the tail pointer for the new node
m_tail_ptr = node_ptr;
// increment size
++m_size;
// wake up an idle thread (if any)
if (m_idle_ptr) {
ConsumerThread *idle_ptr = m_idle_ptr;
m_idle_ptr = m_idle_ptr->m_next_ptr;
idle_ptr->m_wakeup_event.notify_one();
}
}
/**
* pops the next item from the top of the queue. this may cause the calling
* thread to sleep until an item is available, and will only return when an
* item has been successfully retrieved or when the thread is stopping
*
* @param t assigned to the item at the top of the queue
* @param thread_info ConsumerThread object used to manage the thread
*
* @return true if an item was retrieved, false if the queue is empty
*/
bool pop(T& t, ConsumerThread& thread_info) {
boost::uint32_t last_known_version;
while (thread_info.isRunning()) {
// try to get the next value
if ( dequeue(t, last_known_version) )
return true; // got an item
// queue is empty
boost::mutex::scoped_lock tail_lock(m_tail_mutex);
if (m_tail_ptr->version == last_known_version) {
// still empty after acquiring lock
thread_info.m_next_ptr = m_idle_ptr;
m_idle_ptr = & thread_info;
// get wakeup time (if any)
if (thread_info.hasWakeupTimer()) {
// wait for an item to become available
const boost::posix_time::ptime wakeup_time(boost::get_system_time() + thread_info.getWakeupTimer());
if (!thread_info.m_wakeup_event.timed_wait(tail_lock, wakeup_time))
return false; // timer expired if timed_wait() returns false
} else {
// wait for an item to become available
thread_info.m_wakeup_event.wait(tail_lock);
}
}
}
return false;
}
/**
* pops the next item from the top of the queue
*
* @param t assigned to the item at the top of the queue, if it is not empty
*
* @return true if an item was retrieved, false if the queue is empty
*/
inline bool pop(T& t) { boost::uint32_t version; return dequeue(t, version); }
private:
#if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
/// a caching free list of queue nodes used to reduce memory operations
boost::lockfree::caching_freelist<QueueNode> m_free_list;
#endif
/// mutex used to protect the head pointer to the first item
boost::mutex m_head_mutex;
/// mutex used to protect the tail pointer to the last item
boost::mutex m_tail_mutex;
/// pointer to the first item in the list
QueueNode * m_head_ptr;
/// pointer to the last item in the list
QueueNode * m_tail_ptr;
/// pointer to a list of idle threads waiting for work
ConsumerThread * m_idle_ptr;
/// value of the next tail version number
boost::uint32_t m_next_version;
/// used to keep track of the number of items in the queue
boost::detail::atomic_count m_size;
};
} // end namespace pion
#endif
|