summary refs log tree commit diff
path: root/ripple/fossil/src
diff options
context:
space:
mode:
Diffstat (limited to 'ripple/fossil/src')
-rw-r--r--ripple/fossil/src/lib.rs167
1 files changed, 103 insertions, 64 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs
index 8858196..753c3d6 100644
--- a/ripple/fossil/src/lib.rs
+++ b/ripple/fossil/src/lib.rs
@@ -2,14 +2,16 @@
 // SPDX-License-Identifier: OSL-3.0
 
 use {
-	anyhow::Result,
+	anyhow::{Context, Result},
 	byteorder::{BigEndian, ByteOrder},
 	prost::Message,
+	sled::{transaction::ConflictableTransactionError, Transactional},
 	std::{
+		cell::{Cell, RefCell},
 		collections::BTreeMap,
 		fs,
-		io::{self, BufRead, Read, Write},
-		os::unix::fs::PermissionsExt,
+		io::{self, Read, Write},
+		os::unix::{fs::PermissionsExt, prelude::FileExt},
 		path::Path,
 	},
 };
@@ -20,16 +22,40 @@ pub mod store {
 
 const CHUNK_BYTES: usize = 0x400;
 const DIGEST_BYTES: usize = blake3::OUT_LEN;
-const OFFSET_BYTES: usize = 4;
 
 pub struct Store {
 	db: sled::Db,
+	meta: sled::Tree,
+	chunks: RefCell<fs::File>,
+	chunks_tail: Cell<u64>,
 }
 
 impl Store {
 	pub fn open(path: impl AsRef<Path>) -> Result<Store> {
+		let path = path.as_ref();
+
 		let db = sled::open(path)?;
-		Ok(Store { db })
+		let meta = db.open_tree("meta")?;
+
+		let chunks = fs::OpenOptions::new()
+			.read(true)
+			.append(true)
+			.create(true)
+			.open(path.join("chunks"))?;
+
+		let chunks_tail = meta
+			.get("chunks_tail")?
+			.map(|v| BigEndian::read_u64(&v))
+			.unwrap_or_default();
+
+		chunks.set_len(chunks_tail)?;
+
+		Ok(Store {
+			db,
+			meta,
+			chunks: RefCell::new(chunks),
+			chunks_tail: Cell::new(chunks_tail),
+		})
 	}
 
 	pub fn add_path(&self, path: impl AsRef<Path>) -> Node {
@@ -88,18 +114,47 @@ impl Store {
 			h.finalize()
 		};
 
+		if self.db.contains_key(&*ident.as_bytes()).unwrap() {
+			// key already exists
+			return ident;
+		}
+
+		let mut chunks_file = self.chunks.borrow_mut();
+		let offset = self.chunks_tail.get();
+
+		chunks_file.write_all(data).unwrap();
+		let chunks_tail = offset + data.len() as u64;
+
 		// TODO(edef): maybe don't use the default tree?
 		// we should probably have a "blob" tree,
 		// and reserve the default tree for DB metadata
 
-		self.db
-			.transaction::<_, _, sled::Error>(|db| {
-				for (n, chunk) in data.chunks(CHUNK_BYTES).enumerate() {
-					db.insert(chunk_key(&ident, n as u32).as_slice(), chunk)?;
-				}
-				Ok(())
+		let slice = Slice {
+			offset,
+			length: data.len() as u64,
+		};
+
+		let slice_buf = {
+			let mut buf = [0u8; 16];
+			BigEndian::write_u64_into(&[slice.offset, slice.length], &mut buf);
+			buf
+		};
+
+		let chunks_tail_buf = {
+			let mut buf = [0u8; 8];
+			BigEndian::write_u64(&mut buf, chunks_tail);
+			buf
+		};
+
+		// TODO(edef): figure out fsync for durability
+		(&*self.db, &self.meta)
+			.transaction(|(db, meta)| {
+				db.insert(&*ident.as_bytes(), &slice_buf)?;
+				meta.insert("chunks_tail", &chunks_tail_buf)?;
+				Ok::<_, ConflictableTransactionError>(())
 			})
 			.unwrap();
+		self.chunks_tail.set(chunks_tail);
 
 		ident.into()
 	}
@@ -116,9 +171,6 @@ impl Store {
 		};
 
 		if computed_ident != ident {
-			if buffer.is_empty() {
-				panic!("blob not found");
-			}
 			panic!("hash mismatch");
 		}
 
@@ -126,71 +178,58 @@ impl Store {
 	}
 
 	fn raw_blob(&self, ident: Digest) -> RawBlob<'_> {
+		let slice_buf = self
+			.db
+			.get(&*ident.as_bytes())
+			.unwrap()
+			.expect("blob not found");
+
+		let slice = Slice {
+			offset: BigEndian::read_u64(&slice_buf[0..]),
+			length: BigEndian::read_u64(&slice_buf[8..]),
+		};
+
 		RawBlob {
 			store: self,
-			ident,
-			buf: None,
-			off: 0,
+			slice,
+			position: 0,
 		}
 	}
 }
 
-fn chunk_key(ident: &Digest, chunk: u32) -> [u8; DIGEST_BYTES + OFFSET_BYTES] {
-	let mut key = [0u8; DIGEST_BYTES + OFFSET_BYTES];
-	key[..DIGEST_BYTES].copy_from_slice(ident.as_bytes());
-	BigEndian::write_u32(&mut key[DIGEST_BYTES..], chunk as u32);
-	key
-}
-
-fn chunk_id(offset: u64) -> u32 {
-	(offset / CHUNK_BYTES as u64).try_into().unwrap()
+/// a slice in the chunks file
+#[derive(Debug)]
+struct Slice {
+	offset: u64,
+	length: u64,
 }
 
 struct RawBlob<'a> {
 	store: &'a Store,
-	ident: Digest,
-	/// current chunk
-	buf: Option<sled::IVec>,
-	/// reader offset
-	/// LSBs are intra-chunk, MSBs are chunk number
-	off: u64,
+	slice: Slice,
+	position: u64,
 }
 
-impl io::BufRead for RawBlob<'_> {
-	fn fill_buf(&mut self) -> io::Result<&[u8]> {
-		let buf = match self.buf {
-			Some(ref buf) => buf,
-			None => {
-				let chunk = chunk_id(self.off);
-				match self.store.db.get(chunk_key(&self.ident, chunk))? {
-					None => return Ok(&[]),
-					Some(contents) => self.buf.insert(contents),
-				}
-			}
-		};
-
-		let off = (self.off % CHUNK_BYTES as u64) as usize;
-		Ok(buf.get(off..).unwrap_or_default())
-	}
-
-	fn consume(&mut self, amt: usize) {
-		let prev_offset = self.off;
-		let next_offset = self.off.saturating_add(amt as u64);
+impl io::Read for RawBlob<'_> {
+	fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+		let prev_pos = self.position;
+		let next_pos = Ord::min(
+			self.position.saturating_add(dst.len() as u64),
+			self.slice.length,
+		);
 
-		if chunk_id(next_offset) != chunk_id(prev_offset) {
-			self.buf.take();
-		}
+		let len = (next_pos - prev_pos) as usize;
+		let dst = &mut dst[..len];
 
-		self.off = next_offset;
-	}
-}
+		let offset = self.slice.offset + prev_pos;
+		self.store
+			.chunks
+			.borrow()
+			.read_exact_at(dst, offset)
+			.context("Couldn't read blob data")
+			.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
 
-impl io::Read for RawBlob<'_> {
-	fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
-		let src = self.fill_buf()?;
-		let len = Ord::min(src.len(), dst.len());
-		dst[..len].copy_from_slice(&src[..len]);
-		self.consume(len);
+		self.position = next_pos;
 		Ok(len)
 	}
 }