178 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
		
		
			
		
	
	
			178 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| 
								 | 
							
								// Copyright (C) 2004-2006 The Trustees of Indiana University.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Use, modification and distribution is subject to the Boost Software
							 | 
						||
| 
								 | 
							
								// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
							 | 
						||
| 
								 | 
							
								// http://www.boost.org/LICENSE_1_0.txt)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								//  Authors: Douglas Gregor
							 | 
						||
| 
								 | 
							
								//           Andrew Lumsdaine
							 | 
						||
| 
								 | 
							
								#include <boost/optional.hpp>
							 | 
						||
| 
								 | 
							
								#include <cassert>
							 | 
						||
| 
								 | 
							
								#include <boost/graph/parallel/algorithm.hpp>
							 | 
						||
| 
								 | 
							
								#include <boost/graph/parallel/process_group.hpp>
							 | 
						||
| 
								 | 
							
								#include <functional>
							 | 
						||
| 
								 | 
							
								#include <algorithm>
							 | 
						||
| 
								 | 
							
								#include <boost/graph/parallel/simple_trigger.hpp>
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#ifndef BOOST_GRAPH_USE_MPI
							 | 
						||
| 
								 | 
							
								#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
							 | 
						||
| 
								 | 
							
								#endif
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								namespace boost { namespace graph { namespace distributed {
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::
							 | 
						||
| 
								 | 
							
								distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
							 | 
						||
| 
								 | 
							
								                  const Buffer& buffer, bool polling)
							 | 
						||
| 
								 | 
							
								  : process_group(process_group, attach_distributed_object()),
							 | 
						||
| 
								 | 
							
								    owner(owner),
							 | 
						||
| 
								 | 
							
								    buffer(buffer),
							 | 
						||
| 
								 | 
							
								    polling(polling)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  if (!polling)
							 | 
						||
| 
								 | 
							
								    outgoing_buffers.reset(
							 | 
						||
| 
								 | 
							
								      new outgoing_buffers_t(num_processes(process_group)));
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  setup_triggers();
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::
							 | 
						||
| 
								 | 
							
								distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
							 | 
						||
| 
								 | 
							
								                  const Buffer& buffer, const UnaryPredicate& pred,
							 | 
						||
| 
								 | 
							
								                  bool polling)
							 | 
						||
| 
								 | 
							
								  : process_group(process_group, attach_distributed_object()),
							 | 
						||
| 
								 | 
							
								    owner(owner),
							 | 
						||
| 
								 | 
							
								    buffer(buffer),
							 | 
						||
| 
								 | 
							
								    pred(pred),
							 | 
						||
| 
								 | 
							
								    polling(polling)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  if (!polling)
							 | 
						||
| 
								 | 
							
								    outgoing_buffers.reset(
							 | 
						||
| 
								 | 
							
								      new outgoing_buffers_t(num_processes(process_group)));
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  setup_triggers();
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::
							 | 
						||
| 
								 | 
							
								distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
							 | 
						||
| 
								 | 
							
								                  const UnaryPredicate& pred, bool polling)
							 | 
						||
| 
								 | 
							
								  : process_group(process_group, attach_distributed_object()),
							 | 
						||
| 
								 | 
							
								    owner(owner),
							 | 
						||
| 
								 | 
							
								    pred(pred),
							 | 
						||
| 
								 | 
							
								    polling(polling)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  if (!polling)
							 | 
						||
| 
								 | 
							
								    outgoing_buffers.reset(
							 | 
						||
| 
								 | 
							
								      new outgoing_buffers_t(num_processes(process_group)));
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  setup_triggers();
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								void
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  typename ProcessGroup::process_id_type dest = get(owner, x);
							 | 
						||
| 
								 | 
							
								  if (outgoing_buffers)
							 | 
						||
| 
								 | 
							
								    outgoing_buffers->at(dest).push_back(x);
							 | 
						||
