libadc-cxx 1.0.0
Structured logging for scientific computing
Loading...
Searching...
No Matches
libldms_msg.ipp
Go to the documentation of this file.
1/* Copyright 2025 NTESS. See the top-level LICENSE.txt file for details.
2 *
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
7#include <cstdlib>
8#include <iostream>
9#include <fstream>
10#include <filesystem>
11#include <unistd.h>
12#include <ldms.h>
13
14namespace adc {
15
16using std::cout;
17
18typedef std::string string;
19typedef std::string_view string_view;
20
21
22/*! \brief libldms_msg_publish publisher_api implementation.
23 This plugin calls the ldms libraries for sending messages.
24
25 Multiple independent instances of this plugin may be used simultaneously.
26 The asynchronous tranmission spawns (or recycles) a separate thread to
27 handle tranmission.
28 */
30 enum state {
31 ok,
32 err
33 };
34 enum mode {
35 /* the next mode needed for correct operation */
36 pi_config,
37 pi_init,
38 pi_pub_or_final
39 };
40public:
41 /// \brief name of the channel ADC messages go into
42 /// LDMS aggregators must be subscribed to this name.
43 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_CHANNEL").
44 inline static const char* adc_libldms_msg_publish_plugin_channel_default = "adc_publish_api";
45
46 /// \brief authentication method for channel connections; ldmsd listeners must match.
47 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_AUTH").
48 inline static const char* adc_libldms_msg_publish_plugin_auth_default = "munge";
49
50 /// \brief port for channel connections; ldmsd listeners must match.
51 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_PORT").
52 inline static const char* adc_libldms_msg_publish_plugin_port_default = "412";
53
54 /// \brief host for channel connections; ldmsd must be listening on the host.
55 /// Overridden with env("ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_HOST").
56 inline static const char* adc_libldms_msg_publish_plugin_host_default = "localhost";
57
58private:
59 const std::map< const string, const string > plugin_libldms_msg_publish_config_defaults =
60 {
65 };
66
67 inline static const char *plugin_prefix = "ADC_LIBLDMS_MSG_PUBLISH_PLUGIN_";
68
69 const string vers;
70 const std::vector<string> tags;
71 string auth;
72 string channel;
73 string port;
74 string host;
75 enum state state;
76 bool paused;
77 enum mode mode;
78 bool configured;
79
80 int config(const string& dir, string_view shost, string_view sport, string_view sauth, string_view schannel) {
81 if (mode != pi_config)
82 return 2;
83 uid_t ui = geteuid();
84 port = sport;
85 host = shost;
86 auth = sauth;
87 channel = schannel;
88 mode = pi_init;
89 configured = true;
90 return 0;
91 }
92
93 // find field in m, then with prefix in env, then the default.
94 const string get(const std::map< string, string >& m,
95 string field, const string& env_prefix) {
96 // fields not defined in config_defaults raise an exception.
97 auto it = m.find(field);
98 if (it != m.end()) {
99 return it->second;
100 }
101 string en = env_prefix + field;
102 char *ec = getenv(en.c_str());
103 if (!ec) {
104 return plugin_libldms_msg_publish_config_defaults.at(field);
105 } else {
106 return string(ec);
107 }
108 }
109
110 // invoke ldms_stream_publish with json in f, then delete f.
111 // wrap this in a thread to make it non-blocking eventually.
112 int libldms_msg_publish_send(string& f)
113 {
114// fixme libldms_msg_publish_plugin::libldms_msg_publish_send
115#if 0
116 string qcmd = "(" + prog +
117 " -t json "
118 " -x sock "
119 " -h " + host +
120 " -p " + port +
121 " -a " + auth +
122 " -s " + stream +
123 " -f " + f +
124 "> /dev/null 2>&1 ; /bin/rm " + f + ") &";
125 int err2 = std::system(qcmd.c_str());
126 std::cout << f << std::endl;
127 std::cout << err2 << std::endl;
128#endif
129 return 0;
130 }
131
132public:
133 libldms_msg_publish_plugin() : vers("0.0.0") , tags({"unimplemented"}), state(ok), paused(false), mode(pi_config), configured(false) {
134 std::cout << "Constructing libldms_msg_publish_plugin" << std::endl;
135 }
136
137 int publish(std::shared_ptr<builder_api> b) {
138 if (paused)
139 return 0;
140 if (state != ok)
141 return 1;
142 if (mode != pi_pub_or_final)
143 return 2;
144// fixme // send to ldms_msg api; libldms_msg_publish_plugin::publish
145 return 1;
146 }
147
148 int config(const std::map< std::string, std::string >& m) {
149 return config(m, plugin_prefix);
150 }
151
152 int config(const std::map< std::string, std::string >& m, const std::string& env_prefix) {
153 string host = get(m, "HOST", env_prefix);
154 string port = get(m, "PORT", env_prefix);
155 string auth = get(m, "AUTH", env_prefix);
156 string channel = get(m, "CHANNEL", env_prefix);
157 return config(d, host, port, auth, channel);
158 }
159
160 const std::map< const std::string, const std::string> & get_option_defaults() {
161 return plugin_libldms_msg_publish_config_defaults;
162 }
163
165 std::map <string, string >m;
166 if (!configured)
167 config(m);
168 if (mode != pi_init) {
169 return 2;
170 }
171 if ( state == err ) {
172 std::cout << "libldms_msg_publish plugin initialize found pre-existing error" << std::endl;
173 return 3;
174 }
175 // fixme: try connect to ldmsd; libldms_msg_publish_plugin::initialize
176 std::error_code ec;
177 // attach msg channel
178 mode = pi_pub_or_final;
179 return 0;
180 }
181
182 void finalize() {
183 if (mode == pi_pub_or_final) {
184 state = ok;
185 paused = false;
186 mode = pi_config;
187 } else {
188 std::cout << "libldms_msg_publish plugin finalize on non-running plugin" << std::endl;
189 }
190 }
191
192 void pause() {
193 paused = true;
194 }
195
196 void resume() {
197 paused = false;
198 }
199
201 return "libldms_msg_publish";
202 }
203
205 return vers;
206 }
207
209 std::cout << "Destructing libldms_msg_publish_plugin" << std::endl;
210 }
211};
212
213} // adc
libldms_msg_publish publisher_api implementation. This plugin calls the ldms libraries for sending me...
void pause()
Pause publishing until a call to resume. Duplicate calls are allowed.
int publish(std::shared_ptr< builder_api > b)
Publish the content of the builder.
static const char * adc_libldms_msg_publish_plugin_channel_default
name of the channel ADC messages go into LDMS aggregators must be subscribed to this name....
static const char * adc_libldms_msg_publish_plugin_port_default
port for channel connections; ldmsd listeners must match. Overridden with env("ADC_LIBLDMS_MSG_PUBLIS...
void resume()
Resume publishing Duplicate calls are allowed.
static const char * adc_libldms_msg_publish_plugin_auth_default
authentication method for channel connections; ldmsd listeners must match. Overridden with env("ADC_L...
const std::map< const std::string, const std::string > & get_option_defaults()
Look up the settable options and their defaults.
int config(const std::map< std::string, std::string > &m, const std::string &env_prefix)
void finalize()
Stop publishing and release any resources held for managing publication.
int config(const std::map< std::string, std::string > &m)
Configure the plugin with the options given.
int initialize()
Ready the plugin to publish following the configuration options set or defaulted.
static const char * adc_libldms_msg_publish_plugin_host_default
host for channel connections; ldmsd must be listening on the host. Overridden with env("ADC_LIBLDMS_M...
Publisher plugin interface.
Definition publisher.hpp:48
Definition adc.hpp:82
std::string_view string_view
Definition curl.ipp:18
std::string string
Definition curl.ipp:17