60 inline static const std::map< const string, const string > plugin_multifile_config_defaults =
66 inline static const char *plugin_multifile_prefix =
"ADC_MULTIFILE_PLUGIN_";
68 const std::vector<string> tags;
73 std::map< std::string, std::unique_ptr< std::ofstream > > app_out;
79 int config(
const string dir,
const string user_rank,
const string sdebug) {
80 if (mode != pi_config)
82 char hname[HOST_NAME_MAX+1];
83 char uname[L_cuserid+1];
84 if (gethostname(hname, HOST_NAME_MAX+1)) {
87 if (!cuserid(uname)) {
91 std::stringstream dss(sdebug);
104 char *wfid = getenv(
"ADC_WFID");
110 pid_t pid = getpid();
112 clock_gettime(CLOCK_BOOTTIME, &ts);
113 ss << dir <<
"/" << uname <<
"/" << wname <<
114 "H_"<< hname <<
".P" << pid <<
".T" <<
115 ts.tv_sec <<
"." << ts.tv_nsec <<
".p" <<
this;
122 std::cerr <<
"multifile plugin configured" <<std::endl;
128 const string get(
const std::map< string, string >& m,
131 auto it = m.find(
field);
136 char *ec = getenv(en.c_str());
138 return plugin_multifile_config_defaults.at(
field);
149 void create_directories(
const string& dir,
const string& user,
const string& fdir, std::error_code& ec) {
150 namespace fs = std::filesystem;
152 auto status_fdir = fs::status(fdir, ec);
153 if (fs::exists(status_fdir)) {
154 if (fs::is_directory(status_fdir) ) {
157 ec = make_error_code(std::errc::not_a_directory);
162 auto fpath = fs::path(fdir);
163 auto bdir = fpath.parent_path();
164 auto status_bdir = fs::status(bdir);
165 if (fs::exists(status_bdir)) {
166 if (fs::is_directory(status_bdir) ) {
167 fs::create_directory(fdir, bdir, ec);
170 ec = make_error_code(std::errc::not_a_directory);
176 fs::path du_path = dir;
178 auto status_udir = fs::status(du_path);
179 if (fs::exists(status_udir)) {
180 if (fs::is_directory(status_bdir) ) {
181 fs::create_directory(fdir, du_path, ec);
184 ec = make_error_code(std::errc::not_a_directory);
190 fs::path d_path = dir;
191 auto status_dir = fs::status(d_path);
192 if (fs::exists(status_dir)) {
193 if (fs::is_directory(status_dir) ) {
194 fs::create_directory(du_path, d_path, ec);
198 fs::create_directory(fdir, d_path, ec);
202 auto is_sticky = status_dir.permissions() & fs::perms::sticky_bit;
205 fs::perms::owner_all | fs::perms::group_read | fs::perms::group_exec | is_sticky,
206 fs::perm_options::replace,
211 ec = make_error_code(std::errc::not_a_directory);
217 fs::create_directory(dir, ec);
223 fs::perms::owner_all | fs::perms::group_read | fs::perms::group_exec | fs::perms::sticky_bit,
224 fs::perm_options::replace,
227 fs::create_directory(du_path, dir, ec);
231 fs::create_directory(fdir, dir, ec);
242 void create_stream(
const string& application) {
243 if (app_out.count(application)) {
246 std::stringstream ss;
247 ss << fdir <<
"/" << application <<
".R" << rank <<
".XXXXXX";
248 string fpath = ss.str();
249 char *ftemplate = strdup(fpath.c_str());
252 std::cerr << __FILE__ <<
": out of memory" << std::endl;
254 app_out[application] = std::make_unique< std::ofstream >();
257 int fd = mkstemp(ftemplate);
260 std::cerr << __FILE__ <<
": mkstemp failed for " << ftemplate << std::endl;
263 app_out[application] = std::make_unique< std::ofstream >();
267 app_out[application] = std::make_unique< std::ofstream >(ftemplate,
268 (std::ofstream::out | std::ofstream::trunc));
269 std::filesystem::permissions(ftemplate,
271 std::filesystem::perms::owner_read |
272 std::filesystem::perms::owner_write |
273 std::filesystem::perms::group_write |
274 std::filesystem::perms::group_read),
275 std::filesystem::perm_options::add);
280 multifile_plugin() : vers(
"1.0.0") , tags({
"none"}), state(ok), paused(
false), mode(pi_config) { }
287 if (mode != pi_pub_or_final)
289 auto app = b->get_value_string(
"/header/application");
291 auto fields = b->get_field_names();
293 std::cerr <<
"fields: ";
294 for (
const auto& f : fields) {
295 std::cerr << f <<
" ";
297 std::cerr << std::endl;
300 std::cerr << __FILE__ <<
301 ": cannot publish without /header/application"
303 std::cerr << b->serialize() << std::endl;
304 std::cerr <<
"sections:" ;
305 for (
auto s : b->get_section_names()) {
306 std::cerr << s <<
" ";
308 std::cerr << std::endl;
314 if (app_out[app]->is_open() && app_out[app]->good()) {
315 *(app_out[app]) <<
string(
"<adct-json>") << b->serialize() <<
"</adct-json>" << std::endl;
317 std::cerr <<
"'multifile' wrote" << std::endl;
322 std::cerr << __FILE__ <<
" failed out.good" << std::endl;
327 int config(
const std::map< std::string, std::string >& m) {
328 return config(m, plugin_multifile_prefix);
331 int config(
const std::map< std::string, std::string >& m, std::string_view env_prefix) {
332 string d = get(m,
"DIRECTORY", env_prefix);
333 string r = get(m,
"RANK", env_prefix);
334 string l = get(m,
"DEBUG", env_prefix);
335 return config(d, r, l);
339 return plugin_multifile_config_defaults;
343 std::map <string, string >m;
346 if (mode != pi_init) {
349 if ( state == err ) {
352 "multifile plugin initialize found pre-existing error"
358 create_directories(topdir, user, fdir, ec);
359 if (ec.value() != 0 && ec.value() != EEXIST ) {
362 std::cerr <<
"unable to create output directory for plugin 'multifile'; "
363 << fdir <<
" : " << ec.message() << std::endl;
367 string testfile = fdir +
"/.XXXXXX";
368 auto ftemplate = std::make_unique<char[]>(testfile.length()+1);
369 ::std::strcpy(ftemplate.get(), testfile.c_str());
370 int fd = mkstemp(ftemplate.get());
375 std::cerr <<
"unable to open file in output directory "
376 << fdir <<
" for plugin 'multifile': " <<
377 std::strerror(rc) << std::endl;
382 unlink(ftemplate.get());
384 mode = pi_pub_or_final;
389 if (mode == pi_pub_or_final) {
396 std::cerr <<
"multifile plugin finalize on non-running plugin" << std::endl;
419 std::cerr <<
"Destructing multifile_plugin" << std::endl;
424 const string& pattern, std::vector< std::string>& old_paths,
427 namespace fs = std::filesystem;
428 std::vector<std::string> new_files;
429 std::vector<std::string> unmerged;
431 std::vector<std::string> parts;
432 for (
const auto& part : fs::path(pattern)) {
433 parts.push_back(part.string());
436 auto appdir = fs::path(pattern).parent_path();
437 auto appleafdir = parts.end()[-2];
438 auto userdir = appdir.parent_path();
439 auto pos = appleafdir.find(
".H_");
441 if (pos != std::string::npos) {
442 prefix = appleafdir.substr(0,pos);
444 const auto now = std::chrono::system_clock::now();
445 const auto esec = std::chrono::duration_cast<
446 std::chrono::seconds >( now.time_since_epoch());
447 const long long isec = esec.count();
448 prefix = std::to_string(isec);
451 userdir /= (
string(
"consolidated.") + prefix +
".adct-json.multi.xml");
452 const char *dest = userdir.c_str();
454 fs::perms perms = fs::status(appdir, ec).permissions();
455 int perm =
static_cast<int>(perms);
462 for (
auto f : unmerged) {
463 std::cerr << __FILE__ <<
464 ": unmergeable file: " <<
467 for (
auto f : old_paths) {
468 std::cerr << __FILE__ <<
469 ": ignoring mergable file: "
476 new_files.push_back(dest);
484 std::stringstream ss;
485 char uname[L_cuserid+1];
486 if (!cuserid(uname)) {
496 ss << dir <<
"/" << uname <<
"/" << wname <<
"H_*.P*.T*.p*/*.R*.*";
514 int perm, std::vector<std::string>& merged, std::vector<std::string>& unmerged)
518 int err = glob(pattern, GLOB_NOSORT|GLOB_TILDE, NULL, &files);
533 int dest_fd = open(dest, O_WRONLY | O_CREAT |O_CLOEXEC | O_TRUNC , perm);
539 for (
size_t i = 0; i < files.gl_pathc; i++) {
540 int src_fd = open(files.gl_pathv[i], O_RDONLY);
541 if (src_fd == -1)
continue;
544 struct stat stat_buf;
545 if (fstat(src_fd, &stat_buf) == 0 && S_ISREG(stat_buf.st_mode)) {
547 size_t remaining = stat_buf.st_size;
550 while (remaining > 0) {
551 ssize_t sent = sendfile(dest_fd, src_fd, &offset, remaining);
555 if (sent == -1 && errno == EINTR)
continue;
562 merged.push_back(files.gl_pathv[i]);
564 unmerged.push_back(files.gl_pathv[i]);
572 if (unmerged.size() != 0) {
581 std::vector<size_t> v;