From ec4ff9199b75926fc0bed56f027035446ae7d021 Mon Sep 17 00:00:00 2001 From: V Date: Thu, 19 Aug 2021 22:10:03 +0200 Subject: fleet/pkgs/naut: a little commit notification bot After a couple of days wrangling Rust's async ecosystem, we now have an IRC bot that will announce new commits. This should hopefully give people a better view into what we're working on! Change-Id: Ie7b3be62afca3ad2a10cb04c15ff666c62408fa2 --- fleet/pkgs/naut/src/main.rs | 252 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 fleet/pkgs/naut/src/main.rs (limited to 'fleet/pkgs/naut/src') diff --git a/fleet/pkgs/naut/src/main.rs b/fleet/pkgs/naut/src/main.rs new file mode 100644 index 0000000..2349330 --- /dev/null +++ b/fleet/pkgs/naut/src/main.rs @@ -0,0 +1,252 @@ +// SPDX-FileCopyrightText: V +// SPDX-License-Identifier: OSL-3.0 + +use { + anyhow::{anyhow, Error, Result}, + git2::{Oid, Repository, Sort}, + irc::client::prelude::*, + pin_utils::pin_mut, + std::{ + collections::{HashMap, HashSet}, + env, + fs::{remove_file, File}, + io::{ErrorKind, Read}, + path::Path, + }, + tokio::{ + io::{AsyncBufRead, AsyncBufReadExt, BufReader, Lines}, + net::UnixListener, + select, spawn, + sync::{mpsc, mpsc::UnboundedSender}, + }, + tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}, +}; + +#[derive(Debug)] +struct Batch { + repository: String, + lines: Vec, +} + +#[tokio::main] +async fn main() -> Result<()> { + let repo_by_channel = { + let mut buf = vec![]; + File::open(env::var("NAUT_CONFIG")?)?.read_to_end(&mut buf)?; + let tmp: HashMap> = toml::from_slice(&buf)?; + if tmp.is_empty() { + return Err(anyhow!("No channels configured!")); + } + tmp + }; + + let channels: Vec = repo_by_channel + .keys() + .clone() + .map(ToOwned::to_owned) + .collect(); + + // Invert the config, so we have a map of repositories to channel names + let channel_by_repo = { + let mut tmp = HashMap::new(); + for (channel, repos) in repo_by_channel { + for repo in repos { + tmp.entry(repo) + .or_insert_with(Vec::new) + .push(channel.to_string()); + } + } + tmp + }; + + let repositories: HashSet<_> = channel_by_repo + .keys() + .clone() + .map(ToOwned::to_owned) + .collect(); + + let (tx, rx) = mpsc::unbounded_channel::(); + + let listener = bind(env::var("NAUT_SOCK")?.as_str())?; + spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + + let tx = tx.clone(); + let repositories = repositories.clone(); + + let conn = async move { + let mut lines = BufReader::new(stream).lines(); + let path = lines.next_line().await?.unwrap(); + + let repo_name = Path::new(&path).file_name().unwrap().to_str().unwrap(); + if !repositories.contains(repo_name) { + return Err(anyhow!( + "Received a request for an unmanaged repository: {}", + repo_name + )); + } + + let repo = Repository::open(&path)?; + + handle(repo, repo_name, lines, tx).await?; + Ok::<(), Error>(()) + }; + + spawn(async move { + if let Err(e) = conn.await { + eprintln!("Failed to handle request: {}", e); + } + }); + } + }); + + let client_config = Config { + server: Some("irc.libera.chat".to_owned()), + password: Some(env::var("NAUT_PASS")?), + nickname: Some("naut".to_owned()), + realname: Some("blub blub".to_owned()), + version: Some(format!("naut {}", env!("CARGO_PKG_VERSION"))), + source: Some("https://src.unfathomable.blue/nixos-config/tree/pkgs/naut".to_owned()), + channels, + ..Default::default() + }; + + let rx = UnboundedReceiverStream::new(rx).fuse(); + pin_mut!(rx); + + loop { + let mut client = Client::from_config(client_config.clone()).await?; + client.identify()?; + + let sender = client.sender(); + + let stream = client.stream()?.fuse(); + pin_mut!(stream); + + loop { + select! { + message = stream.next() => match message { + Some(_) => {}, + None => break, + }, + Some(batch) = rx.next() => { + let channels = channel_by_repo.get(&batch.repository).unwrap(); + for line in batch.lines { + for channel in channels { + sender.send_privmsg(channel.to_owned(), line.to_owned())?; + } + } + }, + } + } + } +} + +fn bind(path: &str) -> Result { + match remove_file(path) { + Ok(()) => (), + Err(e) if e.kind() == ErrorKind::NotFound => (), + Err(e) => return Err(e.into()), + } + + UnixListener::bind(path).map_err(Error::from) +} + +async fn handle( + repo: Repository, + repo_name: &str, + mut lines: Lines, + tx: UnboundedSender, +) -> Result<()> { + while let Some(line) = lines.next_line().await? { + let args: Vec<_> = line.splitn(3, ' ').collect(); + + let old = Oid::from_str(args[0])?; + let new = Oid::from_str(args[1])?; + let r#ref = repo.find_reference(args[2])?; + let ref_name = r#ref.shorthand().unwrap(); + + let mut lines = vec![]; + + if r#ref.is_branch() { + if new.is_zero() { + lines.push(format!( + "[{}] branch {} deleted (was {})", + repo_name, ref_name, old + )); + } else { + let mut walker = repo.revwalk()?; + walker.set_sorting(Sort::REVERSE)?; + walker.push(new)?; + + if old.is_zero() { + lines.push(format!("[{}] new branch created: {}", repo_name, ref_name)); + + // We cannot use repo.head directly, as that comes resolved already. + let head = repo.find_reference("HEAD")?; + + // Hide commits also present from HEAD (unless this *is* HEAD, in which we do want them). + // This avoids duplicating notifications for commits that we've already seen, provided we + // only push branches that are forked directly from HEAD (or one of its ancestors). + if ref_name != head.symbolic_target().unwrap() { + if let Ok(base) = repo.merge_base(head.resolve()?.target().unwrap(), new) { + walker.hide(base)?; + } + } + } else { + walker.hide(old)?; + } + + let commits: Vec<_> = walker + .map(|x| repo.find_commit(x.unwrap()).unwrap()) + .collect(); + + lines.push(format!( + "[{}] {} commits pushed to {}", + repo_name, + commits.len(), + ref_name + )); + + for commit in commits { + lines.push(format!( + " {} \"{}\" by {}", + commit.as_object().short_id()?.as_str().unwrap(), + commit.summary().unwrap(), + commit.author().name().unwrap() + )); + } + } + } else if r#ref.is_tag() { + if new.is_zero() { + lines.push(format!( + "[{}] tag {} deleted (was {})", + repo_name, ref_name, old + )) + } else if old.is_zero() { + lines.push(format!( + "[{}] commit {} tagged as {}", + repo_name, new, ref_name + )) + } else { + lines.push(format!( + "[{}] tag {} modified (was {}, now {})", + repo_name, ref_name, old, new + )) + } + } else { + return Err(anyhow!( + "Received a reference that's neither a branch nor tag: {}", + args[2] + )); + } + + tx.send(Batch { + repository: repo_name.to_owned(), + lines, + })?; + } + + Ok(()) +} -- cgit 1.4.1