From 628f37139ed778616d2487e9890cddb424ed43de Mon Sep 17 00:00:00 2001 From: edef Date: Sat, 18 Feb 2023 19:22:56 +0000 Subject: WIP robust durability We implement an undo journal for blob and chunk writes, and run fsyncs on a dedicated thread. Change-Id: Ic68fdb5652211eaedfe6177656947f8e6d1230e3 --- ripple/fossil/src/transactor.rs | 127 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 ripple/fossil/src/transactor.rs (limited to 'ripple/fossil/src/transactor.rs') diff --git a/ripple/fossil/src/transactor.rs b/ripple/fossil/src/transactor.rs new file mode 100644 index 0000000..f13e63b --- /dev/null +++ b/ripple/fossil/src/transactor.rs @@ -0,0 +1,127 @@ +// SPDX-FileCopyrightText: edef +// SPDX-License-Identifier: OSL-3.0 + +use std::{ + sync::{Arc, Condvar, Mutex}, + thread::{self, JoinHandle}, +}; + +#[derive(Debug)] +pub struct Transactor { + link: Arc, + syncer: Option>, +} + +impl Transactor { + pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc { + let link = Link::new(tail); + + let syncer = { + let link = link.clone(); + thread::spawn(move || link.run(f)) + }; + + Arc::new(Self { + link, + syncer: Some(syncer), + }) + } + + pub fn write(self: &Arc, tx: u64, tail: u64) { + self.link.write(tx, tail) + } + + pub fn wait(self: &Arc, tx: u64) { + self.link.wait(tx) + } +} + +impl Drop for Transactor { + fn drop(&mut self) { + if let Some(syncer) = self.syncer.take() { + self.link.close(); + syncer.join().unwrap(); + } + } +} + +#[derive(Debug)] +struct Link { + state: Mutex, + written: Condvar, + synced: Condvar, +} + +#[derive(Debug, Clone)] +pub struct State { + pub synced_tx: u64, + pub written_tx: u64, + pub written_tail: u64, +} + +impl Link { + fn new(tail: u64) -> Arc { + Arc::new(Self { + state: Mutex::new(State { + synced_tx: 0, + written_tx: 0, + written_tail: tail, + }), + written: Condvar::new(), + synced: Condvar::new(), + }) + } + + fn write(self: &Arc, tx: u64, tail: u64) { + { + let mut state = self.state.lock().unwrap(); + assert!(state.written_tx < !0, "already closed"); + assert!(state.written_tx < tx, "duplicate transaction ID"); + + state.written_tx = tx; + state.written_tail = tail; + } + + self.written.notify_one(); + } + + fn close(self: &Arc) { + { + let mut state = self.state.lock().unwrap(); + state.written_tx = !0; + } + + self.written.notify_one(); + } + + fn wait(self: &Arc, tx: u64) { + let state = self.state.lock().unwrap(); + + drop( + self.synced + .wait_while(state, |state| state.synced_tx < tx) + .unwrap(), + ); + } + + fn run(self: &Arc, mut f: impl FnMut(&State)) { + let mut state = self.state.lock().unwrap(); + + while state.synced_tx < !0 { + state = self + .written + .wait_while(state, |state| state.synced_tx == state.written_tx) + .unwrap(); + + let state_seen = state.clone(); + drop(state); + + f(&state_seen); + + state = self.state.lock().unwrap(); + state.synced_tx = state_seen.written_tx; + + self.synced.notify_all(); + } + } +} -- cgit 1.4.1