1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3pub use crate::config::Config;
4use crate::evaluator::{obj2str, Evaluator, Primitive, Type};
5use crate::extensions::crypto::{
6 primitive_s7_crypto_generate, primitive_s7_crypto_sign, primitive_s7_crypto_verify,
7};
8use crate::persistor::{MemoryPersistor, Persistor, PersistorAccessError, PERSISTOR};
9pub use crate::persistor::{Word, SIZE};
10use libc;
11use log::{debug, info};
12use once_cell::sync::Lazy;
13use std::collections::HashMap;
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16
17use evaluator as s7;
18use sha2::{Digest, Sha256};
19use std::ffi::{CStr, CString};
20
21mod config;
22mod evaluator;
23mod persistor;
24mod extensions {
25 pub mod crypto;
26}
27
28pub static JOURNAL: Lazy<Journal> = Lazy::new(|| Journal::new());
29
30const SYNC_NODE_TAG: i64 = 0;
31
32const GENESIS_STR: &str = "(lambda (*sync-state* query) (cons (eval query) *sync-state*))";
33
34pub const NULL: Word = [
35 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
36];
37
38struct Session {
39 record: Word,
40 persistor: MemoryPersistor,
41 cache: Arc<Mutex<HashMap<(String, String, Vec<u8>), Vec<u8>>>>,
42}
43
44impl Session {
45 fn new(
46 record: Word,
47 persistor: MemoryPersistor,
48 cache: Arc<Mutex<HashMap<(String, String, Vec<u8>), Vec<u8>>>>,
49 ) -> Self {
50 Self {
51 record,
52 persistor,
53 cache,
54 }
55 }
56}
57
58static SESSIONS: Lazy<Mutex<HashMap<usize, Session>>> = Lazy::new(|| Mutex::new(HashMap::new()));
59
60struct CallOnDrop<F: FnMut()>(F);
61
62impl<F: FnMut()> Drop for CallOnDrop<F> {
63 fn drop(&mut self) {
64 (self.0)();
65 }
66}
67
68#[derive(Debug)]
69pub struct JournalAccessError(pub Word);
70
71static LOCK: Mutex<()> = Mutex::new(());
72static RUNS: usize = 3;
73
74pub struct Journal {
95 client: reqwest::Client,
96}
97
98impl Journal {
99 fn new() -> Self {
100 match PERSISTOR.root_new(
101 NULL,
102 PERSISTOR
103 .branch_set(
104 PERSISTOR
105 .leaf_set(GENESIS_STR.as_bytes().to_vec())
106 .expect("Failed to create genesis leaf"),
107 NULL,
108 NULL,
109 )
110 .expect("Failed to create genesis branch"),
111 ) {
112 Ok(_) => Self {
113 client: reqwest::Client::new(),
114 },
115 Err(_) => Self {
116 client: reqwest::Client::new(),
117 },
118 }
119 }
120
121 pub fn evaluate(&self, query: &str) -> String {
137 self.evaluate_record(NULL, query)
138 }
139
140 fn evaluate_record(&self, record: Word, query: &str) -> String {
141 let mut runs = 0;
142 let cache = Arc::new(Mutex::new(HashMap::new()));
143
144 let start = Instant::now();
145 debug!(
146 "Evaluating ({})",
147 query.chars().take(128).collect::<String>(),
148 );
149
150 loop {
151 let _lock1 = if runs >= RUNS {
152 Some(LOCK.lock().expect("Failed to acquire concurrency lock"))
153 } else {
154 None
155 };
156
157 let genesis_func = PERSISTOR
158 .leaf_get(
159 PERSISTOR
160 .branch_get(
161 PERSISTOR
162 .root_get(record)
163 .expect("Failed to get root record"),
164 )
165 .expect("Failed to get genesis branch")
166 .0,
167 )
168 .expect("Failed to get genesis function")
169 .to_vec();
170
171 let genesis_str = String::from_utf8_lossy(&genesis_func);
172
173 let state_old = PERSISTOR
174 .root_get(record)
175 .expect("Failed to get current state");
176
177 let record_temp = PERSISTOR
178 .root_temp(state_old)
179 .expect("Failed to create temporary record");
180
181 let _record_dropper = CallOnDrop(|| {
182 PERSISTOR
183 .root_delete(record_temp)
184 .expect("Failed to delete temporary record");
185 });
186
187 let state_str = format!(
188 "#u({})",
189 state_old
190 .iter()
191 .map(|&num| num.to_string())
192 .collect::<Vec<String>>()
193 .join(" "),
194 );
195
196 let evaluator = Evaluator::new(
197 vec![(SYNC_NODE_TAG, type_s7_sync_node())]
198 .into_iter()
199 .collect(),
200 vec![
201 primitive_s7_sync_hash(),
202 primitive_s7_sync_null(),
203 primitive_s7_sync_node(),
204 primitive_s7_sync_stub(),
205 primitive_s7_sync_is_node(),
206 primitive_s7_sync_is_pair(),
207 primitive_s7_sync_is_stub(),
208 primitive_s7_sync_is_null(),
209 primitive_s7_sync_digest(),
210 primitive_s7_sync_cons(),
211 primitive_s7_sync_car(),
212 primitive_s7_sync_cdr(),
213 primitive_s7_sync_cut(),
214 primitive_s7_sync_create(),
215 primitive_s7_sync_delete(),
216 primitive_s7_sync_all(),
217 primitive_s7_sync_call(),
218 primitive_s7_sync_remote(),
219 primitive_s7_sync_http(),
220 primitive_s7_crypto_generate(),
221 primitive_s7_crypto_sign(),
222 primitive_s7_crypto_verify(),
223 ],
224 );
225
226 SESSIONS
227 .lock()
228 .expect("Failed to acquire sessions lock")
229 .insert(
230 evaluator.sc as usize,
231 Session::new(record, MemoryPersistor::new(), cache.clone()),
232 );
233
234 let _session_dropper = CallOnDrop(|| {
235 let mut session = SESSIONS
236 .lock()
237 .expect("Failed to acquire sessions lock for cleanup");
238 session.remove(&(evaluator.sc as usize));
239 });
240
241 let expr = format!(
242 "((eval {}) (sync-node {}) (quote {}))",
243 genesis_str, state_str, query
244 );
245
246 let result = evaluator.evaluate(expr.as_str());
247 runs += 1;
248
249 let persistor = {
250 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
251 &session
252 .get(&(evaluator.sc as usize))
253 .expect("Session not found in SESSIONS map")
254 .persistor
255 .clone()
256 };
257
258 let (output, state_new) = match result.starts_with("(error '") {
259 true => (result, state_old),
260 false => match result.rfind('.') {
261 Some(index) => match *&result[(index + 16)..(result.len() - 3)]
262 .split(' ')
263 .collect::<Vec<&str>>()
264 .iter()
265 .map(|x| x.parse::<u8>().expect("Failed to parse state byte"))
266 .collect::<Vec<u8>>()
267 .try_into()
268 {
269 Ok(state_new) => (String::from(&result[1..(index - 1)]), state_new),
270 Err(_) => (
271 String::from("(error 'sync-format \"Invalid return format\")"),
272 state_old,
273 ),
274 },
275 None => (
276 String::from("(error 'sync-format \"Invalid return format\")"),
277 state_old,
278 ),
279 },
280 };
281
282 match state_old == state_new {
283 true => {
284 debug!(
285 "Completed ({:?}) {} -> {}",
286 start.elapsed(),
287 query.chars().take(128).collect::<String>(),
288 output,
289 );
290 return output;
291 }
292 false => match state_old
293 == PERSISTOR
294 .root_get(record)
295 .expect("Failed to get record state for comparison")
296 {
297 true => {
298 fn recurse(
299 source: &MemoryPersistor,
300 node: Word,
301 ) -> Result<(), PersistorAccessError> {
302 if node == NULL {
303 Ok(())
304 } else if let Ok(_) = PERSISTOR.leaf_get(node) {
305 Ok(())
306 } else if let Ok(_) = PERSISTOR.stump_get(node) {
307 Ok(())
308 } else if let Ok(_) = PERSISTOR.branch_get(node) {
309 Ok(())
310 } else if let Ok(content) = source.leaf_get(node) {
311 PERSISTOR
312 .leaf_set(content)
313 .expect("Failed to set leaf content");
314 Ok(())
315 } else if let Ok(digest) = source.stump_get(node) {
316 PERSISTOR
317 .stump_set(digest)
318 .expect("Failed to set stump");
319 Ok(())
320 } else if let Ok((left, right, digest)) = source.branch_get(node) {
321 PERSISTOR
322 .branch_set(left, right, digest)
323 .expect("Failed to set branch");
324 match recurse(&source, left) {
325 Ok(_) => recurse(&source, right),
326 err => err,
327 }
328 } else {
329 Err(PersistorAccessError(format!("Dangling branch {:?}", node)))
330 }
331 }
332
333 {
334 let _lock2 = match _lock1 {
335 Some(_) => None,
336 None => {
337 Some(LOCK.lock().expect("Failed to acquire secondary lock"))
338 }
339 };
340
341 match recurse(&persistor, state_new) {
342 Ok(_) => match PERSISTOR.root_set(record, state_old, state_new) {
343 Ok(_) => {
344 debug!(
345 "Completed ({:?}) {} -> {}",
346 start.elapsed(),
347 query.chars().take(128).collect::<String>(),
348 output,
349 );
350 return output;
351 }
352 Err(_) => {
353 info!(
354 "Rerunning (x{}) due to concurrency collision: {}",
355 runs, query
356 );
357 continue;
358 }
359 },
360 Err(err) => {
361 panic!("{:?}", err);
362 }
363 }
364 }
365 }
366 false => {
367 info!(
368 "Rerunning (x{}) due to concurrency collision: {}",
369 runs, query
370 );
371 continue;
372 }
373 },
374 }
375 }
376 }
377}
378
379unsafe fn sync_error(sc: *mut s7::s7_scheme, string: &str) -> s7::s7_pointer {
380 let c_string = CString::new(string).expect("Failed to create CString from string");
381
382 s7::s7_error(
383 sc,
384 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
385 s7::s7_list(sc, 1, s7::s7_make_string(sc, c_string.as_ptr())),
386 )
387}
388
389fn type_s7_sync_node() -> Type {
390 unsafe extern "C" fn free(_sc: *mut s7::s7_scheme, obj: s7::s7_pointer) -> s7::s7_pointer {
391 sync_heap_free(s7::s7_c_object_value(obj));
392 std::ptr::null_mut()
393 }
394
395 unsafe extern "C" fn mark(_sc: *mut s7::s7_scheme, _obj: s7::s7_pointer) -> s7::s7_pointer {
396 std::ptr::null_mut()
397 }
398
399 unsafe extern "C" fn is_equal(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
400 match sync_is_node(s7::s7_cadr(args)) {
401 true => {
402 let word1 = sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)));
403 let word2 = sync_heap_read(s7::s7_c_object_value(s7::s7_cadr(args)));
404 s7::s7_make_boolean(sc, word1 == word2)
405 }
406 false => s7::s7_wrong_type_arg_error(
407 sc,
408 c"equal?".as_ptr(),
409 2,
410 s7::s7_cadr(args),
411 c"a sync-node".as_ptr(),
412 ),
413 }
414 }
415
416 unsafe extern "C" fn to_string(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
417 string_to_s7(
418 sc,
419 format!(
420 "(sync-node #u({}))",
421 sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)))
422 .iter()
423 .map(|&byte| byte.to_string())
424 .collect::<Vec<String>>()
425 .join(" "),
426 )
427 .as_str(),
428 )
429 }
430
431 Type::new(c"sync-node", free, mark, is_equal, to_string)
432}
433
434fn primitive_s7_sync_stub() -> Primitive {
435 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
436 let bv = s7::s7_car(args);
437
438 if !s7::s7_is_byte_vector(bv) || s7::s7_vector_length(bv) as usize != SIZE {
439 return s7::s7_wrong_type_arg_error(
440 sc,
441 c"sync-cut".as_ptr(),
442 1,
443 s7::s7_car(args),
444 c"a hash-sized byte-vector".as_ptr(),
445 );
446 }
447
448 let mut digest = [0 as u8; SIZE];
449 for i in 0..SIZE {
450 digest[i] = s7::s7_byte_vector_ref(bv, i as i64);
451 }
452
453 let persistor = {
454 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
455 &session
456 .get(&(sc as usize))
457 .expect("Session not found for given context")
458 .persistor
459 .clone()
460 };
461
462 match persistor.stump_set(digest) {
463 Ok(stump) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(stump)),
464 Err(_) => sync_error(sc, "Journal is unable to create stub node (sync-stub)"),
465 }
466 }
467
468 Primitive::new(
469 code,
470 c"sync-stub",
471 c"(sync-stub digest) create a sync stub from the provided byte-vector",
472 1,
473 0,
474 false,
475 )
476}
477
478fn primitive_s7_sync_hash() -> Primitive {
479 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
480 let data_bv = s7::s7_car(args);
481
482 if !s7::s7_is_byte_vector(data_bv) {
484 return s7::s7_wrong_type_arg_error(
485 sc,
486 c"sync-hash".as_ptr(),
487 1,
488 data_bv,
489 c"a byte-vector".as_ptr(),
490 );
491 }
492
493 let mut data = vec![];
495 for i in 0..s7::s7_vector_length(data_bv) {
496 data.push(s7::s7_byte_vector_ref(data_bv, i as i64))
497 }
498
499 let digest = Sha256::digest(data).to_vec();
500 let digest_bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
501 for i in 0..SIZE {
502 s7::s7_byte_vector_set(digest_bv, i as i64, digest[i]);
503 }
504 digest_bv
505 }
506
507 Primitive::new(
508 code,
509 c"sync-hash",
510 c"(sync-hash bv) compute the SHA-256 digest of a byte vector",
511 1,
512 0,
513 false,
514 )
515}
516
517fn primitive_s7_sync_node() -> Primitive {
518 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
519 let digest = s7::s7_car(args);
520
521 if !s7::s7_is_byte_vector(digest) || s7::s7_vector_length(digest) as usize != SIZE {
522 return s7::s7_wrong_type_arg_error(
523 sc,
524 c"sync-node".as_ptr(),
525 1,
526 digest,
527 c"a hash-sized byte-vector".as_ptr(),
528 );
529 }
530
531 let mut word = [0 as u8; SIZE];
532 for i in 0..SIZE {
533 word[i] = s7::s7_byte_vector_ref(digest, i as i64);
534 }
535
536 let persistor = {
537 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
538 &session
539 .get(&(sc as usize))
540 .expect("Session not found for sync-node")
541 .persistor
542 .clone()
543 };
544
545 if word == NULL
546 || persistor.branch_get(word).is_ok()
547 || PERSISTOR.branch_get(word).is_ok()
548 || persistor.stump_get(word).is_ok()
549 || PERSISTOR.stump_get(word).is_ok()
550 {
551 s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(word))
552 } else {
553 sync_error(sc, "Node does not exist in this journal (sync-node)")
554 }
555 }
556
557 Primitive::new(
558 code,
559 c"sync-node",
560 c"(sync-node digest) returns the sync pair defined by the digest",
561 1,
562 0,
563 false,
564 )
565}
566
567fn primitive_s7_sync_is_node() -> Primitive {
568 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
569 s7::s7_make_boolean(sc, sync_is_node(s7::s7_car(args)))
570 }
571
572 Primitive::new(
573 code,
574 c"sync-node?",
575 c"(sync-node?) returns whether the object is a sync node",
576 1,
577 0,
578 false,
579 )
580}
581
582fn primitive_s7_sync_null() -> Primitive {
583 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, _args: s7::s7_pointer) -> s7::s7_pointer {
584 s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(NULL))
585 }
586
587 Primitive::new(
588 code,
589 c"sync-null",
590 c"(sync-null) returns the null synchronic node",
591 0,
592 0,
593 false,
594 )
595}
596
597fn primitive_s7_sync_is_null() -> Primitive {
598 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
599 let arg = s7::s7_car(args);
600 match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
601 false => s7::s7_wrong_type_arg_error(
602 sc,
603 c"sync-null?".as_ptr(),
604 1,
605 arg,
606 c"a sync-node or byte-vector".as_ptr(),
607 ),
608 true => {
609 let word = sync_heap_read(s7::s7_c_object_value(arg));
610 for i in 0..SIZE {
611 if word[i] != 0 {
612 return s7::s7_make_boolean(sc, false);
613 }
614 }
615 s7::s7_make_boolean(sc, true)
616 }
617 }
618 }
619
620 Primitive::new(
621 code,
622 c"sync-null?",
623 c"(sync-null? sp) returns a boolean indicating whether sp is equal to sync-null",
624 1,
625 0,
626 false,
627 )
628}
629
630fn primitive_s7_sync_is_pair() -> Primitive {
631 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
632 let arg = s7::s7_car(args);
633 match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
634 false => s7::s7_wrong_type_arg_error(
635 sc,
636 c"sync-pair?".as_ptr(),
637 1,
638 arg,
639 c"a sync-node or byte-vector".as_ptr(),
640 ),
641 true => {
642 let word = sync_heap_read(s7::s7_c_object_value(arg));
643 let persistor = {
644 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
645 &session
646 .get(&(sc as usize))
647 .expect("Session not found for sync-pair?")
648 .persistor
649 .clone()
650 };
651 s7::s7_make_boolean(
652 sc,
653 persistor.branch_get(word).is_ok() || PERSISTOR.branch_get(word).is_ok(),
654 )
655 }
656 }
657 }
658
659 Primitive::new(
660 code,
661 c"sync-pair?",
662 c"(sync-pair? sp) returns a boolean indicating whether sp is a pair",
663 1,
664 0,
665 false,
666 )
667}
668
669fn primitive_s7_sync_is_stub() -> Primitive {
670 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
671 let arg = s7::s7_car(args);
672 match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
673 false => s7::s7_wrong_type_arg_error(
674 sc,
675 c"sync-stub?".as_ptr(),
676 1,
677 arg,
678 c"a sync-node or byte-vector".as_ptr(),
679 ),
680 true => {
681 let word = sync_heap_read(s7::s7_c_object_value(arg));
682 let persistor = {
683 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
684 &session
685 .get(&(sc as usize))
686 .expect("Session not found for sync-stub?")
687 .persistor
688 .clone()
689 };
690 s7::s7_make_boolean(
691 sc,
692 persistor.stump_get(word).is_ok() || PERSISTOR.stump_get(word).is_ok(),
693 )
694 }
695 }
696 }
697
698 Primitive::new(
699 code,
700 c"sync-stub?",
701 c"(sync-stub? sp) returns a boolean indicating whether sp is a stub",
702 1,
703 0,
704 false,
705 )
706}
707
708fn primitive_s7_sync_digest() -> Primitive {
709 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
710 match sync_is_node(s7::s7_car(args)) {
711 false => s7::s7_wrong_type_arg_error(
712 sc,
713 c"sync-digest".as_ptr(),
714 1,
715 s7::s7_car(args),
716 c"a sync-node".as_ptr(),
717 ),
718 true => {
719 let word = sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)));
720 let digest = sync_digest(sc, word).expect("Failed to obtain digest");
721 let bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
722 for i in 0..SIZE {
723 s7::s7_byte_vector_set(bv, i as i64, digest[i]);
724 }
725 bv
726 }
727 }
728 }
729
730 Primitive::new(
731 code,
732 c"sync-digest",
733 c"(sync-digest node) returns the the digest of a sync-node as a byte-vector",
734 1,
735 0,
736 false,
737 )
738}
739
740fn primitive_s7_sync_cons() -> Primitive {
741 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
742 let persistor = {
743 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
744 &session
745 .get(&(sc as usize))
746 .expect("Session not found for sync-cons")
747 .persistor
748 .clone()
749 };
750
751 let handle_arg = |obj, number| {
752 if sync_is_node(obj) {
753 Ok(sync_heap_read(s7::s7_c_object_value(obj)))
754 } else if s7::s7_is_byte_vector(obj) {
755 let mut content = vec![];
756 for i in 0..s7::s7_vector_length(obj) {
757 content.push(s7::s7_byte_vector_ref(obj, i as i64))
758 }
759 match persistor.leaf_set(content) {
760 Ok(atom) => Ok(atom),
761 Err(_) => Err(sync_error(
762 sc,
763 "Journal is unable to add leaf node (sync-cons)",
764 )),
765 }
766 } else {
767 Err(s7::s7_wrong_type_arg_error(
768 sc,
769 c"sync-cons".as_ptr(),
770 number,
771 obj,
772 c"a byte vector or a sync node".as_ptr(),
773 ))
774 }
775 };
776
777 match (
778 handle_arg(s7::s7_car(args), 1),
779 handle_arg(s7::s7_cadr(args), 2),
780 ) {
781 (Ok(left), Ok(right)) => match (sync_digest(sc, left), sync_digest(sc, right)) {
782 (Ok(digest_left), Ok(digest_right)) => {
783 let mut joined = [0 as u8; SIZE * 2];
784 joined[..SIZE].copy_from_slice(&digest_left);
785 joined[SIZE..].copy_from_slice(&digest_right);
786 let digest = Word::from(Sha256::digest(joined));
787
788 match persistor.branch_set(left, right, digest) {
789 Ok(pair) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(pair)),
790 Err(_) => sync_error(sc, "Journal is unable to add pair node (sync-cons)"),
791 }
792 }
793 _ => sync_error(sc, "Journal is unable to obtain node digests (sync-cons)"),
794 },
795 (Err(left), _) => left,
796 (_, Err(right)) => right,
797 }
798 }
799
800 Primitive::new(
801 code,
802 c"sync-cons",
803 c"(sync-cons first rest) construct a new sync pair node",
804 2,
805 0,
806 false,
807 )
808}
809
810fn primitive_s7_sync_car() -> Primitive {
811 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
812 if !sync_is_node(s7::s7_car(args)) {
813 return s7::s7_wrong_type_arg_error(
814 sc,
815 c"sync-car".as_ptr(),
816 1,
817 s7::s7_car(args),
818 c"a sync-pair".as_ptr(),
819 );
820 }
821 sync_cxr(sc, args, c"sync-car", |children| children.0)
822 }
823
824 Primitive::new(
825 code,
826 c"sync-car",
827 c"(sync-car pair) retrieve the first element of a sync pair",
828 1,
829 0,
830 false,
831 )
832}
833
834fn primitive_s7_sync_cdr() -> Primitive {
835 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
836 if !sync_is_node(s7::s7_car(args)) {
837 return s7::s7_wrong_type_arg_error(
838 sc,
839 c"sync-cdr".as_ptr(),
840 1,
841 s7::s7_car(args),
842 c"a sync-pair".as_ptr(),
843 );
844 }
845 sync_cxr(sc, args, c"sync-cdr", |children| children.1)
846 }
847
848 Primitive::new(
849 code,
850 c"sync-cdr",
851 c"(sync-cdr pair) retrieve the second element of a sync pair",
852 1,
853 0,
854 false,
855 )
856}
857
858fn primitive_s7_sync_cut() -> Primitive {
859 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
860 let arg = s7::s7_car(args);
861
862 let handle_digest = |digest| {
863 let persistor = {
864 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
865 &session
866 .get(&(sc as usize))
867 .expect("Session not found for given context")
868 .persistor
869 .clone()
870 };
871 match persistor.stump_set(digest) {
872 Ok(stump) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(stump)),
873 Err(_) => sync_error(sc, "Journal is unable to add stub node (sync-cut)"),
874 }
875 };
876
877 if s7::s7_is_byte_vector(arg) {
878 let mut content = vec![];
879 for i in 0..s7::s7_vector_length(arg) {
880 content.push(s7::s7_byte_vector_ref(arg, i as i64))
881 }
882 handle_digest(Word::from(Sha256::digest(Sha256::digest(&content))))
883 } else if sync_is_node(arg) {
884 match sync_digest(sc, sync_heap_read(s7::s7_c_object_value(arg))) {
885 Ok(digest) => handle_digest(digest),
886 Err(_) => sync_error(sc, "Journal does not recognize input node (sync-cut)"),
887 }
888 } else {
889 s7::s7_wrong_type_arg_error(
890 sc,
891 c"sync-cut".as_ptr(),
892 1,
893 s7::s7_car(args),
894 c"a sync-node or byte-vector".as_ptr(),
895 )
896 }
897 }
898
899 Primitive::new(
900 code,
901 c"sync-cut",
902 c"(sync-cut value) obtain the stub of a sync pair or byte vector",
903 1,
904 0,
905 false,
906 )
907}
908
909fn primitive_s7_sync_create() -> Primitive {
910 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
911 let id = s7::s7_car(args);
912
913 if !s7::s7_is_byte_vector(id) || s7::s7_vector_length(id) as usize != SIZE {
914 return s7::s7_wrong_type_arg_error(
915 sc,
916 c"sync-create".as_ptr(),
917 1,
918 id,
919 c"a hash-sized byte-vector".as_ptr(),
920 );
921 }
922
923 let mut record: Word = [0 as u8; SIZE];
924
925 for i in 0..SIZE {
926 record[i as usize] = s7::s7_byte_vector_ref(id, i as i64)
927 }
928
929 debug!("Adding record: {}", hex::encode(record));
930
931 match PERSISTOR.root_new(
932 record,
933 PERSISTOR
934 .branch_set(
935 PERSISTOR
936 .leaf_set(GENESIS_STR.as_bytes().to_vec())
937 .expect("Failed to create genesis leaf for new record"),
938 NULL,
939 NULL,
940 )
941 .expect("Failed to create genesis branch for new record"),
942 ) {
943 Ok(_) => s7::s7_make_boolean(sc, true),
944 Err(_) => s7::s7_error(
945 sc,
946 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
947 s7::s7_list(
948 sc,
949 1,
950 s7::s7_make_string(sc, c"record ID is already in use".as_ptr()),
951 ),
952 ),
953 }
954 }
955
956 Primitive::new(
957 code,
958 c"sync-create",
959 c"(sync-create id) create a new synchronic record with the given 32-byte ID",
960 1,
961 0,
962 false,
963 )
964}
965
966fn primitive_s7_sync_delete() -> Primitive {
967 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
968 let id = s7::s7_car(args);
969
970 if !s7::s7_is_byte_vector(id) || s7::s7_vector_length(id) as usize != SIZE {
971 return s7::s7_wrong_type_arg_error(
972 sc,
973 c"sync-delete".as_ptr(),
974 1,
975 id,
976 c"a hash-sized byte-vector".as_ptr(),
977 );
978 }
979
980 let mut record: Word = [0 as u8; SIZE];
981
982 for i in 0..s7::s7_vector_length(id) {
983 record[i as usize] = s7::s7_byte_vector_ref(id, i as i64)
984 }
985
986 if record == NULL {
987 return s7::s7_error(
988 sc,
989 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
990 s7::s7_list(
991 sc,
992 1,
993 s7::s7_make_string(sc, c"cannot delete the root record".as_ptr()),
994 ),
995 );
996 }
997
998 debug!("Deleting record: {}", hex::encode(record));
999
1000 match PERSISTOR.root_delete(record) {
1001 Ok(_) => s7::s7_make_boolean(sc, true),
1002 Err(_) => s7::s7_error(
1003 sc,
1004 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1005 s7::s7_list(
1006 sc,
1007 1,
1008 s7::s7_make_string(sc, c"record ID does not exist".as_ptr()),
1009 ),
1010 ),
1011 }
1012 }
1013
1014 Primitive::new(
1015 code,
1016 c"sync-delete",
1017 c"(sync-delete id) delete the synchronic record with the given 32-byte ID",
1018 1,
1019 0,
1020 false,
1021 )
1022}
1023
1024fn primitive_s7_sync_all() -> Primitive {
1025 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, _args: s7::s7_pointer) -> s7::s7_pointer {
1026 let mut list = s7::s7_list(sc, 0);
1027
1028 for record in PERSISTOR.root_list().into_iter().rev() {
1029 let bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
1030 for i in 0..SIZE {
1031 s7::s7_byte_vector_set(bv, i as i64, record[i]);
1032 }
1033
1034 list = s7::s7_cons(sc, bv, list)
1035 }
1036
1037 list
1038 }
1039
1040 Primitive::new(
1041 code,
1042 c"sync-all",
1043 c"(sync-all) list all synchronic record IDs in ascending order",
1044 0,
1045 0,
1046 false,
1047 )
1048}
1049
1050fn primitive_s7_sync_call() -> Primitive {
1051 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1052 let message_expr = s7::s7_car(args);
1053 let blocking = s7::s7_cadr(args);
1054
1055 if !s7::s7_is_boolean(blocking) {
1056 return s7::s7_wrong_type_arg_error(
1057 sc,
1058 c"sync-call".as_ptr(),
1059 2,
1060 blocking,
1061 c"a boolean".as_ptr(),
1062 );
1063 }
1064
1065 let record = match s7::s7_is_null(sc, s7::s7_cddr(args)) {
1066 true => {
1067 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
1068 session
1069 .get(&(sc as usize))
1070 .expect("Session number not found in sessions map")
1071 .record
1072 }
1073 false => {
1074 let bv = s7::s7_caddr(args);
1075 if !s7::s7_is_byte_vector(bv) || s7::s7_vector_length(bv) as usize != SIZE {
1077 return s7::s7_wrong_type_arg_error(
1078 sc,
1079 c"sync-call".as_ptr(),
1080 3,
1081 bv,
1082 c"a hash-sized byte-vector".as_ptr(),
1083 );
1084 }
1085
1086 let mut record = [0 as u8; SIZE];
1087 for i in 0..SIZE {
1088 record[i] = s7::s7_byte_vector_ref(bv, i as i64);
1089 }
1090 record
1091 }
1092 };
1093
1094 match PERSISTOR.root_get(record) {
1095 Ok(_) => {
1096 let message = obj2str(sc, message_expr);
1097 if s7::s7_boolean(sc, blocking) {
1098 let result = JOURNAL.evaluate_record(record, message.as_str());
1099 let c_result = CString::new(format!("(quote {})", result))
1100 .expect("Failed to create C string from journal evaluation result");
1101 s7::s7_eval_c_string(sc, c_result.as_ptr())
1102 } else {
1103 tokio::spawn(async move {
1104 JOURNAL.evaluate_record(record, message.as_str());
1105 });
1106 s7::s7_make_boolean(sc, true)
1107 }
1108 }
1109 Err(_) => s7::s7_error(
1110 sc,
1111 s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1112 s7::s7_list(
1113 sc,
1114 1,
1115 s7::s7_make_string(sc, c"record ID does not exist".as_ptr()),
1116 ),
1117 ),
1118 }
1119 }
1120
1121 Primitive::new(
1122 code,
1123 c"sync-call",
1124 c"(sync-call query blocking? id) query the provided record ID or self if ID not provided",
1125 2,
1126 1,
1127 false,
1128 )
1129}
1130
1131fn primitive_s7_sync_http() -> Primitive {
1132 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1133 let vec2s7 = |vector: Vec<u8>| {
1134 let bv = s7::s7_make_byte_vector(sc, vector.len() as i64, 1, std::ptr::null_mut());
1135 for i in 0..vector.len() {
1136 s7::s7_byte_vector_set(bv, i as i64, vector[i]);
1137 }
1138 bv
1139 };
1140
1141 let method = obj2str(sc, s7::s7_car(args));
1142 let url = obj2str(sc, s7::s7_cadr(args));
1143
1144 let body = if s7::s7_list_length(sc, args) >= 3 {
1145 obj2str(sc, s7::s7_caddr(args))
1146 } else {
1147 String::from("")
1148 };
1149
1150 let cache_mutex = {
1151 let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
1152 &session
1153 .get(&(sc as usize))
1154 .expect("Session ID not found in active sessions")
1155 .cache
1156 .clone()
1157 };
1158
1159 let mut cache = cache_mutex
1160 .lock()
1161 .expect("Failed to acquire cache mutex lock");
1162
1163 let key = (method.clone(), url.clone(), body.as_bytes().to_vec());
1164
1165 match cache.get(&key) {
1166 Some(bytes) => {
1167 debug!("Cache hit on key {:?}", key);
1168 vec2s7(bytes.to_vec())
1169 }
1170 None => {
1171 let result = tokio::task::block_in_place(move || {
1172 tokio::runtime::Handle::current().block_on(async move {
1173 match method.to_lowercase() {
1174 method if method == "get" => {
1175 JOURNAL
1176 .client
1177 .get(&url[1..url.len() - 1])
1178 .send()
1179 .await?
1180 .bytes()
1181 .await
1182 }
1183 method if method == "post" => {
1184 JOURNAL
1185 .client
1186 .post(&url[1..url.len() - 1])
1187 .body(String::from(&body[1..body.len() - 1]))
1188 .send()
1189 .await?
1190 .bytes()
1191 .await
1192 }
1193 _ => {
1194 panic!("Unsupported HTTP method")
1195 }
1196 }
1197 })
1198 });
1199
1200 match result {
1201 Ok(vector) => {
1202 cache.insert(key, vector.to_vec());
1203 vec2s7(vector.to_vec())
1204 }
1205 Err(_) => {
1206 sync_error(sc, "Journal is unable to fulfill HTTP request (sync-http)")
1207 }
1208 }
1209 }
1210 }
1211 }
1212
1213 Primitive::new(
1214 code,
1215 c"sync-http",
1216 c"(sync-http method url . data) make an http request where method is 'get or 'post",
1217 2,
1218 2,
1219 false,
1220 )
1221}
1222
1223fn primitive_s7_sync_remote() -> Primitive {
1224 unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1225 let vec2s7 = |mut vector: Vec<u8>| {
1226 vector.insert(0, 39); vector.push(0);
1228 let c_string =
1229 CString::from_vec_with_nul(vector).expect("Failed to create C string from vector");
1230 s7::s7_eval_c_string(sc, c_string.as_ptr())
1231 };
1232
1233 let url = obj2str(sc, s7::s7_car(args));
1234
1235 let body = obj2str(sc, s7::s7_cadr(args));
1236
1237 let cache_mutex = {
1238 let session = SESSIONS.lock().expect("Failed to acquire session lock");
1239 &session
1240 .get(&(sc as usize))
1241 .expect("Failed to get session from map")
1242 .cache
1243 .clone()
1244 };
1245
1246 let mut cache = cache_mutex.lock().expect("Failed to acquire cache lock");
1247
1248 let key = (String::from("post"), url.clone(), body.as_bytes().to_vec());
1249
1250 match cache.get(&key) {
1251 Some(bytes) => {
1252 debug!("Cache hit on key {:?}", key);
1253 vec2s7(bytes.to_vec())
1254 }
1255 None => {
1256 let result = tokio::task::block_in_place(move || {
1257 tokio::runtime::Handle::current().block_on(async move {
1258 JOURNAL
1259 .client
1260 .post(&url[1..url.len() - 1])
1261 .body(body)
1262 .send()
1263 .await?
1264 .bytes()
1265 .await
1266 })
1267 });
1268
1269 match result {
1270 Ok(bytes) => {
1271 cache.insert(key, bytes.to_vec());
1272 vec2s7(bytes.to_vec())
1273 }
1274 Err(_) => {
1275 sync_error(sc, "Journal is unable to query remote peer (sync-remote)")
1276 }
1277 }
1278 }
1279 }
1280 }
1281
1282 Primitive::new(
1283 code,
1284 c"sync-remote",
1285 c"(sync-remote url data) make a post http request with the data payload)",
1286 2,
1287 0,
1288 false,
1289 )
1290}
1291unsafe fn string_to_s7(sc: *mut s7::s7_scheme, string: &str) -> s7::s7_pointer {
1292 let c_string = CString::new(string).expect("Failed to create CString from string");
1293 let s7_string = s7::s7_make_string(sc, c_string.as_ptr());
1294 s7::s7_object_to_string(sc, s7_string, false)
1295}
1296
1297unsafe fn sync_heap_make(word: Word) -> *mut libc::c_void {
1298 let ptr = libc::malloc(SIZE);
1299 let array: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, SIZE);
1300 for i in 0..SIZE {
1301 array[i] = word[i] as u8;
1302 }
1303 ptr
1304}
1305
1306unsafe fn sync_heap_read(ptr: *mut libc::c_void) -> Word {
1307 std::slice::from_raw_parts_mut(ptr as *mut u8, SIZE)
1308 .try_into()
1309 .expect("Failed to convert slice to Word array")
1310}
1311
1312unsafe fn sync_heap_free(ptr: *mut libc::c_void) {
1313 libc::free(ptr);
1314}
1315
1316unsafe fn sync_is_node(obj: s7::s7_pointer) -> bool {
1317 s7::s7_is_c_object(obj) && s7::s7_c_object_type(obj) == SYNC_NODE_TAG
1318}
1319
1320unsafe fn sync_cxr(
1321 sc: *mut s7::s7_scheme,
1322 args: s7::s7_pointer,
1323 name: &CStr,
1324 selector: fn((Word, Word)) -> Word,
1325) -> s7::s7_pointer {
1326 let node = s7::s7_car(args);
1327 let word = sync_heap_read(s7::s7_c_object_value(node));
1328
1329 let persistor = {
1330 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
1331 &session
1332 .get(&(sc as usize))
1333 .expect("Session not found for given context")
1334 .persistor
1335 .clone()
1336 };
1337
1338 let child_return = |word| {
1339 let node_return = |word| s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(word));
1340
1341 let vector_return = |vector: Vec<u8>| {
1342 let bv = s7::s7_make_byte_vector(sc, vector.len() as i64, 1, std::ptr::null_mut());
1343 for i in 0..vector.len() {
1344 s7::s7_byte_vector_set(bv, i as i64, vector[i]);
1345 }
1346 bv
1347 };
1348
1349 if word == NULL {
1350 node_return(word)
1351 } else if let Ok(_) = persistor.branch_get(word) {
1352 node_return(word)
1353 } else if let Ok(_) = PERSISTOR.branch_get(word) {
1354 node_return(word)
1355 } else if let Ok(content) = persistor.leaf_get(word) {
1356 vector_return(content)
1357 } else if let Ok(content) = PERSISTOR.leaf_get(word) {
1358 vector_return(content)
1359 } else if let Ok(_) = persistor.stump_get(word) {
1360 node_return(word)
1361 } else if let Ok(_) = PERSISTOR.stump_get(word) {
1362 node_return(word)
1363 } else {
1364 sync_error(
1365 sc,
1366 format!(
1367 "Cannot retrieve items for node that is not a sync-pair ({})",
1368 name.to_string_lossy()
1369 )
1370 .as_str(),
1371 )
1372 }
1373 };
1374
1375 match sync_is_node(node) {
1376 true => match persistor.branch_get(word) {
1377 Ok((left, right, _)) => child_return(selector((left, right))),
1378 Err(_) => match PERSISTOR.branch_get(word) {
1379 Ok((left, right, _)) => child_return(selector((left, right))),
1380 Err(_) => sync_error(
1381 sc,
1382 format!(
1383 "Journal cannot retrieve leaf byte-vector ({})",
1384 name.to_string_lossy()
1385 )
1386 .as_str(),
1387 ),
1388 },
1389 },
1390 false => s7::s7_wrong_type_arg_error(sc, name.as_ptr(), 1, node, c"a sync-node".as_ptr()),
1391 }
1392}
1393
1394unsafe fn sync_digest(sc: *mut s7::s7_scheme, word: Word) -> Result<Word, String> {
1395 let persistor = {
1396 let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
1397 &session
1398 .get(&(sc as usize))
1399 .expect("Session not found for given context")
1400 .persistor
1401 .clone()
1402 };
1403
1404 if word == NULL {
1405 Ok(NULL)
1406 } else if let Ok((_, _, digest)) = persistor.branch_get(word) {
1407 Ok(digest)
1408 } else if let Ok((_, _, digest)) = PERSISTOR.branch_get(word) {
1409 Ok(digest)
1410 } else if let Ok(_) = persistor.leaf_get(word) {
1411 Ok(word)
1412 } else if let Ok(_) = PERSISTOR.leaf_get(word) {
1413 Ok(word)
1414 } else if let Ok(digest) = persistor.stump_get(word) {
1415 Ok(digest)
1416 } else if let Ok(digest) = PERSISTOR.stump_get(word) {
1417 Ok(digest)
1418 } else {
1419 Err("Digest not found in persistor".to_string())
1420 }
1421}