diff options
Diffstat (limited to 'ripple/fossil/src/lib.rs')
-rw-r--r-- | ripple/fossil/src/lib.rs | 112 |
1 files changed, 87 insertions, 25 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs index 3b2b3f8..4cea16a 100644 --- a/ripple/fossil/src/lib.rs +++ b/ripple/fossil/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: OSL-3.0 use { + crate::chunker::Chunker, anyhow::{Context, Result}, byteorder::{BigEndian, ByteOrder}, prost::Message, @@ -20,13 +21,15 @@ pub mod store { include!(concat!(env!("OUT_DIR"), "/fossil.store.rs")); } -const CHUNK_BYTES: usize = 0x400; +mod chunker; + const DIGEST_BYTES: usize = blake3::OUT_LEN; pub struct Store { meta: sled::Tree, blobs: sled::Tree, - chunks: RefCell<fs::File>, + chunks: sled::Tree, + chunks_file: RefCell<fs::File>, chunks_tail: Cell<u64>, } @@ -37,8 +40,9 @@ impl Store { let db = sled::open(path)?; let meta = (&*db).clone(); let blobs = db.open_tree("blobs")?; + let chunks = db.open_tree("chunks")?; - let chunks = fs::OpenOptions::new() + let chunks_file = fs::OpenOptions::new() .read(true) .append(true) .create(true) @@ -49,12 +53,13 @@ impl Store { .map(|v| BigEndian::read_u64(&v)) .unwrap_or_default(); - chunks.set_len(chunks_tail)?; + chunks_file.set_len(chunks_tail)?; Ok(Store { blobs, meta, - chunks: RefCell::new(chunks), + chunks, + chunks_file: RefCell::new(chunks_file), chunks_tail: Cell::new(chunks_tail), }) } @@ -124,34 +129,66 @@ impl Store { } fn write_blob_inner(&self, ident: &Digest, outboard: Vec<u8>, data: &[u8]) { - let mut chunks_file = self.chunks.borrow_mut(); - let offset = self.chunks_tail.get(); + let mut chunks_file = self.chunks_file.borrow_mut(); + let mut offset = self.chunks_tail.get(); + let mut batch = sled::Batch::default(); - chunks_file.write_all(data).unwrap(); - let chunks_tail = offset + data.len() as u64; + let chunks = Chunker::from(data) + .map(|chunk_data| { + self.write_chunk(&mut chunks_file, &mut offset, &mut batch, chunk_data) + }) + .collect::<Vec<_>>(); let blob_buf = store::Blob { - offset, - length: data.len() as u64, + chunks, bao_inline: outboard, } .encode_to_vec(); let chunks_tail_buf = { let mut buf = [0u8; 8]; - BigEndian::write_u64(&mut buf, chunks_tail); + BigEndian::write_u64(&mut buf, offset); buf }; // TODO(edef): figure out fsync for durability - (&self.blobs, &self.meta) - .transaction(|(blobs, meta)| { + (&self.blobs, &self.chunks, &self.meta) + .transaction(|(blobs, chunks, meta)| { + chunks.apply_batch(&batch)?; blobs.insert(&*ident.as_bytes(), &*blob_buf)?; meta.insert("chunks_tail", &chunks_tail_buf)?; Ok::<_, ConflictableTransactionError>(()) }) .unwrap(); - self.chunks_tail.set(chunks_tail); + self.chunks_tail.set(offset); + } + + fn write_chunk( + &self, + chunks_file: &mut fs::File, + offset: &mut u64, + batch: &mut sled::Batch, + data: &[u8], + ) -> store::Chunk { + let ident = blake3::hash(data); + if let Some(chunk) = self.get_chunk(&ident) { + return chunk; + } + + chunks_file.write_all(data).unwrap(); + let chunk = store::Chunk { + offset: *offset, + length: data.len() as u32, + }; + *offset += data.len() as u64; + + batch.insert(ident.as_bytes(), chunk.encode_to_vec()); + chunk + } + + fn get_chunk(&self, ident: &Digest) -> Option<store::Chunk> { + let buf = self.chunks.get(&*ident.as_bytes()).unwrap()?; + Some(store::Chunk::decode(&*buf).unwrap()) } pub fn read_blob(&self, ident: Digest) -> Vec<u8> { @@ -168,15 +205,31 @@ impl Store { .expect("blob not found"); let store::Blob { - offset, - length, + mut chunks, bao_inline, } = store::Blob::decode(&*buf).unwrap(); + let mut blob_length: u64 = 0; + let chunks = chunks + .drain(..) + .map(|chunk| { + let chunk_offset = blob_length; + blob_length += chunk.length as u64; + ( + chunk_offset, + Slice { + offset: chunk.offset, + length: chunk.length, + }, + ) + }) + .collect(); + Blob(bao::decode::Decoder::new_outboard( RawBlob { store: self, - slice: Slice { offset, length }, + chunks, + length: blob_length, position: 0, }, io::Cursor::new(bao_inline), @@ -211,29 +264,38 @@ impl io::Seek for Blob<'_> { #[derive(Debug)] struct Slice { offset: u64, - length: u64, + length: u32, } struct RawBlob<'a> { store: &'a Store, - slice: Slice, + chunks: BTreeMap<u64, Slice>, + length: u64, position: u64, } impl io::Read for RawBlob<'_> { fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + let (&chunk_offset, chunk_slice) = + if let Some(entry) = self.chunks.range(..=self.position).next_back() { + entry + } else { + // empty blob + return Ok(0); + }; + let prev_pos = self.position; let next_pos = Ord::min( self.position.saturating_add(dst.len() as u64), - self.slice.length, + chunk_offset + chunk_slice.length as u64, ); let len = (next_pos - prev_pos) as usize; let dst = &mut dst[..len]; - let offset = self.slice.offset + prev_pos; + let offset = prev_pos - chunk_offset + chunk_slice.offset; self.store - .chunks + .chunks_file .borrow() .read_exact_at(dst, offset) .context("Couldn't read blob data") @@ -257,12 +319,12 @@ impl io::Seek for RawBlob<'_> { fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> { let pos = match pos { io::SeekFrom::Start(n) => Some(n), - io::SeekFrom::End(n) => checked_add_signed(self.slice.length, n), + io::SeekFrom::End(n) => checked_add_signed(self.length, n), io::SeekFrom::Current(n) => checked_add_signed(self.position, n), }; match pos { - Some(n) if n <= self.slice.length => { + Some(n) if n <= self.length => { self.position = n; Ok(self.position) } |