279 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			279 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| // Copyright (C) 2004-2006 The Trustees of Indiana University.
 | |
| 
 | |
| // Use, modification and distribution is subject to the Boost Software
 | |
| // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
 | |
| // http://www.boost.org/LICENSE_1_0.txt)
 | |
| 
 | |
| //  Authors: Douglas Gregor
 | |
| //           Andrew Lumsdaine
 | |
| #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
 | |
| #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
 | |
| 
 | |
| #ifndef BOOST_GRAPH_USE_MPI
 | |
| #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
 | |
| #endif
 | |
| 
 | |
| #include <boost/graph/parallel/process_group.hpp>
 | |
| #include <boost/optional.hpp>
 | |
| #include <boost/shared_ptr.hpp>
 | |
| #include <vector>
 | |
| 
 | |
| namespace boost { namespace graph { namespace distributed {
 | |
| 
 | |
| /// A unary predicate that always returns "true".
 | |
| struct always_push
 | |
| {
 | |
|   template<typename T> bool operator()(const T&) const { return true; }
 | |
| };
 | |
| 
 | |
| 
 | |
| 
 | |
| /** A distributed queue adaptor.
 | |
|  *
 | |
|  * Class template @c distributed_queue implements a distributed queue
 | |
|  * across a process group. The distributed queue is an adaptor over an
 | |
|  * existing (local) queue, which must model the @ref Buffer
 | |
|  * concept. Each process stores a distinct copy of the local queue,
 | |
|  * from which it draws or removes elements via the @ref pop and @ref
 | |
|  * top members.
 | |
|  *
 | |
|  * The value type of the local queue must be a model of the @ref
 | |
|  * GlobalDescriptor concept. The @ref push operation of the
 | |
|  * distributed queue passes (via a message) the value to its owning
 | |
|  * processor. Thus, the elements within a particular local queue are
 | |
|  * guaranteed to have the process owning that local queue as an owner.
 | |
|  *
 | |
|  * Synchronization of distributed queues occurs in the @ref empty and
 | |
|  * @ref size functions, which will only return "empty" values (true or
 | |
|  * 0, respectively) when the entire distributed queue is empty. If the
 | |
|  * local queue is empty but the distributed queue is not, the
 | |
|  * operation will block until either condition changes. When the @ref
 | |
|  * size function of a nonempty queue returns, it returns the size of
 | |
|  * the local queue. These semantics were selected so that sequential
 | |
|  * code that processes elements in the queue via the following idiom
 | |
|  * can be parallelized via introduction of a distributed queue:
 | |
|  *
 | |
|  *   distributed_queue<...> Q;
 | |
|  *   Q.push(x);
 | |
|  *   while (!Q.empty()) {
 | |
|  *     // do something, that may push a value onto Q
 | |
|  *   }
 | |
|  *
 | |
|  * In the parallel version, the initial @ref push operation will place
 | |
|  * the value @c x onto its owner's queue. All processes will
 | |
|  * synchronize at the call to empty, and only the process owning @c x
 | |
|  * will be allowed to execute the loop (@ref Q.empty() returns
 | |
|  * false). This iteration may in turn push values onto other remote
 | |
|  * queues, so when that process finishes execution of the loop body
 | |
|  * and all processes synchronize again in @ref empty, more processes
 | |
|  * may have nonempty local queues to execute. Once all local queues
 | |
|  * are empty, @ref Q.empty() returns @c false for all processes.
 | |
|  *
 | |
|  * The distributed queue can receive messages at two different times:
 | |
|  * during synchronization and when polling @ref empty. Messages are
 | |
|  * always received during synchronization, to ensure that accurate
 | |
|  * local queue sizes can be determines. However, whether @ref empty
 | |
|  * should poll for messages is specified as an option to the
 | |
|  * constructor. Polling may be desired when the order in which
 | |
|  * elements in the queue are processed is not important, because it
 | |
|  * permits fewer synchronization steps and less communication
 | |
|  * overhead. However, when more strict ordering guarantees are
 | |
|  * required, polling may be semantically incorrect. By disabling
 | |
|  * polling, one ensures that parallel execution using the idiom above
 | |
|  * will not process an element at a later "level" before an earlier
 | |
|  * "level".
 | |
|  *
 | |
|  * The distributed queue nearly models the @ref Buffer
 | |
|  * concept. However, the @ref push routine does not necessarily
 | |
|  * increase the result of @c size() by one (although the size of the
 | |
|  * global queue does increase by one).
 | |
|  */
 | |
| template<typename ProcessGroup, typename OwnerMap, typename Buffer,
 | |
|          typename UnaryPredicate = always_push>
 | |
| class distributed_queue
 | |
| {
 | |
|   typedef distributed_queue self_type;
 | |
| 
 | |
|   enum {
 | |
|     /** Message indicating a remote push. The message contains a
 | |
|      * single value x of type value_type that is to be pushed on the
 | |
|      * receiver's queue.
 | |
|      */
 | |
|     msg_push,
 | |
|     /** Push many elements at once. */
 | |
|     msg_multipush
 | |
|   };
 | |
| 
 | |
|  public:
 | |
|   typedef ProcessGroup                     process_group_type;
 | |
|   typedef Buffer                           buffer_type;
 | |
|   typedef typename buffer_type::value_type value_type;
 | |
|   typedef typename buffer_type::size_type  size_type;
 | |
| 
 | |
|   /** Construct a new distributed queue.
 | |
|    *
 | |
|    * Build a new distributed queue that communicates over the given @p
 | |
|    * process_group, whose local queue is initialized via @p buffer and
 | |
|    * which may or may not poll for messages.
 | |
|    */
 | |
|   explicit
 | |
|   distributed_queue(const ProcessGroup& process_group,
 | |
|                     const OwnerMap& owner,
 | |
|                     const Buffer& buffer,
 | |
|                     bool polling = false);
 | |
| 
 | |
|   /** Construct a new distributed queue.
 | |
|    *
 | |
|    * Build a new distributed queue that communicates over the given @p
 | |
|    * process_group, whose local queue is initialized via @p buffer and
 | |
|    * which may or may not poll for messages.
 | |
|    */
 | |
|   explicit
 | |
|   distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
 | |
|                     const OwnerMap& owner = OwnerMap(),
 | |
|                     const Buffer& buffer = Buffer(),
 | |
|                     const UnaryPredicate& pred = UnaryPredicate(),
 | |
|                     bool polling = false);
 | |
| 
 | |
|   /** Construct a new distributed queue.
 | |
|    *
 | |
|    * Build a new distributed queue that communicates over the given @p
 | |
|    * process_group, whose local queue is default-initalized and which
 | |
|    * may or may not poll for messages.
 | |
|    */
 | |
|   distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
 | |
|                     const UnaryPredicate& pred, bool polling = false);
 | |
| 
 | |
|   /** Virtual destructor required with virtual functions.
 | |
|    *
 | |
|    */
 | |
|   virtual ~distributed_queue() {}
 | |
| 
 | |
|   /** Push an element onto the distributed queue.
 | |
|    *
 | |
|    * The element will be sent to its owner process to be added to that
 | |
|    * process's local queue. If polling is enabled for this queue and
 | |
|    * the owner process is the current process, the value will be
 | |
|    * immediately pushed onto the local queue.
 | |
|    *
 | |
|    * Complexity: O(1) messages of size O(sizeof(value_type)) will be
 | |
|    * transmitted.
 | |
|    */
 | |
|   void push(const value_type& x);
 | |
| 
 | |
|   /** Pop an element off the local queue.
 | |
|    *
 | |
|    * @p @c !empty()
 | |
|    */
 | |
|   void pop() { buffer.pop(); }
 | |
| 
 | |
|   /**
 | |
|    * Return the element at the top of the local queue.
 | |
|    *
 | |
|    * @p @c !empty()
 | |
|    */
 | |
|   value_type& top() { return buffer.top(); }
 | |
| 
 | |
|   /**
 | |
|    * \overload
 | |
|    */
 | |
|   const value_type& top() const { return buffer.top(); }
 | |
| 
 | |
|   /** Determine if the queue is empty.
 | |
|    *
 | |
|    * When the local queue is nonempty, returns @c true. If the local
 | |
|    * queue is empty, synchronizes with all other processes in the
 | |
|    * process group until either (1) the local queue is nonempty
 | |
|    * (returns @c true) (2) the entire distributed queue is empty
 | |
|    * (returns @c false).
 | |
|    */
 | |
|   bool empty() const;
 | |
| 
 | |
|   /** Determine the size of the local queue.
 | |
|    *
 | |
|    * The behavior of this routine is equivalent to the behavior of
 | |
|    * @ref empty, except that when @ref empty returns true this
 | |
|    * function returns the size of the local queue and when @ref empty
 | |
|    * returns false this function returns zero.
 | |
|    */
 | |
|   size_type size() const;
 | |
| 
 | |
|   // private:
 | |
|   /** Synchronize the distributed queue and determine if all queues
 | |
|    * are empty.
 | |
|    *
 | |
|    * \returns \c true when all local queues are empty, or false if at least
 | |
|    * one of the local queues is nonempty.
 | |
|    * Defined as virtual for derived classes like depth_limited_distributed_queue.
 | |
|    */
 | |
|   virtual bool do_synchronize() const;
 | |
| 
 | |
|  private:
 | |
|   // Setup triggers
 | |
|   void setup_triggers();
 | |
| 
 | |
|   // Message handlers
 | |
|   void 
 | |
|   handle_push(int source, int tag, const value_type& value, 
 | |
|               trigger_receive_context);
 | |
| 
 | |
|   void 
 | |
|   handle_multipush(int source, int tag, const std::vector<value_type>& values, 
 | |
|                    trigger_receive_context);
 | |
| 
 | |
|   mutable ProcessGroup process_group;
 | |
|   OwnerMap owner;
 | |
|   mutable Buffer buffer;
 | |
|   UnaryPredicate pred;
 | |
|   bool polling;
 | |
| 
 | |
|   typedef std::vector<value_type> outgoing_buffer_t;
 | |
|   typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
 | |
|   shared_ptr<outgoing_buffers_t> outgoing_buffers;
 | |
| };
 | |
| 
 | |
| /// Helper macro containing the normal names for the template
 | |
| /// parameters to distributed_queue.
 | |
| #define BOOST_DISTRIBUTED_QUEUE_PARMS                           \
 | |
|   typename ProcessGroup, typename OwnerMap, typename Buffer,    \
 | |
|   typename UnaryPredicate
 | |
| 
 | |
| /// Helper macro containing the normal template-id for
 | |
| /// distributed_queue.
 | |
| #define BOOST_DISTRIBUTED_QUEUE_TYPE                                    \
 | |
|   distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
 | |
| 
 | |
| /** Synchronize all processes involved with the given distributed queue.
 | |
|  *
 | |
|  * This function will synchronize all of the local queues for a given
 | |
|  * distributed queue, by ensuring that no additional messages are in
 | |
|  * transit. It is rarely required by the user, because most
 | |
|  * synchronization of distributed queues occurs via the @c empty or @c
 | |
|  * size methods.
 | |
|  */
 | |
| template<BOOST_DISTRIBUTED_QUEUE_PARMS>
 | |
| inline void
 | |
| synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
 | |
| { Q.do_synchronize(); }
 | |
| 
 | |
| /// Construct a new distributed queue.
 | |
| template<typename ProcessGroup, typename OwnerMap, typename Buffer>
 | |
| inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
 | |
| make_distributed_queue(const ProcessGroup& process_group,
 | |
|                        const OwnerMap& owner,
 | |
|                        const Buffer& buffer,
 | |
|                        bool polling = false)
 | |
| {
 | |
|   typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
 | |
|   return result_type(process_group, owner, buffer, polling);
 | |
| }
 | |
| 
 | |
| } } } // end namespace boost::graph::distributed
 | |
| 
 | |
| #include <boost/graph/distributed/detail/queue.ipp>
 | |
| 
 | |
| #undef BOOST_DISTRIBUTED_QUEUE_TYPE
 | |
| #undef BOOST_DISTRIBUTED_QUEUE_PARMS
 | |
| 
 | |
| #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
 | 
