Fenix @develop
 
Loading...
Searching...
No Matches
op_log.h
1#ifndef FENIX_LOGGING_OP_LOG_H
2#define FENIX_LOGGING_OP_LOG_H
3#include <cstring>
4#include <istream>
5#include <ostream>
6#include "fenix_opt.hpp"
7#include "fenix/mpi_util.hpp"
8#include "fenix/logging/serialize.h"
9
10namespace fenix::logging {
11
12void record_mpi_type(MPI_Datatype d);
13void record_mpi_op(MPI_Op o);
14
15// idempotent
16void init_mpi_records();
17
18class OpLog {
19 public:
20 virtual ~OpLog() { req_free(); }
21
22 auto operator<=>(const OpLog& o) const { return m_idx <=> o.m_idx; }
23 auto operator<=>(const int& i) const { return m_idx <=> i; }
24 auto operator==(const OpLog& o) const { return m_idx == o.m_idx; }
25 auto operator==(const int& i) const { return m_idx == i; }
26
27 void serialize(std::ostream& o) const {
28 serialize::write(o, m_idx);
29 this->serialize_impl(o);
30 };
31
32 MPI_Request* req() const { return m_req; }
33 int idx() const { return m_idx; }
34
35 OpLog(const OpLog&) = delete;
36 OpLog& operator=(const OpLog&) = delete;
37
38 virtual void serialize_impl(std::ostream& o) const = 0;
39 virtual std::string str() const = 0;
40
41 protected:
42 OpLog() : m_idx(-1) {}
43 OpLog(int i) : m_idx(i) {}
44 OpLog(std::istream& i) : OpLog(serialize::read<int>(i)) {}
45
46 OpLog& operator=(OpLog&& o) {
47 m_idx = o.m_idx;
48 return *this;
49 }
50
51 void req_set(MPI_Request* new_ptr) const {
52 req_free();
53 m_req = new_ptr;
54 }
55 void req_free() const {
56 if (req_obj != MPI_REQUEST_NULL) {
57 if (!util::mpi_finalized()) MPI_Request_free(&req_obj);
58 req_obj = MPI_REQUEST_NULL;
59 }
60 m_req = &req_obj;
61 }
62
63 int m_idx;
64
65 private:
66 mutable MPI_Request req_obj = MPI_REQUEST_NULL;
67 mutable MPI_Request* m_req = &req_obj;
68};
69
70template <auto MPIFunction>
71struct mpi_log;
72
73template <auto MPIFunction>
74using mpi_log_t = typename mpi_log<MPIFunction>::type;
75
76class CollectiveLog : public OpLog {
77 public:
78 using OpLog::OpLog;
79
80 // Launch collective for the (locally) first time
81 virtual int begin(MPI_Comm c) const = 0;
82
83 // Asynchronously replay collective
84 virtual void replay(MPI_Comm c) const = 0;
85
86 protected:
87 CollectiveLog& operator=(CollectiveLog&& o) {
88 OpLog::operator=(std::move(o));
89 return *this;
90 }
91};
92
93class MPIBuffer {
94 public:
95 MPIBuffer() = default;
96
97 // No copying, move-only
98 MPIBuffer(const MPIBuffer&) = delete;
99 MPIBuffer& operator=(const MPIBuffer& o) = delete;
100 MPIBuffer(MPIBuffer&& o) { *this = std::move(o); }
101 MPIBuffer& operator=(MPIBuffer&& o) {
102 m_count = o.m_count;
103 o.m_count = 0;
104 m_type = o.m_type;
105 o.m_type = MPI_DATATYPE_NULL;
106 internal_buf = std::move(o.internal_buf);
107 user_buf = o.user_buf;
108 o.user_buf = nullptr;
109 return *this;
110 }
111
112 // Wrap a user's buffer
113 static MPIBuffer wrap(void* user_buffer, int count, MPI_Datatype type) {
114 return MPIBuffer(user_buffer, count, type);
115 }
116 // Create a buffer with uninitialized data
117 static MPIBuffer create(int count, MPI_Datatype type) {
118 return MPIBuffer(count, type);
119 }
120 // Create a buffer and copy from the user's buffer
121 static MPIBuffer copy(const void* user_buffer, int count, MPI_Datatype type) {
122 return MPIBuffer(count, type, static_cast<const char*>(user_buffer));
123 }
124
125 // Check if this object was default initialized or not
126 operator bool() const { return m_type != MPI_DATATYPE_NULL; }
127
128 void copy_to(void* out) {
129 std::memcpy(out, buf(), m_count * util::type_size(m_type));
130 }
131 void copy_from(void* in) {
132 std::memcpy(buf(), in, m_count * util::type_size(m_type));
133 }
134
135 // Release pointer to user buffer, to avoid use-after-free
136 void release_user_buf() const { user_buf = nullptr; }
137
138 void serialize(std::ostream& o) const {
139 serialize::write(o, m_type);
140 serialize::write(o, m_count);
141
142 int size = garbage_data ? 0 : internal_buf.size();
143 serialize::write<int>(o, size);
144 if (size > 0) serialize::write(o, internal_buf.data(), size);
145 }
146 MPIBuffer(std::istream& i) {
147 serialize::read(i, m_type);
148 serialize::read(i, m_count);
149
150 int size = serialize::read<int>(i);
151 if (size > 0) {
152 internal_buf.resize(size);
153 serialize::read(i, &internal_buf[0], size);
154 }
155 }
156
157 void* buf() const {
158 if (user_buf) {
159 // This should always be either a user buf wrapper OR hold its own buf
160 fenix_assert(internal_buf.empty());
161 return user_buf;
162 }
163 if (internal_buf.empty()) {
164 // We're allocating the buffer to store unused output data into during
165 // replay, so track that this should not be serialized
166 garbage_data = true;
167 internal_buf.resize(m_count * util::type_size(m_type));
168 }
169 fenix_assert(internal_buf.size() == m_count * util::type_size(m_type));
170 return internal_buf.data();
171 };
172 int count() const { return m_count; }
173 MPI_Datatype type() const { return m_type; }
174
175 operator void*() const { return buf(); }
176 operator int() const { return m_count; }
177 operator MPI_Datatype() const { return m_type; }
178
179 private:
180 // Wrapping constructor
181 MPIBuffer(void* user_buffer, int count, MPI_Datatype type)
182 : user_buf(user_buffer), m_count(count), m_type(type) {};
183 // Copying constructor
184 MPIBuffer(int count, MPI_Datatype type, const char* b)
185 : internal_buf(b, b + count * util::type_size(type)), m_count(count),
186 m_type(type) {};
187 // Creating constructor
188 MPIBuffer(int count, MPI_Datatype type) : m_count(count), m_type(type) {};
189
190 mutable void* user_buf = nullptr;
191 mutable std::vector<char> internal_buf;
192 // Sometimes internal_buf is only allocated as a receive buffer for replayed
193 // operations. In that case, we don't serialize the data
194 mutable bool garbage_data = false;
195
196 int m_count = 0;
197 MPI_Datatype m_type = MPI_DATATYPE_NULL;
198};
199
200} //namespace fenix::logging
201#endif
Definition op_log.h:76
Definition op_log.h:93
Definition op_log.h:18
Definition op_log.h:71