14 const void* send,
void* recv,
int count, MPI_Datatype type, MPI_Op o,
15 int root_rank, MPI_Comm c,
int idx
18 sbuf(MPIBuffer::copy(send == MPI_IN_PLACE ? recv : send, count, type)) {
19 if (root == util::comm_rank(c)) {
20 rbuf = MPIBuffer::wrap(recv, count, type);
26 CollectiveLog::operator=(std::move(o));
29 sbuf = std::move(o.sbuf);
30 rbuf = std::move(o.rbuf);
37 serialize::read(i, root);
38 serialize::read(i, op);
39 serialize::read(i, sbuf);
40 rbuf = MPIBuffer::create(sbuf, sbuf);
42 void serialize_impl(std::ostream& s)
const override {
43 serialize::write(s, root);
44 serialize::write(s, op);
45 serialize::write(s, sbuf);
48 std::string str()
const override {
49 return "Reduce " + std::to_string(m_idx) +
50 " (root = " + std::to_string(root) +
")";
53 int begin(MPI_Comm c)
const override {
55 void* recv = root == util::comm_rank(c) ? rbuf.buf() :
nullptr;
56 int ret = PMPI_Ireduce(sbuf, recv, sbuf, sbuf, op, root, c, req());
57 if (ret == MPI_SUCCESS) ret = PMPI_Wait(req(), MPI_STATUS_IGNORE);
59 rbuf.release_user_buf();
63 void replay(MPI_Comm c)
const override {
65 void* recv = root == util::comm_rank(c) ? rbuf.buf() :
nullptr;
66 int ret = PMPI_Ireduce(sbuf, recv, sbuf, sbuf, op, root, c, req());
68 ret == MPI_SUCCESS,
"Non-process MPI error during collective replay\n"