libadc-cxx 1.0.0
Structured logging for scientific computing
Loading...
Searching...
No Matches
libldms_msg.ipp
Go to the documentation of this file.
3#include <cstdlib>
4#include <iostream>
5#include <fstream>
6#include <filesystem>
7#include <unistd.h>
8#include <ldms.h>
9
10namespace adc {
11
12using std::cout;
13
14typedef std::string string;
15typedef std::string_view string_view;
16
17
18/*! \brief libldms_msg_publish publisher_api implementation.
19 This plugin calls the ldms libraries for sending messages.
20
21 Multiple independent instances of this plugin may be used simultaneously.
22 The asynchronous tranmission spawns (or recycles) a separate thread to
23 handle tranmission.
24 */
26 enum state {
27 ok,
28 err
29 };
30 enum mode {
31 /* the next mode needed for correct operation */
32 pi_config,
33 pi_init,
34 pi_pub_or_final
35 };
36public:
37 /// \brief name of the channel ADC messages go into
38 /// LDMS aggregators must be subscribed to this name.
39 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_CHANNEL").
40 inline static const char* adc_libldms_msg_publish_plugin_channel_default = "adc_publish_api";
41
42 /// \brief authentication method for channel connections; ldmsd listeners must match.
43 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_AUTH").
44 inline static const char* adc_libldms_msg_publish_plugin_auth_default = "munge";
45
46 /// \brief port for channel connections; ldmsd listeners must match.
47 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_PORT").
48 inline static const char* adc_libldms_msg_publish_plugin_port_default = "412";
49
50 /// \brief host for channel connections; ldmsd must be listening on the host.
51 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_HOST").
52 inline static const char* adc_libldms_msg_publish_plugin_host_default = "localhost";
53
54private:
55 const std::map< const string, const string > plugin_libldms_msg_publish_config_defaults =
56 {
61 };
62
63 inline static const char *plugin_prefix = "ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_";
64
65 const string vers;
66 const std::vector<string> tags;
67 string auth;
68 string channel;
69 string port;
70 string host;
71 enum state state;
72 bool paused;
73 enum mode mode;
74 bool configured;
75
76 int config(const string& dir, string_view shost, string_view sport, string_view sauth, string_view schannel) {
77 if (mode != pi_config)
78 return 2;
79 uid_t ui = geteuid();
80 port = sport;
81 host = shost;
82 auth = sauth;
83 channel = schannel;
84 mode = pi_init;
85 configured = true;
86 return 0;
87 }
88
89 // find field in m, then with prefix in env, then the default.
90 const string get(const std::map< string, string >& m,
91 string field, const string& env_prefix) {
92 // fields not defined in config_defaults raise an exception.
93 auto it = m.find(field);
94 if (it != m.end()) {
95 return it->second;
96 }
97 string en = env_prefix + field;
98 char *ec = getenv(en.c_str());
99 if (!ec) {
100 return plugin_libldms_msg_publish_config_defaults.at(field);
101 } else {
102 return string(ec);
103 }
104 }
105
106 // invoke ldms_stream_publish with json in f, then delete f.
107 // wrap this in a thread to make it non-blocking eventually.
108 int libldms_msg_publish_send(string& f)
109 {
110// fixme libldms_msg_publish_plugin::libldms_msg_publish_send
111#if 0
112 string qcmd = "(" + prog +
113 " -t json "
114 " -x sock "
115 " -h " + host +
116 " -p " + port +
117 " -a " + auth +
118 " -s " + stream +
119 " -f " + f +
120 "> /dev/null 2>&1 ; /bin/rm " + f + ") &";
121 int err2 = std::system(qcmd.c_str());
122 std::cout << f << std::endl;
123 std::cout << err2 << std::endl;
124#endif
125 return 0;
126 }
127
128public:
129 libldms_msg_publish_plugin() : vers("0.0.0") , tags({"unimplemented"}), state(ok), paused(false), mode(pi_config), configured(false) {
130 std::cout << "Constructing libldms_msg_publish_plugin" << std::endl;
131 }
132
133 int publish(std::shared_ptr<builder_api> b) {
134 if (paused)
135 return 0;
136 if (state != ok)
137 return 1;
138 if (mode != pi_pub_or_final)
139 return 2;
140// fixme // send to ldms_msg api; libldms_msg_publish_plugin::publish
141 return 1;
142 }
143
144 int config(const std::map< std::string, std::string >& m) {
145 return config(m, plugin_prefix);
146 }
147
148 int config(const std::map< std::string, std::string >& m, const std::string& env_prefix) {
149 string host = get(m, "HOST", env_prefix);
150 string port = get(m, "PORT", env_prefix);
151 string auth = get(m, "AUTH", env_prefix);
152 string channel = get(m, "CHANNEL", env_prefix);
153 return config(d, host, port, auth, channel);
154 }
155
156 const std::map< const std::string, const std::string> & get_option_defaults() {
157 return plugin_libldms_msg_publish_config_defaults;
158 }
159
161 std::map <string, string >m;
162 if (!configured)
163 config(m);
164 if (mode != pi_init) {
165 return 2;
166 }
167 if ( state == err ) {
168 std::cout << "libldms_msg_publish plugin initialize found pre-existing error" << std::endl;
169 return 3;
170 }
171 // fixme: try connect to ldmsd; libldms_msg_publish_plugin::initialize
172 std::error_code ec;
173 // attach msg channel
174 mode = pi_pub_or_final;
175 return 0;
176 }
177
178 void finalize() {
179 if (mode == pi_pub_or_final) {
180 state = ok;
181 paused = false;
182 mode = pi_config;
183 } else {
184 std::cout << "libldms_msg_publish plugin finalize on non-running plugin" << std::endl;
185 }
186 }
187
188 void pause() {
189 paused = true;
190 }
191
192 void resume() {
193 paused = false;
194 }
195
197 return "libldms_msg_publish";
198 }
199
201 return vers;
202 }
203
205 std::cout << "Destructing libldms_msg_publish_plugin" << std::endl;
206 }
207};
208
209} // adc
libldms_msg_publish publisher_api implementation. This plugin calls the ldms libraries for sending me...
void pause()
Pause publishing until a call to resume. Duplicate calls are allowed.
int publish(std::shared_ptr< builder_api > b)
Publish the content of the builder.
static const char * adc_libldms_msg_publish_plugin_channel_default
name of the channel ADC messages go into LDMS aggregators must be subscribed to this name....
static const char * adc_libldms_msg_publish_plugin_port_default
port for channel connections; ldmsd listeners must match. Overridden with env("ADC_LIBLDMS_MSG_PUBLIS...
void resume()
Resume publishing Duplicate calls are allowed.
static const char * adc_libldms_msg_publish_plugin_auth_default
authentication method for channel connections; ldmsd listeners must match. Overridden with env("ADC_L...
const std::map< const std::string, const std::string > & get_option_defaults()
Look up the settable options and their defaults.
int config(const std::map< std::string, std::string > &m, const std::string &env_prefix)
void finalize()
Stop publishing and release any resources held for managing publication.
int config(const std::map< std::string, std::string > &m)
Configure the plugin with the options given.
int initialize()
Ready the plugin to publish following the configuration options set or defaulted.
static const char * adc_libldms_msg_publish_plugin_host_default
host for channel connections; ldmsd must be listening on the host. Overridden with env("ADC_LIBLDMS_M...
Publisher plugin interface.
Definition publisher.hpp:44
Definition adc.hpp:75
std::string_view string_view
Definition curl.ipp:14
std::string string
Definition curl.ipp:13