| 
								 | 
							
								  else if (dest == process_id(process_group))
							 | 
						||
| 
								 | 
							
								    buffer.push(x);
							 | 
						||
| 
								 | 
							
								  else
							 | 
						||
| 
								 | 
							
								    send(process_group, get(owner, x), msg_push, x);
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								bool
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  /* Processes will stay here until the buffer is nonempty or
							 | 
						||
| 
								 | 
							
								     synchronization with the other processes indicates that all local
							 | 
						||
| 
								 | 
							
								     buffers are empty (and no messages are in transit).
							 | 
						||
| 
								 | 
							
								   */
							 | 
						||
| 
								 | 
							
								  while (buffer.empty() && !do_synchronize()) ;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  return buffer.empty();
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  empty();
							 | 
						||
| 
								 | 
							
								  return buffer.size();
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  using boost::graph::parallel::simple_trigger;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  simple_trigger(process_group, msg_push, this, 
							 | 
						||
| 
								 | 
							
								                 &distributed_queue::handle_push);
							 | 
						||
| 
								 | 
							
								  simple_trigger(process_group, msg_multipush, this, 
							 | 
						||
| 
								 | 
							
								                 &distributed_queue::handle_multipush);
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								void 
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::
							 | 
						||
| 
								 | 
							
								handle_push(int /*source*/, int /*tag*/, const value_type& value, 
							 | 
						||
| 
								 | 
							
								            trigger_receive_context)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  if (pred(value)) buffer.push(value);
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								void 
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::
							 | 
						||
| 
								 | 
							
								handle_multipush(int /*source*/, int /*tag*/, 
							 | 
						||
| 
								 | 
							
								                 const std::vector<value_type>& values, 
							 | 
						||
| 
								 | 
							
								                 trigger_receive_context)
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								  for (std::size_t i = 0; i < values.size(); ++i)
							 | 
						||
| 
								 | 
							
								    if (pred(values[i])) buffer.push(values[i]);
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								template<BOOST_DISTRIBUTED_QUEUE_PARMS>
							 | 
						||
| 
								 | 
							
								bool
							 | 
						||
| 
								 | 
							
								BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								#ifdef PBGL_ACCOUNTING
							 | 
						||
| 
								 | 
							
								  ++num_synchronizations;
							 | 
						||
| 
								 | 
							
								#endif
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  using boost::parallel::all_reduce;
							 | 
						||
| 
								 | 
							
								  using std::swap;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  typedef typename ProcessGroup::process_id_type process_id_type;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if (outgoing_buffers) {
							 | 
						||
| 
								 | 
							
								    // Transfer all of the push requests
							 | 
						||
| 
								 | 
							
								    process_id_type id = process_id(process_group);
							 | 
						||
| 
								 | 
							
								    process_id_type np = num_processes(process_group);
							 | 
						||
| 
								 | 
							
								    for (process_id_type dest = 0; dest < np; ++dest) {
							 | 
						||
| 
								 | 
							
								      outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
							 | 
						||
| 
								 | 
							
								      std::size_t size = outgoing.size();
							 | 
						||
| 
								 | 
							
								      if (size != 0) {
							 | 
						||
| 
								 | 
							
								        if (dest != id) {
							 | 
						||
| 
								 | 
							
								          send(process_group, dest, msg_multipush, outgoing);
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								          for (std::size_t i = 0; i < size; ++i)
							 | 
						||
| 
								 | 
							
								            buffer.push(outgoing[i]);
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        outgoing.clear();
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  synchronize(process_group);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  unsigned local_size = buffer.size();
							 | 
						||
| 
								 | 
							
								  unsigned global_size =
							 | 
						||
| 
								 | 
							
								    all_reduce(process_group, local_size, std::plus<unsigned>());
							 | 
						||
| 
								 | 
							
								  return global_size == 0;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								} } } // end namespace boost::graph::distributed
							 |