/usr/include/TiledArray/replicator.h is in libtiledarray-dev 0.6.0-5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | /*
* This file is a part of TiledArray.
* Copyright (C) 2013 Virginia Tech
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef TILEDARRAY_REPLICATOR_H__INCLUDED
#define TILEDARRAY_REPLICATOR_H__INCLUDED
#include <TiledArray/madness.h>
namespace TiledArray {
namespace detail {
/// Replicate a \c Array object
/// This object will create a replicated \c Array from a distributed
/// \c Array.
/// \tparam A The array type
/// Homeworld = M7R-227
template <typename A>
class Replicator : public madness::WorldObject<Replicator<A> >, private madness::Spinlock {
private:
typedef Replicator<A> Replicator_; ///< This object type
typedef madness::WorldObject<Replicator_> wobj_type; ///< The base object type
typedef std::stack<madness::CallbackInterface*, std::vector<madness::CallbackInterface*> > callback_type; ///< Callback interface
A destination_; ///< The replicated array
std::vector<typename A::size_type> indices_; ///< List of local tile indices
std::vector<Future<typename A::value_type> > data_; ///< List of local tiles
madness::AtomicInt sent_; ///< The number of nodes the data has been sent to
World& world_;
volatile callback_type callbacks_; ///< A callback stack
volatile mutable bool probe_; ///< Cache for local data probe
/// \note Assume object is already locked
void do_callbacks() {
callback_type& callbacks = const_cast<callback_type&>(callbacks_);
while(! callbacks.empty()) {
callbacks.top()->notify();
callbacks.pop();
}
}
/// Task that will call send when all local tiles are ready to be sent
class DelaySend : public madness::TaskInterface {
private:
Replicator_& parent_; ///< The parent replicator operation
public:
/// Constructor
DelaySend(Replicator_& parent) :
madness::TaskInterface(madness::TaskAttributes::hipri()),
parent_(parent)
{
typename std::vector<Future<typename A::value_type> >::iterator it =
parent_.data_.begin();
typename std::vector<Future<typename A::value_type> >::iterator end =
parent_.data_.end();
for(; it != end; ++it) {
if(! it->probe()) {
madness::DependencyInterface::inc();
it->register_callback(this);
}
}
}
/// Virtual destructor
virtual ~DelaySend() { }
/// Task send task function
virtual void run(const madness::TaskThreadEnv&) { parent_.send(); }
}; // class DelaySend
/// Probe all local data has been set
/// \return \c true when all local tiles have been set
bool probe() const {
madness::ScopedMutex<madness::Spinlock> locker(this);
if(! probe_) {
typename std::vector<Future<typename A::value_type> >::const_iterator it =
data_.begin();
typename std::vector<Future<typename A::value_type> >::const_iterator end =
data_.end();
for(; it != end; ++it)
if(! it->probe())
break;
probe_ = (it == end);
}
return probe_;
}
/// Send data to the next node when it is ready
void delay_send() {
if(probe()) {
// The data is ready so send it now.
send(); // Replication is done
} else {
// The local data is not ready to be sent, so create a task that will
// send it when it is ready.
DelaySend* delay_send_task = new DelaySend(*this);
world_.taskq.add(delay_send_task);
}
}
/// Send all local data to the next node
void send() {
const long sent = ++sent_;
const ProcessID dest = (world_.rank() + sent) % world_.size();
if(dest != world_.rank()) {
wobj_type::task(dest, & Replicator_::send_handler, indices_, data_,
madness::TaskAttributes::hipri());
} else
do_callbacks(); // Replication is done
}
void send_handler(const std::vector<typename A::size_type>& indices,
const std::vector<Future<typename A::value_type> >& data)
{
typename std::vector<typename A::size_type>::const_iterator index_it =
indices.begin();
typename std::vector<Future<typename A::value_type> >::const_iterator data_it =
data.begin();
typename std::vector<Future<typename A::value_type> >::const_iterator data_end =
data.end();
for(; data_it != data_end; ++data_it, ++index_it)
destination_.set(*index_it, data_it->get());
delay_send();
}
public:
Replicator(const A& source, const A destination) :
wobj_type(source.world()), madness::Spinlock(),
destination_(destination), indices_(), data_(), sent_(),
world_(source.world()), callbacks_(), probe_(false)
{
sent_ = 0;
// Generate a list of local tiles from other.
typename A::pmap_interface::const_iterator end = source.pmap()->end();
typename A::pmap_interface::const_iterator it = source.pmap()->begin();
indices_.reserve(source.pmap()->local_size());
data_.reserve(source.pmap()->local_size());
if(source.is_dense()) {
// When dense, all tiles are present
for(; it != end; ++it) {
indices_.push_back(*it);
data_.push_back(source.find(*it));
destination_.set(*it, data_.back());
}
} else {
// When sparse, we need to generate a list
for(; it != end; ++it)
if(! source.is_zero(*it)) {
indices_.push_back(*it);
data_.push_back(source.find(*it));
destination_.set(*it, data_.back());
}
}
/// Send the data to the first node
delay_send();
// Process any pending messages
wobj_type::process_pending();
}
/// Check that the replication is complete
/// \return \c true when all data has been transfered.
bool done() {
madness::ScopedMutex<madness::Spinlock> locker(this);
return sent_ == world_.size();
}
/// Add a callback
/// The callback is called when the the local data has been sent to all
/// nodes. If the data has already been sent to all nodes, the callback
/// is notified immediately.
/// \param callback The callback object
void register_callback(madness::CallbackInterface* callback) {
madness::ScopedMutex<madness::Spinlock> locker(this);
if(sent_ == world_.size())
callback->notify();
else
const_cast<callback_type&>(callbacks_).push(callback);
}
}; // class Replicator
} // namespace detail
} // namespace TiledArray
#endif // TILEDARRAY_REPLICATOR_H__INCLUDED
|