Fenix @develop
 
Loading...
Searching...
No Matches
comm_log.h
1#ifndef COMM_LOG_H
2#define COMM_LOG_H
3
4#include <map>
5#include <vector>
6#include <cassert>
7#include <istream>
8#include <ostream>
9#include <optional>
10
11#include <mpi.h>
12
13#include "fenix/tasks/request.hpp"
14#include "fenix/logging/task.h"
15#include "fenix/logging/rank_log.h"
16#include "fenix/logging/collective_log_holder.h"
17
18namespace fenix::logging {
19struct CRegion {
20 int id = -1, first = -1, next = -1;
21 CRegion() = default;
22 CRegion(int m_id) : id(m_id) {};
23 CRegion(int m_id, int idx) : id(m_id), first(idx), next(idx) {}
24 auto operator<=>(const CRegion& o) const { return id <=> o.id; }
25 auto operator==(const CRegion& o) const { return id == o.id; }
26 auto operator<=>(const int& i) const { return id <=> i; }
27 auto operator==(const int& i) const { return id == i; }
28 bool valid() const { return id >= 0 && first >= 0 && next >= 0; }
29 bool empty() const { return valid() && first == next; }
30 bool fresh() const { return empty() && first == 0; }
31 std::string str() const {
32 return "Region " + std::to_string(id) + " [" + std::to_string(first) + "," +
33 std::to_string(next) + ")";
34 }
35};
36
37struct CommLog {
38 CommLog(MPI_Comm& c, int m_max_regions = 2);
39 CommLog(MPI_Comm& c, std::istream& i);
40 void serialize(std::ostream& o);
41
42 MPI_Comm& comm;
43 const int m_rank;
44 int max_regions;
45 int active_region = 0;
46
47 std::map<int, RankLog> rank_logs;
48 std::vector<TaskT> tasks;
49
50 std::vector<CRegion> regions;
51 std::set<CollectiveLogHolder, std::less<>> collectives;
52 CollectiveLogHolder active_op;
53 // Collective index every rank has successfully completed, as of last reset
54 int completed_collective_all = -1;
55 // As above, but index any rank has successfully completed
56 int completed_collective_any = -1;
57 TaskT task;
58
59 RankLog& logs(int r);
60 RankLog& operator[](int r) { return logs(r); }
61
62 // Attempt progress on each task
63 void progress();
64 // Progress pending tasks and this one until this task completes
65 void progress_through(TaskT t);
66 fenix::tasks::Status progress_through(MPI_Request* r);
67
68 int send(const void* b, int n, MPI_Datatype d, int dst, int t) {
69 return logs(dst).send(b, n, d, t);
70 }
71 int irecv(void* b, int n, MPI_Datatype d, int src, int t, MPI_Request* r) {
72 return logs(src).irecv(b, n, d, t, r);
73 }
74
75 // Return from logged collective calls. MPI return code and the log
76 using CollectiveResult =
77 std::pair<int, std::reference_wrapper<const CollectiveLogHolder>>;
78
79 // Begin a logged collective MPI Function, return the log
80 template <auto MPIFunction, typename... Args>
81 int begin(Args... args) {
82 using LogT = mpi_log_t<MPIFunction>;
83
84 if (!region().valid()) append_region({active_region, 0});
85 return begin(
86 CollectiveLogHolder::template create<LogT>(args..., region().next++)
87 );
88 }
89
90 void fenix_pre_recovery();
91 void reset_consistency(int checkpoint_id);
92
93 void begin_region(int region);
94 CRegion& region() { return regions.back(); }
95 const CRegion& region() const { return regions.back(); }
96
97 std::string str(bool with_region = false) const {
98 return "Rank " + std::to_string(m_rank) +
99 " (active=" + std::to_string(active_region) + ")" +
100 (with_region ? " " + region().str() : "");
101 }
102
103 private:
104 // Iprobe MPI for any other rank trying to form consistency
105 void detect_incoming_consistency_request();
106 TaskT form_consistency();
107 void replay_collectives(int start_idx);
108 void append_region(const CRegion& r);
109 void erase_logs(const CRegion& r);
110 void erase_regions(
111 std::vector<CRegion>::iterator begin, std::vector<CRegion>::iterator end
112 );
113
114 // Returns reference to logged op in collectives set
115 int begin(CollectiveLogHolder&& collective_op);
116};
117
118extern std::optional<CommLog> comm_log;
119
120} //namespace fenix::logging
121
122#endif
Definition collective_log_holder.h:22
Definition task.h:6
Definition mpi_util.hpp:115
Definition comm_log.h:19
Definition comm_log.h:37
Definition rank_log.h:58