diff options
author | edef <edef@unfathomable.blue> | 2023-02-18 19:22:56 +0000 |
---|---|---|
committer | edef <edef@unfathomable.blue> | 2023-02-18 19:22:56 +0000 |
commit | 628f37139ed778616d2487e9890cddb424ed43de (patch) | |
tree | 3d66ac8e05231bc5c023307158b9345056e88052 | |
parent | 0d4906cbabb183caa96e763671810fb39bd0c935 (diff) | |
download | unf-legacy-628f37139ed778616d2487e9890cddb424ed43de.tar.zst |
WIP robust durability wip/durability
We implement an undo journal for blob and chunk writes, and run fsyncs on a dedicated thread. Change-Id: Ic68fdb5652211eaedfe6177656947f8e6d1230e3
-rw-r--r-- | ripple/fossil/src/lib.rs | 111 | ||||
-rw-r--r-- | ripple/fossil/src/transactor.rs | 127 |
2 files changed, 222 insertions, 16 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs index 6420437..af931b0 100644 --- a/ripple/fossil/src/lib.rs +++ b/ripple/fossil/src/lib.rs @@ -3,6 +3,7 @@ pub use crate::chunker::Chunker; use { + crate::transactor::Transactor, anyhow::{anyhow, bail, Context, Result}, byteorder::{BigEndian, ByteOrder}, prost::Message, @@ -15,6 +16,7 @@ use { os::unix::prelude::*, path::Path, str, + sync::Arc, }, }; @@ -23,15 +25,19 @@ pub mod store { } mod chunker; +mod transactor; const DIGEST_BYTES: usize = blake3::OUT_LEN; pub struct Store { - meta: sled::Tree, blobs: sled::Tree, + blobs_dirty: sled::Tree, chunks: sled::Tree, + chunks_dirty: sled::Tree, chunks_file: RefCell<fs::File>, chunks_tail: Cell<u64>, + transactor: Arc<Transactor>, + last_tx_id: Cell<u64>, } impl Store { @@ -41,7 +47,9 @@ impl Store { let db = sled::open(path)?; let meta = (&*db).clone(); let blobs = db.open_tree("blobs")?; + let blobs_dirty = db.open_tree("blobs_dirty")?; let chunks = db.open_tree("chunks")?; + let chunks_dirty = db.open_tree("chunks_dirty")?; let chunks_file = fs::OpenOptions::new() .read(true) @@ -54,25 +62,79 @@ impl Store { .map(|v| BigEndian::read_u64(&v)) .unwrap_or_default(); - chunks_file.set_len(chunks_tail)?; + // clear out dirty state after an unclean shutdown + { + chunks_file.set_len(chunks_tail)?; + + for blob in &blobs_dirty { + let (tx, blob) = blob?; + blobs.remove(blob)?; + blobs_dirty.remove(tx)?; + } + + for chunk in &chunks_dirty { + let (tx, chunk) = chunk?; + chunks.remove(chunk)?; + chunks_dirty.remove(tx)?; + } + } + + let transactor = { + let chunks_file = chunks_file.try_clone()?; + + let blobs_dirty = blobs_dirty.clone(); + let chunks_dirty = chunks_dirty.clone(); + + Transactor::new(chunks_tail, move |state| { + chunks_file.sync_all().unwrap(); + + let written_tx = state.written_tx.to_be_bytes(); + + let mut blobs_batch = sled::Batch::default(); + for tx in blobs_dirty.range(..=written_tx) { + let (tx, _) = tx.unwrap(); + blobs_batch.remove(tx); + } + + let mut chunks_batch = sled::Batch::default(); + for tx in chunks_dirty.range(..=written_tx) { + let (tx, _) = tx.unwrap(); + chunks_batch.remove(tx); + } + + (&blobs_dirty, &chunks_dirty, &meta) + .transaction(|(blobs_dirty, chunks_dirty, meta)| { + blobs_dirty.apply_batch(&blobs_batch)?; + chunks_dirty.apply_batch(&chunks_batch)?; + meta.insert("chunks_tail", &state.written_tail.to_be_bytes())?; + Ok::<_, ConflictableTransactionError>(()) + }) + .unwrap(); + + db.flush().unwrap(); + }) + }; Ok(Store { blobs, - meta, + blobs_dirty, chunks, + chunks_dirty, chunks_file: RefCell::new(chunks_file), chunks_tail: Cell::new(chunks_tail), + transactor, + last_tx_id: Cell::new(0), }) } + fn next_tx_id(&self) -> u64 { + let id = self.last_tx_id.get() + 1; + self.last_tx_id.set(id); + id + } + fn flush(&self) { - // NOTE: sled *can* flush without us explicitly asking for it, so it's - // possible for the store to end up containing pointers to chunks that - // aren't fsynced yet. The easiest fix is to always `chunks_file.sync_data()` - // before we write anything to the database, but that's kind of a performance hazard. - // TODO(edef): keep pending and known-durable blobs/chunks separate in the database - self.chunks_file.borrow_mut().sync_data().unwrap(); - self.meta.flush().unwrap(); + self.transactor.wait(self.last_tx_id.get()); } pub fn add_git_tree(&self, repo: &git2::Repository, tree: git2::Oid) -> DirectoryRef { @@ -209,11 +271,19 @@ impl Store { fn write_blob_inner(&self, ident: &Digest, outboard: Vec<u8>, data: &[u8]) { let mut chunks_file = self.chunks_file.borrow_mut(); let mut offset = self.chunks_tail.get(); + let mut batch = sled::Batch::default(); + let mut batch_dirty = sled::Batch::default(); let chunks = Chunker::from(data) .map(|chunk_data| { - self.write_chunk(&mut chunks_file, &mut offset, &mut batch, chunk_data) + self.write_chunk( + &mut chunks_file, + &mut offset, + &mut batch, + &mut batch_dirty, + chunk_data, + ) }) .collect::<Vec<_>>(); @@ -223,17 +293,24 @@ impl Store { } .encode_to_vec(); - let chunks_tail_buf = offset.to_be_bytes(); - - (&self.blobs, &self.chunks, &self.meta) - .transaction(|(blobs, chunks, meta)| { + ( + &self.blobs, + &self.blobs_dirty, + &self.chunks, + &self.chunks_dirty, + ) + .transaction(|(blobs, blobs_dirty, chunks, chunks_dirty)| { chunks.apply_batch(&batch)?; + chunks_dirty.apply_batch(&batch_dirty)?; blobs.insert(&*ident.as_bytes(), &*blob_buf)?; - meta.insert("chunks_tail", &chunks_tail_buf)?; + blobs_dirty.insert(&self.next_tx_id().to_be_bytes(), &*ident.as_bytes())?; Ok::<_, ConflictableTransactionError>(()) }) .unwrap(); + self.chunks_tail.set(offset); + self.transactor + .write(self.last_tx_id.get(), self.chunks_tail.get()); } fn write_chunk( @@ -241,6 +318,7 @@ impl Store { chunks_file: &mut fs::File, offset: &mut u64, batch: &mut sled::Batch, + batch_dirty: &mut sled::Batch, data: &[u8], ) -> store::Chunk { let ident = blake3::hash(data); @@ -256,6 +334,7 @@ impl Store { *offset += data.len() as u64; batch.insert(ident.as_bytes(), chunk.encode_to_vec()); + batch_dirty.insert(&self.next_tx_id().to_be_bytes(), ident.as_bytes()); chunk } diff --git a/ripple/fossil/src/transactor.rs b/ripple/fossil/src/transactor.rs new file mode 100644 index 0000000..f13e63b --- /dev/null +++ b/ripple/fossil/src/transactor.rs @@ -0,0 +1,127 @@ +// SPDX-FileCopyrightText: edef <edef@unfathomable.blue> +// SPDX-License-Identifier: OSL-3.0 + +use std::{ + sync::{Arc, Condvar, Mutex}, + thread::{self, JoinHandle}, +}; + +#[derive(Debug)] +pub struct Transactor { + link: Arc<Link>, + syncer: Option<JoinHandle<()>>, +} + +impl Transactor { + pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc<Self> { + let link = Link::new(tail); + + let syncer = { + let link = link.clone(); + thread::spawn(move || link.run(f)) + }; + + Arc::new(Self { + link, + syncer: Some(syncer), + }) + } + + pub fn write(self: &Arc<Self>, tx: u64, tail: u64) { + self.link.write(tx, tail) + } + + pub fn wait(self: &Arc<Self>, tx: u64) { + self.link.wait(tx) + } +} + +impl Drop for Transactor { + fn drop(&mut self) { + if let Some(syncer) = self.syncer.take() { + self.link.close(); + syncer.join().unwrap(); + } + } +} + +#[derive(Debug)] +struct Link { + state: Mutex<State>, + written: Condvar, + synced: Condvar, +} + +#[derive(Debug, Clone)] +pub struct State { + pub synced_tx: u64, + pub written_tx: u64, + pub written_tail: u64, +} + +impl Link { + fn new(tail: u64) -> Arc<Self> { + Arc::new(Self { + state: Mutex::new(State { + synced_tx: 0, + written_tx: 0, + written_tail: tail, + }), + written: Condvar::new(), + synced: Condvar::new(), + }) + } + + fn write(self: &Arc<Self>, tx: u64, tail: u64) { + { + let mut state = self.state.lock().unwrap(); + assert!(state.written_tx < !0, "already closed"); + assert!(state.written_tx < tx, "duplicate transaction ID"); + + state.written_tx = tx; + state.written_tail = tail; + } + + self.written.notify_one(); + } + + fn close(self: &Arc<Self>) { + { + let mut state = self.state.lock().unwrap(); + state.written_tx = !0; + } + + self.written.notify_one(); + } + + fn wait(self: &Arc<Self>, tx: u64) { + let state = self.state.lock().unwrap(); + + drop( + self.synced + .wait_while(state, |state| state.synced_tx < tx) + .unwrap(), + ); + } + + fn run(self: &Arc<Self>, mut f: impl FnMut(&State)) { + let mut state = self.state.lock().unwrap(); + + while state.synced_tx < !0 { + state = self + .written + .wait_while(state, |state| state.synced_tx == state.written_tx) + .unwrap(); + + let state_seen = state.clone(); + drop(state); + + f(&state_seen); + + state = self.state.lock().unwrap(); + state.synced_tx = state_seen.written_tx; + + self.synced.notify_all(); + } + } +} |