libadc-cxx 1.0.0
Structured logging for scientific computing
Loading...
Searching...
No Matches
ldms_message_publish.ipp
Go to the documentation of this file.
3#include <cstdlib>
4#include <iostream>
5#include <fstream>
6#include <filesystem>
7#include <unistd.h>
8
9namespace adc {
10
11using std::cout;
12
13typedef std::string string;
14typedef std::string_view string_view;
15
16
17/*! \brief ldms_message_publish utility publisher_api implementation.
18 This plugin generates a scratch file (in-memory) and asynchronously
19 sends it to ldmsd by invoking the 'ldms_message_publish' utility
20 available in ldms versions from 4.5. Use the ldmsd_stream_publish_plugin
21 for ldms versions 4.4 and below.
22
23 Multiple independent instances of this plugin may be used simultaneously.
24 The asynchronous tranmission spawns a separate shell process to
25 handle tranmission and file cleanup.
26 */
28 enum state {
29 ok,
30 err
31 };
32 enum mode {
33 /* the next mode needed for correct operation */
34 pi_config,
35 pi_init,
36 pi_pub_or_final
37 };
38public:
39 /// \brief scratch directory for ldms_message_publish utility messages
40 /// This directory must be globally writable and should be fast.
41 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_DIRECTORY").
42 inline static const char* adc_ldms_message_publish_plugin_directory_default = "/dev/shm/adc";
43
44 /// \brief full path to ldms_message_publish utility
45 /// This program should be world accessible.
46 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_PROG").
47 inline static const char* adc_ldms_message_publish_plugin_prog_default = "/usr/sbin/ldms_msg_publish";
48
49 /// \brief channel of the ADC messages published
50 /// LDMS aggregators must be subscribed to this name.
51 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_CHANNEL").
52 inline static const char* adc_ldms_message_publish_plugin_channel_default = "adc_publish_api";
53
54 /// \brief authentication method for ldms message connections; ldmsd listeners must match.
55 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_AUTH").
56 inline static const char* adc_ldms_message_publish_plugin_auth_default = "munge";
57
58 /// \brief port for message connections; ldmsd listeners must match.
59 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_PORT").
60 inline static const char* adc_ldms_message_publish_plugin_port_default = "412";
61
62 /// \brief host for message connections; ldmsd must be listening on the host.
63 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_HOST").
64 inline static const char* adc_ldms_message_publish_plugin_host_default = "localhost";
65
66 /// \brief affinity for spawned processes; defaults to all
67 /// If not all, it should be a range list of cpu numbers, e,g. "0-1,56-57"
68 /// on a 56 core hyperthreaded processor where admin tools should run on 2 lowest cores.
69 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_AFFINITY").
70 inline static const char* adc_ldms_message_publish_plugin_affinity_default = "all";
71
72 /// \brief ADC ldms-message plugin enable debug messages; (default "0": none)
73 /// Overridden with env("ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_DEBUG").
74 inline static const char *adc_ldms_message_publish_plugin_debug_default = "0";
75
76
77private:
78 const std::map< const string, const string > plugin_ldms_message_publish_config_defaults =
87 };
88
89 inline static const char *plugin_prefix = "ADC_LDMS_MESSAGE_PUBLISH_PLUGIN_";
90
91 const string vers;
92 const std::vector<string> tags;
93 string fdir;
94 string prog;
95 string auth;
96 string channel;
97 string port;
98 string host;
99 int debug;
100 enum state state;
101 bool paused;
102 enum mode mode;
103
104 int config(const string& dir, string_view shost, string_view sport, string_view sprog, string_view sauth, string_view schannel, const string& sdebug) {
105 if (mode != pi_config)
106 return 2;
107 uid_t ui = geteuid();
108 fdir = dir + string("/") + std::to_string(ui);
109 port = sport;
110 host = shost;
111 prog = sprog;
112 auth = sauth;
113 channel = schannel;
114 mode = pi_init;
115 std::stringstream ss(sdebug);
116 ss >> debug;
117 if (debug < 0) {
118 debug = 0;
119 }
120 if (debug > 0) {
121 std::cout<< "ldms_message_publish plugin configured" <<std::endl;
122 }
123 return 0;
124 }
125
126 // find field in m, then with prefix in env, then the default.
127 const string get(const std::map< string, string >& m,
128 string field, string_view env_prefix) {
129 // fields not defined in config_defaults raise an exception.
130 auto it = m.find(field);
131 if (it != m.end()) {
132 return it->second;
133 }
134 string en = string(env_prefix) += field;
135 char *ec = getenv(en.c_str());
136 if (!ec) {
137 return plugin_ldms_message_publish_config_defaults.at(field);
138 } else {
139 return string(ec);
140 }
141 }
142
143 string get_temp_file_name() {
144 string tplt = fdir + "/json-lsp-msg-XXXXXX";
145 char * cstr = new char [tplt.length()+1];
146 std::strcpy (cstr, tplt.c_str());
147 int fd = mkstemp(cstr);
148 if (fd >= 0) {
149 string f(cstr);
150 delete[] cstr;
151 close(fd);
152 // current user now owns tempfile.
153 return f;
154 }
155 std::cout<< "mkstemp failed" <<std::endl;
156 delete[] cstr;
157 return "";
158 }
159
160 // invoke ldms_message_publish with json in f, then delete f.
161 // wrap this in a thread to make it non-blocking eventually.
162 int ldms_message_publish_send(string& f)
163 {
164 string qcmd = "(" + prog +
165 " -t json "
166 " -x sock "
167 " -h " + host +
168 " -p " + port +
169 " -a " + auth +
170 " -m " + channel +
171 " -f " + f +
172 "> /dev/null 2>&1 ; /bin/rm -f " + f + ") &";
173 int err2 = std::system(qcmd.c_str());
174 if (debug) {
175 std::cout << f << std::endl;
176 std::cout << err2 << std::endl;
177 }
178 return 0;
179 }
180
181public:
182 ldms_message_publish_plugin() : vers("1.0.0") , tags({"none"}), debug(0),
183 state(ok), paused(false), mode(pi_config) {}
184
185 int publish(std::shared_ptr<builder_api> b) {
186 if (paused)
187 return 0;
188 if (state != ok)
189 return 1;
190 if (mode != pi_pub_or_final)
191 return 2;
192 // write to tmpfile, ldms_message_publish, and then delete
193 // this could be made fancy with a work queue in a later reimplementation.
194 // try for /dev/shm/adc for performance and fall back to tmpdir
195 string fname = get_temp_file_name();
196 if (debug) {
197 std::cout << "name: " << fname <<std::endl;
198 }
199 // open dump file
200 std::ofstream out(fname);
201 if (out.good()) {
202 out << b->serialize() << std::endl;
203 if (out.good()) {
204 if (debug) {
205 std::cout << "'ldms_message_publish' wrote" << std::endl;
206 }
207 out.close();
208 ldms_message_publish_send(fname); // send also removes file
209 return 0;
210 } else {
211 std::cout << "failed write" << std::endl;
212 return 1;
213 }
214 }
215 std::filesystem::remove(fname);
216 std::cout << "failed open " <<fname<< std::endl;
217 return 1;
218 }
219
220 int config(const std::map< std::string, std::string >& m) {
221 return config(m, plugin_prefix);
222 }
223
224 int config(const std::map< std::string, std::string >& m, string_view env_prefix) {
225 string d = get(m, "DIRECTORY", env_prefix);
226 string host = get(m, "HOST", env_prefix);
227 string port = get(m, "PORT", env_prefix);
228 string prog = get(m, "PROG", env_prefix);
229 string auth = get(m, "AUTH", env_prefix);
230 string channel = get(m, "CHANNEL", env_prefix);
231 string sdebug = get(m, "DEBUG", env_prefix);
232 return config(d, host, port, prog, auth, channel, sdebug);
233 }
234
235 const std::map< const std::string, const std::string> & get_option_defaults() {
236 return plugin_ldms_message_publish_config_defaults;
237 }
238
240 std::map <string, string >m;
241 // config if never config'd
242 if (!fdir.size())
243 config(m);
244 if (mode != pi_init) {
245 return 2;
246 }
247 if ( state == err ) {
248 std::cout << "ldms_message_publish plugin initialize found pre-existing error" << std::endl;
249 return 3;
250 }
251 std::error_code ec;
252 std::filesystem::create_directories(fdir, ec);
253 if (ec.value() != 0 && ec.value() != EEXIST ) {
254 state = err;
255 std::cout << "unable to create scratch directory for plugin 'ldms_message_publish'; "
256 << fdir << " : " << ec.message() << std::endl;
257 return ec.value();
258 } else {
259 if (debug) {
260 std::cout << "created " << fdir <<std::endl;
261 }
262 mode = pi_pub_or_final;
263 }
264 return 0;
265 }
266
267 void finalize() {
268 if (mode == pi_pub_or_final) {
269 state = ok;
270 paused = false;
271 mode = pi_config;
272 } else {
273 if (debug) {
274 std::cout << "ldms_message_publish plugin finalize on non-running plugin" << std::endl;
275 }
276 }
277 }
278
279 void pause() {
280 paused = true;
281 }
282
283 void resume() {
284 paused = false;
285 }
286
288 return "ldms_message_publish";
289 }
290
292 return vers;
293 }
294
296 if (debug) {
297 std::cout << "Destructing ldms_message_publish_plugin" << std::endl;
298 }
299 }
300};
301
302} // adc
ldms_message_publish utility publisher_api implementation. This plugin generates a scratch file (in-m...
int initialize()
Ready the plugin to publish following the configuration options set or defaulted.
const std::map< const std::string, const std::string > & get_option_defaults()
Look up the settable options and their defaults.
void resume()
Resume publishing Duplicate calls are allowed.
int publish(std::shared_ptr< builder_api > b)
Publish the content of the builder.
void pause()
Pause publishing until a call to resume. Duplicate calls are allowed.
static const char * adc_ldms_message_publish_plugin_port_default
port for message connections; ldmsd listeners must match. Overridden with env("ADC_LDMS_MESSAGE_PUBLI...
static const char * adc_ldms_message_publish_plugin_host_default
host for message connections; ldmsd must be listening on the host. Overridden with env("ADC_LDMS_MESS...
int config(const std::map< std::string, std::string > &m, string_view env_prefix)
Configure the plugin with the options given and the corresponding environment variables.
static const char * adc_ldms_message_publish_plugin_affinity_default
affinity for spawned processes; defaults to all If not all, it should be a range list of cpu numbers,...
void finalize()
Stop publishing and release any resources held for managing publication.
static const char * adc_ldms_message_publish_plugin_channel_default
channel of the ADC messages published LDMS aggregators must be subscribed to this name....
static const char * adc_ldms_message_publish_plugin_prog_default
full path to ldms_message_publish utility This program should be world accessible....
static const char * adc_ldms_message_publish_plugin_directory_default
scratch directory for ldms_message_publish utility messages This directory must be globally writable ...
static const char * adc_ldms_message_publish_plugin_debug_default
ADC ldms-message plugin enable debug messages; (default "0": none) Overridden with env("ADC_LDMS_MESS...
int config(const std::map< std::string, std::string > &m)
Configure the plugin with the options given.
static const char * adc_ldms_message_publish_plugin_auth_default
authentication method for ldms message connections; ldmsd listeners must match. Overridden with env("...
Publisher plugin interface.
Definition publisher.hpp:44
Definition adc.hpp:75
std::string_view string_view
Definition curl.ipp:14
std::string string
Definition curl.ipp:13