journal_sdk/
persistor.rs

1use crate::config::Config;
2use once_cell::sync::Lazy;
3use rand::RngCore;
4use rand::prelude::SliceRandom;
5use sha2::{Digest, Sha256};
6use std::collections::{HashMap, HashSet};
7use std::sync::{Arc, RwLock};
8
9use rocksdb::{ColumnFamilyDescriptor, DB, Options, WriteBatch};
10
11pub static PERSISTOR: Lazy<Box<dyn Persistor + Send + Sync>> = Lazy::new(|| {
12    let config = Config::new();
13    match config.database.as_str() {
14        "" => Box::new(MemoryPersistor::new()),
15        path => Box::new(DatabasePersistor::new(path)),
16    }
17});
18
19/// Size, in bytes, of the global hash algorithm (currently SHA-256)
20pub const SIZE: usize = 32;
21
22/// Byte array describing a hash pointer (currently SHA-256)
23pub type Word = [u8; SIZE];
24
25#[allow(dead_code)]
26#[derive(Debug)]
27pub struct PersistorAccessError(pub String);
28
29pub trait Persistor {
30    fn root_list(&self) -> Vec<Word>;
31    fn root_new(&self, handle: Word, root: Word) -> Result<Word, PersistorAccessError>;
32    fn root_temp(&self, root: Word) -> Result<Word, PersistorAccessError>;
33    fn root_get(&self, handle: Word) -> Result<Word, PersistorAccessError>;
34    fn root_set(
35        &self,
36        handle: Word,
37        old: Word,
38        new: Word,
39        source: &dyn Persistor,
40    ) -> Result<Word, PersistorAccessError>;
41    fn root_delete(&self, handle: Word) -> Result<(), PersistorAccessError>;
42    fn branch_set(
43        &self,
44        left: Word,
45        right: Word,
46        digest: Word,
47    ) -> Result<Word, PersistorAccessError>;
48    fn branch_get(&self, branch: Word) -> Result<(Word, Word, Word), PersistorAccessError>;
49    fn leaf_set(&self, content: Vec<u8>) -> Result<Word, PersistorAccessError>;
50    fn leaf_get(&self, leaf: Word) -> Result<Vec<u8>, PersistorAccessError>;
51    fn stump_set(&self, digest: Word) -> Result<Word, PersistorAccessError>;
52    fn stump_get(&self, stump: Word) -> Result<Word, PersistorAccessError>;
53}
54
55#[derive(Clone)]
56pub struct MemoryPersistor {
57    roots: Arc<RwLock<HashMap<Word, (Word, bool)>>>,
58    branches: Arc<RwLock<HashMap<Word, (Word, Word, Word)>>>,
59    leaves: Arc<RwLock<HashMap<Word, Vec<u8>>>>,
60    stumps: Arc<RwLock<HashMap<Word, Word>>>,
61    references: Arc<RwLock<HashMap<Word, usize>>>,
62}
63
64impl MemoryPersistor {
65    pub fn new() -> Self {
66        Self {
67            roots: Arc::new(RwLock::new(HashMap::new())),
68            branches: Arc::new(RwLock::new(HashMap::new())),
69            leaves: Arc::new(RwLock::new(HashMap::new())),
70            stumps: Arc::new(RwLock::new(HashMap::new())),
71            references: Arc::new(RwLock::new(HashMap::new())),
72        }
73    }
74
75    fn reference_increment(&self, node: Word) {
76        let mut references = self.references.write().expect("Failed to lock references");
77        match references.get(&node) {
78            Some(count) => {
79                let count_ = *count;
80                references.insert(node, count_ + 1);
81            }
82            None => {
83                references.insert(node, 1);
84            }
85        };
86    }
87
88    fn reference_decrement(&self, node: Word) {
89        let mut references = self.references.write().expect("Failed to lock references");
90        match references.get(&node) {
91            Some(count_old) => {
92                let count_new = *count_old - 1;
93                if count_new > 0 {
94                    references.insert(node, count_new);
95                } else {
96                    references.remove(&node);
97                    let mut branches = self.branches.write().expect("Failed to lock branches");
98                    if let Some((left, right, _)) = branches.get(&node) {
99                        let left_ = *left;
100                        let right_ = *right;
101                        branches.remove(&node);
102                        drop(references);
103                        drop(branches);
104                        self.reference_decrement(left_);
105                        self.reference_decrement(right_);
106                    } else {
107                        let mut leaves = self.leaves.write().expect("Failed to lock leaves");
108                        let mut stumps = self.stumps.write().expect("Failed to lock stumps");
109                        if let Some(_) = leaves.get(&node) {
110                            leaves.remove(&node);
111                        } else if let Some(_) = stumps.get(&node) {
112                            stumps.remove(&node);
113                        }
114                    }
115                }
116            }
117            None => {}
118        };
119    }
120
121    fn merged_branch(
122        &self,
123        node: Word,
124        plan: &MergePlan,
125    ) -> Result<Option<(Word, Word, Word)>, PersistorAccessError> {
126        if let Some(branch) = plan.branches.get(&node) {
127            return Ok(Some(*branch));
128        }
129
130        match self
131            .branches
132            .read()
133            .expect("Failed to lock branches")
134            .get(&node)
135        {
136            Some((left, right, digest)) => Ok(Some((*left, *right, *digest))),
137            None => Ok(None),
138        }
139    }
140
141    fn base_refcount(&self, node: Word) -> isize {
142        self.references
143            .read()
144            .expect("Failed to lock references")
145            .get(&node)
146            .copied()
147            .unwrap_or(0) as isize
148    }
149
150    fn merge_collect(
151        &self,
152        source: &dyn Persistor,
153        node: Word,
154        plan: &mut MergePlan,
155        seen: &mut HashSet<Word>,
156    ) -> Result<(), PersistorAccessError> {
157        if node == [0 as u8; SIZE] || !seen.insert(node) {
158            return Ok(());
159        }
160
161        if plan.branches.contains_key(&node)
162            || plan.leaves.contains_key(&node)
163            || plan.stumps.contains_key(&node)
164        {
165            return Ok(());
166        }
167
168        if self
169            .leaves
170            .read()
171            .expect("Failed to lock leaves")
172            .contains_key(&node)
173            || self
174                .stumps
175                .read()
176                .expect("Failed to lock stumps")
177                .contains_key(&node)
178            || self
179                .branches
180                .read()
181                .expect("Failed to lock branches")
182                .contains_key(&node)
183        {
184            return Ok(());
185        }
186
187        if let Ok(content) = source.leaf_get(node) {
188            plan.leaves.insert(node, content);
189            return Ok(());
190        }
191
192        if let Ok(digest) = source.stump_get(node) {
193            plan.stumps.insert(node, digest);
194            return Ok(());
195        }
196
197        if let Ok((left, right, digest)) = source.branch_get(node) {
198            self.merge_collect(source, left, plan, seen)?;
199            self.merge_collect(source, right, plan, seen)?;
200            plan.branches.insert(node, (left, right, digest));
201            plan.delta_add(left, 1);
202            plan.delta_add(right, 1);
203            return Ok(());
204        }
205
206        Ok(())
207    }
208
209    fn release_plan(&self, node: Word, plan: &mut MergePlan) -> Result<(), PersistorAccessError> {
210        if node == [0 as u8; SIZE] {
211            return Ok(());
212        }
213
214        let effective = self.base_refcount(node) + plan.deltas.get(&node).copied().unwrap_or(0);
215        if effective <= 0 {
216            return Ok(());
217        }
218
219        plan.delta_add(node, -1);
220
221        if effective == 1 {
222            if !plan.deletes.insert(node) {
223                return Ok(());
224            }
225
226            if let Some((left, right, _)) = self.merged_branch(node, plan)? {
227                self.release_plan(left, plan)?;
228                self.release_plan(right, plan)?;
229            }
230        }
231
232        Ok(())
233    }
234}
235
236impl Persistor for MemoryPersistor {
237    fn root_list(&self) -> Vec<Word> {
238        let mut keys: Vec<Word> = self
239            .roots
240            .read()
241            .expect("Failed to get locked roots")
242            .iter()
243            .filter(|&(_, &(_, is_persistent))| is_persistent)
244            .map(|(key, _)| key)
245            .cloned()
246            .collect();
247        keys.sort();
248        keys
249    }
250
251    fn root_new(&self, handle: Word, root: Word) -> Result<Word, PersistorAccessError> {
252        let mut roots = self.roots.write().expect("Failed to lock roots map");
253        match roots.get(&handle) {
254            Some(_) => Err(PersistorAccessError(format!(
255                "Handle {:?} already exists",
256                handle
257            ))),
258            None => {
259                self.reference_increment(root);
260                roots.insert(handle, (root, true));
261                Ok(handle)
262            }
263        }
264    }
265
266    fn root_temp(&self, root: Word) -> Result<Word, PersistorAccessError> {
267        let mut roots = self.roots.write().expect("Failed to lock roots map");
268        let mut handle_: Word = [0 as u8; 32];
269        rand::thread_rng().fill_bytes(&mut handle_);
270        match roots.get(&handle_) {
271            Some(_) => Err(PersistorAccessError(format!(
272                "Handle {:?} already exists",
273                handle_
274            ))),
275            None => {
276                self.reference_increment(root);
277                roots.insert(handle_, (root, false));
278                Ok(handle_)
279            }
280        }
281    }
282
283    fn root_get(&self, handle: Word) -> Result<Word, PersistorAccessError> {
284        match self
285            .roots
286            .read()
287            .expect("Failed to lock roots map")
288            .get(&handle)
289        {
290            Some((root, _)) => Ok(*root),
291            None => Err(PersistorAccessError(format!(
292                "Handle {:?} not found",
293                handle
294            ))),
295        }
296    }
297
298    fn root_set(
299        &self,
300        handle: Word,
301        old: Word,
302        new: Word,
303        source: &dyn Persistor,
304    ) -> Result<Word, PersistorAccessError> {
305        let status = {
306            let roots = self.roots.write().expect("Failed to lock roots map");
307            match roots.get(&handle) {
308                Some((root, true)) if *root == old => 0,
309                Some((_, false)) => 1,
310                Some((_, true)) => 2,
311                None => 3,
312            }
313        };
314
315        match status {
316            0 => {
317                let mut plan = MergePlan::new();
318                let mut seen = HashSet::new();
319                self.merge_collect(source, new, &mut plan, &mut seen)?;
320                plan.delta_add(new, 1);
321                self.release_plan(old, &mut plan)?;
322
323                {
324                    let mut leaves = self.leaves.write().expect("Failed to lock leaves");
325                    for (node, content) in plan.leaves.iter() {
326                        leaves.insert(*node, content.clone());
327                    }
328                    for node in plan.deletes.iter() {
329                        leaves.remove(node);
330                    }
331                }
332
333                {
334                    let mut stumps = self.stumps.write().expect("Failed to lock stumps");
335                    for (node, digest) in plan.stumps.iter() {
336                        stumps.insert(*node, *digest);
337                    }
338                    for node in plan.deletes.iter() {
339                        stumps.remove(node);
340                    }
341                }
342
343                {
344                    let mut branches = self.branches.write().expect("Failed to lock branches");
345                    for (node, branch) in plan.branches.iter() {
346                        branches.insert(*node, *branch);
347                    }
348                    for node in plan.deletes.iter() {
349                        branches.remove(node);
350                    }
351                }
352
353                {
354                    let mut references = self.references.write().expect("Failed to lock references");
355                    for (node, delta) in plan.deltas.iter() {
356                        let base = references.get(node).copied().unwrap_or(0) as isize;
357                        let next = base + delta;
358                        if next > 0 {
359                            references.insert(*node, next as usize);
360                        } else {
361                            references.remove(node);
362                        }
363                    }
364                }
365
366                let mut roots = self.roots.write().expect("Failed to lock roots map");
367                roots.insert(handle, (new, true));
368                Ok(handle)
369            }
370            1 => Err(PersistorAccessError(format!(
371                "Handle {:?} is temporary",
372                handle
373            ))),
374            2 => Err(PersistorAccessError(format!(
375                "Handle {:?} changed since compare",
376                handle
377            ))),
378            _ => Err(PersistorAccessError(format!(
379                "Handle {:?} not found",
380                handle
381            ))),
382        }
383    }
384
385    fn root_delete(&self, handle: Word) -> Result<(), PersistorAccessError> {
386        let mut roots = self.roots.write().expect("Failed to lock roots map");
387        match roots.get(&handle) {
388            Some((old, _)) => {
389                let old_ = *old;
390                roots.remove(&handle);
391                self.reference_decrement(old_);
392                Ok(())
393            }
394            None => Err(PersistorAccessError(format!(
395                "Handle {:?} not found",
396                handle
397            ))),
398        }
399    }
400
401    fn branch_set(
402        &self,
403        left: Word,
404        right: Word,
405        digest: Word,
406    ) -> Result<Word, PersistorAccessError> {
407        let mut joined = [0 as u8; SIZE * 3];
408        joined[..SIZE].copy_from_slice(&left);
409        joined[SIZE..SIZE * 2].copy_from_slice(&right);
410        joined[SIZE * 2..].copy_from_slice(&digest);
411
412        let branch = Sha256::digest(joined);
413        let mut branches = self.branches.write().expect("Failed to lock branches map");
414        branches.insert(branch.into(), (left, right, digest));
415        drop(branches);
416        self.reference_increment(left);
417        self.reference_increment(right);
418        Ok(Word::from(branch))
419    }
420
421    fn branch_get(&self, branch: Word) -> Result<(Word, Word, Word), PersistorAccessError> {
422        let branches = self.branches.read().expect("Failed to lock branches map");
423        match branches.get(&branch) {
424            Some((left, right, digest)) => {
425                let mut joined = [0 as u8; SIZE * 3];
426                joined[..SIZE].copy_from_slice(left);
427                joined[SIZE..SIZE * 2].copy_from_slice(right);
428                joined[SIZE * 2..].copy_from_slice(digest);
429                assert!(Vec::from(branch) == Sha256::digest(joined).to_vec());
430                Ok((*left, *right, *digest))
431            }
432            None => Err(PersistorAccessError(format!(
433                "Branch {:?} not found",
434                branch
435            ))),
436        }
437    }
438
439    fn leaf_set(&self, content: Vec<u8>) -> Result<Word, PersistorAccessError> {
440        let leaf = Word::from(Sha256::digest(Sha256::digest(&content)));
441        self.leaves
442            .write()
443            .expect("Failed to lock leaves map")
444            .insert(leaf, content);
445        Ok(leaf)
446    }
447
448    fn leaf_get(&self, leaf: Word) -> Result<Vec<u8>, PersistorAccessError> {
449        let leaves = self.leaves.read().expect("Failed to lock leaves map");
450        match leaves.get(&leaf) {
451            Some(content) => {
452                assert!(Vec::from(leaf) == Sha256::digest(Sha256::digest(content)).to_vec());
453                Ok(content.to_vec())
454            }
455            None => Err(PersistorAccessError(format!("Leaf {:?} not found", leaf))),
456        }
457    }
458
459    fn stump_set(&self, digest: Word) -> Result<Word, PersistorAccessError> {
460        let stump = Sha256::digest(digest);
461        self.stumps
462            .write()
463            .expect("Failed to lock stump map")
464            .insert(stump.into(), digest);
465        Ok(Word::from(stump))
466    }
467
468    fn stump_get(&self, stump: Word) -> Result<Word, PersistorAccessError> {
469        let stumps = self.stumps.read().expect("Failed to lock stumps map");
470        match stumps.get(&stump) {
471            Some(digest) => {
472                assert!(Vec::from(stump) == Sha256::digest(Vec::from(digest)).to_vec());
473                Ok(*digest)
474            }
475            None => Err(PersistorAccessError(format!("Stump {:?} not found", stump))),
476        }
477    }
478}
479
480pub struct DatabasePersistor {
481    db: RwLock<DB>,
482}
483
484struct MergePlan {
485    branches: HashMap<Word, (Word, Word, Word)>,
486    leaves: HashMap<Word, Vec<u8>>,
487    stumps: HashMap<Word, Word>,
488    deltas: HashMap<Word, isize>,
489    deletes: HashSet<Word>,
490}
491
492impl MergePlan {
493    fn new() -> Self {
494        Self {
495            branches: HashMap::new(),
496            leaves: HashMap::new(),
497            stumps: HashMap::new(),
498            deltas: HashMap::new(),
499            deletes: HashSet::new(),
500        }
501    }
502
503    fn delta_add(&mut self, node: Word, amount: isize) {
504        let next = self.deltas.get(&node).copied().unwrap_or(0) + amount;
505        if next == 0 {
506            self.deltas.remove(&node);
507        } else {
508            self.deltas.insert(node, next);
509        }
510    }
511}
512
513impl DatabasePersistor {
514    pub fn new(path: &str) -> Self {
515        let mut opts = Options::default();
516        opts.create_if_missing(true);
517        opts.create_missing_column_families(true);
518
519        let cfs = vec![
520            ColumnFamilyDescriptor::new("roots", Options::default()),
521            ColumnFamilyDescriptor::new("branches", Options::default()),
522            ColumnFamilyDescriptor::new("leaves", Options::default()),
523            ColumnFamilyDescriptor::new("stumps", Options::default()),
524            ColumnFamilyDescriptor::new("references", Options::default()),
525        ];
526        let persistor = Self {
527            db: RwLock::new(
528                DB::open_cf_descriptors(&opts, path, cfs).expect("Failed to open database"),
529            ),
530        };
531
532        // TODO: clear this due to memory leakage
533        {
534            let mut handles: Vec<Word> = Vec::new();
535            let db = persistor
536                .db
537                .read()
538                .expect("Failed to acquire database lock");
539            let mut iter = db.raw_iterator_cf(
540                db.cf_handle("roots")
541                    .expect("Failed to get roots column family"),
542            );
543            iter.seek_to_first();
544            while iter.valid() {
545                if (*iter.value().expect("Failed to get iterator value"))[SIZE] == false as u8 {
546                    handles.push(
547                        (*iter.key().expect("Failed to get iterator key"))
548                            .try_into()
549                            .expect("Failed to convert key to Word"),
550                    );
551                }
552                iter.next();
553            }
554            for handle in handles {
555                db.delete_cf(
556                    db.cf_handle("roots")
557                        .expect("Failed to get roots column family"),
558                    handle,
559                )
560                .expect("Failed to delete value from roots");
561            }
562        }
563
564        persistor
565    }
566
567    fn reference_increment(&self, db: &DB, node: Word) {
568        let references = db
569            .cf_handle("references")
570            .expect("Failed to get references handle");
571        match db.get_cf(references, node) {
572            Ok(Some(count)) => {
573                let count_old =
574                    usize::from_ne_bytes(count.try_into().expect("Invalid count bytes"));
575                let count_new = count_old + 1;
576                db.put_cf(references, node, count_new.to_ne_bytes())
577                    .expect("Failed to increment reference count");
578            }
579            Ok(None) => {
580                db.put_cf(references, node, (1 as usize).to_ne_bytes())
581                    .expect("Failed to set initial reference count");
582            }
583            Err(e) => {
584                panic! {"{}", e}
585            }
586        };
587    }
588
589    fn reference_decrement(&self, db: &DB, node: Word) {
590        let branches = db
591            .cf_handle("branches")
592            .expect("Failed to get branches handle");
593        let leaves = db.cf_handle("leaves").expect("Failed to get leaves handle");
594        let stumps = db.cf_handle("stumps").expect("Failed to get stumps handle");
595        let references = db
596            .cf_handle("references")
597            .expect("Failed to get references handle");
598        match db
599            .get_cf(references, node)
600            .expect("Failed to get reference count")
601        {
602            Some(count_old) => {
603                let count_old =
604                    usize::from_ne_bytes(count_old.try_into().expect("Invalid count bytes"));
605                let count_new = count_old - 1;
606                if count_new > 0 {
607                    db.put_cf(references, node, count_new.to_ne_bytes())
608                        .expect("Failed to update reference count");
609                } else {
610                    db.delete_cf(references, node)
611                        .expect("Failed to delete reference");
612                    if let Some(value) = db.get_cf(branches, node).expect("Failed to get branch") {
613                        let left: Word = value[..SIZE].try_into().expect("Invalid left node bytes");
614                        let right: Word = value[SIZE..SIZE * 2]
615                            .try_into()
616                            .expect("Invalid right node bytes");
617                        db.delete_cf(branches, node)
618                            .expect("Failed to delete branch");
619                        self.reference_decrement(db, left);
620                        self.reference_decrement(db, right);
621                    } else {
622                        if let Some(_) = db.get_cf(leaves, node).expect("Failed to get leaf") {
623                            db.delete_cf(leaves, node).expect("Failed to delete leaf");
624                        } else if let Some(_) =
625                            db.get_cf(stumps, node).expect("Failed to get stump")
626                        {
627                            db.delete_cf(stumps, node).expect("Failed to delete stump");
628                        }
629                    }
630                }
631            }
632            None => {}
633        };
634    }
635
636    fn merge_collect(
637        &self,
638        db: &DB,
639        source: &dyn Persistor,
640        node: Word,
641        plan: &mut MergePlan,
642        seen: &mut HashSet<Word>,
643    ) -> Result<(), PersistorAccessError> {
644        if node == [0 as u8; SIZE] || !seen.insert(node) {
645            return Ok(());
646        }
647
648        let branches = db
649            .cf_handle("branches")
650            .expect("Failed to get branches handle");
651        let leaves = db.cf_handle("leaves").expect("Failed to get leaves handle");
652        let stumps = db.cf_handle("stumps").expect("Failed to get stumps handle");
653
654        if plan.branches.contains_key(&node)
655            || plan.leaves.contains_key(&node)
656            || plan.stumps.contains_key(&node)
657        {
658            return Ok(());
659        }
660
661        if db.get_cf(leaves, node).expect("Failed to get leaf").is_some()
662            || db.get_cf(stumps, node).expect("Failed to get stump").is_some()
663            || db.get_cf(branches, node).expect("Failed to get branch").is_some()
664        {
665            return Ok(());
666        }
667
668        if let Ok(content) = source.leaf_get(node) {
669            plan.leaves.insert(node, content);
670            return Ok(());
671        }
672
673        if let Ok(digest) = source.stump_get(node) {
674            plan.stumps.insert(node, digest);
675            return Ok(());
676        }
677
678        if let Ok((left, right, digest)) = source.branch_get(node) {
679            self.merge_collect(db, source, left, plan, seen)?;
680            self.merge_collect(db, source, right, plan, seen)?;
681            plan.branches.insert(node, (left, right, digest));
682            plan.delta_add(left, 1);
683            plan.delta_add(right, 1);
684            return Ok(());
685        }
686
687        Ok(())
688    }
689
690    fn merged_branch(
691        &self,
692        db: &DB,
693        node: Word,
694        plan: &MergePlan,
695    ) -> Result<Option<(Word, Word, Word)>, PersistorAccessError> {
696        if let Some(branch) = plan.branches.get(&node) {
697            return Ok(Some(*branch));
698        }
699
700        let branches = db
701            .cf_handle("branches")
702            .expect("Failed to get branches handle");
703        match db.get_cf(branches, node) {
704            Ok(Some(value)) => {
705                let left = value[..SIZE].try_into().expect("Invalid left branch size");
706                let right = value[SIZE..SIZE * 2]
707                    .try_into()
708                    .expect("Invalid right branch size");
709                let digest = value[SIZE * 2..]
710                    .try_into()
711                    .expect("Invalid digest branch size");
712                Ok(Some((left, right, digest)))
713            }
714            Ok(None) => Ok(None),
715            Err(e) => Err(PersistorAccessError(format!("{}", e))),
716        }
717    }
718
719    fn base_refcount(&self, db: &DB, node: Word) -> Result<isize, PersistorAccessError> {
720        let references = db
721            .cf_handle("references")
722            .expect("Failed to get references handle");
723        match db.get_cf(references, node) {
724            Ok(Some(count)) => Ok(
725                usize::from_ne_bytes(count.try_into().expect("Invalid count bytes")) as isize,
726            ),
727            Ok(None) => Ok(0),
728            Err(e) => Err(PersistorAccessError(format!("{}", e))),
729        }
730    }
731
732    fn release_plan(
733        &self,
734        db: &DB,
735        node: Word,
736        plan: &mut MergePlan,
737    ) -> Result<(), PersistorAccessError> {
738        if node == [0 as u8; SIZE] {
739            return Ok(());
740        }
741
742        let effective = self.base_refcount(db, node)? + plan.deltas.get(&node).copied().unwrap_or(0);
743        if effective <= 0 {
744            return Ok(());
745        }
746
747        plan.delta_add(node, -1);
748
749        if effective == 1 {
750            if !plan.deletes.insert(node) {
751                return Ok(());
752            }
753
754            if let Some((left, right, _)) = self.merged_branch(db, node, plan)? {
755                self.release_plan(db, left, plan)?;
756                self.release_plan(db, right, plan)?;
757            }
758        }
759
760        Ok(())
761    }
762}
763
764impl Persistor for DatabasePersistor {
765    fn root_list(&self) -> Vec<Word> {
766        let mut handles: Vec<Word> = Vec::new();
767        let db = self.db.read().expect("Failed to acquire db lock");
768        let roots = db
769            .cf_handle("roots")
770            .expect("Failed to get roots column family");
771        let mut iter = db.raw_iterator_cf(roots);
772        iter.seek_to_first();
773        while iter.valid() {
774            if (*iter.value().expect("Failed to get iterator value"))[SIZE] != false as u8 {
775                handles.push(
776                    (*iter.key().expect("Failed to get iterator key"))
777                        .try_into()
778                        .expect("Failed to convert key to Word"),
779                );
780            }
781            iter.next();
782        }
783
784        handles.shuffle(&mut rand::thread_rng());
785        handles
786    }
787
788    fn root_new(&self, handle: Word, root: Word) -> Result<Word, PersistorAccessError> {
789        let mut root_marked = [0 as u8; SIZE + 1];
790        root_marked[..SIZE].copy_from_slice(&root);
791        root_marked[SIZE] = true as u8;
792
793        let db = self.db.write().expect("Failed to acquire db lock");
794        let roots = db
795            .cf_handle("roots")
796            .expect("Failed to get roots column family");
797        match db.get_cf(roots, handle) {
798            Ok(Some(_)) => Err(PersistorAccessError(format!(
799                "Handle {:?} already exists",
800                handle
801            ))),
802            Ok(None) => {
803                self.reference_increment(&db, root);
804                db.put_cf(roots, handle, root_marked)
805                    .expect("Failed to put root in db");
806                Ok(handle)
807            }
808            Err(e) => Err(PersistorAccessError(format!("{}", e))),
809        }
810    }
811
812    fn root_temp(&self, root: Word) -> Result<Word, PersistorAccessError> {
813        let db = self.db.write().expect("Failed to acquire db lock");
814        let roots = db
815            .cf_handle("roots")
816            .expect("Failed to get roots column family");
817        let mut root_marked = [0 as u8; SIZE + 1];
818        root_marked[..SIZE].copy_from_slice(&root);
819        root_marked[SIZE] = false as u8;
820
821        let mut handle_: Word = [0 as u8; 32];
822        rand::thread_rng().fill_bytes(&mut handle_);
823
824        match db.get_cf(roots, handle_) {
825            Ok(Some(_)) => Err(PersistorAccessError(format!(
826                "Handle {:?} already exists",
827                handle_
828            ))),
829            Ok(None) => {
830                self.reference_increment(&db, root);
831                db.put_cf(roots, handle_, root_marked)
832                    .expect("Failed to put root in db");
833                Ok(handle_)
834            }
835            Err(e) => Err(PersistorAccessError(format!("{}", e))),
836        }
837    }
838
839    fn root_get(&self, handle: Word) -> Result<Word, PersistorAccessError> {
840        let db = self.db.read().expect("Failed to acquire db lock");
841        let roots = db.cf_handle("roots").expect("Failed to get roots handle");
842        match db.get_cf(roots, handle) {
843            Ok(Some(root_marked)) => Ok(((*root_marked)[..SIZE])
844                .try_into()
845                .expect("Invalid root size")),
846            Ok(None) => Err(PersistorAccessError(format!(
847                "Handle {:?} not found",
848                handle
849            ))),
850            Err(e) => Err(PersistorAccessError(format!("{}", e))),
851        }
852    }
853
854    fn root_set(
855        &self,
856        handle: Word,
857        old: Word,
858        new: Word,
859        source: &dyn Persistor,
860    ) -> Result<Word, PersistorAccessError> {
861        let db = self.db.write().expect("Failed to acquire db lock");
862        let roots = db.cf_handle("roots").expect("Failed to get roots handle");
863        match db.get_cf(roots, handle) {
864            Ok(Some(root_marked)) => match root_marked[SIZE] != false as u8 {
865                true => match root_marked[..SIZE] == old.to_vec() {
866                    true => {
867                        let mut plan = MergePlan::new();
868                        let mut seen = HashSet::new();
869                        self.merge_collect(&db, source, new, &mut plan, &mut seen)?;
870                        plan.delta_add(new, 1);
871                        self.release_plan(&db, old, &mut plan)?;
872
873                        let mut new_marked = [0 as u8; SIZE + 1];
874                        new_marked[..SIZE].copy_from_slice(&new);
875                        new_marked[SIZE] = true as u8;
876
877                        let branches = db
878                            .cf_handle("branches")
879                            .expect("Failed to get branches handle");
880                        let leaves = db.cf_handle("leaves").expect("Failed to get leaves handle");
881                        let stumps = db.cf_handle("stumps").expect("Failed to get stumps handle");
882                        let references = db
883                            .cf_handle("references")
884                            .expect("Failed to get references handle");
885
886                        let mut batch = WriteBatch::default();
887
888                        for (node, content) in plan.leaves.iter() {
889                            batch.put_cf(leaves, node, content);
890                        }
891
892                        for (node, digest) in plan.stumps.iter() {
893                            batch.put_cf(stumps, node, digest);
894                        }
895
896                        for (node, (left, right, digest)) in plan.branches.iter() {
897                            let mut joined = [0 as u8; SIZE * 3];
898                            joined[..SIZE].copy_from_slice(left);
899                            joined[SIZE..SIZE * 2].copy_from_slice(right);
900                            joined[SIZE * 2..].copy_from_slice(digest);
901                            batch.put_cf(branches, node, joined);
902                        }
903
904                        for (node, delta) in plan.deltas.iter() {
905                            let base = self.base_refcount(&db, *node)?;
906                            let next = base + delta;
907                            if next > 0 {
908                                batch.put_cf(references, node, (next as usize).to_ne_bytes());
909                            } else {
910                                batch.delete_cf(references, node);
911                            }
912                        }
913
914                        for node in plan.deletes.iter() {
915                            if plan.branches.contains_key(node)
916                                || db.get_cf(branches, node).expect("Failed to get branch").is_some()
917                            {
918                                batch.delete_cf(branches, node);
919                            } else if plan.leaves.contains_key(node)
920                                || db.get_cf(leaves, node).expect("Failed to get leaf").is_some()
921                            {
922                                batch.delete_cf(leaves, node);
923                            } else if plan.stumps.contains_key(node)
924                                || db.get_cf(stumps, node).expect("Failed to get stump").is_some()
925                            {
926                                batch.delete_cf(stumps, node);
927                            }
928                        }
929
930                        batch.put_cf(roots, handle, new_marked);
931                        db.write(batch).expect("Failed to apply root merge batch");
932                        Ok(handle)
933                    }
934                    false => Err(PersistorAccessError(format!(
935                        "Handle {:?} changed since compare",
936                        handle
937                    ))),
938                },
939                false => Err(PersistorAccessError(format!(
940                    "Handle {:?} is temporary",
941                    handle
942                ))),
943            },
944            Ok(None) => Err(PersistorAccessError(format!(
945                "Handle {:?} not found",
946                handle
947            ))),
948            Err(e) => Err(PersistorAccessError(format!("{}", e))),
949        }
950    }
951
952    fn root_delete(&self, handle: Word) -> Result<(), PersistorAccessError> {
953        let db = self.db.write().expect("Failed to acquire db lock");
954        let roots = db.cf_handle("roots").expect("Failed to get roots handle");
955        match db.get_cf(roots, handle) {
956            Ok(Some(root_marked)) => {
957                let root: Word = root_marked[..SIZE].try_into().expect("Invalid root size");
958                db.delete_cf(roots, handle).expect("Failed to delete root");
959                self.reference_decrement(&db, root);
960                Ok(())
961            }
962            Ok(None) => Err(PersistorAccessError(format!(
963                "Handle {:?} not found",
964                handle
965            ))),
966            Err(e) => Err(PersistorAccessError(format!("{}", e))),
967        }
968    }
969
970    fn branch_set(
971        &self,
972        left: Word,
973        right: Word,
974        digest: Word,
975    ) -> Result<Word, PersistorAccessError> {
976        let mut joined = [0 as u8; SIZE * 3];
977        joined[..SIZE].copy_from_slice(&left);
978        joined[SIZE..SIZE * 2].copy_from_slice(&right);
979        joined[SIZE * 2..].copy_from_slice(&digest);
980
981        let branch = Sha256::digest(joined);
982
983        let db = self.db.write().expect("Failed to acquire db lock");
984        let branches = db
985            .cf_handle("branches")
986            .expect("Failed to get branches handle");
987        db.put_cf(branches, branch, joined)
988            .expect("Failed to put branch");
989        self.reference_increment(&db, left);
990        self.reference_increment(&db, right);
991
992        Ok(Word::from(branch))
993    }
994
995    fn branch_get(&self, branch: Word) -> Result<(Word, Word, Word), PersistorAccessError> {
996        let db = self.db.read().expect("Failed to acquire db lock");
997        let branches = db
998            .cf_handle("branches")
999            .expect("Failed to get branches handle");
1000        match db.get_cf(branches, branch) {
1001            Ok(Some(value)) => {
1002                assert!(Vec::from(branch) == Sha256::digest(value.clone()).to_vec());
1003                let left = &value[..SIZE].try_into().expect("Invalid left branch size");
1004                let right = &value[SIZE..SIZE * 2]
1005                    .try_into()
1006                    .expect("Invalid right branch size");
1007                let digest = &value[SIZE * 2..]
1008                    .try_into()
1009                    .expect("Invalid digest branch size");
1010                Ok((*left, *right, *digest))
1011            }
1012            Ok(None) => Err(PersistorAccessError(format!(
1013                "Branch {:?} not found",
1014                branch
1015            ))),
1016            Err(e) => Err(PersistorAccessError(format!("{}", e))),
1017        }
1018    }
1019
1020    fn leaf_set(&self, content: Vec<u8>) -> Result<Word, PersistorAccessError> {
1021        let leaf = Word::from(Sha256::digest(Sha256::digest(&content)));
1022        let db = self.db.write().expect("Failed to acquire db lock");
1023        let leaves = db.cf_handle("leaves").expect("Failed to get leaves handle");
1024        db.put_cf(leaves, leaf, content.clone())
1025            .expect("Failed to put leaf");
1026        Ok(leaf)
1027    }
1028
1029    fn leaf_get(&self, leaf: Word) -> Result<Vec<u8>, PersistorAccessError> {
1030        let db = self.db.read().expect("Failed to acquire db lock");
1031        let leaves = db.cf_handle("leaves").expect("Failed to get leaves handle");
1032        match db.get_cf(leaves, leaf) {
1033            Ok(Some(content)) => {
1034                assert!(leaf == *Sha256::digest(Sha256::digest(content.clone())));
1035                Ok(content.to_vec())
1036            }
1037            Ok(None) => Err(PersistorAccessError(format!("Leaf {:?} not found", leaf))),
1038            Err(e) => Err(PersistorAccessError(format!("{}", e))),
1039        }
1040    }
1041
1042    fn stump_set(&self, digest: Word) -> Result<Word, PersistorAccessError> {
1043        let stump = Word::from(Sha256::digest(Vec::from(digest)));
1044        let db = self.db.write().expect("Failed to acquire db lock");
1045        let stumps = db.cf_handle("stumps").expect("Failed to get stumps handle");
1046        db.put_cf(stumps, stump, digest)
1047            .expect("Failed to put stump");
1048        Ok(stump)
1049    }
1050
1051    fn stump_get(&self, stump: Word) -> Result<Word, PersistorAccessError> {
1052        let db = self.db.read().expect("Failed to acquire db lock");
1053        let stumps = db.cf_handle("stumps").expect("Failed to get stumps handle");
1054        match db.get_cf(stumps, stump) {
1055            Ok(Some(digest)) => {
1056                assert!(stump == *Sha256::digest(digest.clone()));
1057                Ok((&(*digest)[..SIZE])
1058                    .try_into()
1059                    .expect("Invalid left node bytes"))
1060            }
1061            Ok(None) => Err(PersistorAccessError(format!(
1062                "Stumps {:?} not found",
1063                stump
1064            ))),
1065            Err(e) => Err(PersistorAccessError(format!("{}", e))),
1066        }
1067    }
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072    use super::{DatabasePersistor, MemoryPersistor, Persistor, SIZE, Word};
1073    use rocksdb::{DB, IteratorMode};
1074    use std::fs;
1075    use std::sync::RwLock;
1076
1077    fn test_persistence(persistor: Box<dyn Persistor>) {
1078        let zeros: Word = [0 as u8; SIZE];
1079        let handle = persistor
1080            .root_new(zeros, zeros)
1081            .expect("Failed to create new root");
1082
1083        assert!(
1084            persistor
1085                .root_delete(
1086                    persistor
1087                        .root_temp(zeros)
1088                        .expect("Failed to create temp root")
1089                )
1090                .expect("Failed to delete root")
1091                == ()
1092        );
1093
1094        assert!(
1095            persistor
1096                .root_get(
1097                    persistor
1098                        .root_set(handle, zeros, zeros, persistor.as_ref())
1099                        .expect("Failed to set root"),
1100                )
1101                .expect("Failed to get root")
1102                == zeros
1103        );
1104
1105        assert!(
1106            persistor
1107                .branch_get(
1108                    persistor
1109                        .branch_set(zeros, zeros, zeros)
1110                        .expect("Failed to set branch"),
1111                )
1112                .expect("Failed to get branch")
1113                == (zeros, zeros, zeros)
1114        );
1115
1116        assert!(
1117            persistor
1118                .leaf_get(persistor.leaf_set(vec!(0)).expect("Failed to set leaf"),)
1119                .expect("Failed to get leaf")
1120                == vec!(0)
1121        );
1122    }
1123
1124    #[test]
1125    fn test_memory_persistence() {
1126        test_persistence(Box::new(MemoryPersistor::new()));
1127    }
1128
1129    #[test]
1130    fn test_database_persistence() {
1131        let db = ".test-database-persistence";
1132        let _ = fs::remove_dir_all(db);
1133        test_persistence(Box::new(DatabasePersistor::new(db)));
1134        let _ = fs::remove_dir_all(db);
1135    }
1136
1137    #[test]
1138    fn test_memory_garbage() {
1139        let persistor = MemoryPersistor::new();
1140        let zeros: Word = [0 as u8; SIZE];
1141        let handle: Word = [0 as u8; SIZE];
1142
1143        let leaf_0 = persistor.leaf_set(vec![0]).expect("Failed to set leaf 0");
1144        let leaf_1 = persistor.leaf_set(vec![1]).expect("Failed to set leaf 1");
1145        let leaf_2 = persistor.leaf_set(vec![2]).expect("Failed to set leaf 2");
1146
1147        let stump_0 = persistor
1148            .stump_set([0 as u8; SIZE])
1149            .expect("Failed to set stump 0");
1150
1151        let branch_a = persistor
1152            .branch_set(leaf_0, leaf_1, zeros)
1153            .expect("Failed to set branch A");
1154        let branch_b = persistor
1155            .branch_set(branch_a, leaf_2, zeros)
1156            .expect("Failed to set branch B");
1157        let branch_c = persistor
1158            .branch_set(branch_b, stump_0, zeros)
1159            .expect("Failed to set branch B");
1160
1161        persistor
1162            .root_new(handle, branch_c)
1163            .expect("Failed to create new root");
1164
1165        assert!(persistor.roots.read().expect("Failed to lock roots").len() == 1);
1166        assert!(
1167            persistor
1168                .branches
1169                .read()
1170                .expect("Failed to lock branches")
1171                .len()
1172                == 3
1173        );
1174        assert!(
1175            persistor
1176                .leaves
1177                .read()
1178                .expect("Failed to lock leaves")
1179                .len()
1180                == 3
1181        );
1182        assert!(
1183            persistor
1184                .stumps
1185                .read()
1186                .expect("Failed to lock leaves")
1187                .len()
1188                == 1
1189        );
1190        assert!(
1191            persistor
1192                .references
1193                .read()
1194                .expect("Failed to lock references")
1195                .len()
1196                == 7
1197        );
1198
1199        let leaf_3 = persistor.leaf_set(vec![3]).expect("Failed to set leaf 3");
1200        let branch_d = persistor
1201            .branch_set(leaf_2, leaf_3, zeros)
1202            .expect("Failed to set branch D");
1203        persistor
1204            .root_set(handle, branch_c, branch_d, &persistor)
1205            .expect("Failed to set root");
1206
1207        assert!(persistor.roots.read().expect("Failed to lock roots").len() == 1);
1208        assert!(
1209            persistor
1210                .branches
1211                .read()
1212                .expect("Failed to lock branches")
1213                .len()
1214                == 1
1215        );
1216        assert!(
1217            persistor
1218                .leaves
1219                .read()
1220                .expect("Failed to lock leaves")
1221                .len()
1222                == 2
1223        );
1224        assert!(
1225            persistor
1226                .stumps
1227                .read()
1228                .expect("Failed to lock stumps")
1229                .len()
1230                == 0
1231        );
1232        assert!(
1233            persistor
1234                .references
1235                .read()
1236                .expect("Failed to lock references")
1237                .len()
1238                == 3
1239        );
1240    }
1241
1242    #[test]
1243    fn test_database_garbage() {
1244        let db = ".test-database-garbage";
1245        let _ = fs::remove_dir_all(db);
1246        let persistor = DatabasePersistor::new(db);
1247        let zeros: Word = [0 as u8; SIZE];
1248        let handle: Word = [0 as u8; SIZE];
1249        let leaf_0 = persistor.leaf_set(vec![0]).expect("Failed to set leaf 0");
1250        let leaf_1 = persistor.leaf_set(vec![1]).expect("Failed to set leaf 1");
1251        let leaf_2 = persistor.leaf_set(vec![2]).expect("Failed to set leaf 2");
1252
1253        let stump_0 = persistor
1254            .stump_set([0 as u8; SIZE])
1255            .expect("Failed to set stump 0");
1256
1257        let branch_a = persistor
1258            .branch_set(leaf_0, leaf_1, zeros)
1259            .expect("Failed to set branch A");
1260        let branch_b = persistor
1261            .branch_set(branch_a, leaf_2, zeros)
1262            .expect("Failed to set branch B");
1263        let branch_c = persistor
1264            .branch_set(branch_b, stump_0, zeros)
1265            .expect("Failed to set branch C");
1266
1267        persistor
1268            .root_new(handle, branch_c)
1269            .expect("Failed to create new root");
1270
1271        let cf_count = |db: &RwLock<DB>, cf| {
1272            let db_ = db.read().expect("Failed to lock database");
1273            db_.iterator_cf(
1274                db_.cf_handle(cf).expect("Failed to get CF handle"),
1275                IteratorMode::Start,
1276            )
1277            .count()
1278        };
1279
1280        {
1281            assert!(cf_count(&persistor.db, "roots") == 1);
1282            assert!(cf_count(&persistor.db, "branches") == 3);
1283            assert!(cf_count(&persistor.db, "leaves") == 3);
1284            assert!(cf_count(&persistor.db, "stumps") == 1);
1285            assert!(cf_count(&persistor.db, "references") == 7);
1286        }
1287
1288        let leaf_3 = persistor.leaf_set(vec![3]).expect("Failed to set leaf 3");
1289        let branch_d = persistor
1290            .branch_set(leaf_2, leaf_3, zeros)
1291            .expect("Failed to set branch D");
1292        persistor
1293            .root_set(handle, branch_c, branch_d, &persistor)
1294            .expect("Failed to set root");
1295
1296        {
1297            assert!(cf_count(&persistor.db, "roots") == 1);
1298            assert!(cf_count(&persistor.db, "branches") == 1);
1299            assert!(cf_count(&persistor.db, "leaves") == 2);
1300            assert!(cf_count(&persistor.db, "stumps") == 0);
1301            assert!(cf_count(&persistor.db, "references") == 3);
1302        }
1303
1304        let _ = fs::remove_dir_all(db);
1305    }
1306
1307    fn test_root_set_merge_from_source(persistor: Box<dyn Persistor>) {
1308        let zeros: Word = [0 as u8; SIZE];
1309        let handle: Word = [1 as u8; SIZE];
1310        let target = Box::new(MemoryPersistor::new());
1311
1312        let leaf_0 = persistor.leaf_set(vec![0]).expect("Failed to set leaf 0");
1313        let leaf_1 = persistor.leaf_set(vec![1]).expect("Failed to set leaf 1");
1314        let branch = persistor
1315            .branch_set(leaf_0, leaf_1, zeros)
1316            .expect("Failed to set source branch");
1317
1318        target
1319            .root_new(handle, zeros)
1320            .expect("Failed to create target root");
1321        target
1322            .root_set(handle, zeros, branch, persistor.as_ref())
1323            .expect("Failed to merge root from source");
1324
1325        assert!(target.root_get(handle).expect("Failed to get target root") == branch);
1326        assert!(target.branch_get(branch).expect("Failed to get merged branch") == (leaf_0, leaf_1, zeros));
1327        assert!(target.leaf_get(leaf_0).expect("Failed to get merged leaf 0") == vec![0]);
1328        assert!(target.leaf_get(leaf_1).expect("Failed to get merged leaf 1") == vec![1]);
1329    }
1330
1331    #[test]
1332    fn test_memory_root_set_merge_from_source() {
1333        test_root_set_merge_from_source(Box::new(MemoryPersistor::new()));
1334    }
1335
1336    #[test]
1337    fn test_database_root_set_merge_from_source() {
1338        let db = ".test-database-root-set-merge-from-source";
1339        let _ = fs::remove_dir_all(db);
1340        test_root_set_merge_from_source(Box::new(DatabasePersistor::new(db)));
1341        let _ = fs::remove_dir_all(db);
1342    }
1343
1344    #[test]
1345    fn test_memory_root_set_merge_from_source_garbage() {
1346        let zeros: Word = [0 as u8; SIZE];
1347        let handle: Word = [0 as u8; SIZE];
1348        let target = MemoryPersistor::new();
1349        let source = MemoryPersistor::new();
1350
1351        let leaf_0 = target.leaf_set(vec![0]).expect("Failed to set target leaf 0");
1352        let leaf_1 = target.leaf_set(vec![1]).expect("Failed to set target leaf 1");
1353        let leaf_2 = target.leaf_set(vec![2]).expect("Failed to set target leaf 2");
1354        let stump_0 = target
1355            .stump_set([0 as u8; SIZE])
1356            .expect("Failed to set target stump 0");
1357        let branch_a = target
1358            .branch_set(leaf_0, leaf_1, zeros)
1359            .expect("Failed to set target branch A");
1360        let branch_b = target
1361            .branch_set(branch_a, leaf_2, zeros)
1362            .expect("Failed to set target branch B");
1363        let branch_c = target
1364            .branch_set(branch_b, stump_0, zeros)
1365            .expect("Failed to set target branch C");
1366        target
1367            .root_new(handle, branch_c)
1368            .expect("Failed to create target root");
1369
1370        let leaf_2_source = source.leaf_set(vec![2]).expect("Failed to set source leaf 2");
1371        let leaf_3_source = source.leaf_set(vec![3]).expect("Failed to set source leaf 3");
1372        let branch_d = source
1373            .branch_set(leaf_2_source, leaf_3_source, zeros)
1374            .expect("Failed to set source branch D");
1375
1376        target
1377            .root_set(handle, branch_c, branch_d, &source)
1378            .expect("Failed to merge source root into target");
1379
1380        assert!(target.roots.read().expect("Failed to lock roots").len() == 1);
1381        assert!(target.branches.read().expect("Failed to lock branches").len() == 1);
1382        assert!(target.leaves.read().expect("Failed to lock leaves").len() == 2);
1383        assert!(target.stumps.read().expect("Failed to lock stumps").len() == 0);
1384        assert!(target.references.read().expect("Failed to lock references").len() == 3);
1385    }
1386}