libadc-cxx 1.0.0
Structured logging for scientific computing
Loading...
Searching...
No Matches
multifile.ipp
Go to the documentation of this file.
3#include <cstdlib>
4#include <cstdio>
5#include <iostream>
6#include <fstream>
7#include <filesystem>
8
9namespace adc {
10
11using std::cout;
12
13typedef std::string string;
14typedef std::string_view string_view;
15
16
17/*! \brief Parallel file output publisher_api implementation.
18 This plugin generates writes each message to the configured directory
19 tree with <json></json> delimiters surrounding it.
20 The output directory is "." by default, but may be overriden with
21 a full path defined in env("ADC_MULTIFILE_PLUGIN_DIRECTORY").
22 The resulting mass of files can be reduced independently later
23 by concatenating all files in the tree or more selectively
24 with a single call to the adc::multfile_assemble() function.
25
26 Multiple independent multifile publishers may be created; exact filenames
27 are not user controlled, to avoid collisions. Likewise there is no append
28 mode.
29
30 DIR/$user/[$adc_wfid.]$host.$pid.$start.$publisherptr/$application.$rank.XXXXXX
31 */
33 enum state {
34 ok,
35 err
36 };
37 enum mode {
38 /* the next mode needed for correct operation */
39 pi_config,
40 pi_init,
41 pi_pub_or_final
42 };
43
44private:
45 inline static const std::map< const string, const string > plugin_multifile_config_defaults =
46 {
47 { "DIRECTORY", "."},
48 { "RANK", ""}
49 };
50 inline static const char *plugin_multifile_prefix = "ADC_MULTIFILE_PLUGIN_";
51 const string vers;
52 const std::vector<string> tags;
53 string fdir;
54 string rank;
55 std::ofstream out;
56 enum state state;
57 bool paused;
58 enum mode mode;
59
60 int config(const string dir, const string user_rank) {
61 if (mode != pi_config)
62 return 2;
63 char hname[HOST_NAME_MAX+1];
64 char uname[L_cuserid+1];
65 if (gethostname(hname, HOST_NAME_MAX+1)) {
66 return 2;
67 }
68 if (!cuserid(uname)) {
69 return 2;
70 }
71 std::stringstream ss;
72 // output goes to
73 // dir/user/[wfid.].host.pid.starttime.ptr/application.rank.XXXXXX files
74 // where application[.rank].XXXXXX is opened per process
75 // and application comes in header and rank via config.
76 char *wfid = getenv("ADC_WFID");
77 pid_t pid = getpid();
78 struct timespec ts;
79 clock_gettime(CLOCK_BOOTTIME, &ts);
80 ss << dir << "/" << uname << "/" << ( wfid ? wfid : "") <<
81 (wfid ? "." : "") <<
82 hname << "." << pid << "." <<
83 ts.tv_sec << "." << ts.tv_nsec << "." << this;
84 ss >> fdir;
85 rank = user_rank;
86 mode = pi_init;
87 return 0;
88 }
89
90 // find field in m, then with prefix in env, then the default.
91 const string get(const std::map< string, string >& m,
92 string field, string_view env_prefix) {
93 // fields not defined in config_defaults raise an exception.
94 auto it = m.find(field);
95 if (it != m.end()) {
96 return it->second;
97 }
98 string en = string(env_prefix) += field;
99 char *ec = getenv(en.c_str());
100 if (!ec) {
101 return plugin_multifile_config_defaults.at(field);
102 } else {
103 return string(ec);
104 }
105 }
106
107
108public:
109 multifile_plugin() : vers("1.0.0") , tags({"none"}), state(ok), paused(false), mode(pi_config) {
110 std::cout << "Constructing multifile_plugin" << std::endl;
111 }
112
113 int publish(std::shared_ptr<builder_api> b) {
114 if (paused)
115 return 0;
116 if (state != ok)
117 return 1;
118 if (mode != pi_pub_or_final)
119 return 2;
120 auto header = b->get_section("header");
121 auto app = b->get_value("application");
122
123#if 0
124 // fixme; multifile_plugin::publish
125 // open dump file; messages are null separated, not endl separated, as content may include \n.
126 // need a map of stream to header.application
127 string fpath = fdir + "/" + "multifile.XXXXXX"; //
128 out.open(fpath, std::ofstream::out | (std::ofstream::trunc));
129 if (out.good()) {
130 mode = pi_pub_or_final;
131 return 0;
132 }
133 state = err;
134 return EBADF;
135#endif
136 // write to stream
137 if (out.good()) {
138 out << "<json>" << b->serialize() << "</json>" << std::endl;
139 std::cout << "'multifile' wrote" << std::endl;
140 return 0;
141 }
142 std::cout << "failed out.good" << std::endl;
143 return 1;
144 }
145
146 int config(const std::map< std::string, std::string >& m) {
147 return config(m, plugin_multifile_prefix);
148 }
149
150 int config(const std::map< std::string, std::string >& m, std::string_view env_prefix) {
151 string d = get(m, "DIRECTORY", env_prefix);
152 string r = get(m, "RANK", env_prefix);
153 return config(d, r);
154 }
155
156 const std::map< const std::string, const std::string> & get_option_defaults() {
157 return plugin_multifile_config_defaults;
158 }
159
161 std::map <string, string >m;
162 if (!fdir.size())
163 config(m);
164 if (mode != pi_init) {
165 return 2;
166 }
167 if ( state == err ) {
168 std::cout <<
169 "multifile plugin initialize found pre-existing error"
170 << std::endl;
171 return 3;
172 }
173 std::error_code ec;
174 std::filesystem::create_directories(fdir, ec);
175 if (ec.value() != 0 && ec.value() != EEXIST ) {
176 state = err;
177 std::cout << "unable to create output directory for plugin 'multifile'; "
178 << fdir << " : " << ec.message() << std::endl;
179 return ec.value();
180 }
181 string testfile = fdir + "/.XXXXXX";
182 auto ftemplate = std::make_unique<char[]>(testfile.length()+1);
183 ::std::strcpy(ftemplate.get(), testfile.c_str());
184 int fd = mkstemp(ftemplate.get());
185 int rc = errno;
186 if (fd == -1) {
187 state = err;
188 std::cout << "unable to open file in output directory "
189 << fdir << " for plugin 'multifile': " <<
190 std::strerror(rc) << std::endl;
191 return 4;
192 } else {
193 close(fd);
194 unlink(ftemplate.get());
195 }
196 return 0;
197 }
198
199 void finalize() {
200 if (mode == pi_pub_or_final) {
201 state = ok;
202 paused = false;
203 mode = pi_config;
204 out.close();
205 } else {
206 std::cout << "multifile plugin finalize on non-running plugin" << std::endl;
207 }
208 }
209
210 void pause() {
211 paused = true;
212 }
213
214 void resume() {
215 paused = false;
216 }
217
219 return "multifile";
220 }
221
223 return vers;
224 }
225
227 std::cout << "Destructing multifile_plugin" << std::endl;
228 }
229};
230
231} // adc
Parallel file output publisher_api implementation. This plugin generates writes each message to the c...
Definition multifile.ipp:32
int config(const std::map< std::string, std::string > &m)
Configure the plugin with the options given.
void pause()
Pause publishing until a call to resume. Duplicate calls are allowed.
int config(const std::map< std::string, std::string > &m, std::string_view env_prefix)
Configure the plugin with the options given and the corresponding environment variables.
int publish(std::shared_ptr< builder_api > b)
Publish the content of the builder.
int initialize()
Ready the plugin to publish following the configuration options set or defaulted.
string_view name() const
void resume()
Resume publishing Duplicate calls are allowed.
const std::map< const std::string, const std::string > & get_option_defaults()
Look up the settable options and their defaults.
void finalize()
Stop publishing and release any resources held for managing publication.
string_view version() const
Publisher plugin interface.
Definition publisher.hpp:44
Definition adc.hpp:75
std::string_view string_view
Definition curl.ipp:14
std::string string
Definition curl.ipp:13