diff options
Diffstat (limited to 'ripple')
-rw-r--r-- | ripple/fossil/src/lib.rs | 167 |
1 files changed, 103 insertions, 64 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs index 8858196..753c3d6 100644 --- a/ripple/fossil/src/lib.rs +++ b/ripple/fossil/src/lib.rs @@ -2,14 +2,16 @@ // SPDX-License-Identifier: OSL-3.0 use { - anyhow::Result, + anyhow::{Context, Result}, byteorder::{BigEndian, ByteOrder}, prost::Message, + sled::{transaction::ConflictableTransactionError, Transactional}, std::{ + cell::{Cell, RefCell}, collections::BTreeMap, fs, - io::{self, BufRead, Read, Write}, - os::unix::fs::PermissionsExt, + io::{self, Read, Write}, + os::unix::{fs::PermissionsExt, prelude::FileExt}, path::Path, }, }; @@ -20,16 +22,40 @@ pub mod store { const CHUNK_BYTES: usize = 0x400; const DIGEST_BYTES: usize = blake3::OUT_LEN; -const OFFSET_BYTES: usize = 4; pub struct Store { db: sled::Db, + meta: sled::Tree, + chunks: RefCell<fs::File>, + chunks_tail: Cell<u64>, } impl Store { pub fn open(path: impl AsRef<Path>) -> Result<Store> { + let path = path.as_ref(); + let db = sled::open(path)?; - Ok(Store { db }) + let meta = db.open_tree("meta")?; + + let chunks = fs::OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(path.join("chunks"))?; + + let chunks_tail = meta + .get("chunks_tail")? + .map(|v| BigEndian::read_u64(&v)) + .unwrap_or_default(); + + chunks.set_len(chunks_tail)?; + + Ok(Store { + db, + meta, + chunks: RefCell::new(chunks), + chunks_tail: Cell::new(chunks_tail), + }) } pub fn add_path(&self, path: impl AsRef<Path>) -> Node { @@ -88,18 +114,47 @@ impl Store { h.finalize() }; + if self.db.contains_key(&*ident.as_bytes()).unwrap() { + // key already exists + return ident; + } + + let mut chunks_file = self.chunks.borrow_mut(); + let offset = self.chunks_tail.get(); + + chunks_file.write_all(data).unwrap(); + let chunks_tail = offset + data.len() as u64; + // TODO(edef): maybe don't use the default tree? // we should probably have a "blob" tree, // and reserve the default tree for DB metadata - self.db - .transaction::<_, _, sled::Error>(|db| { - for (n, chunk) in data.chunks(CHUNK_BYTES).enumerate() { - db.insert(chunk_key(&ident, n as u32).as_slice(), chunk)?; - } - Ok(()) + let slice = Slice { + offset, + length: data.len() as u64, + }; + + let slice_buf = { + let mut buf = [0u8; 16]; + BigEndian::write_u64_into(&[slice.offset, slice.length], &mut buf); + buf + }; + + let chunks_tail_buf = { + let mut buf = [0u8; 8]; + BigEndian::write_u64(&mut buf, chunks_tail); + buf + }; + + // TODO(edef): figure out fsync for durability + (&*self.db, &self.meta) + .transaction(|(db, meta)| { + db.insert(&*ident.as_bytes(), &slice_buf)?; + meta.insert("chunks_tail", &chunks_tail_buf)?; + Ok::<_, ConflictableTransactionError>(()) }) .unwrap(); + self.chunks_tail.set(chunks_tail); ident.into() } @@ -116,9 +171,6 @@ impl Store { }; if computed_ident != ident { - if buffer.is_empty() { - panic!("blob not found"); - } panic!("hash mismatch"); } @@ -126,71 +178,58 @@ impl Store { } fn raw_blob(&self, ident: Digest) -> RawBlob<'_> { + let slice_buf = self + .db + .get(&*ident.as_bytes()) + .unwrap() + .expect("blob not found"); + + let slice = Slice { + offset: BigEndian::read_u64(&slice_buf[0..]), + length: BigEndian::read_u64(&slice_buf[8..]), + }; + RawBlob { store: self, - ident, - buf: None, - off: 0, + slice, + position: 0, } } } -fn chunk_key(ident: &Digest, chunk: u32) -> [u8; DIGEST_BYTES + OFFSET_BYTES] { - let mut key = [0u8; DIGEST_BYTES + OFFSET_BYTES]; - key[..DIGEST_BYTES].copy_from_slice(ident.as_bytes()); - BigEndian::write_u32(&mut key[DIGEST_BYTES..], chunk as u32); - key -} - -fn chunk_id(offset: u64) -> u32 { - (offset / CHUNK_BYTES as u64).try_into().unwrap() +/// a slice in the chunks file +#[derive(Debug)] +struct Slice { + offset: u64, + length: u64, } struct RawBlob<'a> { store: &'a Store, - ident: Digest, - /// current chunk - buf: Option<sled::IVec>, - /// reader offset - /// LSBs are intra-chunk, MSBs are chunk number - off: u64, + slice: Slice, + position: u64, } -impl io::BufRead for RawBlob<'_> { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - let buf = match self.buf { - Some(ref buf) => buf, - None => { - let chunk = chunk_id(self.off); - match self.store.db.get(chunk_key(&self.ident, chunk))? { - None => return Ok(&[]), - Some(contents) => self.buf.insert(contents), - } - } - }; - - let off = (self.off % CHUNK_BYTES as u64) as usize; - Ok(buf.get(off..).unwrap_or_default()) - } - - fn consume(&mut self, amt: usize) { - let prev_offset = self.off; - let next_offset = self.off.saturating_add(amt as u64); +impl io::Read for RawBlob<'_> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + let prev_pos = self.position; + let next_pos = Ord::min( + self.position.saturating_add(dst.len() as u64), + self.slice.length, + ); - if chunk_id(next_offset) != chunk_id(prev_offset) { - self.buf.take(); - } + let len = (next_pos - prev_pos) as usize; + let dst = &mut dst[..len]; - self.off = next_offset; - } -} + let offset = self.slice.offset + prev_pos; + self.store + .chunks + .borrow() + .read_exact_at(dst, offset) + .context("Couldn't read blob data") + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; -impl io::Read for RawBlob<'_> { - fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { - let src = self.fill_buf()?; - let len = Ord::min(src.len(), dst.len()); - dst[..len].copy_from_slice(&src[..len]); - self.consume(len); + self.position = next_pos; Ok(len) } } |