libadc-cxx 1.0.0
Structured logging for scientific computing
Loading...
Searching...
No Matches
curl.ipp
Go to the documentation of this file.
3#include <cstdlib>
4#include <iostream>
5#include <fstream>
6#include <filesystem>
7#include <unistd.h>
8
9namespace adc {
10
11using std::cout;
12
13typedef std::string string;
14typedef std::string_view string_view;
15
16/*! \brief Curl utility publisher_api implementation.
17 This plugin generates a scratch file (in-memory) and asynchronously
18 sends it to the configured web service by invoking the 'curl' utility.
19 Multiple independent instances of this plugin may be used simultaneously.
20 */
21class curl_plugin : public publisher_api {
22 enum state {
23 ok,
24 err
25 };
26 enum mode {
27 /* the next mode needed for correct operation */
28 pi_config,
29 pi_init,
30 pi_pub_or_final
31 };
32
33public:
34 /// \brief default scratch directory for curl utility messages.
35 /// This directory must be globally writable and should be fast.
36 /// Overridden with env("ADC_CURL_PLUGIN_DIRECTORY").
37 inline static const char *adc_curl_plugin_directory_default = "/dev/shm/adc";
38
39 /// \brief default full path to curl utility
40 /// This program should be world accessible.
41 /// Overridden with env("ADC_CURL_PLUGIN_PROG").
42 inline static const char *adc_curl_plugin_prog_default = "/usr/bin/curl";
43
44 /// \brief default ADC server https port.
45 /// The server should be highly available.
46 /// Overridden with env("ADC_CURL_PLUGIN_PORT").
47 inline static const char *adc_curl_plugin_port_default = "443";
48
49 /// \brief ADC data ingest server host.
50 /// The server should be highly available; the
51 /// default value of https://localhost must be overridden by an
52 /// environment variable unless the test environment includes a test server.
53 /// Overridden with env("ADC_CURL_PLUGIN_URL").
54 inline static const char *adc_curl_plugin_url_default = "https://localhost";
55
56 /// \brief ADC curl plugin enable debug messages; (default "0": none)
57 /// "2" provides message send debugging
58 /// "1" provides lighter debugging
59 /// Overridden with env("ADC_CURL_PLUGIN_DEBUG").
60 inline static const char *adc_curl_plugin_debug_default = "0";
61
62private:
63 inline static const char *plugin_prefix = "ADC_CURL_PLUGIN_";
64 inline static const std::map< const string, const string > plugin_config_defaults =
65 { {"DIRECTORY", adc_curl_plugin_directory_default},
70 };
71
72 const string vers;
73 const std::vector<string> tags;
74 string fdir;
75 string prog;
76 string port;
77 string url;
78 int debug;
79 enum state state;
80 bool paused;
81 enum mode mode;
82
83 int config(const string& dir, string_view surl, string_view sport, string_view sprog, const string& sdebug) {
84 if (mode != pi_config)
85 return 2;
86
87 uid_t ui = geteuid();
88 fdir = dir + string("/") + std::to_string(ui);
89 port = sport;
90 url = surl;
91 prog = sprog;
92 mode = pi_init;
93 std::stringstream ss(sdebug);
94 ss >> debug;
95 if (debug < 0) {
96 debug = 0;
97 }
98 if (debug > 0) {
99 std::cout<< "curl plugin configured" <<std::endl;
100 }
101 return 0;
102 }
103
104 // find field in m, then with prefix in env, then the default.
105 const string get(const std::map< string, string >& m,
106 string field, string_view env_prefix) {
107 // fields not defined in config_defaults raise an exception.
108 auto it = m.find(field);
109 if (it != m.end()) {
110 return it->second;
111 }
112 string en = string(env_prefix) += field;
113 char *ec = getenv(en.c_str());
114 if (!ec) {
115 return plugin_config_defaults.at(field);
116 } else {
117 return string(ec);
118 }
119 }
120
121 string get_temp_file_name() {
122 string tplt = fdir + "/json-curl-msg-XXXXXX";
123 char * cstr = new char [tplt.length()+1];
124 std::strcpy (cstr, tplt.c_str());
125 int fd = mkstemp(cstr);
126 if (fd >= 0) {
127 string f(cstr);
128 delete[] cstr;
129 close(fd);
130 // current user now owns tempfile.
131 return f;
132 }
133 std::cout<< "mkstemp failed" <<std::endl;
134 delete[] cstr;
135 return "";
136 }
137
138 // invoke curl with json in f, then delete f.
139 // wrap this in a thread to make it non-blocking eventually.
140 int curl_send(string& f)
141 {
142 if (debug == 2) {
143 // do not suppress stderr/stdout from system call
144 string cmd = "(" + prog +
145 " -X POST -s -w \"\n%{http_code}\n\" -H \"Content-Type: application/json\" -d @"
146 + f + " " + url + " || /bin/rm -f " + f + ") &";
147 int err1 = std::system(cmd.c_str());
148 std::cout << "cmd:" << cmd << std::endl;
149 std::cout << err1 << std::endl;
150 std::cout << f << std::endl;
151 } else {
152 // suppress stderr/stdout from system call
153 string qcmd = "(" + prog +
154 " -X POST -s -w \"\n%{http_code}\n\" -H \"Content-Type: application/json\" -d @"
155 + f + " " + url +
156 " > /dev/null 2>&1 ; /bin/rm -f " + f + ") &";
157 int err2 = std::system(qcmd.c_str());
158 if (debug) {
159 std::cout << err2 << std::endl;
160 }
161 }
162 return 0;
163 }
164
165public:
166 curl_plugin() : vers("1.0.0") , state(ok), paused(false), mode(pi_config) { }
167
168 /*!
169 */
170 int publish(std::shared_ptr<builder_api> b) {
171 if (paused)
172 return 0;
173 if (state != ok)
174 return 1;
175 if (mode != pi_pub_or_final)
176 return 2;
177 // write to tmpfile, curl, and then delete
178 // this could be made fancy with a work queue in a later reimplementation.
179 // try for /dev/shm/adc for performance and fall back to tmpdir
180 string fname = get_temp_file_name();
181 if (debug) {
182 std::cout << "name: " << fname <<std::endl;
183 }
184 // open dump file
185 std::ofstream out(fname);
186 if (out.good()) {
187 out << b->serialize() << std::endl;
188 if (out.good()) {
189 if (debug) {
190 std::cout << "'curl' wrote" << std::endl;
191 }
192 out.close();
193 curl_send(fname); // send also removes file
194 return 0;
195 } else {
196 std::cout << "failed write" << std::endl;
197 return 1;
198 }
199 }
200 std::filesystem::remove(fname);
201 std::cout << "failed open " << fname << std::endl;
202 return 1;
203 }
204
205 /*!
206 */
207 int config(const std::map< std::string, std::string >& m) {
208 return config(m, plugin_prefix);
209 }
210
211 /*!
212 */
213 int config(const std::map< std::string, std::string >& m, string_view env_prefix) {
214 string d = get(m, "DIRECTORY", env_prefix);
215 string url = get(m, "URL", env_prefix);
216 string port = get(m, "PORT", env_prefix);
217 string prog = get(m, "PROG", env_prefix);
218 string sdebug = get(m, "DEBUG", env_prefix);
219 return config(d, url, port, prog, sdebug);
220 }
221
222 const std::map< const std::string, const std::string> & get_option_defaults() {
223 return plugin_config_defaults;
224 }
225
227 std::map <string, string >m;
228 // config if never config'd
229 if (!fdir.size())
230 config(m);
231 if (mode != pi_init) {
232 return 2;
233 }
234 if ( state == err ) {
235 std::cout << "curl plugin initialize found pre-existing error" << std::endl;
236 return 3;
237 }
238 std::error_code ec;
239 std::filesystem::create_directories(fdir, ec);
240 if (ec.value() != 0 && ec.value() != EEXIST ) {
241 state = err;
242 std::cout << "unable to create scratch directory for plugin 'curl'; "
243 << fdir << " : " << ec.message() << std::endl;
244 return ec.value();
245 } else {
246 if (debug) {
247 std::cout << "created " << fdir <<std::endl;
248 }
249 mode = pi_pub_or_final;
250 }
251 return 0;
252 }
253
254 void finalize() {
255 if (mode == pi_pub_or_final) {
256 state = ok;
257 paused = false;
258 mode = pi_config;
259 } else {
260 if (debug) {
261 std::cout << "curl plugin finalize on non-running plugin" << std::endl;
262 }
263 }
264 }
265
266 void pause() {
267 paused = true;
268 }
269
270 void resume() {
271 paused = false;
272 }
273
275 return "curl";
276 }
277
279 return vers;
280 }
281
283 if (debug) {
284 std::cout << "Destructing curl_plugin" << std::endl;
285 }
286 }
287};
288
289} // adc
Curl utility publisher_api implementation. This plugin generates a scratch file (in-memory) and async...
Definition curl.ipp:21
int config(const std::map< std::string, std::string > &m, string_view env_prefix)
Configure the plugin with the options given and the corresponding environment variables.
Definition curl.ipp:213
void pause()
Pause publishing until a call to resume. Duplicate calls are allowed.
Definition curl.ipp:266
static const char * adc_curl_plugin_prog_default
default full path to curl utility This program should be world accessible. Overridden with env("ADC_C...
Definition curl.ipp:42
int initialize()
Ready the plugin to publish following the configuration options set or defaulted.
Definition curl.ipp:226
string_view name() const
Definition curl.ipp:274
string_view version() const
Definition curl.ipp:278
int config(const std::map< std::string, std::string > &m)
Configure the plugin with the options given.
Definition curl.ipp:207
const std::map< const std::string, const std::string > & get_option_defaults()
Look up the settable options and their defaults.
Definition curl.ipp:222
static const char * adc_curl_plugin_debug_default
ADC curl plugin enable debug messages; (default "0": none) "2" provides message send debugging "1" pr...
Definition curl.ipp:60
void resume()
Resume publishing Duplicate calls are allowed.
Definition curl.ipp:270
void finalize()
Stop publishing and release any resources held for managing publication.
Definition curl.ipp:254
int publish(std::shared_ptr< builder_api > b)
Publish the content of the builder.
Definition curl.ipp:170
static const char * adc_curl_plugin_directory_default
default scratch directory for curl utility messages. This directory must be globally writable and sho...
Definition curl.ipp:37
static const char * adc_curl_plugin_port_default
default ADC server https port. The server should be highly available. Overridden with env("ADC_CURL_P...
Definition curl.ipp:47
static const char * adc_curl_plugin_url_default
ADC data ingest server host. The server should be highly available; the default value of https://loca...
Definition curl.ipp:54
Publisher plugin interface.
Definition publisher.hpp:44
Definition adc.hpp:75
std::string_view string_view
Definition curl.ipp:14
std::string string
Definition curl.ipp:13