libadc-cxx 1.0.0
Structured logging for scientific computing
Loading...
Searching...
No Matches
multifile.ipp
Go to the documentation of this file.
1/* Copyright 2025 NTESS. See the top-level LICENSE.txt file for details.
2 *
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
7#include <cstdlib>
8#include <cstdio>
9#include <cerrno>
10#include <iostream>
11#include <fstream>
12#include <filesystem>
13#include <vector>
14#include <string>
15#include <chrono>
16// for glob_sendfile_join
17#include <sys/sendfile.h>
18#include <sys/stat.h>
19#include <fcntl.h>
20#include <unistd.h>
21#include <glob.h>
22
23
24namespace adc {
25
26using std::string;
27using std::string_view;
28
29
30/*! \brief Parallel file output publisher_api implementation.
31 This plugin generates writes each message to the configured directory
32 tree with <adct-json></adct-json> delimiters surrounding it.
33 The output directory is "." by default, but may be overriden with
34 a full path defined in env("ADC_MULTIFILE_PLUGIN_DIRECTORY").
35 The resulting mass of files can be reduced independently later
36 by concatenating all files in the tree or more selectively
37 with a single call to the adc::consolidate_multifile_logs() function.
38
39 Multiple independent multifile publishers may be created; exact filenames
40 are not user controlled, to avoid collisions. Likewise there is no append
41 mode.
42
43 DIR/$user/[$adc_wfid.]H_$host.P$pid.T$start.$publisherptr/$application.$rank.XXXXXX
44
45 Files opened will remain opened until the publisher is finalized.
46 */
48 enum state {
49 ok,
50 err
51 };
52 enum mode {
53 /* the next mode needed for correct operation */
54 pi_config,
55 pi_init,
56 pi_pub_or_final
57 };
58
59private:
60 inline static const std::map< const string, const string > plugin_multifile_config_defaults =
61 {
62 { "DIRECTORY", "."},
63 { "DEBUG", "0"},
64 { "RANK", ""}
65 };
66 inline static const char *plugin_multifile_prefix = "ADC_MULTIFILE_PLUGIN_";
67 const string vers;
68 const std::vector<string> tags;
69 string fdir;
70 string topdir;
71 string user;
72 string rank;
73 std::map< std::string, std::unique_ptr< std::ofstream > > app_out;
74 int debug;
75 enum state state;
76 bool paused;
77 enum mode mode;
78
79 int config(const string dir, const string user_rank, const string sdebug) {
80 if (mode != pi_config)
81 return 2;
82 char hname[HOST_NAME_MAX+1];
83 char uname[L_cuserid+1];
84 if (gethostname(hname, HOST_NAME_MAX+1)) {
85 return 2;
86 }
87 if (!cuserid(uname)) {
88 return 2;
89 }
90 user = uname;
91 std::stringstream dss(sdebug);
92 dss >> debug;
93 if (debug < 0) {
94 debug = 0;
95 }
96 std::stringstream ss;
97 // output goes to
98 // dir/user/[wfid.].H_host.Ppid.Tstarttime.pptr/application.Rrank.XXXXXX files
99 // where application[.rank].XXXXXX is opened per process
100 // and application comes in header and rank via config.
101 // Here we set up dir/user/[wfid.].H_host.Ppid.Tstarttime.pptr/ in fdir
102 // and later we add application.rank.XXXXXX
103 string wname;
104 char *wfid = getenv("ADC_WFID");
105 if (wfid) {
106 wname = string("wfid_") + string(wfid) + ".";
107 } else {
108 wname = "";
109 }
110 pid_t pid = getpid();
111 struct timespec ts;
112 clock_gettime(CLOCK_BOOTTIME, &ts);
113 ss << dir << "/" << uname << "/" << wname <<
114 "H_"<< hname << ".P" << pid << ".T" <<
115 ts.tv_sec << "." << ts.tv_nsec << ".p" << this;
116 ss >> fdir;
117 topdir = dir;
118 rank = user_rank;
119 mode = pi_init;
120
121 if (debug > 0) {
122 std::cerr << "multifile plugin configured" <<std::endl;
123 }
124 return 0;
125 }
126
127 // find field in m, then with prefix in env, then the default.
128 const string get(const std::map< string, string >& m,
129 string field, string_view env_prefix) {
130 // fields not defined in config_defaults raise an exception.
131 auto it = m.find(field);
132 if (it != m.end()) {
133 return it->second;
134 }
135 string en = string(env_prefix) += field;
136 char *ec = getenv(en.c_str());
137 if (!ec) {
138 return plugin_multifile_config_defaults.at(field);
139 } else {
140 return string(ec);
141 }
142 }
143
144 /** Create fdir path following perm/group of dir or dir/user
145 * or dirname(fdir), depending on which already exists.
146 * The aim is to preserve group policy set at dir/user as other items
147 * below get created instead of defaulting to user:user;600.
148 */
149 void create_directories(const string& dir, const string& user, const string& fdir, std::error_code& ec) {
150 namespace fs = std::filesystem;
151 // return if exists already
152 auto status_fdir = fs::status(fdir, ec);
153 if (fs::exists(status_fdir)) {
154 if (fs::is_directory(status_fdir) ) {
155 return;
156 } else {
157 ec = make_error_code(std::errc::not_a_directory);
158 return;
159 }
160 }
161 // if dirname(fdir), create last subdir
162 auto fpath = fs::path(fdir);
163 auto bdir = fpath.parent_path();
164 auto status_bdir = fs::status(bdir);
165 if (fs::exists(status_bdir)) {
166 if (fs::is_directory(status_bdir) ) {
167 fs::create_directory(fdir, bdir, ec);
168 return;
169 } else {
170 ec = make_error_code(std::errc::not_a_directory);
171 return;
172 }
173 }
174
175 // if dir/user exists, create rest w/perm, sticky from user/
176 fs::path du_path = dir;
177 du_path /= user;
178 auto status_udir = fs::status(du_path);
179 if (fs::exists(status_udir)) {
180 if (fs::is_directory(status_bdir) ) {
181 fs::create_directory(fdir, du_path, ec);
182 return;
183 } else {
184 ec = make_error_code(std::errc::not_a_directory);
185 return;
186 }
187 }
188
189 // if dir/ exists, create user and rest w/perm, sticky from dir/
190 fs::path d_path = dir;
191 auto status_dir = fs::status(d_path);
192 if (fs::exists(status_dir)) {
193 if (fs::is_directory(status_dir) ) {
194 fs::create_directory(du_path, d_path, ec);
195 if (ec) {
196 return;
197 }
198 fs::create_directory(fdir, d_path, ec);
199 if (ec) {
200 return;
201 }
202 auto is_sticky = status_dir.permissions() & fs::perms::sticky_bit;
203 fs::permissions(
204 fdir,
205 fs::perms::owner_all | fs::perms::group_read | fs::perms::group_exec | is_sticky,
206 fs::perm_options::replace,
207 ec
208 );
209 return;
210 } else {
211 ec = make_error_code(std::errc::not_a_directory);
212 return;
213 }
214 }
215
216 // create tree w/o1750
217 fs::create_directory(dir, ec);
218 if (ec) {
219 return;
220 }
221 fs::permissions(
222 dir,
223 fs::perms::owner_all | fs::perms::group_read | fs::perms::group_exec | fs::perms::sticky_bit,
224 fs::perm_options::replace,
225 ec
226 );
227 fs::create_directory(du_path, dir, ec);
228 if (ec) {
229 return;
230 }
231 fs::create_directory(fdir, dir, ec);
232 if (ec) {
233 return;
234 }
235 }
236
237 /** If not yet created, add an element to the app_out file mapping.
238 * The element added should always be checked before use.
239 * File is created with perm 640. Group will be inherited from the
240 * file system parent if gid (+s) is set there.
241 */
242 void create_stream(const string& application) {
243 if (app_out.count(application)) {
244 return;
245 }
246 std::stringstream ss;
247 ss << fdir << "/" << application << ".R" << rank << ".XXXXXX";
248 string fpath = ss.str();
249 char *ftemplate = strdup(fpath.c_str());
250 if (!ftemplate) {
251 if (debug) {
252 std::cerr << __FILE__ << ": out of memory" << std::endl;
253 }
254 app_out[application] = std::make_unique< std::ofstream >();
255 return;
256 }
257 int fd = mkstemp(ftemplate);
258 if (fd < 0) {
259 if (debug) {
260 std::cerr << __FILE__ << ": mkstemp failed for " << ftemplate << std::endl;
261 }
262 free(ftemplate);
263 app_out[application] = std::make_unique< std::ofstream >();
264 return;
265 }
266 close(fd);
267 app_out[application] = std::make_unique< std::ofstream >(ftemplate,
268 (std::ofstream::out | std::ofstream::trunc));
269 std::filesystem::permissions(ftemplate,
270 (
271 std::filesystem::perms::owner_read |
272 std::filesystem::perms::owner_write |
273 std::filesystem::perms::group_write |
274 std::filesystem::perms::group_read),
275 std::filesystem::perm_options::add);
276 free(ftemplate);
277 }
278
279public:
280 multifile_plugin() : vers("1.0.0") , tags({"none"}), state(ok), paused(false), mode(pi_config) { }
281
282 int publish(std::shared_ptr<builder_api> b) {
283 if (paused)
284 return 0;
285 if (state != ok)
286 return 1;
287 if (mode != pi_pub_or_final)
288 return 2;
289 auto app = b->get_value_string("/header/application");
290 if (!app) {
291 auto fields = b->get_field_names();
292 if (debug) {
293 std::cerr << "fields: ";
294 for (const auto& f : fields) {
295 std::cerr << f << " ";
296 }
297 std::cerr << std::endl;
298 }
299 if (debug) {
300 std::cerr << __FILE__ <<
301 ": cannot publish without /header/application"
302 << std::endl;
303 std::cerr << b->serialize() << std::endl;
304 std::cerr << "sections:" ;
305 for (auto s : b->get_section_names()) {
306 std::cerr << s << " ";
307 }
308 std::cerr << std::endl;
309 }
310 return 1;
311 }
312
313 create_stream(app);
314 if (app_out[app]->is_open() && app_out[app]->good()) {
315 *(app_out[app]) << string("<adct-json>") << b->serialize() << "</adct-json>" << std::endl;
316 if (debug) {
317 std::cerr << "'multifile' wrote" << std::endl;
318 }
319 return 0;
320 }
321 if (debug) {
322 std::cerr << __FILE__ << " failed out.good" << std::endl;
323 }
324 return 1;
325 }
326
327 int config(const std::map< std::string, std::string >& m) {
328 return config(m, plugin_multifile_prefix);
329 }
330
331 int config(const std::map< std::string, std::string >& m, std::string_view env_prefix) {
332 string d = get(m, "DIRECTORY", env_prefix);
333 string r = get(m, "RANK", env_prefix);
334 string l = get(m, "DEBUG", env_prefix);
335 return config(d, r, l);
336 }
337
338 const std::map< const std::string, const std::string> & get_option_defaults() {
339 return plugin_multifile_config_defaults;
340 }
341
343 std::map <string, string >m;
344 if (!fdir.size())
345 config(m);
346 if (mode != pi_init) {
347 return 2;
348 }
349 if ( state == err ) {
350 if (debug) {
351 std::cerr <<
352 "multifile plugin initialize found pre-existing error"
353 << std::endl;
354 }
355 return 3;
356 }
357 std::error_code ec;
358 create_directories(topdir, user, fdir, ec);
359 if (ec.value() != 0 && ec.value() != EEXIST ) {
360 state = err;
361 if (debug) {
362 std::cerr << "unable to create output directory for plugin 'multifile'; "
363 << fdir << " : " << ec.message() << std::endl;
364 }
365 return ec.value();
366 }
367 string testfile = fdir + "/.XXXXXX";
368 auto ftemplate = std::make_unique<char[]>(testfile.length()+1);
369 ::std::strcpy(ftemplate.get(), testfile.c_str());
370 int fd = mkstemp(ftemplate.get());
371 int rc = errno;
372 if (fd == -1) {
373 state = err;
374 if (debug) {
375 std::cerr << "unable to open file in output directory "
376 << fdir << " for plugin 'multifile': " <<
377 std::strerror(rc) << std::endl;
378 }
379 return 4;
380 } else {
381 close(fd);
382 unlink(ftemplate.get());
383 }
384 mode = pi_pub_or_final;
385 return 0;
386 }
387
388 void finalize() {
389 if (mode == pi_pub_or_final) {
390 state = ok;
391 paused = false;
392 mode = pi_config;
393 app_out.clear();
394 } else {
395 if (debug) {
396 std::cerr << "multifile plugin finalize on non-running plugin" << std::endl;
397 }
398 }
399 }
400
401 void pause() {
402 paused = true;
403 }
404
405 void resume() {
406 paused = false;
407 }
408
410 return "multifile";
411 }
412
414 return vers;
415 }
416
418 if (debug) {
419 std::cerr << "Destructing multifile_plugin" << std::endl;
420 }
421 }
422
423 inline static std::vector<std::string> consolidate_multifile_logs(
424 const string& pattern, std::vector< std::string>& old_paths,
425 int debug=0)
426 {
427 namespace fs = std::filesystem;
428 std::vector<std::string> new_files;
429 std::vector<std::string> unmerged;
430
431 std::vector<std::string> parts;
432 for (const auto& part : fs::path(pattern)) {
433 parts.push_back(part.string());
434 }
435 // strip last 2: [wfid.]H_host.Pval.Tval.paddr/app.Rrank.unique
436 auto appdir = fs::path(pattern).parent_path();
437 auto appleafdir = parts.end()[-2];
438 auto userdir = appdir.parent_path();
439 auto pos = appleafdir.find(".H_");
440 std::string prefix;
441 if (pos != std::string::npos) {
442 prefix = appleafdir.substr(0,pos);
443 } else {
444 const auto now = std::chrono::system_clock::now();
445 const auto esec = std::chrono::duration_cast<
446 std::chrono::seconds >( now.time_since_epoch());
447 const long long isec = esec.count();
448 prefix = std::to_string(isec);
449 }
450 // dir/user/[wfid.]H_host.Pval.Tval.paddr/app.Rrank.unique
451 userdir /= (string("consolidated.") + prefix + ".adct-json.multi.xml");
452 const char *dest = userdir.c_str();
453 std::error_code ec;
454 fs::perms perms = fs::status(appdir, ec).permissions();
455 int perm = static_cast<int>(perms);
456 if (ec) {
457 perm = 0640;
458 }
459 int merge_err = glob_sendfile_join(dest, pattern.c_str(), perm, old_paths, unmerged);
460 if (merge_err) {
461 if (debug) {
462 for (auto f : unmerged) {
463 std::cerr << __FILE__ <<
464 ": unmergeable file: " <<
465 f << std::endl;
466 }
467 for (auto f : old_paths) {
468 std::cerr << __FILE__ <<
469 ": ignoring mergable file: "
470 << f << std::endl;
471 }
472 }
473 old_paths.clear();
474 return new_files;
475 }
476 new_files.push_back(dest);
477 return new_files;
478 }
479
480 /*! Utility to get output directory glob pattern from the multifile_publisher.
481 */
482 inline static std::string get_multifile_log_path(string_view dir, string_view wfid)
483 {
484 std::stringstream ss;
485 char uname[L_cuserid+1];
486 if (!cuserid(uname)) {
487 return "";
488 }
489 string wname;
490 if (wfid.size()) {
491 wname = string("wfid_") + string(wfid) + ".";
492 } else {
493 wname = "";
494 }
495 // dir/user/[wfid.].H_host.Pval.Tval.paddr/app.Rrank.unique
496 ss << dir << "/" << uname << "/" << wname << "H_*.P*.T*.p*/*.R*.*";
497 string s;
498 ss >> s;
499 return s;
500 }
501
502 /*! \brief use sendfile to join all files matching pattern in new dest file.
503 * No checking of input format correctness is performed.
504 * Ignores directory names.
505 * \param dest file to write merge result.
506 * \param pattern glob pattern to match possible input files.
507 * \param merged list of files added
508 * \param unmerged list of files found but not added due to some error.
509 * If this list is not empty, the output file should be abandoned,
510 * as it may contain a partially copied file.
511 * \return 0 and updated merged list or errno and file and merged vector content undefined.
512 */
513 inline static int glob_sendfile_join(const char *dest, const char *pattern,
514 int perm, std::vector<std::string>& merged, std::vector<std::string>& unmerged)
515 {
516 glob_t files;
517 merged.clear();
518 int err = glob(pattern, GLOB_NOSORT|GLOB_TILDE, NULL, &files);
519 switch(err) {
520 case 0:
521 break;
522 case GLOB_NOSPACE:
523 globfree(&files);
524 return ENOMEM;
525 case GLOB_ABORTED:
526 globfree(&files);
527 return EPERM;
528 case GLOB_NOMATCH:
529 globfree(&files);
530 return 0;
531 }
532
533 int dest_fd = open(dest, O_WRONLY | O_CREAT |O_CLOEXEC | O_TRUNC , perm);
534 if (dest_fd == -1) {
535 globfree(&files);
536 return errno;
537 }
538
539 for (size_t i = 0; i < files.gl_pathc; i++) {
540 int src_fd = open(files.gl_pathv[i], O_RDONLY);
541 if (src_fd == -1) continue;
542
543 err = 0;
544 struct stat stat_buf;
545 if (fstat(src_fd, &stat_buf) == 0 && S_ISREG(stat_buf.st_mode)) {
546 off_t offset = 0;
547 size_t remaining = stat_buf.st_size;
548
549 // Loop until the entire file is transferred
550 while (remaining > 0) {
551 ssize_t sent = sendfile(dest_fd, src_fd, &offset, remaining);
552
553 if (sent <= 0) {
554 // Handle errors or unexpected EOF
555 if (sent == -1 && errno == EINTR) continue; // Interrupted by signal, retry
556 err = errno;
557 break;
558 }
559 remaining -= sent;
560 }
561 if (!err) {
562 merged.push_back(files.gl_pathv[i]);
563 } else {
564 unmerged.push_back(files.gl_pathv[i]);
565 }
566 }
567 close(src_fd);
568 }
569
570 close(dest_fd);
571 globfree(&files);
572 if (unmerged.size() != 0) {
573 return EINVAL;
574 }
575
576 return 0;
577 }
578
579 inline static std::vector<size_t> validate_multifile_log(string_view /*filename*/, bool /*check_json*/, size_t & record_count)
580 {
581 std::vector<size_t> v;
582 record_count = 0;
583 //fixme validate_multifile_log
584 // open file
585 // state=closed
586 // curpos = 0
587 // begin = 0
588 // while( next = find_next_regex <[/]adct-json>, curpos ) != end
589 // if next[1] == 'a': # found start tag
590 // if state == closed
591 // state = opened
592 // begin = curpos
593 // curpos = after(next)
594 // else # broken missing tail
595 // state = closed
596 // v.push_back(begin)
597 // curpos = next
598 // else # found end tag
599 // if state == opened
600 // state = closed
601 // record_count++
602 // if check_json
603 // parse check (after(begin), curpos-1)
604 // curpos=after(next)
605 // else # broken missing head
606 // v.push_back(begin)
607 // curpos=after(next)
608 // if state != closed
609 // extra at end of file.
610 // v.push_back(begin)
611 // else
612 // if !eof
613 // trailing junk error
614 return v;
615 }
616};
617
618} // adc
Parallel file output publisher_api implementation. This plugin generates writes each message to the c...
Definition multifile.ipp:47
int config(const std::map< std::string, std::string > &m)
Configure the plugin with the options given.
void pause()
Pause publishing until a call to resume. Duplicate calls are allowed.
static std::vector< size_t > validate_multifile_log(string_view, bool, size_t &record_count)
static std::vector< std::string > consolidate_multifile_logs(const string &pattern, std::vector< std::string > &old_paths, int debug=0)
int config(const std::map< std::string, std::string > &m, std::string_view env_prefix)
Configure the plugin with the options given and the corresponding environment variables.
int publish(std::shared_ptr< builder_api > b)
Publish the content of the builder.
int initialize()
Ready the plugin to publish following the configuration options set or defaulted.
string_view name() const
static std::string get_multifile_log_path(string_view dir, string_view wfid)
void resume()
Resume publishing Duplicate calls are allowed.
static int glob_sendfile_join(const char *dest, const char *pattern, int perm, std::vector< std::string > &merged, std::vector< std::string > &unmerged)
use sendfile to join all files matching pattern in new dest file. No checking of input format correct...
const std::map< const std::string, const std::string > & get_option_defaults()
Look up the settable options and their defaults.
void finalize()
Stop publishing and release any resources held for managing publication.
string_view version() const
Publisher plugin interface.
Definition publisher.hpp:48
Definition adc.hpp:82
std::string_view string_view
Definition curl.ipp:18
std::string string
Definition curl.ipp:17