From 7831d9e831c923bf4577da714b220948e4295d34 Mon Sep 17 00:00:00 2001 From: edef Date: Tue, 19 Apr 2022 00:57:34 +0000 Subject: ripple/fossil: prepare for seekable, streaming blob reading This implements blob reading in terms of RawBlob, a fairly naive streaming blob reader. For now, we still only use it for simple one-shot reads. Change-Id: Iecd4f926412b474ca6f3dde8c6055c0c3781301f --- ripple/fossil/src/lib.rs | 105 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 89 insertions(+), 16 deletions(-) (limited to 'ripple/fossil') diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs index d7818cd..e65253b 100644 --- a/ripple/fossil/src/lib.rs +++ b/ripple/fossil/src/lib.rs @@ -4,7 +4,13 @@ use { byteorder::{BigEndian, ByteOrder}, prost::Message, - std::{collections::BTreeMap, fs, io, os::unix::fs::PermissionsExt, path::Path}, + std::{ + collections::BTreeMap, + fs, + io::{self, BufRead, Read}, + os::unix::fs::PermissionsExt, + path::Path, + }, }; pub mod store { @@ -75,10 +81,10 @@ impl Store { } fn write_blob(&self, data: &[u8]) -> Digest { - let digest = { + let ident = { let mut h = blake3::Hasher::new(); h.update_with_join::(data); - *h.finalize().as_bytes() + h.finalize() }; // TODO(edef): maybe don't use the default tree? @@ -88,28 +94,26 @@ impl Store { self.db .transaction::<_, _, sled::Error>(|db| { for (n, chunk) in data.chunks(CHUNK_BYTES).enumerate() { - let mut key = [0u8; DIGEST_BYTES + OFFSET_BYTES]; - key[..DIGEST_BYTES].copy_from_slice(&digest); - BigEndian::write_u32(&mut key[DIGEST_BYTES..], n as u32); - db.insert(&key[..], chunk)?; + db.insert(chunk_key(&ident, n as u32).as_slice(), chunk)?; } Ok(()) }) .unwrap(); - digest.into() + ident.into() } - pub fn read_blob(&self, r#ref: Digest) -> Vec { + pub fn read_blob(&self, ident: Digest) -> Vec { let mut buffer = Vec::new(); - let mut h = blake3::Hasher::new(); - for element in self.db.scan_prefix(r#ref.as_bytes()) { - let (_, chunk) = element.unwrap(); - h.update(&chunk); - buffer.extend_from_slice(&chunk); - } + self.raw_blob(ident).read_to_end(&mut buffer).unwrap(); + + let computed_ident = { + let mut h = blake3::Hasher::new(); + h.update_with_join::(&buffer); + h.finalize() + }; - if h.finalize() != r#ref { + if computed_ident != ident { if buffer.is_empty() { panic!("blob not found"); } @@ -118,6 +122,75 @@ impl Store { buffer } + + fn raw_blob(&self, ident: Digest) -> RawBlob<'_> { + RawBlob { + store: self, + ident, + buf: None, + off: 0, + } + } +} + +fn chunk_key(ident: &Digest, chunk: u32) -> [u8; DIGEST_BYTES + OFFSET_BYTES] { + let mut key = [0u8; DIGEST_BYTES + OFFSET_BYTES]; + key[..DIGEST_BYTES].copy_from_slice(ident.as_bytes()); + BigEndian::write_u32(&mut key[DIGEST_BYTES..], chunk as u32); + key +} + +fn chunk_id(offset: u64) -> u32 { + (offset / CHUNK_BYTES as u64).try_into().unwrap() +} + +struct RawBlob<'a> { + store: &'a Store, + ident: Digest, + /// current chunk + buf: Option, + /// reader offset + /// LSBs are intra-chunk, MSBs are chunk number + off: u64, +} + +impl io::BufRead for RawBlob<'_> { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + let buf = match self.buf { + Some(ref buf) => buf, + None => { + let chunk = chunk_id(self.off); + match self.store.db.get(chunk_key(&self.ident, chunk))? { + None => return Ok(&[]), + Some(contents) => self.buf.insert(contents), + } + } + }; + + let off = (self.off % CHUNK_BYTES as u64) as usize; + Ok(buf.get(off..).unwrap_or_default()) + } + + fn consume(&mut self, amt: usize) { + let prev_offset = self.off; + let next_offset = self.off.saturating_add(amt as u64); + + if chunk_id(next_offset) != chunk_id(prev_offset) { + self.buf.take(); + } + + self.off = next_offset; + } +} + +impl io::Read for RawBlob<'_> { + fn read(&mut self, dst: &mut [u8]) -> io::Result { + let src = self.fill_buf()?; + let len = Ord::min(src.len(), dst.len()); + dst[..len].copy_from_slice(&src[..len]); + self.consume(len); + Ok(len) + } } pub type Digest = blake3::Hash; -- cgit 1.4.1