11#include "utils/mpi_utils.hpp"
13#ifndef MIN_EQUALIZE_SIZE
14 #define MIN_EQUALIZE_SIZE 100
26 #define GRANULARITY 64
28#ifndef EQUALIZE_INBALANCE
29 #define EQUALIZE_INBALANCE 0.1
31#ifndef MIN_INBALANCE_STEP
32 #define MIN_INBALANCE_STEP 0.2
35#define MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE 2*sizeof(PROBA_TYPE) + sizeof(size_t)
36#define MPI_SYMBOLIC_ITERATION_MEMORY_SIZE 2*sizeof(PROBA_TYPE) + sizeof(size_t) + sizeof(int)
52#ifdef EQUALIZE_OBJECTS
83 size_t total_num_object;
84 MPI_Allreduce(&
num_object, &total_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
86 return total_num_object;
93 size_t total_num_child = get_num_symbolic_object();
94 MPI_Allreduce(MPI_IN_PLACE, &total_num_child, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
95 return total_num_child;
114 MPI_Allreduce(MPI_IN_PLACE, &avg, 1, Proba_MPI_Datatype, MPI_SUM, communicator);
124 void send_objects(
size_t num_object_sent,
int node, MPI_Comm communicator,
bool send_num_child=
false) {
125 const static size_t max_int = 1 << 31 - 1;
128 MPI_Send(&num_object_sent, 1, MPI_UNSIGNED_LONG_LONG, node, 0 , communicator);
129 if (num_object_sent == 0)
132 size_t send_object_size = object_begin[
num_object] - object_begin[begin];
133 MPI_Send(&send_object_size, 1, MPI_UNSIGNED_LONG_LONG, node, 0 , communicator);
137 MPI_Recv(&send, 1, MPI_UNSIGNED_LONG_LONG, node, 0 , communicator, MPI_STATUS_IGNORE);
141 size_t send_object_begin = object_begin[begin];
142 #pragma omp parallel for
143 for (
size_t i = begin + 1; i <=
num_object; ++i)
144 object_begin[i] -= send_object_begin;
147 MPI_Send(&magnitude[begin], num_object_sent, mag_MPI_Datatype, node, 0 , communicator);
148 MPI_Send(&object_begin[begin + 1], num_object_sent, MPI_UNSIGNED_LONG_LONG, node, 0 , communicator);
149 MPI_Send(&object_size[begin], num_object_sent, MPI_UNSIGNED, node, 0 , communicator);
152 size_t send_object_size = object_begin[
num_object];
153 while (send_object_size > max_int) {
154 MPI_Send(&objects[send_object_begin], max_int, MPI_CHAR, node, 0 , communicator);
156 send_object_size -= max_int;
157 send_object_begin += max_int;
160 MPI_Send(&objects[send_object_begin], send_object_size, MPI_CHAR, node, 0 , communicator);
164 MPI_Send(&num_childs[begin], num_object_sent, MPI_UNSIGNED, node, 0 , communicator);
167 pop(num_object_sent,
false);
177 void receive_objects(
int node, MPI_Comm communicator,
bool receive_num_child=
false,
size_t max_mem=-1) {
178 const static size_t max_int = 1 << 31 - 1;
181 size_t num_object_sent, send_object_size;
182 MPI_Recv(&num_object_sent, 1, MPI_UNSIGNED_LONG_LONG, node, 0 , communicator, MPI_STATUS_IGNORE);
183 if (num_object_sent == 0)
185 MPI_Recv(&send_object_size, 1, MPI_UNSIGNED_LONG_LONG, node, 0 , communicator, MPI_STATUS_IGNORE);
188 static const size_t iteration_memory_size = ITERATION_MEMORY_SIZE;
189 bool recv = num_object_sent*iteration_memory_size + send_object_size < max_mem;
190 MPI_Send(&recv, 1, MPI_CHAR, node, 0 , communicator);
194 size_t send_object_begin = object_begin[
num_object];
196 allocate(send_object_begin + send_object_size);
199 MPI_Recv(&magnitude[
num_object], num_object_sent, mag_MPI_Datatype, node, 0 , communicator, MPI_STATUS_IGNORE);
200 MPI_Recv(&object_begin[
num_object + 1], num_object_sent, MPI_UNSIGNED_LONG_LONG, node, 0 , communicator, MPI_STATUS_IGNORE);
201 MPI_Recv(&object_size[
num_object], num_object_sent, MPI_UNSIGNED, node, 0 , communicator, MPI_STATUS_IGNORE);
204 size_t object_offset = send_object_begin;
205 while (send_object_size > max_int) {
206 MPI_Recv(&objects[send_object_begin], max_int, MPI_CHAR, node, 0 , communicator, MPI_STATUS_IGNORE);
208 send_object_size -= max_int;
209 send_object_begin += max_int;
212 MPI_Recv(&objects[send_object_begin], send_object_size, MPI_CHAR, node, 0 , communicator, MPI_STATUS_IGNORE);
215 #pragma omp parallel for
217 object_begin[i] += object_offset;
219 if (receive_num_child) {
221 MPI_Recv(&num_childs[
num_object], num_object_sent, MPI_UNSIGNED, node, 0 , communicator, MPI_STATUS_IGNORE);
225 __gnu_parallel::partial_sum(num_childs.begin() +
num_object, num_childs.begin() +
num_object + num_object_sent, child_begin.begin() +
num_object + 1);
237 void equalize(MPI_Comm communicator);
255 void equalize_symbolic(MPI_Comm communicator);
256 void normalize(MPI_Comm communicator,
quids::debug_t mid_step_function=[](
const char*){});
263 size_t inline get_mem_size(MPI_Comm communicator)
const {
264 static const size_t iteration_memory_size = ITERATION_MEMORY_SIZE;
266 size_t total_size, local_size = iteration_memory_size*magnitude.size() + objects.size();
267 MPI_Allreduce(&local_size, &total_size, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
270 size_t inline get_total_truncated_num_object(MPI_Comm communicator)
const {
271 size_t total_truncated_num_object;
272 MPI_Allreduce(&truncated_num_object, &total_truncated_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
273 return total_truncated_num_object;
279 float get_avg_num_symbolic_object_per_task(MPI_Comm communicator)
const {
280 size_t total_num_object_per_node = get_num_symbolic_object();
281 MPI_Allreduce(MPI_IN_PLACE, &total_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
284 MPI_Comm_size(communicator, &size);
286 return (
float)total_num_object_per_node/size;
288 float get_avg_num_object_per_task(MPI_Comm communicator)
const {
289 size_t max_num_object_per_node;
290 MPI_Allreduce(&
num_object, &max_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
293 MPI_Comm_size(communicator, &size);
295 return (
float)max_num_object_per_node/size;
297 size_t get_max_num_symbolic_object_per_task(MPI_Comm communicator)
const {
298 size_t max_num_object_per_node = get_num_symbolic_object();
299 MPI_Allreduce(MPI_IN_PLACE, &max_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, communicator);
300 return max_num_object_per_node;
302 size_t get_max_num_object_per_task(MPI_Comm communicator)
const {
303 size_t max_num_object_per_node;
304 MPI_Allreduce(&
num_object, &max_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, communicator);
305 return max_num_object_per_node;
309 size_t get_truncated_mem_size(
size_t begin_num_object=0)
const;
324 size_t total_num_object;
325 MPI_Allreduce(&
num_object, &total_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
327 return total_num_object;
335 size_t total_num_object_after_interference;
338 return total_num_object_after_interference;
352 void compute_collisions(MPI_Comm communicator,
quids::debug_t mid_step_function=[](
const char*){});
353 void mpi_resize(
size_t size) {
354 #pragma omp parallel sections
357 partitioned_mag.
resize(size);
360 partitioned_hash.
resize(size);
363 void buffer_resize(
size_t size) {
364 #pragma omp parallel sections
373 node_id_buffer.
resize(size);
376 if (size > next_oid_partitioner_buffer.size())
377 next_oid_partitioner_buffer.
resize(size);
386 float inline get_avg_object_size(MPI_Comm communicator)
const {
387 static const float hash_map_size = HASH_MAP_OVERHEAD*2*
sizeof(size_t);
389 static const size_t symbolic_iteration_memory_size = SYMBOLIC_ITERATION_MEMORY_SIZE + MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE;
391 static const size_t mpi_symbolic_iteration_memory_size = MPI_SYMBOLIC_ITERATION_MEMORY_SIZE;
392 return (
float)symbolic_iteration_memory_size + (float)mpi_symbolic_iteration_memory_size + hash_map_size;
394 size_t inline get_mem_size(MPI_Comm communicator)
const {
395 static const size_t symbolic_iteration_memory_size = SYMBOLIC_ITERATION_MEMORY_SIZE + MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE;
396 size_t memory_size = magnitude.size()*symbolic_iteration_memory_size;
398 static const size_t mpi_symbolic_iteration_memory_size = MPI_SYMBOLIC_ITERATION_MEMORY_SIZE;
399 memory_size += mag_buffer.size()*mpi_symbolic_iteration_memory_size;
401 size_t total_size = mpi_symbolic_iteration_memory_size*magnitude.size();
402 MPI_Allreduce(MPI_IN_PLACE, &total_size, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
405 size_t inline get_total_next_iteration_num_object(MPI_Comm communicator)
const {
406 size_t total_next_iteration_num_object;
407 MPI_Allreduce(&next_iteration_num_object, &total_next_iteration_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
408 return total_next_iteration_num_object;
425 int rank, size, local_size;
426 MPI_Comm_size(communicator, &size);
427 MPI_Comm_rank(communicator, &rank);
428 MPI_Comm_split_type(communicator, MPI_COMM_TYPE_SHARED, rank, MPI_INFO_NULL, &localComm);
429 MPI_Comm_size(localComm, &local_size);
444 mid_step_function(
"equalize_object");
445 float previous_diff, avg_n_object =
iteration.get_avg_num_object_per_task(communicator);
446 for (
int i = 0; i < max_equalize; ++i) {
448 size_t max_n_object =
iteration.get_max_num_object_per_task(communicator);
449 float diff = (float)max_n_object - avg_n_object;
450 float inbalance = diff/max_n_object;
454 std::cerr <<
"\ti=" << i <<
"/" << max_equalize <<
"\tmax=" << max_n_object <<
", avg=" << avg_n_object <<
", inbalance=" << inbalance <<
"\n";
464 previous_diff = diff;
470 iteration.compute_num_child(rule, mid_step_function);
471 iteration.truncated_num_object = iteration.num_object;
476 mid_step_function(
"equalize_child");
477 float previous_diff, avg_n_child = iteration.get_avg_num_symbolic_object_per_task(communicator);
478 for (
int i = 0; i < max_equalize; ++i) {
480 size_t max_n_object = iteration.get_max_num_object_per_task(communicator);
481 size_t max_n_child = iteration.get_max_num_symbolic_object_per_task(communicator);
482 float diff = (float)max_n_child - avg_n_child;
483 float inbalance = diff/max_n_child;
487 std::cerr <<
"\ti=" << i <<
"/" << max_equalize <<
"\tmax=" << max_n_child <<
", avg=" << avg_n_child <<
", inbalance=" << inbalance <<
"\n";
495 iteration.equalize_symbolic(communicator);
496 iteration.truncated_num_object = iteration.num_object;
498 previous_diff = diff;
504 mid_step_function(
"truncate_symbolic - prepare");
505 iteration.prepare_truncate(mid_step_function);
509 mid_step_function(
"truncate_symbolic");
510 if (max_num_object == 0) {
512 size_t avail_memory = next_iteration.get_mem_size(localComm) + symbolic_iteration.get_mem_size(localComm) +
quids::utils::get_free_mem();
516 if (iteration.get_truncated_mem_size() > target_memory) {
517 size_t begin = 0, end = iteration.num_object;
518 while (end > begin + 1) {
519 size_t middle = (end + begin) / 2;
520 iteration.truncate(begin, middle, mid_step_function);
522 size_t used_memory = iteration.get_truncated_mem_size(begin);
523 if (used_memory < target_memory) {
524 target_memory -= used_memory;
531 iteration.truncate(0, max_num_object/local_size, mid_step_function);
535 if (iteration.num_object > 0) {
536 if (iteration.truncated_num_object < next_iteration.num_object)
537 next_iteration.resize(iteration.truncated_num_object);
538 size_t next_object_size = iteration.truncated_num_object*iteration.get_object_length()/iteration.num_object;
539 if (next_object_size < next_iteration.objects.size())
540 next_iteration.allocate(next_object_size);
545 iteration.generate_symbolic_iteration(rule, symbolic_iteration, mid_step_function);
546 symbolic_iteration.compute_collisions(communicator, mid_step_function);
547 symbolic_iteration.next_iteration_num_object = symbolic_iteration.num_object_after_interferences;
551 mid_step_function(
"truncate - prepare");
552 symbolic_iteration.prepare_truncate(mid_step_function);
556 mid_step_function(
"truncate");
557 if (max_num_object == 0) {
563 if (symbolic_iteration.get_truncated_mem_size() > target_memory) {
564 size_t begin = 0, end = symbolic_iteration.num_object_after_interferences;
565 while (end > begin + 1) {
566 size_t middle = (end + begin) / 2;
567 symbolic_iteration.truncate(begin, middle, mid_step_function);
569 size_t used_memory = symbolic_iteration.get_truncated_mem_size(begin);
570 if (used_memory < target_memory) {
571 target_memory -= used_memory;
578 symbolic_iteration.truncate(0, max_num_object/local_size, mid_step_function);
582 symbolic_iteration.finalize(rule, iteration, next_iteration, mid_step_function);
583 next_iteration.normalize(communicator, mid_step_function);
585 MPI_Comm_free(&localComm);
591 size_t mpi_iteration::get_truncated_mem_size(
size_t begin_num_object)
const {
592 static const size_t iteration_memory_size = ITERATION_MEMORY_SIZE;
594 static const float hash_map_size = HASH_MAP_OVERHEAD*2*
sizeof(size_t);
595 static const size_t symbolic_iteration_memory_size = SYMBOLIC_ITERATION_MEMORY_SIZE + MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE;
596 static const size_t mpi_symbolic_iteration_memory_size = MPI_SYMBOLIC_ITERATION_MEMORY_SIZE;
598 size_t mem_size = iteration_memory_size*(truncated_num_object - begin_num_object);
599 for (
size_t i = begin_num_object; i < truncated_num_object; ++i) {
600 size_t oid = truncated_oid[i];
602 mem_size += object_begin[oid + 1] - object_begin[oid];
603 mem_size += num_childs[oid]*(symbolic_iteration_memory_size + mpi_symbolic_iteration_memory_size + hash_map_size);
612 void mpi_symbolic_iteration::compute_collisions(MPI_Comm communicator,
quids::debug_t mid_step_function) {
614 MPI_Comm_size(communicator, &size);
615 MPI_Comm_rank(communicator, &rank);
618 return quids::symbolic_iteration::compute_collisions(mid_step_function);
623 num_threads = omp_get_num_threads();
625 int const n_segment = size*num_threads;
629 std::vector<int> load_balancing_begin(n_segment + 1);
630 std::vector<size_t> partition_begin(num_bucket + 1);
631 std::vector<size_t> total_partition_begin(num_bucket + 1);
633 std::vector<int> local_disp(n_segment + 1);
634 std::vector<int> local_count(n_segment);
635 std::vector<int> global_disp(n_segment + 1, 0);
636 std::vector<int> global_count(n_segment);
638 std::vector<int> send_disp(size + 1);
639 std::vector<int> send_count(size);
640 std::vector<int> receive_disp(size + 1);
641 std::vector<int> receive_count(size);
643 mid_step_function(
"compute_collisions - prepare");
654 partition_begin.begin(), partition_begin.end(),
655 [&](
size_t const oid) {
656 return hash[oid] >> offset;
660 #pragma omp parallel for
662 size_t oid = next_oid[id];
664 partitioned_mag[id] = magnitude[oid];
665 partitioned_hash[id] = hash[oid];
679 mid_step_function(
"compute_collisions - com");
680 MPI_Allreduce(&partition_begin[1], &total_partition_begin[1],
681 num_bucket, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
683 mid_step_function(
"compute_collisions - prepare");
684 total_partition_begin[0] = 0;
686 load_balancing_begin.begin(), load_balancing_begin.end());
688 for (
size_t i = 0; i <= n_segment; ++i)
689 load_balancing_begin[i] = i*num_bucket/n_segment;
694 for (
int i = 1; i <= n_segment; ++i) {
695 local_disp[i] = partition_begin[load_balancing_begin[i]];
696 local_count[i - 1] = local_disp[i] - local_disp[i - 1];
707 mid_step_function(
"compute_collisions - com");
708 MPI_Alltoall(&local_count [0], num_threads, MPI_INT,
709 &global_count[0], num_threads, MPI_INT, communicator);
711 mid_step_function(
"compute_collisions - prepare");
712 std::partial_sum(&global_count[0], &global_count[0] + n_segment, &global_disp[1]);
715 send_disp[0] = 0; receive_disp[0] = 0;
716 for (
int i = 1; i <= size; ++i) {
717 send_disp[i] = local_disp[i*num_threads];
718 send_count[i - 1] = send_disp[i] - send_disp[i - 1];
720 receive_disp[i] = global_disp[i*num_threads];
721 receive_count[i - 1] = receive_disp[i] - receive_disp[i - 1];
725 buffer_resize(receive_disp[size]);
728 mid_step_function(
"compute_collisions - com");
729 MPI_Alltoallv(&partitioned_hash[0], &send_count[0], &send_disp[0], MPI_UNSIGNED_LONG_LONG,
730 &hash_buffer[0], &receive_count[0], &receive_disp[0], MPI_UNSIGNED_LONG_LONG, communicator);
731 MPI_Alltoallv(&partitioned_mag[0], &send_count[0], &send_disp[0], mag_MPI_Datatype,
732 &mag_buffer[0], &receive_count[0], &receive_disp[0], mag_MPI_Datatype, communicator);
742 mid_step_function(
"compute_collisions - prepare");
744 for (
int node = 0; node < size; ++node)
745 std::fill(&node_id_buffer[0] + receive_disp[node],
746 &node_id_buffer[0] + receive_disp[node + 1],
750 std::cerr << size <<
"=size\n";
752 mid_step_function(
"compute_collisions - insert");
756 std::vector<size_t> global_num_object_after_interferences(size, 0);
757 const auto work_steal = [&](
int const node_id,
int const other_node_id) {
758#ifndef SKIP_WORK_STEALING
759 bool steal = global_num_object_after_interferences[other_node_id] >= global_num_object_after_interferences[node_id];
763 --global_num_object_after_interferences[other_node_id];
765 --global_num_object_after_interferences[node_id];
773 int const thread_id = omp_get_thread_num();
774 robin_hood::unordered_map<size_t, size_t> elimination_map;
777 size_t total_size = 0, max_count = 0;
778 for (
int node_id = 0; node_id < size; ++node_id) {
779 size_t this_size = global_count[node_id*num_threads + thread_id];
781 total_size += this_size;
782 max_count = std::max(this_size, max_count);
784 elimination_map.reserve(total_size);
788 for (
int j = 0; j < size; ++j) {
789 const int node_id = (rank + j)%size;
794 const size_t begin = i*
GRANULARITY + global_disp[node_id*num_threads + thread_id ];
795 const size_t end = std::min(begin +
GRANULARITY, (
size_t)global_disp[node_id*num_threads + thread_id + 1]);
797 for (
size_t oid = begin; oid < end; ++oid) {
798 ++global_num_object_after_interferences[node_id];
801 auto [it, unique] = elimination_map.insert({hash_buffer[oid], oid});
803 const size_t other_oid = it->second;
804 const int other_node_id = node_id_buffer[other_oid];
806 if (work_steal(node_id, other_node_id)) {
811 mag_buffer[oid] += mag_buffer[other_oid];
812 mag_buffer[other_oid] = 0;
815 mag_buffer[other_oid] += mag_buffer[oid];
832 mid_step_function(
"compute_collisions - com");
833 MPI_Alltoallv(&mag_buffer[0], &receive_count[0], &receive_disp[0], mag_MPI_Datatype,
834 &partitioned_mag[0], &send_count[0], &send_disp[0], mag_MPI_Datatype, communicator);
837 mid_step_function(
"compute_collisions - finalize");
838 #pragma omp parallel for
840 magnitude[next_oid[
id]] = partitioned_mag[
id];
851 size_t* partitioned_it = __gnu_parallel::partition(&next_oid[0], &next_oid[0] +
num_object,
852 [&](
size_t const &oid) {
853 return std::norm(magnitude[oid]) >
tolerance;
861 void mpi_iteration::normalize(MPI_Comm communicator,
quids::debug_t mid_step_function) {
865 mid_step_function(
"normalize");
870 #pragma omp parallel for reduction(+:node_total_proba)
876 PROBA_TYPE normalization_factor = std::sqrt(
total_proba);
878 if (normalization_factor != 1)
879 #pragma omp parallel for
881 magnitude[oid] /= normalization_factor;
885 mid_step_function(
"end");
895 MPI_Request request = MPI_REQUEST_NULL;
898 int rank, size, local_size;
899 MPI_Comm_size(communicator, &size);
900 MPI_Comm_rank(communicator, &rank);
901 MPI_Comm_split_type(communicator, MPI_COMM_TYPE_SHARED, rank, MPI_INFO_NULL, &localComm);
902 MPI_Comm_size(localComm, &local_size);
907 std::vector<size_t> sizes(size, 0);
908 MPI_Gather(&
num_object, 1, MPI_UNSIGNED_LONG_LONG, &sizes[0], 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
911 std::vector<int> pair_id(size, 0);
915 MPI_Scatter(&pair_id[0], 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
918 MPI_Gather(&
num_object, 1, MPI_UNSIGNED_LONG_LONG, NULL, 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
921 MPI_Scatter(NULL, 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
925 MPI_Barrier(localComm);
926 size_t total_iteration_size, iteration_size = quids::iteration::get_mem_size();
927 MPI_Allreduce(&iteration_size, &total_iteration_size, 1, Proba_MPI_Datatype, MPI_SUM, localComm);
929 MPI_Barrier(localComm);
932 if (this_pair_id == rank)
936 size_t other_num_object;
937 MPI_Isend(&
num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 , communicator, &request);
938 MPI_Isend(&
num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 , communicator, &request);
940 MPI_Recv(&other_num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 , communicator, MPI_STATUS_IGNORE);
941 MPI_Recv(&other_num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 , communicator, MPI_STATUS_IGNORE);
945 size_t num_object_sent = (
num_object - other_num_object) / 2;
946 send_objects(num_object_sent, this_pair_id, communicator,
false);
954 void mpi_iteration::equalize_symbolic(MPI_Comm communicator) {
955 MPI_Request request = MPI_REQUEST_NULL;
958 int rank, size, local_size;
959 MPI_Comm_size(communicator, &size);
960 MPI_Comm_rank(communicator, &rank);
961 MPI_Comm_split_type(communicator, MPI_COMM_TYPE_SHARED, rank, MPI_INFO_NULL, &localComm);
962 MPI_Comm_size(localComm, &local_size);
965 size_t num_symbolic_object = child_begin[
num_object];
970 std::vector<size_t> sizes(size, 0);
971 MPI_Gather(&num_symbolic_object, 1, MPI_UNSIGNED_LONG_LONG, &sizes[0], 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
974 std::vector<int> pair_id(size, 0);
978 MPI_Scatter(&pair_id[0], 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
981 MPI_Gather(&num_symbolic_object, 1, MPI_UNSIGNED_LONG_LONG, NULL, 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
984 MPI_Scatter(NULL, 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
988 MPI_Barrier(localComm);
989 size_t total_iteration_size, iteration_size = quids::iteration::get_mem_size();
990 MPI_Allreduce(&iteration_size, &total_iteration_size, 1, Proba_MPI_Datatype, MPI_SUM, localComm);
992 MPI_Barrier(localComm);
995 if (this_pair_id == rank)
999 uint other_num_object;
1000 uint other_ub_symbolic_object_size;
1002 MPI_Isend(&num_symbolic_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 , communicator, &request);
1003 MPI_Isend(&ub_symbolic_object_size, 1, MPI_UNSIGNED, this_pair_id, 0 , communicator, &request);
1005 MPI_Recv(&other_num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 , communicator, MPI_STATUS_IGNORE);
1006 MPI_Recv(&other_ub_symbolic_object_size, 1, MPI_UNSIGNED, this_pair_id, 0 , communicator, MPI_STATUS_IGNORE);
1008 ub_symbolic_object_size = std::max(ub_symbolic_object_size, other_ub_symbolic_object_size);
1011 if (num_symbolic_object > other_num_object) {
1012 size_t num_symbolic_object_to_send = (num_symbolic_object - other_num_object) / 2;
1015 auto limit_it = std::lower_bound(child_begin.begin(), child_begin.begin() +
num_object, num_symbolic_object - num_symbolic_object_to_send) - 1;
1016 size_t num_object_sent = std::distance(limit_it, child_begin.begin() +
num_object);
1018 send_objects(num_object_sent, this_pair_id, communicator,
true);
1019 }
else if (num_symbolic_object < other_num_object)
1028 MPI_Comm_size(communicator, &size);
1029 MPI_Comm_rank(communicator, &rank);
1032 if (rank == node_id) {
1033 for (
int node = 1; node < size; ++node) {
1034 int node_to_send = node <= node_id ? node - 1 : node;
1035 size_t num_object_sent = (initial_num_object * (node + 1)) / size - (initial_num_object * node) / size;
1038 send_objects(num_object_sent, node_to_send, communicator);
1051 MPI_Comm_size(communicator, &size);
1052 MPI_Comm_rank(communicator, &rank);
1054 if (rank == node_id) {
1055 for (
int node = 1; node < size; ++node) {
1056 int receive_node = node <= node_id ? node - 1 : node;
iteration (wave function) representation class
Definition: quids.hpp:144
PROBA_TYPE average_value(const observable_t observable) const
function to get the average value of a custom observable
Definition: quids.hpp:203
void pop(size_t n=1, bool normalize_=true)
function that removes a given number of object from the "tail" of the memory representation
Definition: quids.hpp:189
PROBA_TYPE total_proba
total probability retained after previous truncation (if any).
Definition: quids.hpp:149
size_t num_object
number of objects contained in the wave function
Definition: quids.hpp:147
mpi iteration (wave function) class, ineriting from the quids::iteration class
Definition: quids_mpi.hpp:64
void distribute_objects(MPI_Comm communicator, int node_id)
distribute objects eqaully from a single node to all others.
Definition: quids_mpi.hpp:1026
size_t get_total_num_object(MPI_Comm communicator) const
getter for the total amount of distributed objects.
Definition: quids_mpi.hpp:81
void equalize(MPI_Comm communicator)
equalize the number of object across node pairs.
Definition: quids_mpi.hpp:894
PROBA_TYPE average_value(const quids::observable_t observable) const
function to get the average local value of a custom observable.
Definition: quids_mpi.hpp:101
void receive_objects(int node, MPI_Comm communicator, bool receive_num_child=false, size_t max_mem=-1)
function to receive objects (at the "tail" of the memory representation).
Definition: quids_mpi.hpp:177
size_t get_total_num_symbolic_object(MPI_Comm communicator) const
getter for the total amount of distributed symbolic objects.
Definition: quids_mpi.hpp:92
PROBA_TYPE average_value(const quids::observable_t observable, MPI_Comm communicator) const
function to get the average value of a custom observable accross the total distributed wave function.
Definition: quids_mpi.hpp:109
friend void simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object, quids::debug_t mid_step_function)
function to apply a dynamic to a wave function distributed accross multiple nodes
Definition: quids_mpi.hpp:422
void gather_objects(MPI_Comm communicator, int node_id)
gather objects to a single node from all others.
Definition: quids_mpi.hpp:1049
void send_objects(size_t num_object_sent, int node, MPI_Comm communicator, bool send_num_child=false)
function to send objects (from the "tail" of the memory representation).
Definition: quids_mpi.hpp:124
mpi_iteration()
simple empty wavefunction constructor.
Definition: quids_mpi.hpp:70
PROBA_TYPE node_total_proba
total probability retained locally after previous truncation (if any).
Definition: quids_mpi.hpp:67
mpi_iteration(char *object_begin_, char *object_end_)
constructor that insert a single object with magnitude 1
Definition: quids_mpi.hpp:75
symbolic mpi iteration (computation intermediary)
Definition: quids_mpi.hpp:313
mpi_symbolic_iteration()
simple constructor
Definition: quids_mpi.hpp:316
size_t get_total_num_object(MPI_Comm communicator) const
getter for the total amount of distributed objects.
Definition: quids_mpi.hpp:322
friend void simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object, quids::debug_t mid_step_function)
function to apply a dynamic to a wave function distributed accross multiple nodes
Definition: quids_mpi.hpp:422
size_t get_total_num_object_after_interferences(MPI_Comm communicator) const
getter for the total amount of distributed objects after duplicate elimination.
Definition: quids_mpi.hpp:333
class represneting a dynamic (or rule).
Definition: quids.hpp:100
symbolic iteration (computation intermediary)
Definition: quids.hpp:333
size_t num_object_after_interferences
number of objects obrained after eliminating duplicates
Definition: quids.hpp:341
size_t num_object
number of objects considered in the symbolic step
Definition: quids.hpp:339
void resize(const Int n_, const uint align_byte_length_=std::alignment_of< T >()) const
align_byte_length_ should be used to reallign the buffer, which is not yet implemented as realloc doe...
Definition: vector.hpp:111
void make_equal_pairs(size_t *size_begin, size_t *size_end, int *pair_id)
function to partition into pair of almost equal sum
Definition: mpi_utils.hpp:9
MPI_Datatype get_mpi_datatype(T x)
function to get the corresponding MPI type of a variable
Definition: mpi_utils.hpp:33
mpi implementation namespace
Definition: quids_mpi.hpp:39
float min_equalize_step
minimum jump in equalize before breaking
Definition: quids_mpi.hpp:50
bool equalize_children
if true, equalize the number of children. Otherwise equalize the number of objects.
Definition: quids_mpi.hpp:55
size_t min_equalize_size
minimum number of object that should be attained (in at least one node) before equalizing (load-shari...
Definition: quids_mpi.hpp:46
class mpi_iteration mpi_it_t
mpi iteration type
Definition: quids_mpi.hpp:59
class mpi_symbolic_iteration mpi_sy_it_t
mpi symbolic iteration type
Definition: quids_mpi.hpp:61
void simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object=0, quids::debug_t mid_step_function=[](const char *){})
function to apply a dynamic to a wave function distributed accross multiple nodes
Definition: quids_mpi.hpp:422
float equalize_inbalance
maximum imbalance between nodes (max_obj - avg_obj)/max_obj allowed before equalizing.
Definition: quids_mpi.hpp:48
void load_balancing_from_prefix_sum(const UnsignedIntIterator1 prefixSumLoadBegin, const UnsignedIntIterator1 prefixSumLoadEnd, UnsignedIntIterator2 workSharingIndexesBegin, UnsignedIntIterator2 workSharingIndexesEnd)
simple CCP load balacing implementation.
Definition: load_balancing.hpp:14
int log_2_upper_bound(int n)
returns the upperbound bound of the log2
Definition: algorithm.hpp:16
int nearest_power_of_two(int n)
returns the upper bound as a power of two
Definition: algorithm.hpp:9
size_t get_free_mem()
function that get the total amount of available free memory.
Definition: memory.hpp:47
void parallel_generalized_partition_from_iota(idIteratorType idx_in, idIteratorType idx_in_end, long long int const iotaOffset, countIteratorType offset, countIteratorType offset_end, functionType const partitioner)
linear partitioning algorithm into n partitions without an initialized index list in parallel
Definition: algorithm.hpp:170
QuIDS namespace.
Definition: quids.hpp:55
void simulate(it_t &iteration, modifier_t const rule)
function to apply a modifer to a wave function
Definition: quids.hpp:431
std::function< void(const char *step)> debug_t
debuging function type
Definition: quids.hpp:85
class rule rule_t
dynamic (or rule) type
Definition: quids.hpp:79
int load_balancing_bucket_per_thread
number of load balancing buckets per thread
Definition: quids.hpp:63
std::function< PROBA_TYPE(char const *object_begin, char const *object_end)> observable_t
observable definition typedef
Definition: quids.hpp:83
float safety_margin
memory safety margin (0.2 = 80% memory usage target)
Definition: quids.hpp:61
PROBA_TYPE tolerance
tolerance for objects (remove objects with a smaller probability)
Definition: quids.hpp:59
#define GRANULARITY
granularity, i.e. the typical loop size we consider when doing a 2d loop.
Definition: quids_mpi.hpp:26