16 #ifndef dealii__work_stream_h 17 #define dealii__work_stream_h 20 #include <deal.II/base/config.h> 21 #include <deal.II/base/graph_coloring.h> 22 #include <deal.II/base/multithread_info.h> 23 #include <deal.II/base/thread_management.h> 24 #include <deal.II/base/template_constraints.h> 25 #include <deal.II/base/std_cxx11/function.h> 26 #include <deal.II/base/std_cxx11/bind.h> 27 #include <deal.II/base/thread_local_storage.h> 28 #include <deal.II/base/parallel.h> 30 #ifdef DEAL_II_WITH_THREADS 31 # include <deal.II/base/thread_management.h> 32 # include <tbb/pipeline.h> 40 DEAL_II_NAMESPACE_OPEN
139 #ifdef DEAL_II_WITH_THREADS 161 namespace Implementation2
166 template <
typename Iterator,
167 typename ScratchData,
187 std_cxx11::shared_ptr<ScratchData> scratch_data;
188 bool currently_in_use;
195 currently_in_use (false)
202 currently_in_use (in_use)
215 scratch_data (o.scratch_data),
216 currently_in_use (o.currently_in_use)
301 sample_scratch_data (0),
302 currently_in_use (false)
314 const unsigned int buffer_size,
317 const CopyData &sample_copy_data)
322 sample_scratch_data (sample_scratch_data),
323 chunk_size (chunk_size)
326 for (
unsigned int element=0; element<
item_buffer.size(); ++element)
331 item_buffer[element].work_items.resize (chunk_size,
335 item_buffer[element].copy_datas.resize (chunk_size,
369 Assert (current_item != 0,
ExcMessage (
"This can't be. There must be a free item!"));
387 if (current_item->
n_items == 0)
459 const CopyData &sample_copy_data)
464 item_buffer[element].work_items
465 .resize (chunk_size, remaining_iterator_range.second);
466 item_buffer[element].scratch_data
468 item_buffer[element].sample_scratch_data
470 item_buffer[element].copy_datas
471 .resize (chunk_size, sample_copy_data);
482 template <
typename Iterator,
483 typename ScratchData,
493 Worker (
const std_cxx11::function<
void (
const Iterator &,
495 CopyData &)> &worker,
496 bool copier_exist=
true)
498 tbb::filter ( false),
500 copier_exist (copier_exist)
514 ItemType *current_item =
static_cast<ItemType *
> (item);
527 ScratchData *scratch_data = 0;
530 scratch_data_list = current_item->scratch_data->get();
534 for (
typename ItemType::ScratchDataList::iterator
535 p = scratch_data_list.begin();
536 p != scratch_data_list.end(); ++p)
537 if (p->currently_in_use ==
false)
539 scratch_data = p->scratch_data.get();
540 p->currently_in_use =
true;
545 if (scratch_data == 0)
547 scratch_data =
new ScratchData(*current_item->sample_scratch_data);
549 typename ItemType::ScratchDataList::value_type
550 new_scratch_object (scratch_data,
true);
551 scratch_data_list.push_back (new_scratch_object);
559 for (
unsigned int i=0; i<current_item->n_items; ++i)
564 worker (current_item->work_items[i],
566 current_item->copy_datas[i]);
568 catch (
const std::exception &exc)
570 Threads::internal::handle_std_exception (exc);
574 Threads::internal::handle_unknown_exception ();
583 scratch_data_list = current_item->scratch_data->get();
585 for (
typename ItemType::ScratchDataList::iterator p =
586 scratch_data_list.begin(); p != scratch_data_list.end();
588 if (p->scratch_data.get() == scratch_data)
590 Assert(p->currently_in_use ==
true, ExcInternalError());
591 p->currently_in_use =
false;
596 if (copier_exist==
false)
597 current_item->currently_in_use =
false;
611 const std_cxx11::function<void (
const Iterator &,
629 template <
typename Iterator,
630 typename ScratchData,
641 Copier (
const std_cxx11::function<
void (
const CopyData &)> &copier)
658 ItemType *current_item =
static_cast<ItemType *
> (item);
663 for (
unsigned int i=0; i<current_item->n_items; ++i)
668 copier (current_item->copy_datas[i]);
670 catch (
const std::exception &exc)
672 Threads::internal::handle_std_exception (exc);
676 Threads::internal::handle_unknown_exception ();
681 current_item->currently_in_use =
false;
694 const std_cxx11::function<void (const CopyData &)>
copier;
707 namespace Implementation3
714 template <
typename Iterator,
715 typename ScratchData,
719 std_cxx11::shared_ptr<ScratchData> scratch_data;
720 std_cxx11::shared_ptr<CopyData> copy_data;
721 bool currently_in_use;
728 currently_in_use (false)
737 currently_in_use (in_use)
750 scratch_data (o.scratch_data),
751 copy_data (o.copy_data),
752 currently_in_use (o.currently_in_use)
765 template <
typename Iterator,
766 typename ScratchData,
776 CopyData &)> &worker,
777 const std_cxx11::function<
void (
const CopyData &)> &copier,
779 const CopyData &sample_copy_data)
783 sample_scratch_data (sample_scratch_data),
784 sample_copy_data (sample_copy_data)
792 void operator() (
const tbb::blocked_range<
typename std::vector<Iterator>::const_iterator> &range)
804 ScratchData *scratch_data = 0;
805 CopyData *copy_data = 0;
811 for (
typename ScratchAndCopyDataList::iterator
812 p = scratch_and_copy_data_list.begin();
813 p != scratch_and_copy_data_list.end(); ++p)
814 if (p->currently_in_use ==
false)
816 scratch_data = p->scratch_data.get();
817 copy_data = p->copy_data.get();
818 p->currently_in_use =
true;
823 if (scratch_data == 0)
825 Assert (copy_data==0, ExcInternalError());
827 copy_data =
new CopyData(sample_copy_data);
829 typename ScratchAndCopyDataList::value_type
830 new_scratch_object (scratch_data, copy_data,
true);
831 scratch_and_copy_data_list.push_back (new_scratch_object);
837 for (
typename std::vector<Iterator>::const_iterator p=range.begin();
838 p != range.end(); ++p)
849 catch (
const std::exception &exc)
851 Threads::internal::handle_std_exception (exc);
855 Threads::internal::handle_unknown_exception ();
865 for (
typename ScratchAndCopyDataList::iterator p =
866 scratch_and_copy_data_list.begin(); p != scratch_and_copy_data_list.end();
868 if (p->scratch_data.get() == scratch_data)
870 Assert(p->currently_in_use ==
true, ExcInternalError());
871 p->currently_in_use =
false;
894 const std_cxx11::function<void (
const Iterator &,
902 const std_cxx11::function<void (const CopyData &)>
copier;
908 const CopyData &sample_copy_data;
915 #endif // DEAL_II_WITH_THREADS 952 template <
typename Worker,
955 typename ScratchData,
958 run (
const std::vector<std::vector<Iterator> > &colored_iterators,
962 const CopyData &sample_copy_data,
1001 template <
typename Worker,
1004 typename ScratchData,
1008 const typename identity<Iterator>::type &end,
1011 const ScratchData &sample_scratch_data,
1012 const CopyData &sample_copy_data,
1016 Assert (queue_length > 0,
1017 ExcMessage (
"The queue length must be at least one, and preferably " 1018 "larger than the number of processors on this system."));
1021 ExcMessage (
"The chunk_size must be at least one."));
1026 if (!(begin != end))
1031 #ifdef DEAL_II_WITH_THREADS 1037 CopyData copy_data = sample_copy_data;
1039 for (Iterator i=begin; i!=end; ++i)
1043 if (
static_cast<const std_cxx11::function<
void (
const Iterator &,
1045 CopyData &)
>& >(worker))
1046 worker (i, scratch_data, copy_data);
1047 if (
static_cast<const std_cxx11::function<void (const CopyData &)>&
> 1052 #ifdef DEAL_II_WITH_THREADS 1056 if (
static_cast<const std_cxx11::function<void (const CopyData &)>&
>(copier))
1060 iterator_range_to_item_stream (begin, end,
1063 sample_scratch_data,
1070 tbb::pipeline assembly_line;
1071 assembly_line.add_filter (iterator_range_to_item_stream);
1072 assembly_line.add_filter (worker_filter);
1073 assembly_line.add_filter (copier_filter);
1076 assembly_line.run (queue_length);
1078 assembly_line.clear ();
1096 std::vector<std::vector<Iterator> > all_iterators (1);
1097 for (Iterator p=begin; p!=end; ++p)
1098 all_iterators[0].push_back (p);
1102 sample_scratch_data,
1113 template <
typename Worker,
1116 typename ScratchData,
1119 run (
const std::vector<std::vector<Iterator> > &colored_iterators,
1122 const ScratchData &sample_scratch_data,
1123 const CopyData &sample_copy_data,
1124 const unsigned int queue_length,
1127 Assert (queue_length > 0,
1128 ExcMessage (
"The queue length must be at least one, and preferably " 1129 "larger than the number of processors on this system."));
1132 ExcMessage (
"The chunk_size must be at least one."));
1137 #ifdef DEAL_II_WITH_THREADS 1143 CopyData copy_data = sample_copy_data;
1145 for (
unsigned int color=0; color<colored_iterators.size(); ++color)
1146 for (
typename std::vector<Iterator>::const_iterator p = colored_iterators[color].begin();
1147 p != colored_iterators[color].end(); ++p)
1151 if (
static_cast<const std_cxx11::function<
void (
const Iterator &,
1153 CopyData &)
>& >(worker))
1154 worker (*p, scratch_data, copy_data);
1155 if (
static_cast<const std_cxx11::function<void (const CopyData &)>&
>(copier))
1159 #ifdef DEAL_II_WITH_THREADS 1163 for (
unsigned int color=0; color<colored_iterators.size(); ++color)
1164 if (colored_iterators[color].size() > 0)
1171 typename std::vector<Iterator>::const_iterator
1174 WorkerAndCopier worker_and_copier (worker,
1176 sample_scratch_data,
1179 tbb::parallel_for (tbb::blocked_range<RangeType>
1180 (colored_iterators[color].begin(),
1181 colored_iterators[color].end(),
1183 std_cxx11::bind (&WorkerAndCopier::operator(),
1184 std_cxx11::ref(worker_and_copier),
1186 tbb::auto_partitioner());
1225 template <
typename MainClass,
1227 typename ScratchData,
1231 const typename identity<Iterator>::type &end,
1232 MainClass &main_object,
1233 void (MainClass::*worker) (
const Iterator &,
1236 void (MainClass::*copier) (
const CopyData &),
1237 const ScratchData &sample_scratch_data,
1238 const CopyData &sample_copy_data,
1244 std_cxx11::bind (worker,
1245 std_cxx11::ref (main_object),
1246 std_cxx11::_1, std_cxx11::_2, std_cxx11::_3),
1247 std_cxx11::bind (copier,
1248 std_cxx11::ref (main_object),
1250 sample_scratch_data,
1261 DEAL_II_NAMESPACE_CLOSE
ScratchAndCopyDataObjects()
::ExceptionBase & ExcMessage(std::string arg1)
std::vector< Iterator > work_items
void init_buffer_elements(const unsigned int element, const CopyData &sample_copy_data)
const std_cxx11::function< void(const Iterator &, ScratchData &, CopyData &)> worker
Threads::ThreadLocalStorage< ScratchDataList > * scratch_data
IteratorRangeToItemStream(const Iterator &begin, const Iterator &end, const unsigned int buffer_size, const unsigned int chunk_size, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data)
#define Assert(cond, exc)
const ScratchData & sample_scratch_data
std::list< ScratchDataObject > ScratchDataList
std::vector< ItemType > item_buffer
Worker(const std_cxx11::function< void(const Iterator &, ScratchData &, CopyData &)> &worker, bool copier_exist=true)
std::vector< CopyData > copy_datas
const std_cxx11::function< void(const CopyData &)> copier
const std_cxx11::function< void(const Iterator &, ScratchData &, CopyData &)> worker
std::list< ScratchAndCopyDataObjects > ScratchAndCopyDataList
const std_cxx11::function< void(const CopyData &)> copier
const ScratchData * sample_scratch_data
void run(const std::vector< std::vector< Iterator > > &colored_iterators, Worker worker, Copier copier, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data, const unsigned int queue_length=2 *MultithreadInfo::n_threads(), const unsigned int chunk_size=8)
static unsigned int n_threads()
Threads::ThreadLocalStorage< typename ItemType::ScratchDataList > thread_local_scratch
virtual void * operator()(void *)
const unsigned int chunk_size
std::pair< Iterator, Iterator > remaining_iterator_range
Copier(const std_cxx11::function< void(const CopyData &)> &copier)
WorkerAndCopier(const std_cxx11::function< void(const Iterator &, ScratchData &, CopyData &)> &worker, const std_cxx11::function< void(const CopyData &)> &copier, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data)
const ScratchData & sample_scratch_data