summary refs log tree commit diff
path: root/ripple/fossil
diff options
context:
space:
mode:
Diffstat (limited to 'ripple/fossil')
-rw-r--r--ripple/fossil/src/lib.rs111
-rw-r--r--ripple/fossil/src/transactor.rs127
2 files changed, 222 insertions, 16 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs
index 6420437..af931b0 100644
--- a/ripple/fossil/src/lib.rs
+++ b/ripple/fossil/src/lib.rs
@@ -3,6 +3,7 @@
 
 pub use crate::chunker::Chunker;
 use {
+	crate::transactor::Transactor,
 	anyhow::{anyhow, bail, Context, Result},
 	byteorder::{BigEndian, ByteOrder},
 	prost::Message,
@@ -15,6 +16,7 @@ use {
 		os::unix::prelude::*,
 		path::Path,
 		str,
+		sync::Arc,
 	},
 };
 
@@ -23,15 +25,19 @@ pub mod store {
 }
 
 mod chunker;
+mod transactor;
 
 const DIGEST_BYTES: usize = blake3::OUT_LEN;
 
 pub struct Store {
-	meta: sled::Tree,
 	blobs: sled::Tree,
+	blobs_dirty: sled::Tree,
 	chunks: sled::Tree,
+	chunks_dirty: sled::Tree,
 	chunks_file: RefCell<fs::File>,
 	chunks_tail: Cell<u64>,
+	transactor: Arc<Transactor>,
+	last_tx_id: Cell<u64>,
 }
 
 impl Store {
@@ -41,7 +47,9 @@ impl Store {
 		let db = sled::open(path)?;
 		let meta = (&*db).clone();
 		let blobs = db.open_tree("blobs")?;
+		let blobs_dirty = db.open_tree("blobs_dirty")?;
 		let chunks = db.open_tree("chunks")?;
+		let chunks_dirty = db.open_tree("chunks_dirty")?;
 
 		let chunks_file = fs::OpenOptions::new()
 			.read(true)
@@ -54,25 +62,79 @@ impl Store {
 			.map(|v| BigEndian::read_u64(&v))
 			.unwrap_or_default();
 
-		chunks_file.set_len(chunks_tail)?;
+		// clear out dirty state after an unclean shutdown
+		{
+			chunks_file.set_len(chunks_tail)?;
+
+			for blob in &blobs_dirty {
+				let (tx, blob) = blob?;
+				blobs.remove(blob)?;
+				blobs_dirty.remove(tx)?;
+			}
+
+			for chunk in &chunks_dirty {
+				let (tx, chunk) = chunk?;
+				chunks.remove(chunk)?;
+				chunks_dirty.remove(tx)?;
+			}
+		}
+
+		let transactor = {
+			let chunks_file = chunks_file.try_clone()?;
+
+			let blobs_dirty = blobs_dirty.clone();
+			let chunks_dirty = chunks_dirty.clone();
+
+			Transactor::new(chunks_tail, move |state| {
+				chunks_file.sync_all().unwrap();
+
+				let written_tx = state.written_tx.to_be_bytes();
+
+				let mut blobs_batch = sled::Batch::default();
+				for tx in blobs_dirty.range(..=written_tx) {
+					let (tx, _) = tx.unwrap();
+					blobs_batch.remove(tx);
+				}
+
+				let mut chunks_batch = sled::Batch::default();
+				for tx in chunks_dirty.range(..=written_tx) {
+					let (tx, _) = tx.unwrap();
+					chunks_batch.remove(tx);
+				}
+
+				(&blobs_dirty, &chunks_dirty, &meta)
+					.transaction(|(blobs_dirty, chunks_dirty, meta)| {
+						blobs_dirty.apply_batch(&blobs_batch)?;
+						chunks_dirty.apply_batch(&chunks_batch)?;
+						meta.insert("chunks_tail", &state.written_tail.to_be_bytes())?;
+						Ok::<_, ConflictableTransactionError>(())
+					})
+					.unwrap();
+
+				db.flush().unwrap();
+			})
+		};
 
 		Ok(Store {
 			blobs,
-			meta,
+			blobs_dirty,
 			chunks,
+			chunks_dirty,
 			chunks_file: RefCell::new(chunks_file),
 			chunks_tail: Cell::new(chunks_tail),
+			transactor,
+			last_tx_id: Cell::new(0),
 		})
 	}
 
+	fn next_tx_id(&self) -> u64 {
+		let id = self.last_tx_id.get() + 1;
+		self.last_tx_id.set(id);
+		id
+	}
+
 	fn flush(&self) {
-		// NOTE: sled *can* flush without us explicitly asking for it, so it's
-		// possible for the store to end up containing pointers to chunks that
-		// aren't fsynced yet. The easiest fix is to always `chunks_file.sync_data()`
-		// before we write anything to the database, but that's kind of a performance hazard.
-		// TODO(edef): keep pending and known-durable blobs/chunks separate in the database
-		self.chunks_file.borrow_mut().sync_data().unwrap();
-		self.meta.flush().unwrap();
+		self.transactor.wait(self.last_tx_id.get());
 	}
 
 	pub fn add_git_tree(&self, repo: &git2::Repository, tree: git2::Oid) -> DirectoryRef {
@@ -209,11 +271,19 @@ impl Store {
 	fn write_blob_inner(&self, ident: &Digest, outboard: Vec<u8>, data: &[u8]) {
 		let mut chunks_file = self.chunks_file.borrow_mut();
 		let mut offset = self.chunks_tail.get();
+
 		let mut batch = sled::Batch::default();
+		let mut batch_dirty = sled::Batch::default();
 
 		let chunks = Chunker::from(data)
 			.map(|chunk_data| {
-				self.write_chunk(&mut chunks_file, &mut offset, &mut batch, chunk_data)
+				self.write_chunk(
+					&mut chunks_file,
+					&mut offset,
+					&mut batch,
+					&mut batch_dirty,
+					chunk_data,
+				)
 			})
 			.collect::<Vec<_>>();
 
@@ -223,17 +293,24 @@ impl Store {
 		}
 		.encode_to_vec();
 
-		let chunks_tail_buf = offset.to_be_bytes();
-
-		(&self.blobs, &self.chunks, &self.meta)
-			.transaction(|(blobs, chunks, meta)| {
+		(
+			&self.blobs,
+			&self.blobs_dirty,
+			&self.chunks,
+			&self.chunks_dirty,
+		)
+			.transaction(|(blobs, blobs_dirty, chunks, chunks_dirty)| {
 				chunks.apply_batch(&batch)?;
+				chunks_dirty.apply_batch(&batch_dirty)?;
 				blobs.insert(&*ident.as_bytes(), &*blob_buf)?;
-				meta.insert("chunks_tail", &chunks_tail_buf)?;
+				blobs_dirty.insert(&self.next_tx_id().to_be_bytes(), &*ident.as_bytes())?;
 				Ok::<_, ConflictableTransactionError>(())
 			})
 			.unwrap();
+
 		self.chunks_tail.set(offset);
+		self.transactor
+			.write(self.last_tx_id.get(), self.chunks_tail.get());
 	}
 
 	fn write_chunk(
@@ -241,6 +318,7 @@ impl Store {
 		chunks_file: &mut fs::File,
 		offset: &mut u64,
 		batch: &mut sled::Batch,
+		batch_dirty: &mut sled::Batch,
 		data: &[u8],
 	) -> store::Chunk {
 		let ident = blake3::hash(data);
@@ -256,6 +334,7 @@ impl Store {
 		*offset += data.len() as u64;
 
 		batch.insert(ident.as_bytes(), chunk.encode_to_vec());
+		batch_dirty.insert(&self.next_tx_id().to_be_bytes(), ident.as_bytes());
 		chunk
 	}
 
diff --git a/ripple/fossil/src/transactor.rs b/ripple/fossil/src/transactor.rs
new file mode 100644
index 0000000..f13e63b
--- /dev/null
+++ b/ripple/fossil/src/transactor.rs
@@ -0,0 +1,127 @@
+// SPDX-FileCopyrightText: edef <edef@unfathomable.blue>
+// SPDX-License-Identifier: OSL-3.0
+
+use std::{
+	sync::{Arc, Condvar, Mutex},
+	thread::{self, JoinHandle},
+};
+
+#[derive(Debug)]
+pub struct Transactor {
+	link: Arc<Link>,
+	syncer: Option<JoinHandle<()>>,
+}
+
+impl Transactor {
+	pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc<Self> {
+		let link = Link::new(tail);
+
+		let syncer = {
+			let link = link.clone();
+			thread::spawn(move || link.run(f))
+		};
+
+		Arc::new(Self {
+			link,
+			syncer: Some(syncer),
+		})
+	}
+
+	pub fn write(self: &Arc<Self>, tx: u64, tail: u64) {
+		self.link.write(tx, tail)
+	}
+
+	pub fn wait(self: &Arc<Self>, tx: u64) {
+		self.link.wait(tx)
+	}
+}
+
+impl Drop for Transactor {
+	fn drop(&mut self) {
+		if let Some(syncer) = self.syncer.take() {
+			self.link.close();
+			syncer.join().unwrap();
+		}
+	}
+}
+
+#[derive(Debug)]
+struct Link {
+	state: Mutex<State>,
+	written: Condvar,
+	synced: Condvar,
+}
+
+#[derive(Debug, Clone)]
+pub struct State {
+	pub synced_tx: u64,
+	pub written_tx: u64,
+	pub written_tail: u64,
+}
+
+impl Link {
+	fn new(tail: u64) -> Arc<Self> {
+		Arc::new(Self {
+			state: Mutex::new(State {
+				synced_tx: 0,
+				written_tx: 0,
+				written_tail: tail,
+			}),
+			written: Condvar::new(),
+			synced: Condvar::new(),
+		})
+	}
+
+	fn write(self: &Arc<Self>, tx: u64, tail: u64) {
+		{
+			let mut state = self.state.lock().unwrap();
+			assert!(state.written_tx < !0, "already closed");
+			assert!(state.written_tx < tx, "duplicate transaction ID");
+
+			state.written_tx = tx;
+			state.written_tail = tail;
+		}
+
+		self.written.notify_one();
+	}
+
+	fn close(self: &Arc<Self>) {
+		{
+			let mut state = self.state.lock().unwrap();
+			state.written_tx = !0;
+		}
+
+		self.written.notify_one();
+	}
+
+	fn wait(self: &Arc<Self>, tx: u64) {
+		let state = self.state.lock().unwrap();
+
+		drop(
+			self.synced
+				.wait_while(state, |state| state.synced_tx < tx)
+				.unwrap(),
+		);
+	}
+
+	fn run(self: &Arc<Self>, mut f: impl FnMut(&State)) {
+		let mut state = self.state.lock().unwrap();
+
+		while state.synced_tx < !0 {
+			state = self
+				.written
+				.wait_while(state, |state| state.synced_tx == state.written_tx)
+				.unwrap();
+
+			let state_seen = state.clone();
+			drop(state);
+
+			f(&state_seen);
+
+			state = self.state.lock().unwrap();
+			state.synced_tx = state_seen.written_tx;
+
+			self.synced.notify_all();
+		}
+	}
+}