journal_sdk/
lib.rs

1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3pub use crate::config::Config;
4use crate::evaluator::{obj2str, Evaluator, Primitive, Type};
5use crate::extensions::crypto::{
6    primitive_s7_crypto_generate, primitive_s7_crypto_sign, primitive_s7_crypto_verify,
7};
8use crate::persistor::{MemoryPersistor, Persistor, PersistorAccessError, PERSISTOR};
9pub use crate::persistor::{Word, SIZE};
10use libc;
11use log::{debug, info};
12use once_cell::sync::Lazy;
13use std::collections::HashMap;
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16
17use evaluator as s7;
18use sha2::{Digest, Sha256};
19use std::ffi::{CStr, CString};
20
21mod config;
22mod evaluator;
23mod persistor;
24mod extensions {
25    pub mod crypto;
26}
27
28pub static JOURNAL: Lazy<Journal> = Lazy::new(|| Journal::new());
29
30const SYNC_NODE_TAG: i64 = 0;
31
32const GENESIS_STR: &str = "(lambda (*sync-state* query) (cons (eval query) *sync-state*))";
33
34pub const NULL: Word = [
35    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
36];
37
38struct Session {
39    record: Word,
40    persistor: MemoryPersistor,
41    cache: Arc<Mutex<HashMap<(String, String, Vec<u8>), Vec<u8>>>>,
42}
43
44impl Session {
45    fn new(
46        record: Word,
47        persistor: MemoryPersistor,
48        cache: Arc<Mutex<HashMap<(String, String, Vec<u8>), Vec<u8>>>>,
49    ) -> Self {
50        Self {
51            record,
52            persistor,
53            cache,
54        }
55    }
56}
57
58static SESSIONS: Lazy<Mutex<HashMap<usize, Session>>> = Lazy::new(|| Mutex::new(HashMap::new()));
59
60struct CallOnDrop<F: FnMut()>(F);
61
62impl<F: FnMut()> Drop for CallOnDrop<F> {
63    fn drop(&mut self) {
64        (self.0)();
65    }
66}
67
68#[derive(Debug)]
69pub struct JournalAccessError(pub Word);
70
71static LOCK: Mutex<()> = Mutex::new(());
72static RUNS: usize = 3;
73
74/// Journals are the primary way that application developers
75/// interact with the synchronic web.
76///
77/// Conceptually, a Journal is a
78/// service that interacts with users and other Journals (nodes) to
79/// persist synchronic web state. Behind the schemes, it is
80/// responsible for two capabilities:
81///
82/// * __Persistence__: managing bytes on the global hash graph
83///
84/// * __Evaluation__: executing code in the global Lisp environment
85///
86/// __Records__ are the primary way that developers interface with
87/// Journals. A Record is a mapping between a constant identifier and
88/// mutable state. Both identifiers and state are represented as
89/// fixed-size __Words__ that the outputs of a cryptographic hash
90/// function. When a new record is created, the Journal returns a
91/// record secret that is the second hash preimage of the identifier.
92/// This is intended to be used so that applications can bootstrap
93/// records into increasingly sophisticated notions of identity.
94pub struct Journal {
95    client: reqwest::Client,
96}
97
98impl Journal {
99    fn new() -> Self {
100        match PERSISTOR.root_new(
101            NULL,
102            PERSISTOR
103                .branch_set(
104                    PERSISTOR
105                        .leaf_set(GENESIS_STR.as_bytes().to_vec())
106                        .expect("Failed to create genesis leaf"),
107                    NULL,
108                    NULL,
109                )
110                .expect("Failed to create genesis branch"),
111        ) {
112            Ok(_) => Self {
113                client: reqwest::Client::new(),
114            },
115            Err(_) => Self {
116                client: reqwest::Client::new(),
117            },
118        }
119    }
120
121    /// Evaluate a Lisp expression within a Record
122    ///
123    /// # Examples
124    /// ```
125    /// use journal_sdk::JOURNAL;
126    ///
127    /// // Simple expression
128    /// let output = JOURNAL.evaluate("(+ 1 2)");
129    /// assert!(output == "3");
130    ///
131    /// // Complex expression
132    /// let output = JOURNAL.evaluate(
133    ///     "(begin (define (add2 x) (+ x 2)) (add2 1))",
134    /// );
135    /// assert!(output == "3");
136    pub fn evaluate(&self, query: &str) -> String {
137        self.evaluate_record(NULL, query)
138    }
139
140    fn evaluate_record(&self, record: Word, query: &str) -> String {
141        let mut runs = 0;
142        let cache = Arc::new(Mutex::new(HashMap::new()));
143
144        let start = Instant::now();
145        debug!(
146            "Evaluating ({})",
147            query.chars().take(128).collect::<String>(),
148        );
149
150        loop {
151            let _lock1 = if runs >= RUNS {
152                Some(LOCK.lock().expect("Failed to acquire concurrency lock"))
153            } else {
154                None
155            };
156
157            let genesis_func = PERSISTOR
158                .leaf_get(
159                    PERSISTOR
160                        .branch_get(
161                            PERSISTOR
162                                .root_get(record)
163                                .expect("Failed to get root record"),
164                        )
165                        .expect("Failed to get genesis branch")
166                        .0,
167                )
168                .expect("Failed to get genesis function")
169                .to_vec();
170
171            let genesis_str = String::from_utf8_lossy(&genesis_func);
172
173            let state_old = PERSISTOR
174                .root_get(record)
175                .expect("Failed to get current state");
176
177            let record_temp = PERSISTOR
178                .root_temp(state_old)
179                .expect("Failed to create temporary record");
180
181            let _record_dropper = CallOnDrop(|| {
182                PERSISTOR
183                    .root_delete(record_temp)
184                    .expect("Failed to delete temporary record");
185            });
186
187            let state_str = format!(
188                "#u({})",
189                state_old
190                    .iter()
191                    .map(|&num| num.to_string())
192                    .collect::<Vec<String>>()
193                    .join(" "),
194            );
195
196            let evaluator = Evaluator::new(
197                vec![(SYNC_NODE_TAG, type_s7_sync_node())]
198                    .into_iter()
199                    .collect(),
200                vec![
201                    primitive_s7_sync_hash(),
202                    primitive_s7_sync_null(),
203                    primitive_s7_sync_node(),
204                    primitive_s7_sync_stub(),
205                    primitive_s7_sync_is_node(),
206                    primitive_s7_sync_is_pair(),
207                    primitive_s7_sync_is_stub(),
208                    primitive_s7_sync_is_null(),
209                    primitive_s7_sync_digest(),
210                    primitive_s7_sync_cons(),
211                    primitive_s7_sync_car(),
212                    primitive_s7_sync_cdr(),
213                    primitive_s7_sync_cut(),
214                    primitive_s7_sync_create(),
215                    primitive_s7_sync_delete(),
216                    primitive_s7_sync_all(),
217                    primitive_s7_sync_call(),
218                    primitive_s7_sync_remote(),
219                    primitive_s7_sync_http(),
220                    primitive_s7_crypto_generate(),
221                    primitive_s7_crypto_sign(),
222                    primitive_s7_crypto_verify(),
223                ],
224            );
225
226            SESSIONS
227                .lock()
228                .expect("Failed to acquire sessions lock")
229                .insert(
230                    evaluator.sc as usize,
231                    Session::new(record, MemoryPersistor::new(), cache.clone()),
232                );
233
234            let _session_dropper = CallOnDrop(|| {
235                let mut session = SESSIONS
236                    .lock()
237                    .expect("Failed to acquire sessions lock for cleanup");
238                session.remove(&(evaluator.sc as usize));
239            });
240
241            let expr = format!(
242                "((eval {}) (sync-node {}) (quote {}))",
243                genesis_str, state_str, query
244            );
245
246            let result = evaluator.evaluate(expr.as_str());
247            runs += 1;
248
249            let persistor = {
250                let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
251                &session
252                    .get(&(evaluator.sc as usize))
253                    .expect("Session not found in SESSIONS map")
254                    .persistor
255                    .clone()
256            };
257
258            let (output, state_new) = match result.starts_with("(error '") {
259                true => (result, state_old),
260                false => match result.rfind('.') {
261                    Some(index) => match *&result[(index + 16)..(result.len() - 3)]
262                        .split(' ')
263                        .collect::<Vec<&str>>()
264                        .iter()
265                        .map(|x| x.parse::<u8>().expect("Failed to parse state byte"))
266                        .collect::<Vec<u8>>()
267                        .try_into()
268                    {
269                        Ok(state_new) => (String::from(&result[1..(index - 1)]), state_new),
270                        Err(_) => (
271                            String::from("(error 'sync-format \"Invalid return format\")"),
272                            state_old,
273                        ),
274                    },
275                    None => (
276                        String::from("(error 'sync-format \"Invalid return format\")"),
277                        state_old,
278                    ),
279                },
280            };
281
282            match state_old == state_new {
283                true => {
284                    debug!(
285                        "Completed ({:?}) {} -> {}",
286                        start.elapsed(),
287                        query.chars().take(128).collect::<String>(),
288                        output,
289                    );
290                    return output;
291                }
292                false => match state_old
293                    == PERSISTOR
294                        .root_get(record)
295                        .expect("Failed to get record state for comparison")
296                {
297                    true => {
298                        fn recurse(
299                            source: &MemoryPersistor,
300                            node: Word,
301                        ) -> Result<(), PersistorAccessError> {
302                            if node == NULL {
303                                Ok(())
304                            } else if let Ok(_) = PERSISTOR.leaf_get(node) {
305                                Ok(())
306                            } else if let Ok(_) = PERSISTOR.stump_get(node) {
307                                Ok(())
308                            } else if let Ok(_) = PERSISTOR.branch_get(node) {
309                                Ok(())
310                            } else if let Ok(content) = source.leaf_get(node) {
311                                PERSISTOR
312                                    .leaf_set(content)
313                                    .expect("Failed to set leaf content");
314                                Ok(())
315                            } else if let Ok(digest) = source.stump_get(node) {
316                                PERSISTOR
317                                    .stump_set(digest)
318                                    .expect("Failed to set stump");
319                                Ok(())
320                            } else if let Ok((left, right, digest)) = source.branch_get(node) {
321                                PERSISTOR
322                                    .branch_set(left, right, digest)
323                                    .expect("Failed to set branch");
324                                match recurse(&source, left) {
325                                    Ok(_) => recurse(&source, right),
326                                    err => err,
327                                }
328                            } else {
329                                Err(PersistorAccessError(format!("Dangling branch {:?}", node)))
330                            }
331                        }
332
333                        {
334                            let _lock2 = match _lock1 {
335                                Some(_) => None,
336                                None => {
337                                    Some(LOCK.lock().expect("Failed to acquire secondary lock"))
338                                }
339                            };
340
341                            match recurse(&persistor, state_new) {
342                                Ok(_) => match PERSISTOR.root_set(record, state_old, state_new) {
343                                    Ok(_) => {
344                                        debug!(
345                                            "Completed ({:?}) {} -> {}",
346                                            start.elapsed(),
347                                            query.chars().take(128).collect::<String>(),
348                                            output,
349                                        );
350                                        return output;
351                                    }
352                                    Err(_) => {
353                                        info!(
354                                            "Rerunning (x{}) due to concurrency collision: {}",
355                                            runs, query
356                                        );
357                                        continue;
358                                    }
359                                },
360                                Err(err) => {
361                                    panic!("{:?}", err);
362                                }
363                            }
364                        }
365                    }
366                    false => {
367                        info!(
368                            "Rerunning (x{}) due to concurrency collision: {}",
369                            runs, query
370                        );
371                        continue;
372                    }
373                },
374            }
375        }
376    }
377}
378
379unsafe fn sync_error(sc: *mut s7::s7_scheme, string: &str) -> s7::s7_pointer {
380    let c_string = CString::new(string).expect("Failed to create CString from string");
381
382    s7::s7_error(
383        sc,
384        s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
385        s7::s7_list(sc, 1, s7::s7_make_string(sc, c_string.as_ptr())),
386    )
387}
388
389fn type_s7_sync_node() -> Type {
390    unsafe extern "C" fn free(_sc: *mut s7::s7_scheme, obj: s7::s7_pointer) -> s7::s7_pointer {
391        sync_heap_free(s7::s7_c_object_value(obj));
392        std::ptr::null_mut()
393    }
394
395    unsafe extern "C" fn mark(_sc: *mut s7::s7_scheme, _obj: s7::s7_pointer) -> s7::s7_pointer {
396        std::ptr::null_mut()
397    }
398
399    unsafe extern "C" fn is_equal(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
400        match sync_is_node(s7::s7_cadr(args)) {
401            true => {
402                let word1 = sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)));
403                let word2 = sync_heap_read(s7::s7_c_object_value(s7::s7_cadr(args)));
404                s7::s7_make_boolean(sc, word1 == word2)
405            }
406            false => s7::s7_wrong_type_arg_error(
407                sc,
408                c"equal?".as_ptr(),
409                2,
410                s7::s7_cadr(args),
411                c"a sync-node".as_ptr(),
412            ),
413        }
414    }
415
416    unsafe extern "C" fn to_string(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
417        string_to_s7(
418            sc,
419            format!(
420                "(sync-node #u({}))",
421                sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)))
422                    .iter()
423                    .map(|&byte| byte.to_string())
424                    .collect::<Vec<String>>()
425                    .join(" "),
426            )
427            .as_str(),
428        )
429    }
430
431    Type::new(c"sync-node", free, mark, is_equal, to_string)
432}
433
434fn primitive_s7_sync_stub() -> Primitive {
435    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
436        let bv = s7::s7_car(args);
437
438        if !s7::s7_is_byte_vector(bv) || s7::s7_vector_length(bv) as usize != SIZE {
439            return s7::s7_wrong_type_arg_error(
440                sc,
441                c"sync-cut".as_ptr(),
442                1,
443                s7::s7_car(args),
444                c"a hash-sized byte-vector".as_ptr(),
445            );
446        }
447
448        let mut digest = [0 as u8; SIZE];
449        for i in 0..SIZE {
450            digest[i] = s7::s7_byte_vector_ref(bv, i as i64);
451        }
452
453        let persistor = {
454            let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
455            &session
456                .get(&(sc as usize))
457                .expect("Session not found for given context")
458                .persistor
459                .clone()
460        };
461
462        match persistor.stump_set(digest) {
463            Ok(stump) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(stump)),
464            Err(_) => sync_error(sc, "Journal is unable to create stub node (sync-stub)"),
465        }
466    }
467
468    Primitive::new(
469        code,
470        c"sync-stub",
471        c"(sync-stub digest) create a sync stub from the provided byte-vector",
472        1,
473        0,
474        false,
475    )
476}
477
478fn primitive_s7_sync_hash() -> Primitive {
479    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
480        let data_bv = s7::s7_car(args);
481
482        // check the input arguments
483        if !s7::s7_is_byte_vector(data_bv) {
484            return s7::s7_wrong_type_arg_error(
485                sc,
486                c"sync-hash".as_ptr(),
487                1,
488                data_bv,
489                c"a byte-vector".as_ptr(),
490            );
491        }
492
493        // convert to rust data types
494        let mut data = vec![];
495        for i in 0..s7::s7_vector_length(data_bv) {
496            data.push(s7::s7_byte_vector_ref(data_bv, i as i64))
497        }
498
499        let digest = Sha256::digest(data).to_vec();
500        let digest_bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
501        for i in 0..SIZE {
502            s7::s7_byte_vector_set(digest_bv, i as i64, digest[i]);
503        }
504        digest_bv
505    }
506
507    Primitive::new(
508        code,
509        c"sync-hash",
510        c"(sync-hash bv) compute the SHA-256 digest of a byte vector",
511        1,
512        0,
513        false,
514    )
515}
516
517fn primitive_s7_sync_node() -> Primitive {
518    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
519        let digest = s7::s7_car(args);
520
521        if !s7::s7_is_byte_vector(digest) || s7::s7_vector_length(digest) as usize != SIZE {
522            return s7::s7_wrong_type_arg_error(
523                sc,
524                c"sync-node".as_ptr(),
525                1,
526                digest,
527                c"a hash-sized byte-vector".as_ptr(),
528            );
529        }
530
531        let mut word = [0 as u8; SIZE];
532        for i in 0..SIZE {
533            word[i] = s7::s7_byte_vector_ref(digest, i as i64);
534        }
535
536        let persistor = {
537            let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
538            &session
539                .get(&(sc as usize))
540                .expect("Session not found for sync-node")
541                .persistor
542                .clone()
543        };
544
545        if word == NULL
546            || persistor.branch_get(word).is_ok()
547            || PERSISTOR.branch_get(word).is_ok()
548            || persistor.stump_get(word).is_ok()
549            || PERSISTOR.stump_get(word).is_ok()
550        {
551            s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(word))
552        } else {
553            sync_error(sc, "Node does not exist in this journal (sync-node)")
554        }
555    }
556
557    Primitive::new(
558        code,
559        c"sync-node",
560        c"(sync-node digest) returns the sync pair defined by the digest",
561        1,
562        0,
563        false,
564    )
565}
566
567fn primitive_s7_sync_is_node() -> Primitive {
568    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
569        s7::s7_make_boolean(sc, sync_is_node(s7::s7_car(args)))
570    }
571
572    Primitive::new(
573        code,
574        c"sync-node?",
575        c"(sync-node?) returns whether the object is a sync node",
576        1,
577        0,
578        false,
579    )
580}
581
582fn primitive_s7_sync_null() -> Primitive {
583    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, _args: s7::s7_pointer) -> s7::s7_pointer {
584        s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(NULL))
585    }
586
587    Primitive::new(
588        code,
589        c"sync-null",
590        c"(sync-null) returns the null synchronic node",
591        0,
592        0,
593        false,
594    )
595}
596
597fn primitive_s7_sync_is_null() -> Primitive {
598    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
599        let arg = s7::s7_car(args);
600        match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
601            false => s7::s7_wrong_type_arg_error(
602                sc,
603                c"sync-null?".as_ptr(),
604                1,
605                arg,
606                c"a sync-node or byte-vector".as_ptr(),
607            ),
608            true => {
609                let word = sync_heap_read(s7::s7_c_object_value(arg));
610                for i in 0..SIZE {
611                    if word[i] != 0 {
612                        return s7::s7_make_boolean(sc, false);
613                    }
614                }
615                s7::s7_make_boolean(sc, true)
616            }
617        }
618    }
619
620    Primitive::new(
621        code,
622        c"sync-null?",
623        c"(sync-null? sp) returns a boolean indicating whether sp is equal to sync-null",
624        1,
625        0,
626        false,
627    )
628}
629
630fn primitive_s7_sync_is_pair() -> Primitive {
631    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
632        let arg = s7::s7_car(args);
633        match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
634            false => s7::s7_wrong_type_arg_error(
635                sc,
636                c"sync-pair?".as_ptr(),
637                1,
638                arg,
639                c"a sync-node or byte-vector".as_ptr(),
640            ),
641            true => {
642                let word = sync_heap_read(s7::s7_c_object_value(arg));
643                let persistor = {
644                    let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
645                    &session
646                        .get(&(sc as usize))
647                        .expect("Session not found for sync-pair?")
648                        .persistor
649                        .clone()
650                };
651                s7::s7_make_boolean(
652                    sc,
653                    persistor.branch_get(word).is_ok() || PERSISTOR.branch_get(word).is_ok(),
654                )
655            }
656        }
657    }
658
659    Primitive::new(
660        code,
661        c"sync-pair?",
662        c"(sync-pair? sp) returns a boolean indicating whether sp is a pair",
663        1,
664        0,
665        false,
666    )
667}
668
669fn primitive_s7_sync_is_stub() -> Primitive {
670    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
671        let arg = s7::s7_car(args);
672        match sync_is_node(arg) || s7::s7_is_byte_vector(arg) {
673            false => s7::s7_wrong_type_arg_error(
674                sc,
675                c"sync-stub?".as_ptr(),
676                1,
677                arg,
678                c"a sync-node or byte-vector".as_ptr(),
679            ),
680            true => {
681                let word = sync_heap_read(s7::s7_c_object_value(arg));
682                let persistor = {
683                    let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
684                    &session
685                        .get(&(sc as usize))
686                        .expect("Session not found for sync-stub?")
687                        .persistor
688                        .clone()
689                };
690                s7::s7_make_boolean(
691                    sc,
692                    persistor.stump_get(word).is_ok() || PERSISTOR.stump_get(word).is_ok(),
693                )
694            }
695        }
696    }
697
698    Primitive::new(
699        code,
700        c"sync-stub?",
701        c"(sync-stub? sp) returns a boolean indicating whether sp is a stub",
702        1,
703        0,
704        false,
705    )
706}
707
708fn primitive_s7_sync_digest() -> Primitive {
709    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
710        match sync_is_node(s7::s7_car(args)) {
711            false => s7::s7_wrong_type_arg_error(
712                sc,
713                c"sync-digest".as_ptr(),
714                1,
715                s7::s7_car(args),
716                c"a sync-node".as_ptr(),
717            ),
718            true => {
719                let word = sync_heap_read(s7::s7_c_object_value(s7::s7_car(args)));
720                let digest = sync_digest(sc, word).expect("Failed to obtain digest");
721                let bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
722                for i in 0..SIZE {
723                    s7::s7_byte_vector_set(bv, i as i64, digest[i]);
724                }
725                bv
726            }
727        }
728    }
729
730    Primitive::new(
731        code,
732        c"sync-digest",
733        c"(sync-digest node) returns the the digest of a sync-node as a byte-vector",
734        1,
735        0,
736        false,
737    )
738}
739
740fn primitive_s7_sync_cons() -> Primitive {
741    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
742        let persistor = {
743            let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
744            &session
745                .get(&(sc as usize))
746                .expect("Session not found for sync-cons")
747                .persistor
748                .clone()
749        };
750
751        let handle_arg = |obj, number| {
752            if sync_is_node(obj) {
753                Ok(sync_heap_read(s7::s7_c_object_value(obj)))
754            } else if s7::s7_is_byte_vector(obj) {
755                let mut content = vec![];
756                for i in 0..s7::s7_vector_length(obj) {
757                    content.push(s7::s7_byte_vector_ref(obj, i as i64))
758                }
759                match persistor.leaf_set(content) {
760                    Ok(atom) => Ok(atom),
761                    Err(_) => Err(sync_error(
762                        sc,
763                        "Journal is unable to add leaf node (sync-cons)",
764                    )),
765                }
766            } else {
767                Err(s7::s7_wrong_type_arg_error(
768                    sc,
769                    c"sync-cons".as_ptr(),
770                    number,
771                    obj,
772                    c"a byte vector or a sync node".as_ptr(),
773                ))
774            }
775        };
776
777        match (
778            handle_arg(s7::s7_car(args), 1),
779            handle_arg(s7::s7_cadr(args), 2),
780        ) {
781            (Ok(left), Ok(right)) => match (sync_digest(sc, left), sync_digest(sc, right)) {
782                (Ok(digest_left), Ok(digest_right)) => {
783                    let mut joined = [0 as u8; SIZE * 2];
784                    joined[..SIZE].copy_from_slice(&digest_left);
785                    joined[SIZE..].copy_from_slice(&digest_right);
786                    let digest = Word::from(Sha256::digest(joined));
787
788                    match persistor.branch_set(left, right, digest) {
789                        Ok(pair) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(pair)),
790                        Err(_) => sync_error(sc, "Journal is unable to add pair node (sync-cons)"),
791                    }
792                }
793                _ => sync_error(sc, "Journal is unable to obtain node digests (sync-cons)"),
794            },
795            (Err(left), _) => left,
796            (_, Err(right)) => right,
797        }
798    }
799
800    Primitive::new(
801        code,
802        c"sync-cons",
803        c"(sync-cons first rest) construct a new sync pair node",
804        2,
805        0,
806        false,
807    )
808}
809
810fn primitive_s7_sync_car() -> Primitive {
811    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
812        if !sync_is_node(s7::s7_car(args)) {
813            return s7::s7_wrong_type_arg_error(
814                sc,
815                c"sync-car".as_ptr(),
816                1,
817                s7::s7_car(args),
818                c"a sync-pair".as_ptr(),
819            );
820        }
821        sync_cxr(sc, args, c"sync-car", |children| children.0)
822    }
823
824    Primitive::new(
825        code,
826        c"sync-car",
827        c"(sync-car pair) retrieve the first element of a sync pair",
828        1,
829        0,
830        false,
831    )
832}
833
834fn primitive_s7_sync_cdr() -> Primitive {
835    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
836        if !sync_is_node(s7::s7_car(args)) {
837            return s7::s7_wrong_type_arg_error(
838                sc,
839                c"sync-cdr".as_ptr(),
840                1,
841                s7::s7_car(args),
842                c"a sync-pair".as_ptr(),
843            );
844        }
845        sync_cxr(sc, args, c"sync-cdr", |children| children.1)
846    }
847
848    Primitive::new(
849        code,
850        c"sync-cdr",
851        c"(sync-cdr pair) retrieve the second element of a sync pair",
852        1,
853        0,
854        false,
855    )
856}
857
858fn primitive_s7_sync_cut() -> Primitive {
859    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
860        let arg = s7::s7_car(args);
861
862        let handle_digest = |digest| {
863            let persistor = {
864                let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
865                &session
866                    .get(&(sc as usize))
867                    .expect("Session not found for given context")
868                    .persistor
869                    .clone()
870            };
871            match persistor.stump_set(digest) {
872                Ok(stump) => s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(stump)),
873                Err(_) => sync_error(sc, "Journal is unable to add stub node (sync-cut)"),
874            }
875        };
876
877        if s7::s7_is_byte_vector(arg) {
878            let mut content = vec![];
879            for i in 0..s7::s7_vector_length(arg) {
880                content.push(s7::s7_byte_vector_ref(arg, i as i64))
881            }
882            handle_digest(Word::from(Sha256::digest(Sha256::digest(&content))))
883        } else if sync_is_node(arg) {
884            match sync_digest(sc, sync_heap_read(s7::s7_c_object_value(arg))) {
885                Ok(digest) => handle_digest(digest),
886                Err(_) => sync_error(sc, "Journal does not recognize input node (sync-cut)"),
887            }
888        } else {
889            s7::s7_wrong_type_arg_error(
890                sc,
891                c"sync-cut".as_ptr(),
892                1,
893                s7::s7_car(args),
894                c"a sync-node or byte-vector".as_ptr(),
895            )
896        }
897    }
898
899    Primitive::new(
900        code,
901        c"sync-cut",
902        c"(sync-cut value) obtain the stub of a sync pair or byte vector",
903        1,
904        0,
905        false,
906    )
907}
908
909fn primitive_s7_sync_create() -> Primitive {
910    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
911        let id = s7::s7_car(args);
912
913        if !s7::s7_is_byte_vector(id) || s7::s7_vector_length(id) as usize != SIZE {
914            return s7::s7_wrong_type_arg_error(
915                sc,
916                c"sync-create".as_ptr(),
917                1,
918                id,
919                c"a hash-sized byte-vector".as_ptr(),
920            );
921        }
922
923        let mut record: Word = [0 as u8; SIZE];
924
925        for i in 0..SIZE {
926            record[i as usize] = s7::s7_byte_vector_ref(id, i as i64)
927        }
928
929        debug!("Adding record: {}", hex::encode(record));
930
931        match PERSISTOR.root_new(
932            record,
933            PERSISTOR
934                .branch_set(
935                    PERSISTOR
936                        .leaf_set(GENESIS_STR.as_bytes().to_vec())
937                        .expect("Failed to create genesis leaf for new record"),
938                    NULL,
939                    NULL,
940                )
941                .expect("Failed to create genesis branch for new record"),
942        ) {
943            Ok(_) => s7::s7_make_boolean(sc, true),
944            Err(_) => s7::s7_error(
945                sc,
946                s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
947                s7::s7_list(
948                    sc,
949                    1,
950                    s7::s7_make_string(sc, c"record ID is already in use".as_ptr()),
951                ),
952            ),
953        }
954    }
955
956    Primitive::new(
957        code,
958        c"sync-create",
959        c"(sync-create id) create a new synchronic record with the given 32-byte ID",
960        1,
961        0,
962        false,
963    )
964}
965
966fn primitive_s7_sync_delete() -> Primitive {
967    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
968        let id = s7::s7_car(args);
969
970        if !s7::s7_is_byte_vector(id) || s7::s7_vector_length(id) as usize != SIZE {
971            return s7::s7_wrong_type_arg_error(
972                sc,
973                c"sync-delete".as_ptr(),
974                1,
975                id,
976                c"a hash-sized byte-vector".as_ptr(),
977            );
978        }
979
980        let mut record: Word = [0 as u8; SIZE];
981
982        for i in 0..s7::s7_vector_length(id) {
983            record[i as usize] = s7::s7_byte_vector_ref(id, i as i64)
984        }
985
986        if record == NULL {
987            return s7::s7_error(
988                sc,
989                s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
990                s7::s7_list(
991                    sc,
992                    1,
993                    s7::s7_make_string(sc, c"cannot delete the root record".as_ptr()),
994                ),
995            );
996        }
997
998        debug!("Deleting record: {}", hex::encode(record));
999
1000        match PERSISTOR.root_delete(record) {
1001            Ok(_) => s7::s7_make_boolean(sc, true),
1002            Err(_) => s7::s7_error(
1003                sc,
1004                s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1005                s7::s7_list(
1006                    sc,
1007                    1,
1008                    s7::s7_make_string(sc, c"record ID does not exist".as_ptr()),
1009                ),
1010            ),
1011        }
1012    }
1013
1014    Primitive::new(
1015        code,
1016        c"sync-delete",
1017        c"(sync-delete id) delete the synchronic record with the given 32-byte ID",
1018        1,
1019        0,
1020        false,
1021    )
1022}
1023
1024fn primitive_s7_sync_all() -> Primitive {
1025    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, _args: s7::s7_pointer) -> s7::s7_pointer {
1026        let mut list = s7::s7_list(sc, 0);
1027
1028        for record in PERSISTOR.root_list().into_iter().rev() {
1029            let bv = s7::s7_make_byte_vector(sc, SIZE as i64, 1, std::ptr::null_mut());
1030            for i in 0..SIZE {
1031                s7::s7_byte_vector_set(bv, i as i64, record[i]);
1032            }
1033
1034            list = s7::s7_cons(sc, bv, list)
1035        }
1036
1037        list
1038    }
1039
1040    Primitive::new(
1041        code,
1042        c"sync-all",
1043        c"(sync-all) list all synchronic record IDs in ascending order",
1044        0,
1045        0,
1046        false,
1047    )
1048}
1049
1050fn primitive_s7_sync_call() -> Primitive {
1051    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1052        let message_expr = s7::s7_car(args);
1053        let blocking = s7::s7_cadr(args);
1054
1055        if !s7::s7_is_boolean(blocking) {
1056            return s7::s7_wrong_type_arg_error(
1057                sc,
1058                c"sync-call".as_ptr(),
1059                2,
1060                blocking,
1061                c"a boolean".as_ptr(),
1062            );
1063        }
1064
1065        let record = match s7::s7_is_null(sc, s7::s7_cddr(args)) {
1066            true => {
1067                let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
1068                session
1069                    .get(&(sc as usize))
1070                    .expect("Session number not found in sessions map")
1071                    .record
1072            }
1073            false => {
1074                let bv = s7::s7_caddr(args);
1075                // check the input arguments
1076                if !s7::s7_is_byte_vector(bv) || s7::s7_vector_length(bv) as usize != SIZE {
1077                    return s7::s7_wrong_type_arg_error(
1078                        sc,
1079                        c"sync-call".as_ptr(),
1080                        3,
1081                        bv,
1082                        c"a hash-sized byte-vector".as_ptr(),
1083                    );
1084                }
1085
1086                let mut record = [0 as u8; SIZE];
1087                for i in 0..SIZE {
1088                    record[i] = s7::s7_byte_vector_ref(bv, i as i64);
1089                }
1090                record
1091            }
1092        };
1093
1094        match PERSISTOR.root_get(record) {
1095            Ok(_) => {
1096                let message = obj2str(sc, message_expr);
1097                if s7::s7_boolean(sc, blocking) {
1098                    let result = JOURNAL.evaluate_record(record, message.as_str());
1099                    let c_result = CString::new(format!("(quote {})", result))
1100                        .expect("Failed to create C string from journal evaluation result");
1101                    s7::s7_eval_c_string(sc, c_result.as_ptr())
1102                } else {
1103                    tokio::spawn(async move {
1104                        JOURNAL.evaluate_record(record, message.as_str());
1105                    });
1106                    s7::s7_make_boolean(sc, true)
1107                }
1108            }
1109            Err(_) => s7::s7_error(
1110                sc,
1111                s7::s7_make_symbol(sc, c"sync-web-error".as_ptr()),
1112                s7::s7_list(
1113                    sc,
1114                    1,
1115                    s7::s7_make_string(sc, c"record ID does not exist".as_ptr()),
1116                ),
1117            ),
1118        }
1119    }
1120
1121    Primitive::new(
1122        code,
1123        c"sync-call",
1124        c"(sync-call query blocking? id) query the provided record ID or self if ID not provided",
1125        2,
1126        1,
1127        false,
1128    )
1129}
1130
1131fn primitive_s7_sync_http() -> Primitive {
1132    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1133        let vec2s7 = |vector: Vec<u8>| {
1134            let bv = s7::s7_make_byte_vector(sc, vector.len() as i64, 1, std::ptr::null_mut());
1135            for i in 0..vector.len() {
1136                s7::s7_byte_vector_set(bv, i as i64, vector[i]);
1137            }
1138            bv
1139        };
1140
1141        let method = obj2str(sc, s7::s7_car(args));
1142        let url = obj2str(sc, s7::s7_cadr(args));
1143
1144        let body = if s7::s7_list_length(sc, args) >= 3 {
1145            obj2str(sc, s7::s7_caddr(args))
1146        } else {
1147            String::from("")
1148        };
1149
1150        let cache_mutex = {
1151            let session = SESSIONS.lock().expect("Failed to acquire sessions lock");
1152            &session
1153                .get(&(sc as usize))
1154                .expect("Session ID not found in active sessions")
1155                .cache
1156                .clone()
1157        };
1158
1159        let mut cache = cache_mutex
1160            .lock()
1161            .expect("Failed to acquire cache mutex lock");
1162
1163        let key = (method.clone(), url.clone(), body.as_bytes().to_vec());
1164
1165        match cache.get(&key) {
1166            Some(bytes) => {
1167                debug!("Cache hit on key {:?}", key);
1168                vec2s7(bytes.to_vec())
1169            }
1170            None => {
1171                let result = tokio::task::block_in_place(move || {
1172                    tokio::runtime::Handle::current().block_on(async move {
1173                        match method.to_lowercase() {
1174                            method if method == "get" => {
1175                                JOURNAL
1176                                    .client
1177                                    .get(&url[1..url.len() - 1])
1178                                    .send()
1179                                    .await?
1180                                    .bytes()
1181                                    .await
1182                            }
1183                            method if method == "post" => {
1184                                JOURNAL
1185                                    .client
1186                                    .post(&url[1..url.len() - 1])
1187                                    .body(String::from(&body[1..body.len() - 1]))
1188                                    .send()
1189                                    .await?
1190                                    .bytes()
1191                                    .await
1192                            }
1193                            _ => {
1194                                panic!("Unsupported HTTP method")
1195                            }
1196                        }
1197                    })
1198                });
1199
1200                match result {
1201                    Ok(vector) => {
1202                        cache.insert(key, vector.to_vec());
1203                        vec2s7(vector.to_vec())
1204                    }
1205                    Err(_) => {
1206                        sync_error(sc, "Journal is unable to fulfill HTTP request (sync-http)")
1207                    }
1208                }
1209            }
1210        }
1211    }
1212
1213    Primitive::new(
1214        code,
1215        c"sync-http",
1216        c"(sync-http method url . data) make an http request where method is 'get or 'post",
1217        2,
1218        2,
1219        false,
1220    )
1221}
1222
1223fn primitive_s7_sync_remote() -> Primitive {
1224    unsafe extern "C" fn code(sc: *mut s7::s7_scheme, args: s7::s7_pointer) -> s7::s7_pointer {
1225        let vec2s7 = |mut vector: Vec<u8>| {
1226            vector.insert(0, 39); // add quote character so that it evaluates correctly
1227            vector.push(0);
1228            let c_string =
1229                CString::from_vec_with_nul(vector).expect("Failed to create C string from vector");
1230            s7::s7_eval_c_string(sc, c_string.as_ptr())
1231        };
1232
1233        let url = obj2str(sc, s7::s7_car(args));
1234
1235        let body = obj2str(sc, s7::s7_cadr(args));
1236
1237        let cache_mutex = {
1238            let session = SESSIONS.lock().expect("Failed to acquire session lock");
1239            &session
1240                .get(&(sc as usize))
1241                .expect("Failed to get session from map")
1242                .cache
1243                .clone()
1244        };
1245
1246        let mut cache = cache_mutex.lock().expect("Failed to acquire cache lock");
1247
1248        let key = (String::from("post"), url.clone(), body.as_bytes().to_vec());
1249
1250        match cache.get(&key) {
1251            Some(bytes) => {
1252                debug!("Cache hit on key {:?}", key);
1253                vec2s7(bytes.to_vec())
1254            }
1255            None => {
1256                let result = tokio::task::block_in_place(move || {
1257                    tokio::runtime::Handle::current().block_on(async move {
1258                        JOURNAL
1259                            .client
1260                            .post(&url[1..url.len() - 1])
1261                            .body(body)
1262                            .send()
1263                            .await?
1264                            .bytes()
1265                            .await
1266                    })
1267                });
1268
1269                match result {
1270                    Ok(bytes) => {
1271                        cache.insert(key, bytes.to_vec());
1272                        vec2s7(bytes.to_vec())
1273                    }
1274                    Err(_) => {
1275                        sync_error(sc, "Journal is unable to query remote peer (sync-remote)")
1276                    }
1277                }
1278            }
1279        }
1280    }
1281
1282    Primitive::new(
1283        code,
1284        c"sync-remote",
1285        c"(sync-remote url data) make a post http request with the data payload)",
1286        2,
1287        0,
1288        false,
1289    )
1290}
1291unsafe fn string_to_s7(sc: *mut s7::s7_scheme, string: &str) -> s7::s7_pointer {
1292    let c_string = CString::new(string).expect("Failed to create CString from string");
1293    let s7_string = s7::s7_make_string(sc, c_string.as_ptr());
1294    s7::s7_object_to_string(sc, s7_string, false)
1295}
1296
1297unsafe fn sync_heap_make(word: Word) -> *mut libc::c_void {
1298    let ptr = libc::malloc(SIZE);
1299    let array: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, SIZE);
1300    for i in 0..SIZE {
1301        array[i] = word[i] as u8;
1302    }
1303    ptr
1304}
1305
1306unsafe fn sync_heap_read(ptr: *mut libc::c_void) -> Word {
1307    std::slice::from_raw_parts_mut(ptr as *mut u8, SIZE)
1308        .try_into()
1309        .expect("Failed to convert slice to Word array")
1310}
1311
1312unsafe fn sync_heap_free(ptr: *mut libc::c_void) {
1313    libc::free(ptr);
1314}
1315
1316unsafe fn sync_is_node(obj: s7::s7_pointer) -> bool {
1317    s7::s7_is_c_object(obj) && s7::s7_c_object_type(obj) == SYNC_NODE_TAG
1318}
1319
1320unsafe fn sync_cxr(
1321    sc: *mut s7::s7_scheme,
1322    args: s7::s7_pointer,
1323    name: &CStr,
1324    selector: fn((Word, Word)) -> Word,
1325) -> s7::s7_pointer {
1326    let node = s7::s7_car(args);
1327    let word = sync_heap_read(s7::s7_c_object_value(node));
1328
1329    let persistor = {
1330        let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
1331        &session
1332            .get(&(sc as usize))
1333            .expect("Session not found for given context")
1334            .persistor
1335            .clone()
1336    };
1337
1338    let child_return = |word| {
1339        let node_return = |word| s7::s7_make_c_object(sc, SYNC_NODE_TAG, sync_heap_make(word));
1340
1341        let vector_return = |vector: Vec<u8>| {
1342            let bv = s7::s7_make_byte_vector(sc, vector.len() as i64, 1, std::ptr::null_mut());
1343            for i in 0..vector.len() {
1344                s7::s7_byte_vector_set(bv, i as i64, vector[i]);
1345            }
1346            bv
1347        };
1348
1349        if word == NULL {
1350            node_return(word)
1351        } else if let Ok(_) = persistor.branch_get(word) {
1352            node_return(word)
1353        } else if let Ok(_) = PERSISTOR.branch_get(word) {
1354            node_return(word)
1355        } else if let Ok(content) = persistor.leaf_get(word) {
1356            vector_return(content)
1357        } else if let Ok(content) = PERSISTOR.leaf_get(word) {
1358            vector_return(content)
1359        } else if let Ok(_) = persistor.stump_get(word) {
1360            node_return(word)
1361        } else if let Ok(_) = PERSISTOR.stump_get(word) {
1362            node_return(word)
1363        } else {
1364            sync_error(
1365                sc,
1366                format!(
1367                    "Cannot retrieve items for node that is not a sync-pair ({})",
1368                    name.to_string_lossy()
1369                )
1370                .as_str(),
1371            )
1372        }
1373    };
1374
1375    match sync_is_node(node) {
1376        true => match persistor.branch_get(word) {
1377            Ok((left, right, _)) => child_return(selector((left, right))),
1378            Err(_) => match PERSISTOR.branch_get(word) {
1379                Ok((left, right, _)) => child_return(selector((left, right))),
1380                Err(_) => sync_error(
1381                    sc,
1382                    format!(
1383                        "Journal cannot retrieve leaf byte-vector ({})",
1384                        name.to_string_lossy()
1385                    )
1386                    .as_str(),
1387                ),
1388            },
1389        },
1390        false => s7::s7_wrong_type_arg_error(sc, name.as_ptr(), 1, node, c"a sync-node".as_ptr()),
1391    }
1392}
1393
1394unsafe fn sync_digest(sc: *mut s7::s7_scheme, word: Word) -> Result<Word, String> {
1395    let persistor = {
1396        let session = SESSIONS.lock().expect("Failed to acquire SESSIONS lock");
1397        &session
1398            .get(&(sc as usize))
1399            .expect("Session not found for given context")
1400            .persistor
1401            .clone()
1402    };
1403
1404    if word == NULL {
1405        Ok(NULL)
1406    } else if let Ok((_, _, digest)) = persistor.branch_get(word) {
1407        Ok(digest)
1408    } else if let Ok((_, _, digest)) = PERSISTOR.branch_get(word) {
1409        Ok(digest)
1410    } else if let Ok(_) = persistor.leaf_get(word) {
1411        Ok(word)
1412    } else if let Ok(_) = PERSISTOR.leaf_get(word) {
1413        Ok(word)
1414    } else if let Ok(digest) = persistor.stump_get(word) {
1415        Ok(digest)
1416    } else if let Ok(digest) = PERSISTOR.stump_get(word) {
1417        Ok(digest)
1418    } else {
1419        Err("Digest not found in persistor".to_string())
1420    }
1421}