libadc-cxx 1.0.0
Structured logging for scientific computing
Loading...
Searching...
No Matches
ldmsd_stream_publish.ipp
Go to the documentation of this file.
1/* Copyright 2025 NTESS. See the top-level LICENSE.txt file for details.
2 *
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
7#include <cstdlib>
8#include <iostream>
9#include <fstream>
10#include <filesystem>
11#include <unistd.h>
12
13namespace adc {
14
15using std::cout;
16
17typedef std::string string;
18typedef std::string_view string_view;
19
20
21/*! \brief ldmsd_stream_publish utility publisher_api implementation.
22 This plugin generates a scratch file (in-memory) and asynchronously
23 sends it to ldmsd by invoking the 'ldmsd_stream_publish' utility
24 available in ldms versions up to 4.4. Use the ldms_message_publish_plugin
25 for ldms versions 4.5 and above.
26
27 Multiple independent instances of this plugin may be used simultaneously.
28 The asynchronous tranmission spawns a separate shell process to
29 handle tranmission and file cleanup.
30 */
32 enum state {
33 ok,
34 err
35 };
36 enum mode {
37 /* the next mode needed for correct operation */
38 pi_config,
39 pi_init,
40 pi_pub_or_final
41 };
42public:
43 /// \brief scratch directory for ldmsd_stream_publish utility messages
44 /// This directory must be globally writable and should be fast.
45 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_DIRECTORY").
46 inline static const char* adc_ldmsd_stream_publish_plugin_directory_default = "/dev/shm/adc";
47
48 /// \brief full path to ldmsd_stream_publish utility
49 /// This program should be world accessible.
50 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_PROG").
51 inline static const char* adc_ldmsd_stream_publish_plugin_prog_default = "/usr/sbin/ldmsd_stream_publish";
52
53 /// \brief name of the stream ADC messages go into
54 /// LDMS aggregators must be subscribed to this name.
55 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_STREAM").
56 inline static const char* adc_ldmsd_stream_publish_plugin_stream_default = "adc_publish_api";
57
58 /// \brief authentication method for stream connections; ldmsd listeners must match.
59 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_AUTH").
60 inline static const char* adc_ldmsd_stream_publish_plugin_auth_default = "munge";
61
62 /// \brief port for stream connections; ldmsd listeners must match.
63 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_PORT").
64 inline static const char* adc_ldmsd_stream_publish_plugin_port_default = "412";
65
66 /// \brief host for stream connections; ldmsd must be listening on the host.
67 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_HOST").
68 inline static const char* adc_ldmsd_stream_publish_plugin_host_default = "localhost";
69
70 /// \brief affinity for spawned processes; defaults to all
71 /// If not all, it should be a range list of cpu numbers, e,g. "0-1,56-57"
72 /// on a 56 core hyperthreaded processor where admin tools should run on 2 lowest cores.
73 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_AFFINITY").
74 inline static const char* adc_ldmsd_stream_publish_plugin_affinity_default = "all";
75
76 /// \brief ADC ldmsd-stream plugin enable debug messages; (default "0": none)
77 /// Overridden with env("ADC_LDMSD_STREAM_PUBLISH_PLUGIN_DEBUG").
78 inline static const char *adc_ldmsd_stream_publish_plugin_debug_default = "0";
79
80
81private:
82 const std::map< const string, const string > plugin_ldmsd_stream_publish_config_defaults =
91 };
92
93 inline static const char *plugin_prefix = "ADC_LDMSD_STREAM_PUBLISH_PLUGIN_";
94
95 const string vers;
96 const std::vector<string> tags;
97 string fdir;
98 string prog;
99 string auth;
100 string stream;
101 string port;
102 string host;
103 int debug;
104 enum state state;
105 bool paused;
106 enum mode mode;
107
108 int config(const string& dir, string_view shost, string_view sport, string_view sprog, string_view sauth, string_view sstream, const string& sdebug) {
109 if (mode != pi_config)
110 return 2;
111 uid_t ui = geteuid();
112 fdir = dir + string("/") + std::to_string(ui);
113 port = sport;
114 host = shost;
115 prog = sprog;
116 auth = sauth;
117 stream = sstream;
118 mode = pi_init;
119 std::stringstream ss(sdebug);
120 ss >> debug;
121 if (debug < 0) {
122 debug = 0;
123 }
124 if (debug > 0) {
125 std::cout<< "ldmsd_stream_publish plugin configured" <<std::endl;
126 }
127 return 0;
128 }
129
130 // find field in m, then with prefix in env, then the default.
131 const string get(const std::map< string, string >& m,
132 string field, string_view env_prefix) {
133 // fields not defined in config_defaults raise an exception.
134 auto it = m.find(field);
135 if (it != m.end()) {
136 return it->second;
137 }
138 string en = string(env_prefix) += field;
139 char *ec = getenv(en.c_str());
140 if (!ec) {
141 return plugin_ldmsd_stream_publish_config_defaults.at(field);
142 } else {
143 return string(ec);
144 }
145 }
146
147 string get_temp_file_name() {
148 string tplt = fdir + "/json-lsp-msg-XXXXXX";
149 char * cstr = new char [tplt.length()+1];
150 std::strcpy (cstr, tplt.c_str());
151 int fd = mkstemp(cstr);
152 if (fd >= 0) {
153 string f(cstr);
154 delete[] cstr;
155 close(fd);
156 // current user now owns tempfile.
157 return f;
158 }
159 std::cout<< "mkstemp failed" <<std::endl;
160 delete[] cstr;
161 return "";
162 }
163
164 // invoke ldms_stream_publish with json in f, then delete f.
165 // wrap this in a thread to make it non-blocking eventually.
166 int ldmsd_stream_publish_send(string& f)
167 {
168 string qcmd = "(" + prog +
169 " -t json "
170 " -x sock "
171 " -h " + host +
172 " -p " + port +
173 " -a " + auth +
174 " -s " + stream +
175 " -f " + f +
176 "> /dev/null 2>&1 ; /bin/rm -f " + f + ") &";
177 int err2 = std::system(qcmd.c_str());
178 if (debug) {
179 std::cout << f << std::endl;
180 std::cout << err2 << std::endl;
181 }
182 return 0;
183 }
184
185public:
186 ldmsd_stream_publish_plugin() : vers("1.0.0") , tags({"none"}), debug(0),
187 state(ok), paused(false), mode(pi_config) {}
188
189 int publish(std::shared_ptr<builder_api> b) {
190 if (paused)
191 return 0;
192 if (state != ok)
193 return 1;
194 if (mode != pi_pub_or_final)
195 return 2;
196 // write to tmpfile, ldms_stream_publish, and then delete
197 // this could be made fancy with a work queue in a later reimplementation.
198 // try for /dev/shm/adc for performance and fall back to tmpdir
199 string fname = get_temp_file_name();
200 if (debug) {
201 std::cout << "name: " << fname <<std::endl;
202 }
203 // open dump file
204 std::ofstream out(fname);
205 if (out.good()) {
206 out << b->serialize() << std::endl;
207 if (out.good()) {
208 if (debug) {
209 std::cout << "'ldmsd_stream_publish' wrote" << std::endl;
210 }
211 out.close();
212 ldmsd_stream_publish_send(fname); // send also removes file
213 return 0;
214 } else {
215 std::cout << "failed write" << std::endl;
216 return 1;
217 }
218 }
219 std::filesystem::remove(fname);
220 std::cout << "failed open " <<fname<< std::endl;
221 return 1;
222 }
223
224 int config(const std::map< std::string, std::string >& m) {
225 return config(m, plugin_prefix);
226 }
227
228 int config(const std::map< std::string, std::string >& m, string_view env_prefix) {
229 string d = get(m, "DIRECTORY", env_prefix);
230 string host = get(m, "HOST", env_prefix);
231 string port = get(m, "PORT", env_prefix);
232 string prog = get(m, "PROG", env_prefix);
233 string auth = get(m, "AUTH", env_prefix);
234 string stream = get(m, "STREAM", env_prefix);
235 string sdebug = get(m, "DEBUG", env_prefix);
236 return config(d, host, port, prog, auth, stream, sdebug);
237 }
238
239 const std::map< const std::string, const std::string> & get_option_defaults() {
240 return plugin_ldmsd_stream_publish_config_defaults;
241 }
242
244 std::map <string, string >m;
245 // config if never config'd
246 if (!fdir.size())
247 config(m);
248 if (mode != pi_init) {
249 return 2;
250 }
251 if ( state == err ) {
252 std::cout << "ldmsd_stream_publish plugin initialize found pre-existing error" << std::endl;
253 return 3;
254 }
255 std::error_code ec;
256 std::filesystem::create_directories(fdir, ec);
257 if (ec.value() != 0 && ec.value() != EEXIST ) {
258 state = err;
259 std::cout << "unable to create scratch directory for plugin 'ldmsd_stream_publish'; "
260 << fdir << " : " << ec.message() << std::endl;
261 return ec.value();
262 } else {
263 if (debug) {
264 std::cout << "created " << fdir <<std::endl;
265 }
266 mode = pi_pub_or_final;
267 }
268 return 0;
269 }
270
271 void finalize() {
272 if (mode == pi_pub_or_final) {
273 state = ok;
274 paused = false;
275 mode = pi_config;
276 } else {
277 if (debug) {
278 std::cout << "ldmsd_stream_publish plugin finalize on non-running plugin" << std::endl;
279 }
280 }
281 }
282
283 void pause() {
284 paused = true;
285 }
286
287 void resume() {
288 paused = false;
289 }
290
292 return "ldmsd_stream_publish";
293 }
294
296 return vers;
297 }
298
300 if (debug) {
301 std::cout << "Destructing ldmsd_stream_publish_plugin" << std::endl;
302 }
303 }
304};
305
306} // adc
ldmsd_stream_publish utility publisher_api implementation. This plugin generates a scratch file (in-m...
int publish(std::shared_ptr< builder_api > b)
Publish the content of the builder.
static const char * adc_ldmsd_stream_publish_plugin_prog_default
full path to ldmsd_stream_publish utility This program should be world accessible....
void pause()
Pause publishing until a call to resume. Duplicate calls are allowed.
void resume()
Resume publishing Duplicate calls are allowed.
int config(const std::map< std::string, std::string > &m, string_view env_prefix)
Configure the plugin with the options given and the corresponding environment variables.
static const char * adc_ldmsd_stream_publish_plugin_auth_default
authentication method for stream connections; ldmsd listeners must match. Overridden with env("ADC_LD...
static const char * adc_ldmsd_stream_publish_plugin_port_default
port for stream connections; ldmsd listeners must match. Overridden with env("ADC_LDMSD_STREAM_PUBLIS...
int initialize()
Ready the plugin to publish following the configuration options set or defaulted.
static const char * adc_ldmsd_stream_publish_plugin_debug_default
ADC ldmsd-stream plugin enable debug messages; (default "0": none) Overridden with env("ADC_LDMSD_STR...
void finalize()
Stop publishing and release any resources held for managing publication.
static const char * adc_ldmsd_stream_publish_plugin_host_default
host for stream connections; ldmsd must be listening on the host. Overridden with env("ADC_LDMSD_STRE...
int config(const std::map< std::string, std::string > &m)
Configure the plugin with the options given.
static const char * adc_ldmsd_stream_publish_plugin_affinity_default
affinity for spawned processes; defaults to all If not all, it should be a range list of cpu numbers,...
static const char * adc_ldmsd_stream_publish_plugin_stream_default
name of the stream ADC messages go into LDMS aggregators must be subscribed to this name....
static const char * adc_ldmsd_stream_publish_plugin_directory_default
scratch directory for ldmsd_stream_publish utility messages This directory must be globally writable ...
const std::map< const std::string, const std::string > & get_option_defaults()
Look up the settable options and their defaults.
Publisher plugin interface.
Definition publisher.hpp:48
Definition adc.hpp:82
std::string_view string_view
Definition curl.ipp:18
std::string string
Definition curl.ipp:17