// SPDX-FileCopyrightText: edef // SPDX-License-Identifier: OSL-3.0 pub use crate::chunker::Chunker; use { anyhow::{anyhow, bail, Context, Result}, byteorder::{BigEndian, ByteOrder}, prost::Message, sled::{transaction::ConflictableTransactionError, Transactional}, std::{ cell::{Cell, RefCell}, collections::BTreeMap, fs, io::{self, Read, Write}, os::unix::{fs::PermissionsExt, prelude::FileExt}, path::Path, }, }; pub mod store { include!(concat!(env!("OUT_DIR"), "/fossil.store.rs")); } mod chunker; const DIGEST_BYTES: usize = blake3::OUT_LEN; pub struct Store { meta: sled::Tree, blobs: sled::Tree, chunks: sled::Tree, chunks_file: RefCell, chunks_tail: Cell, } impl Store { pub fn open(path: impl AsRef) -> Result { let path = path.as_ref(); let db = sled::open(path)?; let meta = (&*db).clone(); let blobs = db.open_tree("blobs")?; let chunks = db.open_tree("chunks")?; let chunks_file = fs::OpenOptions::new() .read(true) .append(true) .create(true) .open(path.join("chunks"))?; let chunks_tail = meta .get("chunks_tail")? .map(|v| BigEndian::read_u64(&v)) .unwrap_or_default(); chunks_file.set_len(chunks_tail)?; Ok(Store { blobs, meta, chunks, chunks_file: RefCell::new(chunks_file), chunks_tail: Cell::new(chunks_tail), }) } fn flush(&self) { // NOTE: sled *can* flush without us explicitly asking for it, so it's // possible for the store to end up containing pointers to chunks that // aren't fsynced yet. The easiest fix is to always `chunks_file.sync_data()` // before we write anything to the database, but that's kind of a performance hazard. // TODO(edef): keep pending and known-durable blobs/chunks separate in the database self.chunks_file.borrow_mut().sync_data().unwrap(); self.meta.flush().unwrap(); } pub fn add_directory(&self, path: impl AsRef) -> Directory { let (d, _) = self.add_directory_inner(path.as_ref()); self.flush(); d } pub fn add_path(&self, path: impl AsRef) -> Node { let node = self.add_path_inner(path.as_ref()); self.flush(); node } fn add_directory_inner(&self, path: &Path) -> (Directory, u32) { let mut d = Directory::new(); let mut size: u32 = 0; for entry in path.read_dir().unwrap() { let entry = entry.unwrap(); let name = entry.file_name().into_string().unwrap(); let child = self.add_path_inner(&entry.path()); size = size.checked_add(child.size()).expect("overflow"); d.children.insert(name, child); } (d, size) } fn add_path_inner(&self, path: &Path) -> Node { let meta = fs::symlink_metadata(path).unwrap(); match meta.file_type() { ty if ty.is_dir() => { let (d, size) = self.add_directory_inner(path); let blob = d.into_pb().encode_to_vec(); Node::Directory(DirectoryRef { ident: self.write_blob(&blob), size, }) } ty if ty.is_file() => { let executable = (meta.permissions().mode() & 0o100) != 0; let blob = fs::read(path).unwrap(); Node::File(FileRef { executable, ident: self.write_blob(&blob), size: blob.len().try_into().expect("overflow"), }) } ty if ty.is_symlink() => { let target = path .read_link() .unwrap() .to_str() .expect("symlink target is invalid UTF-8") .to_owned(); Node::Link { target } } _ => panic!("not a symlink or a regular file"), } } fn write_blob(&self, data: &[u8]) -> Digest { let mut outboard = Vec::new(); let ident: blake3::Hash = { let mut encoder = bao::encode::Encoder::new_outboard(io::Cursor::new(&mut outboard)); encoder.write_all(&data).unwrap(); encoder.finalize().unwrap() }; if !self.blobs.contains_key(&*ident.as_bytes()).unwrap() { self.write_blob_inner(&ident, outboard, data); } ident } fn write_blob_inner(&self, ident: &Digest, outboard: Vec, data: &[u8]) { let mut chunks_file = self.chunks_file.borrow_mut(); let mut offset = self.chunks_tail.get(); let mut batch = sled::Batch::default(); let chunks = Chunker::from(data) .map(|chunk_data| { self.write_chunk(&mut chunks_file, &mut offset, &mut batch, chunk_data) }) .collect::>(); let blob_buf = store::Blob { chunks, bao_inline: outboard, } .encode_to_vec(); let chunks_tail_buf = offset.to_be_bytes(); (&self.blobs, &self.chunks, &self.meta) .transaction(|(blobs, chunks, meta)| { chunks.apply_batch(&batch)?; blobs.insert(&*ident.as_bytes(), &*blob_buf)?; meta.insert("chunks_tail", &chunks_tail_buf)?; Ok::<_, ConflictableTransactionError>(()) }) .unwrap(); self.chunks_tail.set(offset); } fn write_chunk( &self, chunks_file: &mut fs::File, offset: &mut u64, batch: &mut sled::Batch, data: &[u8], ) -> store::Chunk { let ident = blake3::hash(data); if let Some(chunk) = self.get_chunk(&ident) { return chunk; } chunks_file.write_all(data).unwrap(); let chunk = store::Chunk { offset: *offset, length: data.len() as u32, }; *offset += data.len() as u64; batch.insert(ident.as_bytes(), chunk.encode_to_vec()); chunk } fn get_chunk(&self, ident: &Digest) -> Option { let buf = self.chunks.get(&*ident.as_bytes()).unwrap()?; Some(store::Chunk::decode(&*buf).unwrap()) } pub fn read_blob(&self, ident: Digest) -> Vec { let mut buffer = Vec::new(); self.open_blob(ident).read_to_end(&mut buffer).unwrap(); buffer } pub fn open_blob(&self, ident: Digest) -> Blob { let buf = self .blobs .get(&*ident.as_bytes()) .unwrap() .expect("blob not found"); let store::Blob { mut chunks, bao_inline, } = store::Blob::decode(&*buf).unwrap(); let mut blob_length: u64 = 0; let chunks = chunks .drain(..) .map(|chunk| { let chunk_offset = blob_length; blob_length += chunk.length as u64; ( chunk_offset, Slice { offset: chunk.offset, length: chunk.length, }, ) }) .collect(); Blob(bao::decode::Decoder::new_outboard( RawBlob { store: self, chunks, length: blob_length, position: 0, }, io::Cursor::new(bao_inline), &ident, )) } } pub struct Blob<'a>(bao::decode::Decoder, std::io::Cursor>>); impl io::Read for Blob<'_> { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } impl io::Seek for Blob<'_> { fn seek(&mut self, pos: io::SeekFrom) -> io::Result { self.0.seek(pos) } fn rewind(&mut self) -> io::Result<()> { self.0.rewind() } fn stream_position(&mut self) -> io::Result { self.0.stream_position() } } /// a slice in the chunks file #[derive(Debug)] struct Slice { offset: u64, length: u32, } struct RawBlob<'a> { store: &'a Store, chunks: BTreeMap, length: u64, position: u64, } impl io::Read for RawBlob<'_> { fn read(&mut self, dst: &mut [u8]) -> io::Result { let (&chunk_offset, chunk_slice) = if let Some(entry) = self.chunks.range(..=self.position).next_back() { entry } else { // empty blob return Ok(0); }; let prev_pos = self.position; let next_pos = Ord::min( self.position.saturating_add(dst.len() as u64), chunk_offset + chunk_slice.length as u64, ); let len = (next_pos - prev_pos) as usize; let dst = &mut dst[..len]; let offset = prev_pos - chunk_offset + chunk_slice.offset; self.store .chunks_file .borrow() .read_exact_at(dst, offset) .context("Couldn't read blob data") .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; self.position = next_pos; Ok(len) } } // TODO(edef): use checked_add_signed when mixed_integer_ops stabilises fn checked_add_signed(lhs: u64, rhs: i64) -> Option { if rhs >= 0 { lhs.checked_add(rhs as u64) } else { lhs.checked_sub(rhs.unsigned_abs()) } } impl io::Seek for RawBlob<'_> { fn seek(&mut self, pos: io::SeekFrom) -> io::Result { let pos = match pos { io::SeekFrom::Start(n) => Some(n), io::SeekFrom::End(n) => checked_add_signed(self.length, n), io::SeekFrom::Current(n) => checked_add_signed(self.position, n), }; match pos { Some(n) if n <= self.length => { self.position = n; Ok(self.position) } _ => Err(io::Error::new( io::ErrorKind::InvalidInput, "seek out of range", )), } } fn rewind(&mut self) -> io::Result<()> { self.position = 0; Ok(()) } fn stream_position(&mut self) -> io::Result { Ok(self.position) } } pub type Digest = blake3::Hash; pub struct Directory { pub children: BTreeMap, } #[derive(Clone)] pub enum Node { Directory(DirectoryRef), File(FileRef), Link { target: String }, } #[derive(Clone)] pub struct DirectoryRef { pub ident: Digest, pub size: u32, } #[derive(Debug, Clone)] pub struct FileRef { pub ident: Digest, pub executable: bool, pub size: u32, } impl Node { fn size(&self) -> u32 { match self { &Node::Directory(DirectoryRef { size, .. }) => size.checked_add(1).expect("overflow"), _ => 1, } } } impl Directory { pub fn new() -> Directory { Directory { children: BTreeMap::new(), } } pub fn into_pb(self) -> store::Directory { let mut d = store::Directory::default(); for (name, node) in self.children.into_iter() { match node { Node::Directory(DirectoryRef { ident, size }) => { d.directories.push(store::DirectoryNode { name, size, r#ref: ident.as_bytes().to_vec(), }) } Node::File(FileRef { ident, executable, size, }) => d.files.push(store::FileNode { name, r#ref: ident.as_bytes().to_vec(), executable, size, }), Node::Link { target } => d.links.push(store::LinkNode { name, target }), } } d } pub fn from_pb(pb: store::Directory) -> Directory { let mut children = BTreeMap::new(); for child in pb.directories { children.insert( child.name, Node::Directory(DirectoryRef { ident: digest_from_bytes(&child.r#ref), size: child.size, }), ); } for child in pb.files { children.insert( child.name, Node::File(FileRef { ident: digest_from_bytes(&child.r#ref), executable: child.executable, size: child.size, }), ); } for child in pb.links { children.insert( child.name, Node::Link { target: child.target, }, ); } Directory { children } } } pub fn digest_str(digest: &Digest) -> String { let bytes = digest.as_bytes(); zbase32::encode_full_bytes(bytes) } pub fn digest_from_str(s: &str) -> Result { let bits = DIGEST_BYTES * 8; let chars = (bits + 5 - 1) / 5; if s.len() < chars { bail!("digest too short"); } let bytes_vec = zbase32::decode_str(s, bits as u64).map_err(|_| anyhow!("invalid digest"))?; if zbase32::encode_full_bytes(&bytes_vec) != s { // non-canonical input bail!("non-canonical digest"); } let mut bytes = [0; DIGEST_BYTES]; bytes.copy_from_slice(&bytes_vec); Ok(bytes.into()) } #[test] fn digest_str_golden() { let h = blake3::hash(b"hello fossil!"); let s = "xow7w4moszx4w5ngp7d7w3fyfk313tpw6fndik4imdhd18ynw6so"; assert_eq!(digest_str(&h), s); assert_eq!(digest_from_str(s).ok(), Some(h)); } #[test] fn digest_str_short() { assert!(digest_from_str("xow7w4moszx4w5ngp7d7w3fyfk313tpw6fndik4imdhd18ynw6s").is_err()); } #[track_caller] pub fn digest_from_bytes(bytes: &[u8]) -> Digest { if bytes.len() != DIGEST_BYTES { panic!( "digest is {} bytes, expecting {} bytes", bytes.len(), DIGEST_BYTES ); } let mut buffer = [0; DIGEST_BYTES]; buffer.copy_from_slice(bytes); buffer.into() } #[test] /// Write a blob, and read it back. fn read_write() { let data = { let mut h = blake3::Hasher::new(); h.update(b"test vector"); let mut buf = Vec::new(); h.finalize_xof().take(2468).read_to_end(&mut buf).unwrap(); buf }; // TODO(edef): use a temporary file let store = Store::open("fossil.db").unwrap(); let ident = store.write_blob(&data); assert_eq!(data, store.read_blob(ident)); }