1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3pub use crate::config::Config;
4use crate::evaluator::{Evaluator, Primitive, Type, json2lisp, obj2str, lisp2json};
5use crate::extensions::crypto::{
6 primitive_s7_crypto_generate, primitive_s7_crypto_sign, primitive_s7_crypto_verify,
7};
8use crate::persistor::{MemoryPersistor, PERSISTOR, Persistor, PersistorAccessError};
9pub use crate::persistor::{SIZE, Word};
10use libc;
11use log::{debug, info};
12use once_cell::sync::Lazy;
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17
18use evaluator as s7;
19use sha2::{Digest, Sha256};
20use std::ffi::{CStr, CString};
21
22mod config;
23pub mod evaluator;
24mod persistor;
25mod extensions {
26 pub mod crypto;
27}
28
29pub static JOURNAL: Lazy<Journal> = Lazy::new(|| Journal::new());
30
31const SYNC_NODE_TAG: i64 = 0;
32
33const GENESIS_STR: &str = "(lambda (*sync-state* query) (cons (eval query) *sync-state*))";
34
35pub const NULL: Word = [
36 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
37];
38
39struct Session {
40 record: Word,
41 persistor: MemoryPersistor,
42 cache: Arc<Mutex<HashMap<(String, String, Vec<u8>), Vec<u8>>>>,
43}
44
45impl Session {
46 fn new(
47 record: Word,
48 persistor: MemoryPersistor,
49 cache: Arc<Mutex<HashMap<(String, String, Vec<u8>), Vec<u8>>>>,
50 ) -> Self {
51 Self {
52 record,
53 persistor,
54 cache,
55 }
56 }
57}
58
59static SESSIONS: Lazy<Mutex<HashMap<usize, Session>>> = Lazy::new(|| Mutex::new(HashMap::new()));
60
61struct CallOnDrop<F: FnMut()>(F);
62
63impl<F: FnMut()> Drop for CallOnDrop<F> {
64 fn drop(&mut self) {
65 (self.0)();
66 }
67}
68
69#[derive(Debug)]
70pub struct JournalAccessError(pub Word);
71
72static LOCK: Mutex<()> = Mutex::new(());
73static RUNS: usize = 3;
74
75pub struct Journal {
96 client: reqwest::Client,
97}
98
99impl Journal {
100 fn new() -> Self {
101 match PERSISTOR.root_new(
102 NULL,
103 PERSISTOR
104 .branch_set(
105 PERSISTOR
106 .leaf_set(GENESIS_STR.as_bytes().to_vec())
107 .expect("Failed to create genesis leaf"),
108 NULL,
109 NULL,
110 )
111 .expect("Failed to create genesis branch"),
112 ) {
113 Ok(_) => Self {
114 client: reqwest::Client::new(),
115 },
116 Err(_) => Self {
117 client: reqwest::Client::new(),
118 },
119 }
120 }
121
122 pub fn evaluate(&self, query: &str) -> String {
138 self.evaluate_record(NULL, query)
139 }
140
141 pub fn evaluate_json(&self, query: Value) -> Value {
142 match json2lisp(&query) {
143 Ok(scheme_query) => {
144 let result = self.evaluate_record(NULL, scheme_query.as_str());
145 match lisp2json(result.as_str()) {
146 Ok(json_result) => json_result,
147 Err(_) => {
148 log::warn!(
149 "Failed to parse Scheme to JSON. Result: {}",
150 result
151 );
152 lisp2json("(error 'parse-error \"Failed to parse Scheme to JSON\")")
153 }
154 .expect("Error parsing the JSON error message"),
155 }
156 }
157 Err(_) => {
158 let query_str = serde_json::to_string(&query)
159 .unwrap_or_else(|_| "<unprintable json>".to_string());
160 log::warn!(
161 "Failed to parse JSON to Scheme. Query: {}",
162 query_str
163 );
164 lisp2json("(error 'parse-error \"Failed to parse JSON to Scheme\")")
165 }
166 .expect("Error parsing the JSON error message"),
167 }
168 }
169
170 pub fn lisp_to_json(&self, query: &str) -> Value {
181 match lisp2json(query) {
182 Ok(json_result) => json_result,
183 Err(_) => {
184 log::warn!(
185 "Failed to parse Scheme to JSON. Query: {}",
186 query
187 );
188 lisp2json("(error 'parse-error \"Failed to parse Scheme to JSON\")")
189 }
190 .expect("Error parsing the JSON error message"),
191 }
192 }
193
194 pub fn json_to_lisp(&self, query: Value) -> String {
205 match json2lisp(&query) {
206 Ok(scheme_result) => scheme_result,
207 Err(_) => {
208 let query_str = serde_json::to_string(&query)
209 .unwrap_or_else(|_| "<unprintable json>".to_string());
210 log::warn!(
211 "Failed to parse JSON to Scheme. Query: {}",
212 query_str
213 );
214 "(error 'parse-error \"Failed to parse JSON to Scheme\")".to_string()
215 }
216 }
217 }
218
219 fn evaluate_record(&self, record: Word, query: &str) -> String {
220 let mut runs = 0;
221 let cache = Arc::new(Mutex::new(HashMap::new()));
222
223 let start = Instant::now();
224 debug!(
225 "Evaluating ({})",
226 query.chars().take(128).collect::<String>(),
227 );
228
229 loop {
230 let _lock1 = if runs >= RUNS {
231 Some(LOCK.lock().expect("Failed to acquire concurrency lock"))
232 } else {
233 None
234 };
235
236 let genesis_func = PERSISTOR
237 .leaf_get(
238 PERSISTOR
239 .branch_get(
240 PERSISTOR
241 .root_get(record)
242 .expect("Failed to get root record"),
243 )
244 .expect("Failed to get genesis branch")
245 .0,
246 )
247 .expect("Failed to get genesis function")
248 .to_vec();
249
250 let genesis_str = String::from_utf8_lossy(&genesis_func);
251
252 let state_old = PERSISTOR
253 .root_get(record)
254 .expect("Failed to get current state");
255
256 let record_temp = PERSISTOR
257 .root_temp(state_old)
258 .expect("Failed to create temporary record");
259
260 let _record_dropper = CallOnDrop(|| {
261 PERSISTOR
262 .root_delete(record_temp)
263 .expect("Failed to delete temporary record");
264 });
265
266 let state_str = format!(
267 "#u({})",
268 state_old
269 .iter()
270 .map(|&num| num.to_string())
271 .collect::<Vec<String>>()
272 .join(" "),
273 );
274
275 let evaluator = Evaluator::new(
276 vec![(SYNC_NODE_TAG, type_s7_sync_node())]
277 .into_iter()
278 .collect(),
279 vec![
280 primitive_s7_sync_hash(),
281 primitive_s7_sync_null(),
282 primitive_s7_sync_node(),
283 primitive_s7_sync_stub(),
284 primitive_s7_sync_is_node(),
285 primitive_s7_sync_is_pair(),
286 primitive_s7_sync_is_stub(),
287 primitive_s7_sync_is_null(),
288 primitive_s7_sync_digest(),
289 primitive_s7_sync_cons(),
290 primitive_s7_sync_car(),
291 primitive_s7_sync_cdr(),
292 primitive_s7_sync_cut(),
293 primitive_s7_sync_create(),
294 primitive_s7_sync_delete(),
295 primitive_s7_sync_all(),
296 primitive_s7_sync_call(),
297 primitive_s7_sync_remote(),
298 primitive_s7_sync_http(),
299 primitive_s7_crypto_generate(),
300 primitive_s7_crypto_sign(),
301 primitive_s7_crypto_verify(),
302 ],
303 );
304
305 SESSIONS
306 .lock()
307 .expect("Failed to acquire sessions lock")
308 .insert(
309 evaluator.sc as usize,
310 Session::new(record, MemoryPersistor::new(), cache.clone()),
311 );
312
313 let _session_dropper = CallOnDrop(|| {
314 let mut session = SESSIONS
315 .lock()
316 .expect("Failed to acquire sessions lock for cleanup");
317 session.remove(&(evaluator.sc as usize));
318 });
319
320 let expr = format!(
321 "((eval {}) (sync-node {}) (quote {}))",
322 genesis_str, state_str, query
323 );
324
325 let result = evaluator.evaluate(expr.as_str());
326 runs += 1;
327
328 let persistor = {
329 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
330 &session
331 .get(&(evaluator.sc as usize))
332 .expect("Session not found in SESSIONS map")
333 .persistor
334 .clone()
335 };
336
337 let (output, state_new) = match result.starts_with("(error '") {
338 true => (result, state_old),
339 false => match result.rfind('.') {
340 Some(index) => match *&result[(index + 16)..(result.len() - 3)]
341 .split(' ')
342 .collect::<Vec<&str>>()
343 .iter()
344 .map(|x| x.parse::<u8>().expect("Failed to parse state byte"))
345 .collect::<Vec<u8>>()
346 .try_into()
347 {
348 Ok(state_new) => (String::from(&result[1..(index - 1)]), state_new),
349 Err(_) => (
350 String::from("(error 'sync-format \"Invalid return format\")"),
351 state_old,
352 ),
353 },
354 None => (
355 String::from("(error 'sync-format \"Invalid return format\")"),
356 state_old,
357 ),
358 },
359 };
360
361 match state_old == state_new {
362 true => {
363 debug!(
364 "Completed ({:?}) {} -> {}",
365 start.elapsed(),
366 query.chars().take(128).collect::<String>(),
367 output,
368 );
369 return output;
370 }
371 false => match state_old
372 == PERSISTOR
373 .root_get(record)
374 .expect("Failed to get record state for comparison")
375 {
376 true => {
377 fn iterate(
378 source: &MemoryPersistor,
379 start_node: Word,
380 ) -> Result<(), PersistorAccessError> {
381 let mut stack = vec![start_node];
382
383 while let Some(node) = stack.pop() {
384 if node == NULL {
385 continue;
386 } else if let Ok(_) = PERSISTOR.leaf_get(node) {
387 continue;
388 } else if let Ok(_) = PERSISTOR.stump_get(node) {
389 continue;
390 } else if let Ok(_) = PERSISTOR.branch_get(node) {
391 continue;
392 } else if let Ok(content) = source.leaf_get(node) {
393 PERSISTOR
394 .leaf_set(content)
395 .expect("Failed to set leaf content");
396 } else if let Ok(digest) = source.stump_get(node) {
397 PERSISTOR.stump_set(digest).expect("Failed to set stump");
398 } else if let Ok((left, right, digest)) = source.branch_get(node) {
399 PERSISTOR
400 .branch_set(left, right, digest)
401 .expect("Failed to set branch");
402 stack.push(right);
403 stack.push(left);
404 } else {
405 return Err(PersistorAccessError(format!(
406 "Dangling branch {:?}",
407 node
408 )));
409 }
410 }
411
412 Ok(())
413 }
414
415 {
416 let _lock2 = match _lock1 {
417 Some(_) => None,
418 None => {
419 Some(LOCK.lock().expect("Failed to acquire secondary lock"))
420 }
421 };
422
423 match iterate(&persistor, state_new) {
424 Ok(_) => match PERSISTOR.root_set(record, state_old, state_new) {
425 Ok(_) => {
426 debug!(
427 "Completed ({:?}) {} -> {}",
428 start.elapsed(),
429 query.chars().take(128).collect::<String>(),
430 output,
431 );
432 return output;
433 }
434 Err(_) => {
435 info!(
436 "Rerunning (x{}) due to concurrency collision: {}",
437 runs, query
438 );
439 continue;
440 }
441 },
442 Err(err) => {
443 panic!("{:?}", err);
444 }
445 }
446 }
447 }
448 false => {
449 info!(
450 "Rerunning (x{}) due to concurrency collision: {}",
451 runs, query
452 );
453 continue;
454 }
455 },
456 }
457 }
458 }
459}
460
461unsafe fn sync_error(sc: *mut s7::s7_scheme, string: &str) -> s7::s7_pointer {
462 unsafe {
463 let c_string = CString::new(string).expect("Failed to create CString from string");
464
465 s7::s7_error(
466 sc,
467 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
468 s7::s7_list(sc, 1, s7::s7_make_string(sc, c_string.as_ptr())),
469 )
470 }
471}
472
473fn type_s7_sync_node() -> Type {
474 unsafe extern "C" fn free(_sc: *mut s7::s7_scheme, obj: s7::s7_pointer) -> s7::s7_pointer {
475 unsafe {
476 sync_heap_free(s7::s7_c_object_value(obj));
477 std::ptr::null_mut()
478 }
479 }
480
481 unsafe extern "C" fn mark(_sc: *mut s7::s7_scheme, _obj: s7::s7_pointer) -> s7::s7_pointer {
482 std::ptr::null_mut()
483 }
484
485 unsafe extern "C" fn is_equal(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
486 unsafe {
487 match sync_is_node(s7::s7_cadr(args)) {
488 true => {
489 let word1 = sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)));
490 let word2 = sync_heap_read(s7::s7_c_object_value(s7::s7_cadr(args)));
491 s7::s7_make_boolean(sc, word1 == word2)
492 }
493 false => s7::s7_wrong_type_arg_error(
494 sc,
495 c"equal?".as_ptr(),
496 2,
497 s7::s7_cadr(args),
498 c"a sync-node".as_ptr(),
499 ),
500 }
501 }
502 }
503
504 unsafe extern "C" fn to_string(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
505 unsafe {
506 string_to_s7(
507 sc,
508 format!(
509 "(sync-node #u({}))",
510 sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)))
511 .iter()
512 .map(|&byte| byte.to_string())
513 .collect::<Vec<String>>()
514 .join(" "),
515 )
516 .as_str(),
517 )
518 }
519 }
520
521 Type::new(c"sync-node", free, mark, is_equal, to_string)
522}
523
524fn primitive_s7_sync_stub() -> Primitive {
525 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
526 unsafe {
527 let bv = s7::s7_car(args);
528
529 if !s7::s7_is_byte_vector(bv) || s7::s7_vector_length(bv) as usize != SIZE {
530 return s7::s7_wrong_type_arg_error(
531 sc,
532 c"sync-cut".as_ptr(),
533 1,
534 s7::s7_car(args),
535 c"a hash-sized byte-vector".as_ptr(),
536 );
537 }
538
539 let mut digest = [0 as u8; SIZE];
540 for i in 0..SIZE {
541 digest[i] = s7::s7_byte_vector_ref(bv, i as i64);
542 }
543
544 let persistor = {
545 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
546 &session
547 .get(&(sc as usize))
548 .expect("Session not found for given context")
549 .persistor
550 .clone()
551 };
552
553 match persistor.stump_set(digest) {
554 Ok(stump) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(stump)),
555 Err(_) => sync_error(sc, "Journal is unable to create stub node (sync-stub)"),
556 }
557 }
558 }
559
560 Primitive::new(
561 code,
562 c"sync-stub",
563 c"(sync-stub digest) create a sync stub from the provided byte-vector",
564 1,
565 0,
566 false,
567 )
568}
569
570fn primitive_s7_sync_hash() -> Primitive {
571 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
572 unsafe {
573 let data_bv = s7::s7_car(args);
574
575 if !s7::s7_is_byte_vector(data_bv) {
577 return s7::s7_wrong_type_arg_error(
578 sc,
579 c"sync-hash".as_ptr(),
580 1,
581 data_bv,
582 c"a byte-vector".as_ptr(),
583 );
584 }
585
586 let mut data = vec![];
588 for i in 0..s7::s7_vector_length(data_bv) {
589 data.push(s7::s7_byte_vector_ref(data_bv, i as i64))
590 }
591
592 let digest = Sha256::digest(data).to_vec();
593 let digest_bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
594 for i in 0..SIZE {
595 s7::s7_byte_vector_set(digest_bv, i as i64, digest[i]);
596 }
597 digest_bv
598 }
599 }
600
601 Primitive::new(
602 code,
603 c"sync-hash",
604 c"(sync-hash bv) compute the SHA-256 digest of a byte vector",
605 1,
606 0,
607 false,
608 )
609}
610
611fn primitive_s7_sync_node() -> Primitive {
612 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
613 unsafe {
614 let digest = s7::s7_car(args);
615
616 if !s7::s7_is_byte_vector(digest) || s7::s7_vector_length(digest) as usize != SIZE {
617 return s7::s7_wrong_type_arg_error(
618 sc,
619 c"sync-node".as_ptr(),
620 1,
621 digest,
622 c"a hash-sized byte-vector".as_ptr(),
623 );
624 }
625
626 let mut word = [0 as u8; SIZE];
627 for i in 0..SIZE {
628 word[i] = s7::s7_byte_vector_ref(digest, i as i64);
629 }
630
631 let persistor = {
632 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
633 &session
634 .get(&(sc as usize))
635 .expect("Session not found for sync-node")
636 .persistor
637 .clone()
638 };
639
640 if word == NULL
641 || persistor.branch_get(word).is_ok()
642 || PERSISTOR.branch_get(word).is_ok()
643 || persistor.stump_get(word).is_ok()
644 || PERSISTOR.stump_get(word).is_ok()
645 {
646 s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(word))
647 } else {
648 sync_error(sc, "Node does not exist in this journal (sync-node)")
649 }
650 }
651 }
652
653 Primitive::new(
654 code,
655 c"sync-node",
656 c"(sync-node digest) returns the sync pair defined by the digest",
657 1,
658 0,
659 false,
660 )
661}
662
663fn primitive_s7_sync_is_node() -> Primitive {
664 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
665 unsafe { s7::s7_make_boolean(sc, sync_is_node(s7::s7_car(args))) }
666 }
667
668 Primitive::new(
669 code,
670 c"sync-node?",
671 c"(sync-node?) returns whether the object is a sync node",
672 1,
673 0,
674 false,
675 )
676}
677
678fn primitive_s7_sync_null() -> Primitive {
679 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, _args: s7::s7_pointer) -> s7::s7_pointer {
680 unsafe { s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(NULL)) }
681 }
682
683 Primitive::new(
684 code,
685 c"sync-null",
686 c"(sync-null) returns the null synchronic node",
687 0,
688 0,
689 false,
690 )
691}
692
693fn primitive_s7_sync_is_null() -> Primitive {
694 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
695 unsafe {
696 let arg = s7::s7_car(args);
697 match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
698 false => s7::s7_wrong_type_arg_error(
699 sc,
700 c"sync-null?".as_ptr(),
701 1,
702 arg,
703 c"a sync-node or byte-vector".as_ptr(),
704 ),
705 true => {
706 let word = sync_heap_read(s7::s7_c_object_value(arg));
707 for i in 0..SIZE {
708 if word[i] != 0 {
709 return s7::s7_make_boolean(sc, false);
710 }
711 }
712 s7::s7_make_boolean(sc, true)
713 }
714 }
715 }
716 }
717
718 Primitive::new(
719 code,
720 c"sync-null?",
721 c"(sync-null? sp) returns a boolean indicating whether sp is equal to sync-null",
722 1,
723 0,
724 false,
725 )
726}
727
728fn primitive_s7_sync_is_pair() -> Primitive {
729 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
730 unsafe {
731 let arg = s7::s7_car(args);
732 match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
733 false => s7::s7_wrong_type_arg_error(
734 sc,
735 c"sync-pair?".as_ptr(),
736 1,
737 arg,
738 c"a sync-node or byte-vector".as_ptr(),
739 ),
740 true => {
741 let word = sync_heap_read(s7::s7_c_object_value(arg));
742 let persistor = {
743 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
744 &session
745 .get(&(sc as usize))
746 .expect("Session not found for sync-pair?")
747 .persistor
748 .clone()
749 };
750 s7::s7_make_boolean(
751 sc,
752 persistor.branch_get(word).is_ok() || PERSISTOR.branch_get(word).is_ok(),
753 )
754 }
755 }
756 }
757 }
758
759 Primitive::new(
760 code,
761 c"sync-pair?",
762 c"(sync-pair? sp) returns a boolean indicating whether sp is a pair",
763 1,
764 0,
765 false,
766 )
767}
768
769fn primitive_s7_sync_is_stub() -> Primitive {
770 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
771 unsafe {
772 let arg = s7::s7_car(args);
773 match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
774 false => s7::s7_wrong_type_arg_error(
775 sc,
776 c"sync-stub?".as_ptr(),
777 1,
778 arg,
779 c"a sync-node or byte-vector".as_ptr(),
780 ),
781 true => {
782 let word = sync_heap_read(s7::s7_c_object_value(arg));
783 let persistor = {
784 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
785 &session
786 .get(&(sc as usize))
787 .expect("Session not found for sync-stub?")
788 .persistor
789 .clone()
790 };
791 s7::s7_make_boolean(
792 sc,
793 persistor.stump_get(word).is_ok() || PERSISTOR.stump_get(word).is_ok(),
794 )
795 }
796 }
797 }
798 }
799
800 Primitive::new(
801 code,
802 c"sync-stub?",
803 c"(sync-stub? sp) returns a boolean indicating whether sp is a stub",
804 1,
805 0,
806 false,
807 )
808}
809
810fn primitive_s7_sync_digest() -> Primitive {
811 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
812 unsafe {
813 match sync_is_node(s7::s7_car(args)) {
814 false => s7::s7_wrong_type_arg_error(
815 sc,
816 c"sync-digest".as_ptr(),
817 1,
818 s7::s7_car(args),
819 c"a sync-node".as_ptr(),
820 ),
821 true => {
822 let word = sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)));
823 let digest = sync_digest(sc, word).expect("Failed to obtain digest");
824 let bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
825 for i in 0..SIZE {
826 s7::s7_byte_vector_set(bv, i as i64, digest[i]);
827 }
828 bv
829 }
830 }
831 }
832 }
833
834 Primitive::new(
835 code,
836 c"sync-digest",
837 c"(sync-digest node) returns the the digest of a sync-node as a byte-vector",
838 1,
839 0,
840 false,
841 )
842}
843
844fn primitive_s7_sync_cons() -> Primitive {
845 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
846 unsafe {
847 let persistor = {
848 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
849 &session
850 .get(&(sc as usize))
851 .expect("Session not found for sync-cons")
852 .persistor
853 .clone()
854 };
855
856 let handle_arg = |obj, number| {
857 if sync_is_node(obj) {
858 Ok(sync_heap_read(s7::s7_c_object_value(obj)))
859 } else if s7::s7_is_byte_vector(obj) {
860 let mut content = vec![];
861 for i in 0..s7::s7_vector_length(obj) {
862 content.push(s7::s7_byte_vector_ref(obj, i as i64))
863 }
864 match persistor.leaf_set(content) {
865 Ok(atom) => Ok(atom),
866 Err(_) => Err(sync_error(
867 sc,
868 "Journal is unable to add leaf node (sync-cons)",
869 )),
870 }
871 } else {
872 Err(s7::s7_wrong_type_arg_error(
873 sc,
874 c"sync-cons".as_ptr(),
875 number,
876 obj,
877 c"a byte vector or a sync node".as_ptr(),
878 ))
879 }
880 };
881
882 match (
883 handle_arg(s7::s7_car(args), 1),
884 handle_arg(s7::s7_cadr(args), 2),
885 ) {
886 (Ok(left), Ok(right)) => match (sync_digest(sc, left), sync_digest(sc, right)) {
887 (Ok(digest_left), Ok(digest_right)) => {
888 let mut joined = [0 as u8; SIZE * 2];
889 joined[..SIZE].copy_from_slice(&digest_left);
890 joined[SIZE..].copy_from_slice(&digest_right);
891 let digest = Word::from(Sha256::digest(joined));
892
893 match persistor.branch_set(left, right, digest) {
894 Ok(pair) => {
895 s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(pair))
896 }
897 Err(_) => {
898 sync_error(sc, "Journal is unable to add pair node (sync-cons)")
899 }
900 }
901 }
902 _ => sync_error(sc, "Journal is unable to obtain node digests (sync-cons)"),
903 },
904 (Err(left), _) => left,
905 (_, Err(right)) => right,
906 }
907 }
908 }
909
910 Primitive::new(
911 code,
912 c"sync-cons",
913 c"(sync-cons first rest) construct a new sync pair node",
914 2,
915 0,
916 false,
917 )
918}
919
920fn primitive_s7_sync_car() -> Primitive {
921 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
922 unsafe {
923 if !sync_is_node(s7::s7_car(args)) {
924 return s7::s7_wrong_type_arg_error(
925 sc,
926 c"sync-car".as_ptr(),
927 1,
928 s7::s7_car(args),
929 c"a sync-pair".as_ptr(),
930 );
931 }
932 sync_cxr(sc, args, c"sync-car", |children| children.0)
933 }
934 }
935
936 Primitive::new(
937 code,
938 c"sync-car",
939 c"(sync-car pair) retrieve the first element of a sync pair",
940 1,
941 0,
942 false,
943 )
944}
945
946fn primitive_s7_sync_cdr() -> Primitive {
947 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
948 unsafe {
949 if !sync_is_node(s7::s7_car(args)) {
950 return s7::s7_wrong_type_arg_error(
951 sc,
952 c"sync-cdr".as_ptr(),
953 1,
954 s7::s7_car(args),
955 c"a sync-pair".as_ptr(),
956 );
957 }
958 sync_cxr(sc, args, c"sync-cdr", |children| children.1)
959 }
960 }
961
962 Primitive::new(
963 code,
964 c"sync-cdr",
965 c"(sync-cdr pair) retrieve the second element of a sync pair",
966 1,
967 0,
968 false,
969 )
970}
971
972fn primitive_s7_sync_cut() -> Primitive {
973 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
974 unsafe {
975 let arg = s7::s7_car(args);
976
977 let handle_digest = |digest| {
978 let persistor = {
979 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
980 &session
981 .get(&(sc as usize))
982 .expect("Session not found for given context")
983 .persistor
984 .clone()
985 };
986 match persistor.stump_set(digest) {
987 Ok(stump) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(stump)),
988 Err(_) => sync_error(sc, "Journal is unable to add stub node (sync-cut)"),
989 }
990 };
991
992 if s7::s7_is_byte_vector(arg) {
993 let mut content = vec![];
994 for i in 0..s7::s7_vector_length(arg) {
995 content.push(s7::s7_byte_vector_ref(arg, i as i64))
996 }
997 handle_digest(Word::from(Sha256::digest(Sha256::digest(&content))))
998 } else if sync_is_node(arg) {
999 match sync_digest(sc, sync_heap_read(s7::s7_c_object_value(arg))) {
1000 Ok(digest) => handle_digest(digest),
1001 Err(_) => sync_error(sc, "Journal does not recognize input node (sync-cut)"),
1002 }
1003 } else {
1004 s7::s7_wrong_type_arg_error(
1005 sc,
1006 c"sync-cut".as_ptr(),
1007 1,
1008 s7::s7_car(args),
1009 c"a sync-node or byte-vector".as_ptr(),
1010 )
1011 }
1012 }
1013 }
1014
1015 Primitive::new(
1016 code,
1017 c"sync-cut",
1018 c"(sync-cut value) obtain the stub of a sync pair or byte vector",
1019 1,
1020 0,
1021 false,
1022 )
1023}
1024
1025fn primitive_s7_sync_create() -> Primitive {
1026 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1027 unsafe {
1028 let id = s7::s7_car(args);
1029
1030 if !s7::s7_is_byte_vector(id) || s7::s7_vector_length(id) as usize != SIZE {
1031 return s7::s7_wrong_type_arg_error(
1032 sc,
1033 c"sync-create".as_ptr(),
1034 1,
1035 id,
1036 c"a hash-sized byte-vector".as_ptr(),
1037 );
1038 }
1039
1040 let mut record: Word = [0 as u8; SIZE];
1041
1042 for i in 0..SIZE {
1043 record[i as usize] = s7::s7_byte_vector_ref(id, i as i64)
1044 }
1045
1046 debug!("Adding record: {}", hex::encode(record));
1047
1048 match PERSISTOR.root_new(
1049 record,
1050 PERSISTOR
1051 .branch_set(
1052 PERSISTOR
1053 .leaf_set(GENESIS_STR.as_bytes().to_vec())
1054 .expect("Failed to create genesis leaf for new record"),
1055 NULL,
1056 NULL,
1057 )
1058 .expect("Failed to create genesis branch for new record"),
1059 ) {
1060 Ok(_) => s7::s7_make_boolean(sc, true),
1061 Err(_) => s7::s7_error(
1062 sc,
1063 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1064 s7::s7_list(
1065 sc,
1066 1,
1067 s7::s7_make_string(sc, c"record ID is already in use".as_ptr()),
1068 ),
1069 ),
1070 }
1071 }
1072 }
1073
1074 Primitive::new(
1075 code,
1076 c"sync-create",
1077 c"(sync-create id) create a new synchronic record with the given 32-byte ID",
1078 1,
1079 0,
1080 false,
1081 )
1082}
1083
1084fn primitive_s7_sync_delete() -> Primitive {
1085 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1086 unsafe {
1087 let id = s7::s7_car(args);
1088
1089 if !s7::s7_is_byte_vector(id) || s7::s7_vector_length(id) as usize != SIZE {
1090 return s7::s7_wrong_type_arg_error(
1091 sc,
1092 c"sync-delete".as_ptr(),
1093 1,
1094 id,
1095 c"a hash-sized byte-vector".as_ptr(),
1096 );
1097 }
1098
1099 let mut record: Word = [0 as u8; SIZE];
1100
1101 for i in 0..s7::s7_vector_length(id) {
1102 record[i as usize] = s7::s7_byte_vector_ref(id, i as i64)
1103 }
1104
1105 if record == NULL {
1106 return s7::s7_error(
1107 sc,
1108 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1109 s7::s7_list(
1110 sc,
1111 1,
1112 s7::s7_make_string(sc, c"cannot delete the root record".as_ptr()),
1113 ),
1114 );
1115 }
1116
1117 debug!("Deleting record: {}", hex::encode(record));
1118
1119 match PERSISTOR.root_delete(record) {
1120 Ok(_) => s7::s7_make_boolean(sc, true),
1121 Err(_) => s7::s7_error(
1122 sc,
1123 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1124 s7::s7_list(
1125 sc,
1126 1,
1127 s7::s7_make_string(sc, c"record ID does not exist".as_ptr()),
1128 ),
1129 ),
1130 }
1131 }
1132 }
1133
1134 Primitive::new(
1135 code,
1136 c"sync-delete",
1137 c"(sync-delete id) delete the synchronic record with the given 32-byte ID",
1138 1,
1139 0,
1140 false,
1141 )
1142}
1143
1144fn primitive_s7_sync_all() -> Primitive {
1145 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, _args: s7::s7_pointer) -> s7::s7_pointer {
1146 unsafe {
1147 let mut list = s7::s7_list(sc, 0);
1148
1149 for record in PERSISTOR.root_list().into_iter().rev() {
1150 let bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
1151 for i in 0..SIZE {
1152 s7::s7_byte_vector_set(bv, i as i64, record[i]);
1153 }
1154
1155 list = s7::s7_cons(sc, bv, list)
1156 }
1157
1158 list
1159 }
1160 }
1161
1162 Primitive::new(
1163 code,
1164 c"sync-all",
1165 c"(sync-all) list all synchronic record IDs in ascending order",
1166 0,
1167 0,
1168 false,
1169 )
1170}
1171
1172fn primitive_s7_sync_call() -> Primitive {
1173 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1174 unsafe {
1175 let message_expr = s7::s7_car(args);
1176 let blocking = s7::s7_cadr(args);
1177
1178 if !s7::s7_is_boolean(blocking) {
1179 return s7::s7_wrong_type_arg_error(
1180 sc,
1181 c"sync-call".as_ptr(),
1182 2,
1183 blocking,
1184 c"a boolean".as_ptr(),
1185 );
1186 }
1187
1188 let record = match s7::s7_is_null(sc, s7::s7_cddr(args)) {
1189 true => {
1190 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
1191 session
1192 .get(&(sc as usize))
1193 .expect("Session number not found in sessions map")
1194 .record
1195 }
1196 false => {
1197 let bv = s7::s7_caddr(args);
1198 if !s7::s7_is_byte_vector(bv) || s7::s7_vector_length(bv) as usize != SIZE {
1200 return s7::s7_wrong_type_arg_error(
1201 sc,
1202 c"sync-call".as_ptr(),
1203 3,
1204 bv,
1205 c"a hash-sized byte-vector".as_ptr(),
1206 );
1207 }
1208
1209 let mut record = [0 as u8; SIZE];
1210 for i in 0..SIZE {
1211 record[i] = s7::s7_byte_vector_ref(bv, i as i64);
1212 }
1213 record
1214 }
1215 };
1216
1217 match PERSISTOR.root_get(record) {
1218 Ok(_) => {
1219 let message = obj2str(sc, message_expr);
1220 if s7::s7_boolean(sc, blocking) {
1221 let result = JOURNAL.evaluate_record(record, message.as_str());
1222 let c_result = CString::new(format!("(quote {})", result))
1223 .expect("Failed to create C string from journal evaluation result");
1224 s7::s7_eval_c_string(sc, c_result.as_ptr())
1225 } else {
1226 tokio::spawn(async move {
1227 JOURNAL.evaluate_record(record, message.as_str());
1228 });
1229 s7::s7_make_boolean(sc, true)
1230 }
1231 }
1232 Err(_) => s7::s7_error(
1233 sc,
1234 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1235 s7::s7_list(
1236 sc,
1237 1,
1238 s7::s7_make_string(sc, c"record ID does not exist".as_ptr()),
1239 ),
1240 ),
1241 }
1242 }
1243 }
1244
1245 Primitive::new(
1246 code,
1247 c"sync-call",
1248 c"(sync-call query blocking? id) query the provided record ID or self if ID not provided",
1249 2,
1250 1,
1251 false,
1252 )
1253}
1254
1255fn primitive_s7_sync_http() -> Primitive {
1256 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1257 unsafe {
1258 let vec2s7 = |vector: Vec<u8>| {
1259 let bv = s7::s7_make_byte_vector(sc, vector.len() as i64, 1, std::ptr::null_mut());
1260 for i in 0..vector.len() {
1261 s7::s7_byte_vector_set(bv, i as i64, vector[i]);
1262 }
1263 bv
1264 };
1265
1266 let method = obj2str(sc, s7::s7_car(args));
1267 let url = obj2str(sc, s7::s7_cadr(args));
1268
1269 let body = if s7::s7_list_length(sc, args) >= 3 {
1270 obj2str(sc, s7::s7_caddr(args))
1271 } else {
1272 String::from("")
1273 };
1274
1275 let cache_mutex = {
1276 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
1277 &session
1278 .get(&(sc as usize))
1279 .expect("Session ID not found in active sessions")
1280 .cache
1281 .clone()
1282 };
1283
1284 let mut cache = cache_mutex
1285 .lock()
1286 .expect("Failed to acquire cache mutex lock");
1287
1288 let key = (method.clone(), url.clone(), body.as_bytes().to_vec());
1289
1290 match cache.get(&key) {
1291 Some(bytes) => {
1292 debug!("Cache hit on key {:?}", key);
1293 vec2s7(bytes.to_vec())
1294 }
1295 None => {
1296 let result = tokio::task::block_in_place(move || {
1297 tokio::runtime::Handle::current().block_on(async move {
1298 match method.to_lowercase() {
1299 method if method == "get" => {
1300 JOURNAL
1301 .client
1302 .get(&url[1..url.len() - 1])
1303 .send()
1304 .await?
1305 .bytes()
1306 .await
1307 }
1308 method if method == "post" => {
1309 JOURNAL
1310 .client
1311 .post(&url[1..url.len() - 1])
1312 .body(String::from(&body[1..body.len() - 1]))
1313 .send()
1314 .await?
1315 .bytes()
1316 .await
1317 }
1318 _ => {
1319 panic!("Unsupported HTTP method")
1320 }
1321 }
1322 })
1323 });
1324
1325 match result {
1326 Ok(vector) => {
1327 cache.insert(key, vector.to_vec());
1328 vec2s7(vector.to_vec())
1329 }
1330 Err(_) => {
1331 sync_error(sc, "Journal is unable to fulfill HTTP request (sync-http)")
1332 }
1333 }
1334 }
1335 }
1336 }
1337 }
1338
1339 Primitive::new(
1340 code,
1341 c"sync-http",
1342 c"(sync-http method url . data) make an http request where method is 'get or 'post",
1343 2,
1344 2,
1345 false,
1346 )
1347}
1348
1349fn primitive_s7_sync_remote() -> Primitive {
1350 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1351 unsafe {
1352 let vec2s7 = |mut vector: Vec<u8>| {
1353 vector.insert(0, 39); vector.push(0);
1355 let c_string = CString::from_vec_with_nul(vector)
1356 .expect("Failed to create C string from vector");
1357 s7::s7_eval_c_string(sc, c_string.as_ptr())
1358 };
1359
1360 let url = obj2str(sc, s7::s7_car(args));
1361
1362 let body = obj2str(sc, s7::s7_cadr(args));
1363
1364 let cache_mutex = {
1365 let session = SESSIONS.lock().expect("Failed to acquire session lock");
1366 &session
1367 .get(&(sc as usize))
1368 .expect("Failed to get session from map")
1369 .cache
1370 .clone()
1371 };
1372
1373 let mut cache = cache_mutex.lock().expect("Failed to acquire cache lock");
1374
1375 let key = (String::from("post"), url.clone(), body.as_bytes().to_vec());
1376
1377 match cache.get(&key) {
1378 Some(bytes) => {
1379 debug!("Cache hit on key {:?}", key);
1380 vec2s7(bytes.to_vec())
1381 }
1382 None => {
1383 let result = tokio::task::block_in_place(move || {
1384 tokio::runtime::Handle::current().block_on(async move {
1385 JOURNAL
1386 .client
1387 .post(&url[1..url.len() - 1])
1388 .body(body)
1389 .send()
1390 .await?
1391 .bytes()
1392 .await
1393 })
1394 });
1395
1396 match result {
1397 Ok(bytes) => {
1398 cache.insert(key, bytes.to_vec());
1399 vec2s7(bytes.to_vec())
1400 }
1401 Err(_) => {
1402 sync_error(sc, "Journal is unable to query remote peer (sync-remote)")
1403 }
1404 }
1405 }
1406 }
1407 }
1408 }
1409
1410 Primitive::new(
1411 code,
1412 c"sync-remote",
1413 c"(sync-remote url data) make a post http request with the data payload)",
1414 2,
1415 0,
1416 false,
1417 )
1418}
1419unsafe fn string_to_s7(sc: *mut s7::s7_scheme, string: &str) -> s7::s7_pointer {
1420 unsafe {
1421 let c_string = CString::new(string).expect("Failed to create CString from string");
1422 let s7_string = s7::s7_make_string(sc, c_string.as_ptr());
1423 s7::s7_object_to_string(sc, s7_string, false)
1424 }
1425}
1426
1427unsafe fn sync_heap_make(word: Word) -> *mut libc::c_void {
1428 unsafe {
1429 let ptr = libc::malloc(SIZE);
1430 let array: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, SIZE);
1431 for i in 0..SIZE {
1432 array[i] = word[i] as u8;
1433 }
1434 ptr
1435 }
1436}
1437
1438unsafe fn sync_heap_read(ptr: *mut libc::c_void) -> Word {
1439 unsafe {
1440 std::slice::from_raw_parts_mut(ptr as *mut u8, SIZE)
1441 .try_into()
1442 .expect("Failed to convert slice to Word array")
1443 }
1444}
1445
1446unsafe fn sync_heap_free(ptr: *mut libc::c_void) {
1447 unsafe {
1448 libc::free(ptr);
1449 }
1450}
1451
1452unsafe fn sync_is_node(obj: s7::s7_pointer) -> bool {
1453 unsafe { s7::s7_is_c_object(obj) && s7::s7_c_object_type(obj) == SYNC_NODE_TAG }
1454}
1455
1456unsafe fn sync_cxr(
1457 sc: *mut s7::s7_scheme,
1458 args: s7::s7_pointer,
1459 name: &CStr,
1460 selector: fn((Word, Word)) -> Word,
1461) -> s7::s7_pointer {
1462 unsafe {
1463 let node = s7::s7_car(args);
1464 let word = sync_heap_read(s7::s7_c_object_value(node));
1465
1466 let persistor = {
1467 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
1468 &session
1469 .get(&(sc as usize))
1470 .expect("Session not found for given context")
1471 .persistor
1472 .clone()
1473 };
1474
1475 let child_return = |word| {
1476 let node_return = |word| s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(word));
1477
1478 let vector_return = |vector: Vec<u8>| {
1479 let bv = s7::s7_make_byte_vector(sc, vector.len() as i64, 1, std::ptr::null_mut());
1480 for i in 0..vector.len() {
1481 s7::s7_byte_vector_set(bv, i as i64, vector[i]);
1482 }
1483 bv
1484 };
1485
1486 if word == NULL {
1487 node_return(word)
1488 } else if let Ok(_) = persistor.branch_get(word) {
1489 node_return(word)
1490 } else if let Ok(_) = PERSISTOR.branch_get(word) {
1491 node_return(word)
1492 } else if let Ok(content) = persistor.leaf_get(word) {
1493 vector_return(content)
1494 } else if let Ok(content) = PERSISTOR.leaf_get(word) {
1495 vector_return(content)
1496 } else if let Ok(_) = persistor.stump_get(word) {
1497 node_return(word)
1498 } else if let Ok(_) = PERSISTOR.stump_get(word) {
1499 node_return(word)
1500 } else {
1501 sync_error(
1502 sc,
1503 format!(
1504 "Cannot retrieve items for node that is not a sync-pair ({})",
1505 name.to_string_lossy()
1506 )
1507 .as_str(),
1508 )
1509 }
1510 };
1511
1512 match sync_is_node(node) {
1513 true => match persistor.branch_get(word) {
1514 Ok((left, right, _)) => child_return(selector((left, right))),
1515 Err(_) => match PERSISTOR.branch_get(word) {
1516 Ok((left, right, _)) => child_return(selector((left, right))),
1517 Err(_) => sync_error(
1518 sc,
1519 format!(
1520 "Journal cannot retrieve leaf byte-vector ({})",
1521 name.to_string_lossy()
1522 )
1523 .as_str(),
1524 ),
1525 },
1526 },
1527 false => {
1528 s7::s7_wrong_type_arg_error(sc, name.as_ptr(), 1, node, c"a sync-node".as_ptr())
1529 }
1530 }
1531 }
1532}
1533
1534unsafe fn sync_digest(sc: *mut s7::s7_scheme, word: Word) -> Result<Word, String> {
1535 let persistor = {
1536 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
1537 &session
1538 .get(&(sc as usize))
1539 .expect("Session not found for given context")
1540 .persistor
1541 .clone()
1542 };
1543
1544 if word == NULL {
1545 Ok(NULL)
1546 } else if let Ok((_, _, digest)) = persistor.branch_get(word) {
1547 Ok(digest)
1548 } else if let Ok((_, _, digest)) = PERSISTOR.branch_get(word) {
1549 Ok(digest)
1550 } else if let Ok(_) = persistor.leaf_get(word) {
1551 Ok(word)
1552 } else if let Ok(_) = PERSISTOR.leaf_get(word) {
1553 Ok(word)
1554 } else if let Ok(digest) = persistor.stump_get(word) {
1555 Ok(digest)
1556 } else if let Ok(digest) = PERSISTOR.stump_get(word) {
1557 Ok(digest)
1558 } else {
1559 Err("Digest not found in persistor".to_string())
1560 }
1561}