QuIDS: Quantum Irregular Dynamic Simulator
quids_mpi.hpp
Go to the documentation of this file.
1
3#pragma once
4
5typedef unsigned uint;
6
7#include "quids.hpp"
8
9#include <mpi.h>
10
11#include "utils/mpi_utils.hpp"
12
13#ifndef MIN_EQUALIZE_SIZE
14 #define MIN_EQUALIZE_SIZE 100
15#endif
16#ifndef GRANULARITY
18
26 #define GRANULARITY 64
27#endif
28#ifndef EQUALIZE_INBALANCE
29 #define EQUALIZE_INBALANCE 0.1
30#endif
31#ifndef MIN_INBALANCE_STEP
32 #define MIN_INBALANCE_STEP 0.2
33#endif
34
35#define MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE 2*sizeof(PROBA_TYPE) + sizeof(size_t)
36#define MPI_SYMBOLIC_ITERATION_MEMORY_SIZE 2*sizeof(PROBA_TYPE) + sizeof(size_t) + sizeof(int)
37
39namespace quids::mpi {
41 const static MPI_Datatype Proba_MPI_Datatype = utils::get_mpi_datatype((PROBA_TYPE)0);
43 const static MPI_Datatype mag_MPI_Datatype = utils::get_mpi_datatype((std::complex<PROBA_TYPE>)0);
44
46 size_t min_equalize_size = MIN_EQUALIZE_SIZE;
48 float equalize_inbalance = EQUALIZE_INBALANCE;
50 float min_equalize_step = MIN_INBALANCE_STEP;
52#ifdef EQUALIZE_OBJECTS
53 bool equalize_children = false;
54#else
55 bool equalize_children = true;
56#endif
57
59 typedef class mpi_iteration mpi_it_t;
62
65 public:
67 PROBA_TYPE node_total_proba = 0;
68
72
75 mpi_iteration(char* object_begin_, char* object_end_) : quids::iteration(object_begin_, object_end_) {}
76
78
81 size_t get_total_num_object(MPI_Comm communicator) const {
82 /* accumulate number of node */
83 size_t total_num_object;
84 MPI_Allreduce(&num_object, &total_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
85
86 return total_num_object;
87 }
89
92 size_t get_total_num_symbolic_object(MPI_Comm communicator) const {
93 size_t total_num_child = get_num_symbolic_object();
94 MPI_Allreduce(MPI_IN_PLACE, &total_num_child, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
95 return total_num_child;
96 }
98
101 PROBA_TYPE average_value(const quids::observable_t observable) const {
103 }
105
109 PROBA_TYPE average_value(const quids::observable_t observable, MPI_Comm communicator) const {
110 /* compute local average */
111 PROBA_TYPE avg = quids::iteration::average_value(observable);
112
113 /* accumulate average value */
114 MPI_Allreduce(MPI_IN_PLACE, &avg, 1, Proba_MPI_Datatype, MPI_SUM, communicator);
115 return avg;
116 }
118
124 void send_objects(size_t num_object_sent, int node, MPI_Comm communicator, bool send_num_child=false) {
125 const static size_t max_int = 1 << 31 - 1;
126
127 /* send size */
128 MPI_Send(&num_object_sent, 1, MPI_UNSIGNED_LONG_LONG, node, 0 /* tag */, communicator);
129 if (num_object_sent == 0)
130 return;
131 size_t begin = num_object - num_object_sent;
132 size_t send_object_size = object_begin[num_object] - object_begin[begin];
133 MPI_Send(&send_object_size, 1, MPI_UNSIGNED_LONG_LONG, node, 0 /* tag */, communicator);
134
135 /* verify send */
136 bool send;
137 MPI_Recv(&send, 1, MPI_UNSIGNED_LONG_LONG, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
138
139 if (send) {
140 /* prepare send */
141 size_t send_object_begin = object_begin[begin];
142 #pragma omp parallel for
143 for (size_t i = begin + 1; i <= num_object; ++i)
144 object_begin[i] -= send_object_begin;
145
146 /* send properties */
147 MPI_Send(&magnitude[begin], num_object_sent, mag_MPI_Datatype, node, 0 /* tag */, communicator);
148 MPI_Send(&object_begin[begin + 1], num_object_sent, MPI_UNSIGNED_LONG_LONG, node, 0 /* tag */, communicator);
149 MPI_Send(&object_size[begin], num_object_sent, MPI_UNSIGNED, node, 0 /* tag */, communicator);
150
151 /* send objects */
152 size_t send_object_size = object_begin[num_object];
153 while (send_object_size > max_int) {
154 MPI_Send(&objects[send_object_begin], max_int, MPI_CHAR, node, 0 /* tag */, communicator);
155
156 send_object_size -= max_int;
157 send_object_begin += max_int;
158 }
159
160 MPI_Send(&objects[send_object_begin], send_object_size, MPI_CHAR, node, 0 /* tag */, communicator);
161
162 if (send_num_child)
163 /* send num child */
164 MPI_Send(&num_childs[begin], num_object_sent, MPI_UNSIGNED, node, 0 /* tag */, communicator);
165
166 /* pop */
167 pop(num_object_sent, false);
168 }
169 }
171
177 void receive_objects(int node, MPI_Comm communicator, bool receive_num_child=false, size_t max_mem=-1) {
178 const static size_t max_int = 1 << 31 - 1;
179
180 /* receive size */
181 size_t num_object_sent, send_object_size;
182 MPI_Recv(&num_object_sent, 1, MPI_UNSIGNED_LONG_LONG, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
183 if (num_object_sent == 0)
184 return;
185 MPI_Recv(&send_object_size, 1, MPI_UNSIGNED_LONG_LONG, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
186
187 /* verify memory limit */
188 static const size_t iteration_memory_size = ITERATION_MEMORY_SIZE;
189 bool recv = num_object_sent*iteration_memory_size + send_object_size < max_mem;
190 MPI_Send(&recv, 1, MPI_CHAR, node, 0 /* tag*/, communicator);
191
192 if (recv) {
193 /* prepare state */
194 size_t send_object_begin = object_begin[num_object];
195 resize(num_object + num_object_sent);
196 allocate(send_object_begin + send_object_size);
197
198 /* receive properties */
199 MPI_Recv(&magnitude[num_object], num_object_sent, mag_MPI_Datatype, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
200 MPI_Recv(&object_begin[num_object + 1], num_object_sent, MPI_UNSIGNED_LONG_LONG, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
201 MPI_Recv(&object_size[num_object], num_object_sent, MPI_UNSIGNED, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
202
203 /* receive objects */
204 size_t object_offset = send_object_begin;
205 while (send_object_size > max_int) {
206 MPI_Recv(&objects[send_object_begin], max_int, MPI_CHAR, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
207
208 send_object_size -= max_int;
209 send_object_begin += max_int;
210 }
211
212 MPI_Recv(&objects[send_object_begin], send_object_size, MPI_CHAR, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
213
214 /* correct values */
215 #pragma omp parallel for
216 for (size_t i = num_object + 1; i <= num_object + num_object_sent; ++i)
217 object_begin[i] += object_offset;
218
219 if (receive_num_child) {
220 /* receive num child */
221 MPI_Recv(&num_childs[num_object], num_object_sent, MPI_UNSIGNED, node, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
222
223 /* partial sum */
224 num_childs[num_object] += child_begin[num_object];
225 __gnu_parallel::partial_sum(num_childs.begin() + num_object, num_childs.begin() + num_object + num_object_sent, child_begin.begin() + num_object + 1);
226 num_childs[num_object] -= child_begin[num_object];
227 }
228
229 num_object += num_object_sent;
230 }
231 }
232
234
237 void equalize(MPI_Comm communicator);
239
243 void distribute_objects(MPI_Comm communicator, int node_id);
245
249 void gather_objects(MPI_Comm communicator, int node_id);
250
251 private:
253 friend void inline simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object, quids::debug_t mid_step_function);
254
255 void equalize_symbolic(MPI_Comm communicator);
256 void normalize(MPI_Comm communicator, quids::debug_t mid_step_function=[](const char*){});
257
258
259
260 /*
261 utils functions
262 */
263 size_t inline get_mem_size(MPI_Comm communicator) const {
264 static const size_t iteration_memory_size = ITERATION_MEMORY_SIZE;
265
266 size_t total_size, local_size = iteration_memory_size*magnitude.size() + objects.size();
267 MPI_Allreduce(&local_size, &total_size, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
268 return total_size;
269 }
270 size_t inline get_total_truncated_num_object(MPI_Comm communicator) const {
271 size_t total_truncated_num_object;
272 MPI_Allreduce(&truncated_num_object, &total_truncated_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
273 return total_truncated_num_object;
274 }
275
276 /*
277 function to compute the maximum and minimum per node size
278 */
279 float get_avg_num_symbolic_object_per_task(MPI_Comm communicator) const {
280 size_t total_num_object_per_node = get_num_symbolic_object();
281 MPI_Allreduce(MPI_IN_PLACE, &total_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
282
283 int size;
284 MPI_Comm_size(communicator, &size);
285
286 return (float)total_num_object_per_node/size;
287 }
288 float get_avg_num_object_per_task(MPI_Comm communicator) const {
289 size_t max_num_object_per_node;
290 MPI_Allreduce(&num_object, &max_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
291
292 int size;
293 MPI_Comm_size(communicator, &size);
294
295 return (float)max_num_object_per_node/size;
296 }
297 size_t get_max_num_symbolic_object_per_task(MPI_Comm communicator) const {
298 size_t max_num_object_per_node = get_num_symbolic_object();
299 MPI_Allreduce(MPI_IN_PLACE, &max_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, communicator);
300 return max_num_object_per_node;
301 }
302 size_t get_max_num_object_per_task(MPI_Comm communicator) const {
303 size_t max_num_object_per_node;
304 MPI_Allreduce(&num_object, &max_num_object_per_node, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, communicator);
305 return max_num_object_per_node;
306 }
307
308
309 size_t get_truncated_mem_size(size_t begin_num_object=0) const;
310 };
311
314 public:
317
319
322 size_t inline get_total_num_object(MPI_Comm communicator) const {
323 /* accumulate number of node */
324 size_t total_num_object;
325 MPI_Allreduce(&num_object, &total_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
326
327 return total_num_object;
328 }
330
333 size_t inline get_total_num_object_after_interferences(MPI_Comm communicator) const {
334 /* accumulate number of node */
335 size_t total_num_object_after_interference;
336 MPI_Allreduce(&num_object_after_interferences, &total_num_object_after_interference, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
337
338 return total_num_object_after_interference;
339 }
340
341 private:
342 friend mpi_iteration;
343 friend void inline simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object, quids::debug_t mid_step_function);
344
345 quids::utils::fast_vector<mag_t> partitioned_mag;
346 quids::utils::fast_vector<size_t> partitioned_hash;
347
350 quids::utils::fast_vector<int> node_id_buffer;
351
352 void compute_collisions(MPI_Comm communicator, quids::debug_t mid_step_function=[](const char*){});
353 void mpi_resize(size_t size) {
354 #pragma omp parallel sections
355 {
356 #pragma omp section
357 partitioned_mag.resize(size);
358
359 #pragma omp section
360 partitioned_hash.resize(size);
361 }
362 }
363 void buffer_resize(size_t size) {
364 #pragma omp parallel sections
365 {
366 #pragma omp section
367 mag_buffer.resize(size);
368
369 #pragma omp section
370 hash_buffer.resize(size);
371
372 #pragma omp section
373 node_id_buffer.resize(size);
374
375 #pragma omp section
376 if (size > next_oid_partitioner_buffer.size())
377 next_oid_partitioner_buffer.resize(size);
378 }
379 }
380
381
382
383 /*
384 utils functions
385 */
386 float inline get_avg_object_size(MPI_Comm communicator) const {
387 static const float hash_map_size = HASH_MAP_OVERHEAD*2*sizeof(size_t);
388
389 static const size_t symbolic_iteration_memory_size = SYMBOLIC_ITERATION_MEMORY_SIZE + MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE;
390
391 static const size_t mpi_symbolic_iteration_memory_size = MPI_SYMBOLIC_ITERATION_MEMORY_SIZE;
392 return (float)symbolic_iteration_memory_size + (float)mpi_symbolic_iteration_memory_size + hash_map_size;
393 }
394 size_t inline get_mem_size(MPI_Comm communicator) const {
395 static const size_t symbolic_iteration_memory_size = SYMBOLIC_ITERATION_MEMORY_SIZE + MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE;
396 size_t memory_size = magnitude.size()*symbolic_iteration_memory_size;
397
398 static const size_t mpi_symbolic_iteration_memory_size = MPI_SYMBOLIC_ITERATION_MEMORY_SIZE;
399 memory_size += mag_buffer.size()*mpi_symbolic_iteration_memory_size;
400
401 size_t total_size = mpi_symbolic_iteration_memory_size*magnitude.size();
402 MPI_Allreduce(MPI_IN_PLACE, &total_size, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
403 return total_size;
404 }
405 size_t inline get_total_next_iteration_num_object(MPI_Comm communicator) const {
406 size_t total_next_iteration_num_object;
407 MPI_Allreduce(&next_iteration_num_object, &total_next_iteration_num_object, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
408 return total_next_iteration_num_object;
409 }
410 };
411
413
422 void simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object=0, quids::debug_t mid_step_function=[](const char*){}) {
423 /* get local size */
424 MPI_Comm localComm;
425 int rank, size, local_size;
426 MPI_Comm_size(communicator, &size);
427 MPI_Comm_rank(communicator, &rank);
428 MPI_Comm_split_type(communicator, MPI_COMM_TYPE_SHARED, rank, MPI_INFO_NULL, &localComm);
429 MPI_Comm_size(localComm, &local_size);
430
431 const int max_equalize = quids::utils::log_2_upper_bound(size);
432
433
434
435
436
437
438 if (size == 1)
439 return quids::simulate(iteration, rule, next_iteration, symbolic_iteration, max_num_object, mid_step_function);
440
441
442 /* equalize objects */
443 if (!equalize_children) {
444 mid_step_function("equalize_object");
445 float previous_diff, avg_n_object = iteration.get_avg_num_object_per_task(communicator);
446 for (int i = 0; i < max_equalize; ++i) {
447 /* check for condition */
448 size_t max_n_object = iteration.get_max_num_object_per_task(communicator);
449 float diff = (float)max_n_object - avg_n_object;
450 float inbalance = diff/max_n_object;
451
452 // debug:
453 if (rank == 0)
454 std::cerr << "\ti=" << i << "/" << max_equalize << "\tmax=" << max_n_object << ", avg=" << avg_n_object << ", inbalance=" << inbalance << "\n";
455
456 if (max_n_object < min_equalize_size ||
457 inbalance < equalize_inbalance ||
458 (diff > previous_diff*(1 - min_equalize_step)) && i > 0)
459 break;
460
461 /* actually equalize */
462 iteration.equalize(communicator);
463
464 previous_diff = diff;
465 }
466 }
467
468
469 /* start actual simulation */
470 iteration.compute_num_child(rule, mid_step_function);
471 iteration.truncated_num_object = iteration.num_object;
472
473
474 /* equalize symbolic objects */
475 if (equalize_children) {
476 mid_step_function("equalize_child");
477 float previous_diff, avg_n_child = iteration.get_avg_num_symbolic_object_per_task(communicator);
478 for (int i = 0; i < max_equalize; ++i) {
479 /* check for condition */
480 size_t max_n_object = iteration.get_max_num_object_per_task(communicator);
481 size_t max_n_child = iteration.get_max_num_symbolic_object_per_task(communicator);
482 float diff = (float)max_n_child - avg_n_child;
483 float inbalance = diff/max_n_child;
484
485 // debug:
486 if (rank == 0)
487 std::cerr << "\ti=" << i << "/" << max_equalize << "\tmax=" << max_n_child << ", avg=" << avg_n_child << ", inbalance=" << inbalance << "\n";
488
489 if (max_n_object < min_equalize_size ||
490 inbalance < equalize_inbalance ||
491 (diff > previous_diff*(1 - min_equalize_step)) && i > 0)
492 break;
493
494 /* actually equalize */
495 iteration.equalize_symbolic(communicator);
496 iteration.truncated_num_object = iteration.num_object;
497
498 previous_diff = diff;
499 }
500 }
501
502
503 /* prepare truncate */
504 mid_step_function("truncate_symbolic - prepare");
505 iteration.prepare_truncate(mid_step_function);
506
507
508 /* max_num_object */
509 mid_step_function("truncate_symbolic");
510 if (max_num_object == 0) {
511 /* available memory */
512 size_t avail_memory = next_iteration.get_mem_size(localComm) + symbolic_iteration.get_mem_size(localComm) + quids::utils::get_free_mem();
513 size_t target_memory = avail_memory/local_size*(1 - quids::safety_margin);
514
515 /* actually truncate by binary search */
516 if (iteration.get_truncated_mem_size() > target_memory) {
517 size_t begin = 0, end = iteration.num_object;
518 while (end > begin + 1) {
519 size_t middle = (end + begin) / 2;
520 iteration.truncate(begin, middle, mid_step_function);
521
522 size_t used_memory = iteration.get_truncated_mem_size(begin);
523 if (used_memory < target_memory) {
524 target_memory -= used_memory;
525 begin = middle;
526 } else
527 end = middle;
528 }
529 }
530 } else
531 iteration.truncate(0, max_num_object/local_size, mid_step_function);
532
533
534 /* downsize if needed */
535 if (iteration.num_object > 0) {
536 if (iteration.truncated_num_object < next_iteration.num_object)
537 next_iteration.resize(iteration.truncated_num_object);
538 size_t next_object_size = iteration.truncated_num_object*iteration.get_object_length()/iteration.num_object;
539 if (next_object_size < next_iteration.objects.size())
540 next_iteration.allocate(next_object_size);
541 }
542
543
544 /* rest of the simulation */
545 iteration.generate_symbolic_iteration(rule, symbolic_iteration, mid_step_function);
546 symbolic_iteration.compute_collisions(communicator, mid_step_function);
547 symbolic_iteration.next_iteration_num_object = symbolic_iteration.num_object_after_interferences;
548
549
550 /* prepare truncate */
551 mid_step_function("truncate - prepare");
552 symbolic_iteration.prepare_truncate(mid_step_function);
553
554
555 /* second max_num_object */
556 mid_step_function("truncate");
557 if (max_num_object == 0) {
558 /* available memory */
559 size_t avail_memory = next_iteration.get_mem_size(localComm) + quids::utils::get_free_mem();
560 size_t target_memory = avail_memory/local_size*(1 - quids::safety_margin);
561
562 /* actually truncate by binary search */
563 if (symbolic_iteration.get_truncated_mem_size() > target_memory) {
564 size_t begin = 0, end = symbolic_iteration.num_object_after_interferences;
565 while (end > begin + 1) {
566 size_t middle = (end + begin) / 2;
567 symbolic_iteration.truncate(begin, middle, mid_step_function);
568
569 size_t used_memory = symbolic_iteration.get_truncated_mem_size(begin);
570 if (used_memory < target_memory) {
571 target_memory -= used_memory;
572 begin = middle;
573 } else
574 end = middle;
575 }
576 }
577 } else
578 symbolic_iteration.truncate(0, max_num_object/local_size, mid_step_function);
579
580
581 /* finalize simulation */
582 symbolic_iteration.finalize(rule, iteration, next_iteration, mid_step_function);
583 next_iteration.normalize(communicator, mid_step_function);
584
585 MPI_Comm_free(&localComm);
586 }
587
588 /*
589 get the truncated memory size
590 */
591 size_t mpi_iteration::get_truncated_mem_size(size_t begin_num_object) const {
592 static const size_t iteration_memory_size = ITERATION_MEMORY_SIZE;
593
594 static const float hash_map_size = HASH_MAP_OVERHEAD*2*sizeof(size_t);
595 static const size_t symbolic_iteration_memory_size = SYMBOLIC_ITERATION_MEMORY_SIZE + MPI_SPECIFIC_SYMBOLIC_ITERATION_MEMORY_SIZE;
596 static const size_t mpi_symbolic_iteration_memory_size = MPI_SYMBOLIC_ITERATION_MEMORY_SIZE;
597
598 size_t mem_size = iteration_memory_size*(truncated_num_object - begin_num_object);
599 for (size_t i = begin_num_object; i < truncated_num_object; ++i) {
600 size_t oid = truncated_oid[i];
601
602 mem_size += object_begin[oid + 1] - object_begin[oid];
603 mem_size += num_childs[oid]*(symbolic_iteration_memory_size + mpi_symbolic_iteration_memory_size + hash_map_size);
604 }
605
606 return mem_size;
607 }
608
609 /*
610 distributed interference function
611 */
612 void mpi_symbolic_iteration::compute_collisions(MPI_Comm communicator, quids::debug_t mid_step_function) {
613 int size, rank;
614 MPI_Comm_size(communicator, &size);
615 MPI_Comm_rank(communicator, &rank);
616
617 if (size == 1)
618 return quids::symbolic_iteration::compute_collisions(mid_step_function);
619
620 int num_threads;
621 #pragma omp parallel
622 #pragma omp single
623 num_threads = omp_get_num_threads();
624
625 int const n_segment = size*num_threads;
627 size_t const offset = 8*sizeof(size_t) - quids::utils::log_2_upper_bound(num_bucket);
628
629 std::vector<int> load_balancing_begin(n_segment + 1);
630 std::vector<size_t> partition_begin(num_bucket + 1);
631 std::vector<size_t> total_partition_begin(num_bucket + 1);
632
633 std::vector<int> local_disp(n_segment + 1);
634 std::vector<int> local_count(n_segment);
635 std::vector<int> global_disp(n_segment + 1, 0);
636 std::vector<int> global_count(n_segment);
637
638 std::vector<int> send_disp(size + 1);
639 std::vector<int> send_count(size);
640 std::vector<int> receive_disp(size + 1);
641 std::vector<int> receive_count(size);
642
643 mid_step_function("compute_collisions - prepare");
644 mpi_resize(num_object);
645
646
647
648
649
650 /* !!!!!!!!!!!!!!!!
651 partition
652 !!!!!!!!!!!!!!!! */
654 partition_begin.begin(), partition_begin.end(),
655 [&](size_t const oid) {
656 return hash[oid] >> offset;
657 });
658
659 /* generate partitioned hash */
660 #pragma omp parallel for
661 for (size_t id = 0; id < num_object; ++id) {
662 size_t oid = next_oid[id];
663
664 partitioned_mag[id] = magnitude[oid];
665 partitioned_hash[id] = hash[oid];
666 }
667
668
669
670
671
672
673
674 /* !!!!!!!!!!!!!!!!
675 load-balance
676 !!!!!!!!!!!!!!!! */
677
678#ifndef SKIP_CCP
679 mid_step_function("compute_collisions - com");
680 MPI_Allreduce(&partition_begin[1], &total_partition_begin[1],
681 num_bucket, MPI_UNSIGNED_LONG_LONG, MPI_SUM, communicator);
682
683 mid_step_function("compute_collisions - prepare");
684 total_partition_begin[0] = 0;
685 quids::utils::load_balancing_from_prefix_sum(total_partition_begin.begin(), total_partition_begin.end(),
686 load_balancing_begin.begin(), load_balancing_begin.end());
687#else
688 for (size_t i = 0; i <= n_segment; ++i)
689 load_balancing_begin[i] = i*num_bucket/n_segment;
690#endif
691
692 /* recompute local count and disp */
693 local_disp[0] = 0;
694 for (int i = 1; i <= n_segment; ++i) {
695 local_disp[i] = partition_begin[load_balancing_begin[i]];
696 local_count[i - 1] = local_disp[i] - local_disp[i - 1];
697 }
698
699
700
701
702
703
704 /* !!!!!!!!!!!!!!!!
705 share
706 !!!!!!!!!!!!!!!! */
707 mid_step_function("compute_collisions - com");
708 MPI_Alltoall(&local_count [0], num_threads, MPI_INT,
709 &global_count[0], num_threads, MPI_INT, communicator);
710
711 mid_step_function("compute_collisions - prepare");
712 std::partial_sum(&global_count[0], &global_count[0] + n_segment, &global_disp[1]);
713
714 /* recompute send and receive count and disp */
715 send_disp[0] = 0; receive_disp[0] = 0;
716 for (int i = 1; i <= size; ++i) {
717 send_disp[i] = local_disp[i*num_threads];
718 send_count[i - 1] = send_disp[i] - send_disp[i - 1];
719
720 receive_disp[i] = global_disp[i*num_threads];
721 receive_count[i - 1] = receive_disp[i] - receive_disp[i - 1];
722 }
723
724 /* resize */
725 buffer_resize(receive_disp[size]);
726
727 /* actualy share partition */
728 mid_step_function("compute_collisions - com");
729 MPI_Alltoallv(&partitioned_hash[0], &send_count[0], &send_disp[0], MPI_UNSIGNED_LONG_LONG,
730 &hash_buffer[0], &receive_count[0], &receive_disp[0], MPI_UNSIGNED_LONG_LONG, communicator);
731 MPI_Alltoallv(&partitioned_mag[0], &send_count[0], &send_disp[0], mag_MPI_Datatype,
732 &mag_buffer[0], &receive_count[0], &receive_disp[0], mag_MPI_Datatype, communicator);
733
734
735
736
737
738
739 /* !!!!!!!!!!!!!!!!
740 compute-collision
741 !!!!!!!!!!!!!!!! */
742 mid_step_function("compute_collisions - prepare");
743 /* prepare node_id buffer */
744 for (int node = 0; node < size; ++node)
745 std::fill(&node_id_buffer[0] + receive_disp[node],
746 &node_id_buffer[0] + receive_disp[node + 1],
747 node);
748
749 if (rank == 0)
750 std::cerr << size << "=size\n";
751
752 mid_step_function("compute_collisions - insert");
753 #pragma omp parallel
754 {
755 // work stealing oracle
756 std::vector<size_t> global_num_object_after_interferences(size, 0);
757 const auto work_steal = [&](int const node_id, int const other_node_id) {
758#ifndef SKIP_WORK_STEALING
759 bool steal = global_num_object_after_interferences[other_node_id] >= global_num_object_after_interferences[node_id];
760
761 // update counts
762 if (steal) {
763 --global_num_object_after_interferences[other_node_id];
764 } else
765 --global_num_object_after_interferences[node_id];
766
767 return steal;
768#else
769 return false;
770#endif
771 };
772
773 int const thread_id = omp_get_thread_num();
774 robin_hood::unordered_map<size_t, size_t> elimination_map;
775
776 /* compute total_size */
777 size_t total_size = 0, max_count = 0;
778 for (int node_id = 0; node_id < size; ++node_id) {
779 size_t this_size = global_count[node_id*num_threads + thread_id];
780
781 total_size += this_size;
782 max_count = std::max(this_size, max_count);
783 }
784 elimination_map.reserve(total_size);
785
786 /* insert into hashmap */
787 for (size_t i = 0; GRANULARITY*i < max_count; ++i)
788 for (int j = 0; j < size; ++j) {
789 const int node_id = (rank + j)%size;
790
791 if (node_id >= size)
792 throw;
793
794 const size_t begin = i*GRANULARITY + global_disp[node_id*num_threads + thread_id ];
795 const size_t end = std::min(begin + GRANULARITY, (size_t)global_disp[node_id*num_threads + thread_id + 1]);
796
797 for (size_t oid = begin; oid < end; ++oid) {
798 ++global_num_object_after_interferences[node_id];
799
800 /* accessing key */
801 auto [it, unique] = elimination_map.insert({hash_buffer[oid], oid});
802 if (!unique) {
803 const size_t other_oid = it->second;
804 const int other_node_id = node_id_buffer[other_oid];
805
806 if (work_steal(node_id, other_node_id)) {
807 /* swap objects */
808 it->second = oid;
809
810 /* add probabilities */
811 mag_buffer[oid] += mag_buffer[other_oid];
812 mag_buffer[other_oid] = 0;
813 } else {
814 /* if it exist add the probabilities */
815 mag_buffer[other_oid] += mag_buffer[oid];
816 mag_buffer[oid] = 0;
817 }
818 }
819 }
820 }
821 }
822
823
824
825
826
827
828
829 /* !!!!!!!!!!!!!!!!
830 share-back
831 !!!!!!!!!!!!!!!! */
832 mid_step_function("compute_collisions - com");
833 MPI_Alltoallv(&mag_buffer[0], &receive_count[0], &receive_disp[0], mag_MPI_Datatype,
834 &partitioned_mag[0], &send_count[0], &send_disp[0], mag_MPI_Datatype, communicator);
835
836 /* un-partition magnitude */
837 mid_step_function("compute_collisions - finalize");
838 #pragma omp parallel for
839 for (size_t id = 0; id < num_object; ++id)
840 magnitude[next_oid[id]] = partitioned_mag[id];
841
842
843
844
845
846
847
848 /* !!!!!!!!!!!!!!!!
849 partition
850 !!!!!!!!!!!!!!!! */
851 size_t* partitioned_it = __gnu_parallel::partition(&next_oid[0], &next_oid[0] + num_object,
852 [&](size_t const &oid) {
853 return std::norm(magnitude[oid]) > tolerance;
854 });
855 num_object_after_interferences = std::distance(&next_oid[0], partitioned_it);
856 }
857
858 /*
859 distributed normalization function
860 */
861 void mpi_iteration::normalize(MPI_Comm communicator, quids::debug_t mid_step_function) {
862 /* !!!!!!!!!!!!!!!!
863 normalize
864 !!!!!!!!!!!!!!!! */
865 mid_step_function("normalize");
866
868 total_proba = 0;
869
870 #pragma omp parallel for reduction(+:node_total_proba)
871 for (size_t oid = 0; oid < num_object; ++oid)
872 node_total_proba += std::norm(magnitude[oid]);
873
874 /* accumulate probabilities on the master node */
875 MPI_Allreduce(&node_total_proba, &total_proba, 1, Proba_MPI_Datatype, MPI_SUM, communicator);
876 PROBA_TYPE normalization_factor = std::sqrt(total_proba);
877
878 if (normalization_factor != 1)
879 #pragma omp parallel for
880 for (size_t oid = 0; oid < num_object; ++oid)
881 magnitude[oid] /= normalization_factor;
882
884
885 mid_step_function("end");
886 }
887
888 /*
889 "utility" functions from here on:
890 */
891 /*
892 equalize the number of objects across nodes
893 */
894 void mpi_iteration::equalize(MPI_Comm communicator) {
895 MPI_Request request = MPI_REQUEST_NULL;
896
897 MPI_Comm localComm;
898 int rank, size, local_size;
899 MPI_Comm_size(communicator, &size);
900 MPI_Comm_rank(communicator, &rank);
901 MPI_Comm_split_type(communicator, MPI_COMM_TYPE_SHARED, rank, MPI_INFO_NULL, &localComm);
902 MPI_Comm_size(localComm, &local_size);
903
904 int this_pair_id;
905 if (rank == 0) {
906 /* gather sizes */
907 std::vector<size_t> sizes(size, 0);
908 MPI_Gather(&num_object, 1, MPI_UNSIGNED_LONG_LONG, &sizes[0], 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
909
910 /* compute pair_id*/
911 std::vector<int> pair_id(size, 0);
912 utils::make_equal_pairs(&sizes[0], &sizes[0] + size, &pair_id[0]);
913
914 /* scatter pair_id */
915 MPI_Scatter(&pair_id[0], 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
916 } else {
917 /* gather sizes */
918 MPI_Gather(&num_object, 1, MPI_UNSIGNED_LONG_LONG, NULL, 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
919
920 /* scatter pair_id */
921 MPI_Scatter(NULL, 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
922 }
923
924 /* get available memory */
925 MPI_Barrier(localComm);
926 size_t total_iteration_size, iteration_size = quids::iteration::get_mem_size();
927 MPI_Allreduce(&iteration_size, &total_iteration_size, 1, Proba_MPI_Datatype, MPI_SUM, localComm);
928 size_t avail_memory = ((quids::utils::get_free_mem() + total_iteration_size)/local_size - iteration_size)*(1 - quids::safety_margin);
929 MPI_Barrier(localComm);
930
931 /* skip if this node is alone */
932 if (this_pair_id == rank)
933 return;
934
935 /* get the number of objects of the respective pairs */
936 size_t other_num_object;
937 MPI_Isend(&num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 /* tag */, communicator, &request);
938 MPI_Isend(&num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 /* tag */, communicator, &request);
939
940 MPI_Recv(&other_num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
941 MPI_Recv(&other_num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
942
943 /* equalize amoung pairs */
944 if (num_object > other_num_object) {
945 size_t num_object_sent = (num_object - other_num_object) / 2;
946 send_objects(num_object_sent, this_pair_id, communicator, false);
947 } else if (num_object < other_num_object)
948 receive_objects(this_pair_id, communicator, false, avail_memory);
949 }
950
951 /*
952 equalize symbolic object across nodes
953 */
954 void mpi_iteration::equalize_symbolic(MPI_Comm communicator) {
955 MPI_Request request = MPI_REQUEST_NULL;
956
957 MPI_Comm localComm;
958 int rank, size, local_size;
959 MPI_Comm_size(communicator, &size);
960 MPI_Comm_rank(communicator, &rank);
961 MPI_Comm_split_type(communicator, MPI_COMM_TYPE_SHARED, rank, MPI_INFO_NULL, &localComm);
962 MPI_Comm_size(localComm, &local_size);
963
964 /* compute the number of symbolic objects */
965 size_t num_symbolic_object = child_begin[num_object];
966
967 int this_pair_id;
968 if (rank == 0) {
969 /* gather sizes */
970 std::vector<size_t> sizes(size, 0);
971 MPI_Gather(&num_symbolic_object, 1, MPI_UNSIGNED_LONG_LONG, &sizes[0], 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
972
973 /* compute pair_id*/
974 std::vector<int> pair_id(size, 0);
975 utils::make_equal_pairs(&sizes[0], &sizes[0] + size, &pair_id[0]);
976
977 /* scatter pair_id */
978 MPI_Scatter(&pair_id[0], 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
979 } else {
980 /* gather sizes */
981 MPI_Gather(&num_symbolic_object, 1, MPI_UNSIGNED_LONG_LONG, NULL, 1, MPI_UNSIGNED_LONG_LONG, 0, communicator);
982
983 /* scatter pair_id */
984 MPI_Scatter(NULL, 1, MPI_INT, &this_pair_id, 1, MPI_INT, 0, communicator);
985 }
986
987 /* get available memory */
988 MPI_Barrier(localComm);
989 size_t total_iteration_size, iteration_size = quids::iteration::get_mem_size();
990 MPI_Allreduce(&iteration_size, &total_iteration_size, 1, Proba_MPI_Datatype, MPI_SUM, localComm);
991 size_t avail_memory = ((quids::utils::get_free_mem() + total_iteration_size)/local_size - iteration_size)*(1 - quids::safety_margin);
992 MPI_Barrier(localComm);
993
994 /* skip if this node is alone */
995 if (this_pair_id == rank)
996 return;
997
998 /* get the number of objects of the respective pairs */
999 uint other_num_object;
1000 uint other_ub_symbolic_object_size;
1001
1002 MPI_Isend(&num_symbolic_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 /* tag */, communicator, &request);
1003 MPI_Isend(&ub_symbolic_object_size, 1, MPI_UNSIGNED, this_pair_id, 0 /* tag */, communicator, &request);
1004
1005 MPI_Recv(&other_num_object, 1, MPI_UNSIGNED_LONG_LONG, this_pair_id, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
1006 MPI_Recv(&other_ub_symbolic_object_size, 1, MPI_UNSIGNED, this_pair_id, 0 /* tag */, communicator, MPI_STATUS_IGNORE);
1007
1008 ub_symbolic_object_size = std::max(ub_symbolic_object_size, other_ub_symbolic_object_size);
1009
1010 /* equalize amoung pairs */
1011 if (num_symbolic_object > other_num_object) {
1012 size_t num_symbolic_object_to_send = (num_symbolic_object - other_num_object) / 2;
1013
1014 /* find the actual number of object to send */
1015 auto limit_it = std::lower_bound(child_begin.begin(), child_begin.begin() + num_object, num_symbolic_object - num_symbolic_object_to_send) - 1;
1016 size_t num_object_sent = std::distance(limit_it, child_begin.begin() + num_object);
1017
1018 send_objects(num_object_sent, this_pair_id, communicator, true);
1019 } else if (num_symbolic_object < other_num_object)
1020 receive_objects(this_pair_id, communicator, true, avail_memory);
1021 }
1022
1023 /*
1024 function to distribute objects across nodes
1025 */
1026 void mpi_iteration::distribute_objects(MPI_Comm communicator, int node_id=0) {
1027 int size, rank;
1028 MPI_Comm_size(communicator, &size);
1029 MPI_Comm_rank(communicator, &rank);
1030
1031 size_t initial_num_object = num_object;
1032 if (rank == node_id) {
1033 for (int node = 1; node < size; ++node) {
1034 int node_to_send = node <= node_id ? node - 1 : node; //skip this node
1035 size_t num_object_sent = (initial_num_object * (node + 1)) / size - (initial_num_object * node) / size; //better way to spread evently
1036
1037 /* send objects */
1038 send_objects(num_object_sent, node_to_send, communicator);
1039 }
1040
1041 } else
1042 /* receive objects */
1043 receive_objects(node_id, communicator);
1044 }
1045
1046 /*
1047 function to gather object from all nodes
1048 */
1049 void mpi_iteration::gather_objects(MPI_Comm communicator, int node_id=0) {
1050 int size, rank;
1051 MPI_Comm_size(communicator, &size);
1052 MPI_Comm_rank(communicator, &rank);
1053
1054 if (rank == node_id) {
1055 for (int node = 1; node < size; ++node) {
1056 int receive_node = node <= node_id ? node - 1 : node;
1057
1058 /* receive objects */
1059 receive_objects(receive_node, communicator);
1060 }
1061
1062 } else
1063 /* send objects */
1064 send_objects(num_object, node_id, communicator);
1065
1066 /* compute node_total_proba */
1067 node_total_proba = rank == node_id;
1068 }
1069}
iteration (wave function) representation class
Definition: quids.hpp:144
PROBA_TYPE average_value(const observable_t observable) const
function to get the average value of a custom observable
Definition: quids.hpp:203
void pop(size_t n=1, bool normalize_=true)
function that removes a given number of object from the "tail" of the memory representation
Definition: quids.hpp:189
PROBA_TYPE total_proba
total probability retained after previous truncation (if any).
Definition: quids.hpp:149
size_t num_object
number of objects contained in the wave function
Definition: quids.hpp:147
mpi iteration (wave function) class, ineriting from the quids::iteration class
Definition: quids_mpi.hpp:64
void distribute_objects(MPI_Comm communicator, int node_id)
distribute objects eqaully from a single node to all others.
Definition: quids_mpi.hpp:1026
size_t get_total_num_object(MPI_Comm communicator) const
getter for the total amount of distributed objects.
Definition: quids_mpi.hpp:81
void equalize(MPI_Comm communicator)
equalize the number of object across node pairs.
Definition: quids_mpi.hpp:894
PROBA_TYPE average_value(const quids::observable_t observable) const
function to get the average local value of a custom observable.
Definition: quids_mpi.hpp:101
void receive_objects(int node, MPI_Comm communicator, bool receive_num_child=false, size_t max_mem=-1)
function to receive objects (at the "tail" of the memory representation).
Definition: quids_mpi.hpp:177
size_t get_total_num_symbolic_object(MPI_Comm communicator) const
getter for the total amount of distributed symbolic objects.
Definition: quids_mpi.hpp:92
PROBA_TYPE average_value(const quids::observable_t observable, MPI_Comm communicator) const
function to get the average value of a custom observable accross the total distributed wave function.
Definition: quids_mpi.hpp:109
friend void simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object, quids::debug_t mid_step_function)
function to apply a dynamic to a wave function distributed accross multiple nodes
Definition: quids_mpi.hpp:422
void gather_objects(MPI_Comm communicator, int node_id)
gather objects to a single node from all others.
Definition: quids_mpi.hpp:1049
void send_objects(size_t num_object_sent, int node, MPI_Comm communicator, bool send_num_child=false)
function to send objects (from the "tail" of the memory representation).
Definition: quids_mpi.hpp:124
mpi_iteration()
simple empty wavefunction constructor.
Definition: quids_mpi.hpp:70
PROBA_TYPE node_total_proba
total probability retained locally after previous truncation (if any).
Definition: quids_mpi.hpp:67
mpi_iteration(char *object_begin_, char *object_end_)
constructor that insert a single object with magnitude 1
Definition: quids_mpi.hpp:75
symbolic mpi iteration (computation intermediary)
Definition: quids_mpi.hpp:313
mpi_symbolic_iteration()
simple constructor
Definition: quids_mpi.hpp:316
size_t get_total_num_object(MPI_Comm communicator) const
getter for the total amount of distributed objects.
Definition: quids_mpi.hpp:322
friend void simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object, quids::debug_t mid_step_function)
function to apply a dynamic to a wave function distributed accross multiple nodes
Definition: quids_mpi.hpp:422
size_t get_total_num_object_after_interferences(MPI_Comm communicator) const
getter for the total amount of distributed objects after duplicate elimination.
Definition: quids_mpi.hpp:333
class represneting a dynamic (or rule).
Definition: quids.hpp:100
symbolic iteration (computation intermediary)
Definition: quids.hpp:333
size_t num_object_after_interferences
number of objects obrained after eliminating duplicates
Definition: quids.hpp:341
size_t num_object
number of objects considered in the symbolic step
Definition: quids.hpp:339
void resize(const Int n_, const uint align_byte_length_=std::alignment_of< T >()) const
align_byte_length_ should be used to reallign the buffer, which is not yet implemented as realloc doe...
Definition: vector.hpp:111
void make_equal_pairs(size_t *size_begin, size_t *size_end, int *pair_id)
function to partition into pair of almost equal sum
Definition: mpi_utils.hpp:9
MPI_Datatype get_mpi_datatype(T x)
function to get the corresponding MPI type of a variable
Definition: mpi_utils.hpp:33
mpi implementation namespace
Definition: quids_mpi.hpp:39
float min_equalize_step
minimum jump in equalize before breaking
Definition: quids_mpi.hpp:50
bool equalize_children
if true, equalize the number of children. Otherwise equalize the number of objects.
Definition: quids_mpi.hpp:55
size_t min_equalize_size
minimum number of object that should be attained (in at least one node) before equalizing (load-shari...
Definition: quids_mpi.hpp:46
class mpi_iteration mpi_it_t
mpi iteration type
Definition: quids_mpi.hpp:59
class mpi_symbolic_iteration mpi_sy_it_t
mpi symbolic iteration type
Definition: quids_mpi.hpp:61
void simulate(mpi_it_t &iteration, quids::rule_t const *rule, mpi_it_t &next_iteration, mpi_sy_it_t &symbolic_iteration, MPI_Comm communicator, size_t max_num_object=0, quids::debug_t mid_step_function=[](const char *){})
function to apply a dynamic to a wave function distributed accross multiple nodes
Definition: quids_mpi.hpp:422
float equalize_inbalance
maximum imbalance between nodes (max_obj - avg_obj)/max_obj allowed before equalizing.
Definition: quids_mpi.hpp:48
void load_balancing_from_prefix_sum(const UnsignedIntIterator1 prefixSumLoadBegin, const UnsignedIntIterator1 prefixSumLoadEnd, UnsignedIntIterator2 workSharingIndexesBegin, UnsignedIntIterator2 workSharingIndexesEnd)
simple CCP load balacing implementation.
Definition: load_balancing.hpp:14
int log_2_upper_bound(int n)
returns the upperbound bound of the log2
Definition: algorithm.hpp:16
int nearest_power_of_two(int n)
returns the upper bound as a power of two
Definition: algorithm.hpp:9
size_t get_free_mem()
function that get the total amount of available free memory.
Definition: memory.hpp:47
void parallel_generalized_partition_from_iota(idIteratorType idx_in, idIteratorType idx_in_end, long long int const iotaOffset, countIteratorType offset, countIteratorType offset_end, functionType const partitioner)
linear partitioning algorithm into n partitions without an initialized index list in parallel
Definition: algorithm.hpp:170
QuIDS namespace.
Definition: quids.hpp:55
void simulate(it_t &iteration, modifier_t const rule)
function to apply a modifer to a wave function
Definition: quids.hpp:431
std::function< void(const char *step)> debug_t
debuging function type
Definition: quids.hpp:85
class rule rule_t
dynamic (or rule) type
Definition: quids.hpp:79
int load_balancing_bucket_per_thread
number of load balancing buckets per thread
Definition: quids.hpp:63
std::function< PROBA_TYPE(char const *object_begin, char const *object_end)> observable_t
observable definition typedef
Definition: quids.hpp:83
float safety_margin
memory safety margin (0.2 = 80% memory usage target)
Definition: quids.hpp:61
PROBA_TYPE tolerance
tolerance for objects (remove objects with a smaller probability)
Definition: quids.hpp:59
#define GRANULARITY
granularity, i.e. the typical loop size we consider when doing a 2d loop.
Definition: quids_mpi.hpp:26