diff options
author | edef <edef@unfathomable.blue> | 2023-02-18 19:22:56 +0000 |
---|---|---|
committer | edef <edef@unfathomable.blue> | 2023-02-18 19:22:56 +0000 |
commit | 628f37139ed778616d2487e9890cddb424ed43de (patch) | |
tree | 3d66ac8e05231bc5c023307158b9345056e88052 /ripple/fossil/src/transactor.rs | |
parent | 0d4906cbabb183caa96e763671810fb39bd0c935 (diff) | |
download | unf-legacy-wip/durability.tar.zst |
WIP robust durability wip/durability
We implement an undo journal for blob and chunk writes, and run fsyncs on a dedicated thread. Change-Id: Ic68fdb5652211eaedfe6177656947f8e6d1230e3
Diffstat (limited to 'ripple/fossil/src/transactor.rs')
-rw-r--r-- | ripple/fossil/src/transactor.rs | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/ripple/fossil/src/transactor.rs b/ripple/fossil/src/transactor.rs new file mode 100644 index 0000000..f13e63b --- /dev/null +++ b/ripple/fossil/src/transactor.rs @@ -0,0 +1,127 @@ +// SPDX-FileCopyrightText: edef <edef@unfathomable.blue> +// SPDX-License-Identifier: OSL-3.0 + +use std::{ + sync::{Arc, Condvar, Mutex}, + thread::{self, JoinHandle}, +}; + +#[derive(Debug)] +pub struct Transactor { + link: Arc<Link>, + syncer: Option<JoinHandle<()>>, +} + +impl Transactor { + pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc<Self> { + let link = Link::new(tail); + + let syncer = { + let link = link.clone(); + thread::spawn(move || link.run(f)) + }; + + Arc::new(Self { + link, + syncer: Some(syncer), + }) + } + + pub fn write(self: &Arc<Self>, tx: u64, tail: u64) { + self.link.write(tx, tail) + } + + pub fn wait(self: &Arc<Self>, tx: u64) { + self.link.wait(tx) + } +} + +impl Drop for Transactor { + fn drop(&mut self) { + if let Some(syncer) = self.syncer.take() { + self.link.close(); + syncer.join().unwrap(); + } + } +} + +#[derive(Debug)] +struct Link { + state: Mutex<State>, + written: Condvar, + synced: Condvar, +} + +#[derive(Debug, Clone)] +pub struct State { + pub synced_tx: u64, + pub written_tx: u64, + pub written_tail: u64, +} + +impl Link { + fn new(tail: u64) -> Arc<Self> { + Arc::new(Self { + state: Mutex::new(State { + synced_tx: 0, + written_tx: 0, + written_tail: tail, + }), + written: Condvar::new(), + synced: Condvar::new(), + }) + } + + fn write(self: &Arc<Self>, tx: u64, tail: u64) { + { + let mut state = self.state.lock().unwrap(); + assert!(state.written_tx < !0, "already closed"); + assert!(state.written_tx < tx, "duplicate transaction ID"); + + state.written_tx = tx; + state.written_tail = tail; + } + + self.written.notify_one(); + } + + fn close(self: &Arc<Self>) { + { + let mut state = self.state.lock().unwrap(); + state.written_tx = !0; + } + + self.written.notify_one(); + } + + fn wait(self: &Arc<Self>, tx: u64) { + let state = self.state.lock().unwrap(); + + drop( + self.synced + .wait_while(state, |state| state.synced_tx < tx) + .unwrap(), + ); + } + + fn run(self: &Arc<Self>, mut f: impl FnMut(&State)) { + let mut state = self.state.lock().unwrap(); + + while state.synced_tx < !0 { + state = self + .written + .wait_while(state, |state| state.synced_tx == state.written_tx) + .unwrap(); + + let state_seen = state.clone(); + drop(state); + + f(&state_seen); + + state = self.state.lock().unwrap(); + state.synced_tx = state_seen.written_tx; + + self.synced.notify_all(); + } + } +} |